aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--BUILD4
-rw-r--r--CMakeLists.txt2
-rw-r--r--Makefile50
-rw-r--r--build.yaml14
-rw-r--r--include/grpc++/server.h61
-rw-r--r--include/grpc++/server_builder.h30
-rw-r--r--src/cpp/rpcmanager/grpc_rpc_manager.cc188
-rw-r--r--src/cpp/rpcmanager/grpc_rpc_manager.h157
-rw-r--r--src/cpp/server/server_builder.cc130
-rw-r--r--src/cpp/server/server_cc.cc333
-rw-r--r--test/cpp/end2end/end2end_test.cc11
-rw-r--r--test/cpp/rpcmanager/grpc_rpc_manager_test.cc89
-rw-r--r--test/cpp/rpcmanager/grpc_rpc_manager_test.h58
-rw-r--r--tools/doxygen/Doxyfile.c++.internal2
-rw-r--r--tools/run_tests/performance/scenario_config.py503
-rw-r--r--tools/run_tests/sources_and_headers.json23
-rw-r--r--tools/run_tests/tests.json63
-rw-r--r--vsprojects/vcxproj/grpc++/grpc++.vcxproj3
-rw-r--r--vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters9
-rw-r--r--vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj3
-rw-r--r--vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters9
-rw-r--r--vsprojects/vcxproj/test/grpc_rpc_manager_test/grpc_rpc_manager_test.vcxproj204
-rw-r--r--vsprojects/vcxproj/test/grpc_rpc_manager_test/grpc_rpc_manager_test.vcxproj.filters26
23 files changed, 1688 insertions, 284 deletions
diff --git a/BUILD b/BUILD
index 4763bc0abd..39f0d7b51f 100644
--- a/BUILD
+++ b/BUILD
@@ -1270,6 +1270,7 @@ cc_library(
"src/cpp/server/secure_server_credentials.h",
"src/cpp/client/create_channel_internal.h",
"src/cpp/common/channel_filter.h",
+ "src/cpp/rpcmanager/grpc_rpc_manager.h",
"src/cpp/server/dynamic_thread_pool.h",
"src/cpp/server/thread_pool_interface.h",
"src/cpp/client/insecure_credentials.cc",
@@ -1292,6 +1293,7 @@ cc_library(
"src/cpp/common/completion_queue_cc.cc",
"src/cpp/common/core_codegen.cc",
"src/cpp/common/rpc_method.cc",
+ "src/cpp/rpcmanager/grpc_rpc_manager.cc",
"src/cpp/server/async_generic_service.cc",
"src/cpp/server/create_default_thread_pool.cc",
"src/cpp/server/dynamic_thread_pool.cc",
@@ -1497,6 +1499,7 @@ cc_library(
srcs = [
"src/cpp/client/create_channel_internal.h",
"src/cpp/common/channel_filter.h",
+ "src/cpp/rpcmanager/grpc_rpc_manager.h",
"src/cpp/server/dynamic_thread_pool.h",
"src/cpp/server/thread_pool_interface.h",
"src/cpp/client/insecure_credentials.cc",
@@ -1514,6 +1517,7 @@ cc_library(
"src/cpp/common/completion_queue_cc.cc",
"src/cpp/common/core_codegen.cc",
"src/cpp/common/rpc_method.cc",
+ "src/cpp/rpcmanager/grpc_rpc_manager.cc",
"src/cpp/server/async_generic_service.cc",
"src/cpp/server/create_default_thread_pool.cc",
"src/cpp/server/dynamic_thread_pool.cc",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 97cedbe449..a6e89cf619 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1025,6 +1025,7 @@ add_library(grpc++
src/cpp/common/completion_queue_cc.cc
src/cpp/common/core_codegen.cc
src/cpp/common/rpc_method.cc
+ src/cpp/rpcmanager/grpc_rpc_manager.cc
src/cpp/server/async_generic_service.cc
src/cpp/server/create_default_thread_pool.cc
src/cpp/server/dynamic_thread_pool.cc
@@ -1279,6 +1280,7 @@ add_library(grpc++_unsecure
src/cpp/common/completion_queue_cc.cc
src/cpp/common/core_codegen.cc
src/cpp/common/rpc_method.cc
+ src/cpp/rpcmanager/grpc_rpc_manager.cc
src/cpp/server/async_generic_service.cc
src/cpp/server/create_default_thread_pool.cc
src/cpp/server/dynamic_thread_pool.cc
diff --git a/Makefile b/Makefile
index b98380be48..2316a91c5e 100644
--- a/Makefile
+++ b/Makefile
@@ -1048,6 +1048,7 @@ grpc_csharp_plugin: $(BINDIR)/$(CONFIG)/grpc_csharp_plugin
grpc_node_plugin: $(BINDIR)/$(CONFIG)/grpc_node_plugin
grpc_objective_c_plugin: $(BINDIR)/$(CONFIG)/grpc_objective_c_plugin
grpc_python_plugin: $(BINDIR)/$(CONFIG)/grpc_python_plugin
+grpc_rpc_manager_test: $(BINDIR)/$(CONFIG)/grpc_rpc_manager_test
grpc_ruby_plugin: $(BINDIR)/$(CONFIG)/grpc_ruby_plugin
grpc_tool_test: $(BINDIR)/$(CONFIG)/grpc_tool_test
grpclb_api_test: $(BINDIR)/$(CONFIG)/grpclb_api_test
@@ -1416,6 +1417,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/generic_end2end_test \
$(BINDIR)/$(CONFIG)/golden_file_test \
$(BINDIR)/$(CONFIG)/grpc_cli \
+ $(BINDIR)/$(CONFIG)/grpc_rpc_manager_test \
$(BINDIR)/$(CONFIG)/grpc_tool_test \
$(BINDIR)/$(CONFIG)/grpclb_api_test \
$(BINDIR)/$(CONFIG)/grpclb_test \
@@ -1503,6 +1505,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/generic_end2end_test \
$(BINDIR)/$(CONFIG)/golden_file_test \
$(BINDIR)/$(CONFIG)/grpc_cli \
+ $(BINDIR)/$(CONFIG)/grpc_rpc_manager_test \
$(BINDIR)/$(CONFIG)/grpc_tool_test \
$(BINDIR)/$(CONFIG)/grpclb_api_test \
$(BINDIR)/$(CONFIG)/grpclb_test \
@@ -1801,6 +1804,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/generic_end2end_test || ( echo test generic_end2end_test failed ; exit 1 )
$(E) "[RUN] Testing golden_file_test"
$(Q) $(BINDIR)/$(CONFIG)/golden_file_test || ( echo test golden_file_test failed ; exit 1 )
+ $(E) "[RUN] Testing grpc_rpc_manager_test"
+ $(Q) $(BINDIR)/$(CONFIG)/grpc_rpc_manager_test || ( echo test grpc_rpc_manager_test failed ; exit 1 )
$(E) "[RUN] Testing grpc_tool_test"
$(Q) $(BINDIR)/$(CONFIG)/grpc_tool_test || ( echo test grpc_tool_test failed ; exit 1 )
$(E) "[RUN] Testing grpclb_api_test"
@@ -3591,6 +3596,7 @@ LIBGRPC++_SRC = \
src/cpp/common/completion_queue_cc.cc \
src/cpp/common/core_codegen.cc \
src/cpp/common/rpc_method.cc \
+ src/cpp/rpcmanager/grpc_rpc_manager.cc \
src/cpp/server/async_generic_service.cc \
src/cpp/server/create_default_thread_pool.cc \
src/cpp/server/dynamic_thread_pool.cc \
@@ -4121,6 +4127,7 @@ LIBGRPC++_UNSECURE_SRC = \
src/cpp/common/completion_queue_cc.cc \
src/cpp/common/core_codegen.cc \
src/cpp/common/rpc_method.cc \
+ src/cpp/rpcmanager/grpc_rpc_manager.cc \
src/cpp/server/async_generic_service.cc \
src/cpp/server/create_default_thread_pool.cc \
src/cpp/server/dynamic_thread_pool.cc \
@@ -11727,6 +11734,49 @@ ifneq ($(NO_DEPS),true)
endif
+GRPC_RPC_MANAGER_TEST_SRC = \
+ test/cpp/rpcmanager/grpc_rpc_manager_test.cc \
+
+GRPC_RPC_MANAGER_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GRPC_RPC_MANAGER_TEST_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/grpc_rpc_manager_test: openssl_dep_error
+
+else
+
+
+
+
+ifeq ($(NO_PROTOBUF),true)
+
+# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+.
+
+$(BINDIR)/$(CONFIG)/grpc_rpc_manager_test: protobuf_dep_error
+
+else
+
+$(BINDIR)/$(CONFIG)/grpc_rpc_manager_test: $(PROTOBUF_DEP) $(GRPC_RPC_MANAGER_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a
+ $(E) "[LD] Linking $@"
+ $(Q) mkdir -p `dirname $@`
+ $(Q) $(LDXX) $(LDFLAGS) $(GRPC_RPC_MANAGER_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/grpc_rpc_manager_test
+
+endif
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/cpp/rpcmanager/grpc_rpc_manager_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a
+
+deps_grpc_rpc_manager_test: $(GRPC_RPC_MANAGER_TEST_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(GRPC_RPC_MANAGER_TEST_OBJS:.o=.dep)
+endif
+endif
+
+
GRPC_RUBY_PLUGIN_SRC = \
src/compiler/ruby_plugin.cc \
diff --git a/build.yaml b/build.yaml
index 4746cc1a48..87225684fa 100644
--- a/build.yaml
+++ b/build.yaml
@@ -711,6 +711,7 @@ filegroups:
headers:
- src/cpp/client/create_channel_internal.h
- src/cpp/common/channel_filter.h
+ - src/cpp/rpcmanager/grpc_rpc_manager.h
- src/cpp/server/dynamic_thread_pool.h
- src/cpp/server/thread_pool_interface.h
src:
@@ -726,6 +727,7 @@ filegroups:
- src/cpp/common/completion_queue_cc.cc
- src/cpp/common/core_codegen.cc
- src/cpp/common/rpc_method.cc
+ - src/cpp/rpcmanager/grpc_rpc_manager.cc
- src/cpp/server/async_generic_service.cc
- src/cpp/server/create_default_thread_pool.cc
- src/cpp/server/dynamic_thread_pool.cc
@@ -2869,6 +2871,18 @@ targets:
secure: false
vs_config_type: Application
vs_project_guid: '{DF52D501-A6CF-4E6F-BA38-6EBE2E8DAFB2}'
+- name: grpc_rpc_manager_test
+ build: test
+ language: c++
+ headers:
+ - test/cpp/rpcmanager/grpc_rpc_manager_test.h
+ src:
+ - test/cpp/rpcmanager/grpc_rpc_manager_test.cc
+ deps:
+ - grpc++
+ - grpc
+ - gpr
+ - grpc++_test_config
- name: grpc_ruby_plugin
build: protoc
language: c++
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index f51a6c658f..bae83eee3f 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -50,6 +50,8 @@
#include <grpc++/support/status.h>
#include <grpc/compression.h>
+#include "src/cpp/rpcmanager/grpc_rpc_manager.h"
+
struct grpc_server;
namespace grpc {
@@ -105,18 +107,41 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen {
class AsyncRequest;
class ShutdownRequest;
+ /// SyncRequestManager is an implementation of GrpcRpcManager. This class is
+ /// responsible for polling for incoming RPCs and calling the RPC handlers.
+ /// This is only used in case of a Sync server (i.e a server exposing a sync
+ /// interface)
+ class SyncRequestManager;
+
class UnimplementedAsyncRequestContext;
class UnimplementedAsyncRequest;
class UnimplementedAsyncResponse;
/// Server constructors. To be used by \a ServerBuilder only.
///
- /// \param thread_pool The threadpool instance to use for call processing.
- /// \param thread_pool_owned Does the server own the \a thread_pool instance?
- /// \param max_receive_message_size Maximum message length that the channel
- /// can receive.
- Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
- int max_receive_message_size, ChannelArguments* args);
+ /// \param sync_server_cqs The completion queues to use if the server is a
+ /// synchronous server (or a hybrid server). The server polls for new RPCs on
+ /// these queues
+ ///
+ /// \param max_message_size Maximum message length that the channel can
+ /// receive.
+ ///
+ /// \param args The channel args
+ ///
+ /// \param min_pollers The minimum number of polling threads per server
+ /// completion queue (in param sync_server_cqs) to use for listening to
+ /// incoming requests (used only in case of sync server)
+ ///
+ /// \param max_pollers The maximum number of polling threads per server
+ /// completion queue (in param sync_server_cqs) to use for listening to
+ /// incoming requests (used only in case of sync server)
+ ///
+ /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on
+ /// server completion queues passed via sync_server_cqs param.
+ Server(std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+ sync_server_cqs,
+ int max_message_size, ChannelArguments* args, int min_pollers,
+ int max_pollers, int sync_cq_timeout_msec);
/// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the Server instance.
@@ -171,34 +196,36 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen {
const int max_receive_message_size_;
- // Completion queue.
- CompletionQueue cq_;
+ /// The following completion queues are ONLY used in case of Sync API i.e if
+ /// the server has any services with sync methods. The server uses these
+ /// completion queues to poll for new RPCs
+ std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+ sync_server_cqs_;
+
+ /// List of GrpcRpcManager instances (one for each cq in the sync_server_cqs)
+ std::vector<std::unique_ptr<SyncRequestManager>> sync_req_mgrs_;
// Sever status
grpc::mutex mu_;
bool started_;
bool shutdown_;
bool shutdown_notified_;
+
+ // TODO (sreek) : Remove num_running_cb_ and callback_cv_;
// The number of threads which are running callbacks.
- int num_running_cb_;
- grpc::condition_variable callback_cv_;
+ // int num_running_cb_;
+ // grpc::condition_variable callback_cv_;
grpc::condition_variable shutdown_cv_;
std::shared_ptr<GlobalCallbacks> global_callbacks_;
- std::list<SyncRequest>* sync_methods_;
std::vector<grpc::string> services_;
- std::unique_ptr<RpcServiceMethod> unknown_method_;
bool has_generic_service_;
- // Pointer to the c grpc server.
+ // Pointer to the c core's grpc server.
grpc_server* server_;
- ThreadPoolInterface* thread_pool_;
- // Whether the thread pool is created and owned by the server.
- bool thread_pool_owned_;
-
std::unique_ptr<ServerInitializer> server_initializer_;
};
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index 37f1f8cb80..8fac168ff7 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -62,6 +62,22 @@ class ServerBuilder {
public:
ServerBuilder();
+ struct SyncServerSettings {
+ // Number of server completion queues to create to listen to incoming RPCs.
+ int num_cqs;
+
+ // Minimum number of threads per completion queue that should be listening
+ // to incoming RPCs.
+ int min_pollers;
+
+ // Maximum number of threads per completion queue that can be listening to
+ // incoming RPCs.
+ int max_pollers;
+
+ // The timeout for server completion queue's AsyncNext call.
+ int cq_timeout_msec;
+ };
+
/// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the \a Server instance returned
/// by \a BuildAndStart().
@@ -115,6 +131,9 @@ class ServerBuilder {
ServerBuilder& SetOption(std::unique_ptr<ServerBuilderOption> option);
+ /// Note: Only useful if this is a Synchronous server.
+ void SetSyncServerSettings(SyncServerSettings settings);
+
/// Tries to bind \a server to the given \a addr.
///
/// It can be invoked multiple times.
@@ -170,6 +189,14 @@ class ServerBuilder {
int* selected_port;
};
+ // Sync server settings. If this is not set via SetSyncServerSettings(), the
+ // following default values are used:
+ // sync_server_settings_.num_cqs = Number of CPUs
+ // sync_server_settings_.min_pollers = 1
+ // sync_server_settings_.max_pollers = INT_MAX
+ // sync_server_settings_.cq_timeout_msec = 1000
+ struct SyncServerSettings sync_server_settings_;
+
typedef std::unique_ptr<grpc::string> HostString;
struct NamedService {
explicit NamedService(Service* s) : service(s) {}
@@ -184,7 +211,10 @@ class ServerBuilder {
std::vector<std::unique_ptr<ServerBuilderOption>> options_;
std::vector<std::unique_ptr<NamedService>> services_;
std::vector<Port> ports_;
+
+ /* List of completion queues added via AddCompletionQueue() method */
std::vector<ServerCompletionQueue*> cqs_;
+
std::shared_ptr<ServerCredentials> creds_;
std::vector<std::unique_ptr<ServerBuilderPlugin>> plugins_;
AsyncGenericService* generic_service_;
diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.cc b/src/cpp/rpcmanager/grpc_rpc_manager.cc
new file mode 100644
index 0000000000..2299dbdcd3
--- /dev/null
+++ b/src/cpp/rpcmanager/grpc_rpc_manager.cc
@@ -0,0 +1,188 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc++/impl/sync.h>
+#include <grpc++/impl/thd.h>
+#include <grpc/support/log.h>
+#include <climits>
+
+#include "src/cpp/rpcmanager/grpc_rpc_manager.h"
+
+namespace grpc {
+
+GrpcRpcManager::GrpcRpcManagerThread::GrpcRpcManagerThread(
+ GrpcRpcManager* rpc_mgr)
+ : rpc_mgr_(rpc_mgr),
+ thd_(new std::thread(&GrpcRpcManager::GrpcRpcManagerThread::Run, this)) {}
+
+void GrpcRpcManager::GrpcRpcManagerThread::Run() {
+ rpc_mgr_->MainWorkLoop();
+ rpc_mgr_->MarkAsCompleted(this);
+}
+
+GrpcRpcManager::GrpcRpcManagerThread::~GrpcRpcManagerThread() {
+ thd_->join();
+ thd_.reset();
+}
+
+GrpcRpcManager::GrpcRpcManager(int min_pollers, int max_pollers)
+ : shutdown_(false),
+ num_pollers_(0),
+ min_pollers_(min_pollers),
+ max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers),
+ num_threads_(0) {}
+
+GrpcRpcManager::~GrpcRpcManager() {
+ {
+ std::unique_lock<grpc::mutex> lock(mu_);
+ GPR_ASSERT(num_threads_ == 0);
+ }
+
+ CleanupCompletedThreads();
+}
+
+void GrpcRpcManager::Wait() {
+ std::unique_lock<grpc::mutex> lock(mu_);
+ while (num_threads_ != 0) {
+ shutdown_cv_.wait(lock);
+ }
+}
+
+void GrpcRpcManager::ShutdownRpcManager() {
+ std::unique_lock<grpc::mutex> lock(mu_);
+ shutdown_ = true;
+}
+
+bool GrpcRpcManager::IsShutdown() {
+ std::unique_lock<grpc::mutex> lock(mu_);
+ return shutdown_;
+}
+
+void GrpcRpcManager::MarkAsCompleted(GrpcRpcManagerThread* thd) {
+ {
+ std::unique_lock<grpc::mutex> list_lock(list_mu_);
+ completed_threads_.push_back(thd);
+ }
+
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ num_threads_--;
+ if (num_threads_ == 0) {
+ shutdown_cv_.notify_one();
+ }
+}
+
+void GrpcRpcManager::CleanupCompletedThreads() {
+ std::unique_lock<grpc::mutex> lock(list_mu_);
+ for (auto thd = completed_threads_.begin(); thd != completed_threads_.end();
+ thd = completed_threads_.erase(thd)) {
+ delete *thd;
+ }
+}
+
+void GrpcRpcManager::Initialize() {
+ for (int i = 0; i < min_pollers_; i++) {
+ MaybeCreatePoller();
+ }
+}
+
+// If the number of pollers (i.e threads currently blocked in PollForWork()) is
+// less than max threshold (i.e max_pollers_) and the total number of threads is
+// below the maximum threshold, we can let the current thread continue as poller
+bool GrpcRpcManager::MaybeContinueAsPoller() {
+ std::unique_lock<grpc::mutex> lock(mu_);
+
+ if (shutdown_ || num_pollers_ > max_pollers_) {
+ return false;
+ }
+
+ num_pollers_++;
+ return true;
+}
+
+// Create a new poller if the current number of pollers i.e num_pollers_ (i.e
+// threads currently blocked in PollForWork()) is below the threshold (i.e
+// min_pollers_) and the total number of threads is below the maximum threshold
+void GrpcRpcManager::MaybeCreatePoller() {
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ if (!shutdown_ && num_pollers_ < min_pollers_) {
+ num_pollers_++;
+ num_threads_++;
+
+ // Create a new thread (which ends up calling the MainWorkLoop() function
+ new GrpcRpcManagerThread(this);
+ }
+}
+
+void GrpcRpcManager::MainWorkLoop() {
+ void* tag;
+ bool ok;
+
+ /*
+ 1. Poll for work (i.e PollForWork())
+ 2. After returning from PollForWork, reduce the number of pollers by 1. If
+ PollForWork() returned a TIMEOUT, then it may indicate that we have more
+ polling threads than needed. Check if the number of pollers is greater
+ than min_pollers and if so, terminate the thread.
+ 3. Since we are short of one poller now, see if a new poller has to be
+ created (i.e see MaybeCreatePoller() for more details)
+ 4. Do the actual work (DoWork())
+ 5. After doing the work, see it this thread can resume polling work (i.e
+ see MaybeContinueAsPoller() for more details) */
+ do {
+ WorkStatus work_status = PollForWork(&tag, &ok);
+
+ {
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ num_pollers_--;
+
+ if (work_status == TIMEOUT && num_pollers_ > min_pollers_) {
+ break;
+ }
+ }
+
+ // TODO (sreek) See if we need to check for shutdown here and quit
+ // Note that MaybeCreatePoller does check for shutdown and creates a new
+ // thread only if GrpcRpcManager is not shutdown
+ if (work_status == WORK_FOUND) {
+ MaybeCreatePoller();
+ DoWork(tag, ok);
+ }
+ } while (MaybeContinueAsPoller());
+
+ CleanupCompletedThreads();
+
+ // If we are here, either GrpcRpcManager is shutting down or it already has
+ // enough threads.
+}
+
+} // namespace grpc
diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.h b/src/cpp/rpcmanager/grpc_rpc_manager.h
new file mode 100644
index 0000000000..d00771b9a1
--- /dev/null
+++ b/src/cpp/rpcmanager/grpc_rpc_manager.h
@@ -0,0 +1,157 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_GRPC_RPC_MANAGER_H
+#define GRPC_INTERNAL_CPP_GRPC_RPC_MANAGER_H
+
+#include <list>
+#include <memory>
+
+#include <grpc++/impl/sync.h>
+#include <grpc++/impl/thd.h>
+
+namespace grpc {
+
+class GrpcRpcManager {
+ public:
+ explicit GrpcRpcManager(int min_pollers, int max_pollers);
+ virtual ~GrpcRpcManager();
+
+ // This function MUST be called before using the object
+ void Initialize();
+
+ enum WorkStatus { WORK_FOUND, SHUTDOWN, TIMEOUT };
+
+ // "Polls" for new work.
+ // If the return value is WORK_FOUND:
+ // - The implementaion of PollForWork() MAY set some opaque identifier to
+ // (identify the work item found) via the '*tag' parameter
+ // - The implementaion MUST set the value of 'ok' to 'true' or 'false'. A
+ // value of 'false' indicates some implemenation specific error (that is
+ // neither SHUTDOWN nor TIMEOUT)
+ // - GrpcRpcManager does not interpret the values of 'tag' and 'ok'
+ // - GrpcRpcManager WILL call DoWork() and pass '*tag' and 'ok' as input to
+ // DoWork()
+ //
+ // If the return value is SHUTDOWN:,
+ // - GrpcManager WILL NOT call DoWork() and terminates the thead
+ //
+ // If the return value is TIMEOUT:,
+ // - GrpcManager WILL NOT call DoWork()
+ // - GrpcManager MAY terminate the thread depending on the current number of
+ // active poller threads and mix_pollers/max_pollers settings
+ // - Also, the value of timeout is specific to the derived class
+ // implementation
+ virtual WorkStatus PollForWork(void** tag, bool* ok) = 0;
+
+ // The implementation of DoWork() is supposed to perform the work found by
+ // PollForWork(). The tag and ok parameters are the same as returned by
+ // PollForWork()
+ //
+ // The implementation of DoWork() should also do any setup needed to ensure
+ // that the next call to PollForWork() (not necessarily by the current thread)
+ // actually finds some work
+ virtual void DoWork(void* tag, bool ok) = 0;
+
+ // Mark the GrpcRpcManager as shutdown and begin draining the work.
+ // This is a non-blocking call and the caller should call Wait(), a blocking
+ // call which returns only once the shutdown is complete
+ void ShutdownRpcManager();
+
+ // Has ShutdownRpcManager() been called
+ bool IsShutdown();
+
+ // A blocking call that returns only after the GrpcRpcManager has shutdown and
+ // all the threads have drained all the outstanding work
+ void Wait();
+
+ private:
+ // Helper wrapper class around std::thread. This takes a GrpcRpcManager object
+ // and starts a new std::thread to calls the Run() function.
+ //
+ // The Run() function calls GrpcManager::MainWorkLoop() function and once that
+ // completes, it marks the GrpcRpcManagerThread completed by calling
+ // GrpcRpcManager::MarkAsCompleted()
+ class GrpcRpcManagerThread {
+ public:
+ GrpcRpcManagerThread(GrpcRpcManager* rpc_mgr);
+ ~GrpcRpcManagerThread();
+
+ private:
+ // Calls rpc_mgr_->MainWorkLoop() and once that completes, calls
+ // rpc_mgr_>MarkAsCompleted(this) to mark the thread as completed
+ void Run();
+
+ GrpcRpcManager* rpc_mgr_;
+ std::unique_ptr<grpc::thread> thd_;
+ };
+
+ // The main funtion in GrpcRpcManager
+ void MainWorkLoop();
+
+ // Create a new poller if the number of current pollers is less than the
+ // minimum number of pollers needed (i.e min_pollers).
+ void MaybeCreatePoller();
+
+ // Returns true if the current thread can resume as a poller. i.e if the
+ // current number of pollers is less than the max_pollers.
+ bool MaybeContinueAsPoller();
+
+ void MarkAsCompleted(GrpcRpcManagerThread* thd);
+ void CleanupCompletedThreads();
+
+ // Protects shutdown_, num_pollers_ and num_threads_
+ // TODO: sreek - Change num_pollers and num_threads_ to atomics
+ grpc::mutex mu_;
+
+ bool shutdown_;
+ grpc::condition_variable shutdown_cv_;
+
+ // Number of threads doing polling
+ int num_pollers_;
+
+ // The minimum and maximum number of threads that should be doing polling
+ int min_pollers_;
+ int max_pollers_;
+
+ // The total number of threads (includes threads includes the threads that are
+ // currently polling i.e num_pollers_)
+ int num_threads_;
+
+ grpc::mutex list_mu_;
+ std::list<GrpcRpcManagerThread*> completed_threads_;
+};
+
+} // namespace grpc
+
+#endif // GRPC_INTERNAL_CPP_GRPC_RPC_MANAGER_H
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 2980b16c56..59c40dedaf 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -35,6 +35,7 @@
#include <grpc++/impl/service_type.h>
#include <grpc++/server.h>
+#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
@@ -61,6 +62,7 @@ ServerBuilder::ServerBuilder()
auto& factory = *it;
plugins_.emplace_back(factory());
}
+
// all compression algorithms enabled by default.
enabled_compression_algorithms_bitset_ =
(1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
@@ -68,6 +70,16 @@ ServerBuilder::ServerBuilder()
sizeof(maybe_default_compression_level_));
memset(&maybe_default_compression_algorithm_, 0,
sizeof(maybe_default_compression_algorithm_));
+
+ // Sync server setting defaults
+ sync_server_settings_.min_pollers = 1;
+ sync_server_settings_.max_pollers = INT_MAX;
+
+ int num_cpus = gpr_cpu_num_cores();
+ num_cpus = GPR_MAX(num_cpus, 4);
+ sync_server_settings_.num_cqs = num_cpus;
+
+ sync_server_settings_.cq_timeout_msec = 1000;
}
std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
@@ -94,7 +106,7 @@ ServerBuilder& ServerBuilder::RegisterAsyncGenericService(
gpr_log(GPR_ERROR,
"Adding multiple AsyncGenericService is unsupported for now. "
"Dropping the service %p",
- service);
+ (void*)service);
} else {
generic_service_ = service;
}
@@ -130,6 +142,10 @@ ServerBuilder& ServerBuilder::SetDefaultCompressionAlgorithm(
return *this;
}
+void ServerBuilder::SetSyncServerSettings(SyncServerSettings settings) {
+ sync_server_settings_ = settings; // copy the settings
+}
+
ServerBuilder& ServerBuilder::AddListeningPort(
const grpc::string& addr, std::shared_ptr<ServerCredentials> creds,
int* selected_port) {
@@ -139,35 +155,24 @@ ServerBuilder& ServerBuilder::AddListeningPort(
}
std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
- std::unique_ptr<ThreadPoolInterface> thread_pool;
- bool has_sync_methods = false;
- for (auto it = services_.begin(); it != services_.end(); ++it) {
- if ((*it)->service->has_synchronous_methods()) {
- if (!thread_pool) {
- thread_pool.reset(CreateDefaultThreadPool());
- has_sync_methods = true;
- break;
- }
- }
- }
ChannelArguments args;
for (auto option = options_.begin(); option != options_.end(); ++option) {
(*option)->UpdateArguments(&args);
(*option)->UpdatePlugins(&plugins_);
}
+
for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
- if (!thread_pool && (*plugin)->has_sync_methods()) {
- thread_pool.reset(CreateDefaultThreadPool());
- has_sync_methods = true;
- }
(*plugin)->UpdateChannelArguments(&args);
}
+
if (max_receive_message_size_ >= 0) {
args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_);
}
+
if (max_send_message_size_ >= 0) {
args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, max_send_message_size_);
}
+
args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
enabled_compression_algorithms_bitset_);
if (maybe_default_compression_level_.is_set) {
@@ -178,27 +183,84 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
maybe_default_compression_algorithm_.algorithm);
}
- std::unique_ptr<Server> server(new Server(thread_pool.release(), true,
- max_receive_message_size_, &args));
+
+ // == Determine if the server has any syncrhonous methods ==
+ bool has_sync_methods = false;
+ for (auto it = services_.begin(); it != services_.end(); ++it) {
+ if ((*it)->service->has_synchronous_methods()) {
+ has_sync_methods = true;
+ break;
+ }
+ }
+
+ if (!has_sync_methods) {
+ for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
+ if ((*plugin)->has_sync_methods()) {
+ has_sync_methods = true;
+ break;
+ }
+ }
+ }
+
+ // If this is a Sync server, i.e a server expositing sync API, then the server
+ // needs to create some completion queues to listen for incoming requests.
+ // 'sync_server_cqs' are those internal completion queues.
+ //
+ // This is different from the completion queues added to the server via
+ // ServerBuilder's AddCompletionQueue() method (those completion queues
+ // are in 'cqs_' member variable of ServerBuilder object)
+ std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+ sync_server_cqs(
+ new std::vector<std::unique_ptr<ServerCompletionQueue>>());
+
+ if (has_sync_methods) {
+ // This is a Sync server
+ gpr_log(GPR_INFO,
+ "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: "
+ "%d, CQ timeout (msec): %d",
+ sync_server_settings_.num_cqs, sync_server_settings_.min_pollers,
+ sync_server_settings_.max_pollers,
+ sync_server_settings_.cq_timeout_msec);
+
+ // Create completion queues to listen to incoming rpc requests
+ for (int i = 0; i < sync_server_settings_.num_cqs; i++) {
+ sync_server_cqs->emplace_back(new ServerCompletionQueue());
+ }
+ }
+
+ std::unique_ptr<Server> server(new Server(
+ sync_server_cqs, max_receive_message_size_, &args,
+ sync_server_settings_.min_pollers, sync_server_settings_.max_pollers,
+ sync_server_settings_.cq_timeout_msec));
+
ServerInitializer* initializer = server->initializer();
- // If the server has atleast one sync methods, we know that this is a Sync
- // server or a Hybrid server and the completion queue (server->cq_) would be
- // frequently polled.
- int num_frequently_polled_cqs = has_sync_methods ? 1 : 0;
-
- for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
- // A completion queue that is not polled frequently (by calling Next() or
- // AsyncNext()) is not safe to use for listening to incoming channels.
- // Register all such completion queues as non-listening completion queues
- // with the GRPC core library.
- if ((*cq)->IsFrequentlyPolled()) {
- grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
+ // Register all the completion queues with the server. i.e
+ // 1. sync_server_cqs: internal completion queues created IF this is a sync
+ // server
+ // 2. cqs_: Completion queues added via AddCompletionQueue() call
+
+ // All sync cqs (if any) are frequently polled by the GrpcRpcManager
+ int num_frequently_polled_cqs = sync_server_cqs->size();
+
+ for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) {
+ grpc_server_register_completion_queue(server->server_, (*it)->cq(),
+ nullptr);
+ }
+
+ // cqs_ contains the completion queue added by calling the ServerBuilder's
+ // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by
+ // calling Next() or AsyncNext()) and hence are not safe to be used for
+ // listening to incoming channels. Such completion queues must be registered
+ // as non-listening queues
+ for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
+ if ((*it)->IsFrequentlyPolled()) {
+ grpc_server_register_completion_queue(server->server_, (*it)->cq(),
nullptr);
num_frequently_polled_cqs++;
} else {
grpc_server_register_non_listening_completion_queue(server->server_,
- (*cq)->cq(), nullptr);
+ (*it)->cq(), nullptr);
}
}
@@ -214,9 +276,11 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
return nullptr;
}
}
+
for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
(*plugin)->InitServer(initializer);
}
+
if (generic_service_) {
server->RegisterAsyncGenericService(generic_service_);
} else {
@@ -229,6 +293,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
}
}
}
+
for (auto port = ports_.begin(); port != ports_.end(); port++) {
int r = server->AddListeningPort(port->addr, port->creds.get());
if (!r) return nullptr;
@@ -236,13 +301,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
*port->selected_port = r;
}
}
+
auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0];
if (!server->Start(cqs_data, cqs_.size())) {
return nullptr;
}
+
for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
(*plugin)->Finish(initializer);
}
+
return server;
}
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 3f89275370..36bc61fdf1 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -118,6 +118,7 @@ class Server::UnimplementedAsyncResponse GRPC_FINAL
UnimplementedAsyncRequest* const request_;
};
+// TODO (sreek) - This might no longer be needed
class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) {
@@ -126,6 +127,11 @@ class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
}
};
+class ShutdownTag : public CompletionQueueTag {
+ public:
+ bool FinalizeResult(void** tag, bool* status) { return false; }
+};
+
class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
public:
SyncRequest(RpcServiceMethod* method, void* tag)
@@ -147,6 +153,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_metadata_array_destroy(&request_metadata_);
}
+ // TODO (Sreek) This function is probably no longer needed
static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
void* tag = nullptr;
*ok = false;
@@ -158,6 +165,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd;
}
+ // TODO (sreek) - This function is probably no longer needed
static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok,
gpr_timespec deadline) {
void* tag = nullptr;
@@ -177,6 +185,8 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
GPR_UNREACHABLE_CODE(return false);
}
+ // TODO (sreek) - Refactor this SetupRequest/TeardownRequest and ResetRequest
+ // functions
void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
void TeardownRequest() {
@@ -184,6 +194,8 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
cq_ = nullptr;
}
+ void ResetRequest() { in_flight_ = false; }
+
void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true;
@@ -275,53 +287,168 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_completion_queue* cq_;
};
+class Server::SyncRequestManager : public GrpcRpcManager {
+ public:
+ SyncRequestManager(Server* server, CompletionQueue* server_cq,
+ std::shared_ptr<GlobalCallbacks> global_callbacks,
+ int min_pollers, int max_pollers, int cq_timeout_msec)
+ : GrpcRpcManager(min_pollers, max_pollers),
+ server_(server),
+ server_cq_(server_cq),
+ cq_timeout_msec_(cq_timeout_msec),
+ global_callbacks_(global_callbacks) {}
+
+ WorkStatus PollForWork(void** tag, bool* ok) GRPC_OVERRIDE {
+ *tag = nullptr;
+ gpr_timespec deadline =
+ gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN);
+
+ switch (server_cq_->AsyncNext(tag, ok, deadline)) {
+ case CompletionQueue::TIMEOUT:
+ return TIMEOUT;
+ case CompletionQueue::SHUTDOWN:
+ return SHUTDOWN;
+ case CompletionQueue::GOT_EVENT:
+ return WORK_FOUND;
+ }
+
+ GPR_UNREACHABLE_CODE(return TIMEOUT);
+ }
+
+ void DoWork(void* tag, bool ok) GRPC_OVERRIDE {
+ SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
+
+ if (!sync_req) {
+ // No tag. Nothing to work on
+ // TODO (sreek) - Log a warning here since this is an unlikely case
+ return;
+ }
+
+ if (ok) {
+ SyncRequest::CallData cd(server_, sync_req);
+ {
+ sync_req->SetupRequest();
+ if (!IsShutdown()) {
+ sync_req->Request(server_->c_server(), server_cq_->cq());
+ } else {
+ sync_req->TeardownRequest();
+ }
+ }
+ GPR_TIMER_SCOPE("cd.Run()", 0);
+ cd.Run(global_callbacks_);
+ } else {
+ sync_req->ResetRequest();
+ // ok is false. For some reason, the tag was returned but event was not
+ // successful. In this case, request again unless we are shutting down
+ if (!IsShutdown()) {
+ // TODO (sreek) Remove this
+ // sync_req->Request(server_->c_server(), server_cq_->cq());
+ }
+ }
+ }
+
+ void AddSyncMethod(RpcServiceMethod* method, void* tag) {
+ sync_methods_.emplace_back(method, tag);
+ }
+
+ void AddUnknownSyncMethod() {
+ // TODO (sreek) - Check if !sync_methods_.empty() is really needed here
+ if (!sync_methods_.empty()) {
+ unknown_method_.reset(new RpcServiceMethod(
+ "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
+ // Use of emplace_back with just constructor arguments is not accepted
+ // here by gcc-4.4 because it can't match the anonymous nullptr with a
+ // proper constructor implicitly. Construct the object and use push_back.
+ sync_methods_.push_back(SyncRequest(unknown_method_.get(), nullptr));
+ }
+ }
+
+ void ShutdownAndDrainCompletionQueue() {
+ server_cq_->Shutdown();
+
+ // Drain any pending items from the queue
+ void* tag;
+ bool ok;
+ while (server_cq_->Next(&tag, &ok)) {
+ // Nothing to be done here
+ }
+ }
+
+ void Start() {
+ if (!sync_methods_.empty()) {
+ for (auto m = sync_methods_.begin(); m != sync_methods_.end(); m++) {
+ m->SetupRequest();
+ m->Request(server_->c_server(), server_cq_->cq());
+ }
+
+ GrpcRpcManager::Initialize();
+ }
+ }
+
+ private:
+ Server* server_;
+ CompletionQueue* server_cq_;
+ int cq_timeout_msec_;
+ std::vector<SyncRequest> sync_methods_;
+ std::unique_ptr<RpcServiceMethod> unknown_method_;
+ std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
+};
+
static internal::GrpcLibraryInitializer g_gli_initializer;
-Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
- int max_receive_message_size, ChannelArguments* args)
+Server::Server(
+ std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+ sync_server_cqs,
+ int max_receive_message_size, ChannelArguments* args, int min_pollers,
+ int max_pollers, int sync_cq_timeout_msec)
: max_receive_message_size_(max_receive_message_size),
+ sync_server_cqs_(sync_server_cqs),
started_(false),
shutdown_(false),
shutdown_notified_(false),
- num_running_cb_(0),
- sync_methods_(new std::list<SyncRequest>),
has_generic_service_(false),
server_(nullptr),
- thread_pool_(thread_pool),
- thread_pool_owned_(thread_pool_owned),
server_initializer_(new ServerInitializer(this)) {
g_gli_initializer.summon();
gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
global_callbacks_ = g_callbacks;
global_callbacks_->UpdateArguments(args);
+
+ for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
+ it++) {
+ sync_req_mgrs_.emplace_back(
+ new SyncRequestManager(this, (*it).get(), global_callbacks_,
+ min_pollers, max_pollers, sync_cq_timeout_msec));
+ }
+
grpc_channel_args channel_args;
args->SetChannelArgs(&channel_args);
+
server_ = grpc_server_create(&channel_args, nullptr);
- if (thread_pool_ == nullptr) {
- grpc_server_register_non_listening_completion_queue(server_, cq_.cq(),
- nullptr);
- } else {
- grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
- }
}
Server::~Server() {
{
+ // TODO (sreek) Check if we can just call Shutdown() even in case where
+ // started_ == false. This will make things much simpler
grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
lock.unlock();
Shutdown();
} else if (!started_) {
- cq_.Shutdown();
+ // Shutdown the completion queues
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->ShutdownAndDrainCompletionQueue();
+ }
}
}
+
+ // TODO(sreek) Do thisfor all cqs ?
+ /*
void* got_tag;
bool ok;
GPR_ASSERT(!cq_.Next(&got_tag, &ok));
+ */
grpc_server_destroy(server_);
- if (thread_pool_owned_) {
- delete thread_pool_;
- }
- delete sync_methods_;
}
void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
@@ -352,6 +479,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
"Can only register an asynchronous service against one server.");
service->server_ = this;
}
+
const char* method_name = nullptr;
for (auto it = service->methods_.begin(); it != service->methods_.end();
++it) {
@@ -370,7 +498,9 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
if (method->handler() == nullptr) {
method->set_server_tag(tag);
} else {
- sync_methods_->emplace_back(method, tag);
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->AddSyncMethod(method, tag);
+ }
}
method_name = method->name();
}
@@ -406,20 +536,23 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
grpc_server_start(server_);
if (!has_generic_service_) {
- if (!sync_methods_->empty()) {
- unknown_method_.reset(new RpcServiceMethod(
- "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
- // Use of emplace_back with just constructor arguments is not accepted
- // here by gcc-4.4 because it can't match the anonymous nullptr with a
- // proper constructor implicitly. Construct the object and use push_back.
- sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->AddUnknownSyncMethod();
}
+
for (size_t i = 0; i < num_cqs; i++) {
if (cqs[i]->IsFrequentlyPolled()) {
new UnimplementedAsyncRequest(this, cqs[i]);
}
}
}
+
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->Start();
+ }
+
+ /* TODO (Sreek) - No longer needed (being done in (*it)->Start above) */
+ /*
// Start processing rpcs.
if (!sync_methods_->empty()) {
for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
@@ -427,24 +560,76 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
m->Request(server_, cq_.cq());
}
- ScheduleCallback();
+ GrpcRpcManager::Initialize();
}
+ */
return true;
}
+/* TODO (sreek) check if started_ and shutdown_ are needed anymore */
void Server::ShutdownInternal(gpr_timespec deadline) {
grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
shutdown_ = true;
+
+ /// The completion queue to use for server shutdown completion notification
+ CompletionQueue shutdown_cq;
+ ShutdownTag shutdown_tag; // Dummy shutdown tag
+ grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
+
+ // Shutdown all RpcManagers. This will try to gracefully stop all the
+ // threads in the RpcManagers (once they process any inflight requests)
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->ShutdownRpcManager();
+ }
+
+ shutdown_cq.Shutdown();
+
+ void* tag;
+ bool ok;
+ CompletionQueue::NextStatus status =
+ shutdown_cq.AsyncNext(&tag, &ok, deadline);
+
+ // If this timed out, it means we are done with the grace period for a clean
+ // shutdown. We should force a shutdown now by cancelling all inflight calls
+ if (status == CompletionQueue::NextStatus::TIMEOUT) {
+ grpc_server_cancel_all_calls(server_);
+ }
+ // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
+ // successfully shutdown
+
+ // Wait for threads in all RpcManagers to terminate
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->Wait();
+ (*it)->ShutdownAndDrainCompletionQueue();
+ }
+
+ // Drain the shutdown queue (if the previous call to AsyncNext() timed out
+ // and we didn't remove the tag from the queue yet)
+ while (shutdown_cq.Next(&tag, &ok)) {
+ // Nothing to be done here
+ }
+
+ /*
grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
cq_.Shutdown();
lock.unlock();
+ */
+
+ // TODO (sreek) Delete this
+ /*
+ GrpcRpcManager::ShutdownRpcManager();
+ GrpcRpcManager::Wait();
+ */
+
// Spin, eating requests until the completion queue is completely shutdown.
// If the deadline expires then cancel anything that's pending and keep
// spinning forever until the work is actually drained.
// Since nothing else needs to touch state guarded by mu_, holding it
// through this loop is fine.
+ //
+ /*
SyncRequest* request;
bool ok;
while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
@@ -456,11 +641,15 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
}
}
lock.lock();
+ */
+ /* TODO (sreek) - Remove this block */
// Wait for running callbacks to finish.
- while (num_running_cb_ != 0) {
- callback_cv_.wait(lock);
- }
+ /*
+ while (num_running_cb_ != 0) {
+ callback_cv_.wait(lock);
+ }
+ */
shutdown_notified_ = true;
shutdown_cv_.notify_all();
@@ -585,46 +774,86 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
request_->stream()->call_.PerformOps(this);
}
+// TODO: sreek - Remove this function
void Server::ScheduleCallback() {
+ GPR_ASSERT(false);
+ /*
{
grpc::unique_lock<grpc::mutex> lock(mu_);
num_running_cb_++;
}
thread_pool_->Add(std::bind(&Server::RunRpc, this));
+ */
}
+// TODO: sreek - Remove this function
void Server::RunRpc() {
- // Wait for one more incoming rpc.
- bool ok;
- GPR_TIMER_SCOPE("Server::RunRpc", 0);
- auto* mrd = SyncRequest::Wait(&cq_, &ok);
- if (mrd) {
- ScheduleCallback();
- if (ok) {
- SyncRequest::CallData cd(this, mrd);
- {
- mrd->SetupRequest();
- grpc::unique_lock<grpc::mutex> lock(mu_);
- if (!shutdown_) {
- mrd->Request(server_, cq_.cq());
- } else {
- // destroy the structure that was created
- mrd->TeardownRequest();
+ GPR_ASSERT(false);
+ /*
+ // Wait for one more incoming rpc.
+ bool ok;
+ GPR_TIMER_SCOPE("Server::RunRpc", 0);
+ auto* mrd = SyncRequest::Wait(&cq_, &ok);
+ if (mrd) {
+ ScheduleCallback();
+ if (ok) {
+ SyncRequest::CallData cd(this, mrd);
+ {
+ mrd->SetupRequest();
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ if (!shutdown_) {
+ mrd->Request(server_, cq_.cq());
+ } else {
+ // destroy the structure that was created
+ mrd->TeardownRequest();
+ }
}
+ GPR_TIMER_SCOPE("cd.Run()", 0);
+ cd.Run(global_callbacks_);
+ }
+ }
+
+ {
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ num_running_cb_--;
+ if (shutdown_) {
+ callback_cv_.notify_all();
}
- GPR_TIMER_SCOPE("cd.Run()", 0);
- cd.Run(global_callbacks_);
}
+ */
+}
+
+/* TODO (sreek) Move this to SyncRequestManager */
+/*
+void Server::PollForWork(bool& is_work_found, void** tag) {
+ is_work_found = true;
+ *tag = nullptr;
+ auto* mrd = SyncRequest::Wait(&cq_, &is_work_found);
+ if (is_work_found) {
+ *tag = mrd;
}
+}
- {
- grpc::unique_lock<grpc::mutex> lock(mu_);
- num_running_cb_--;
- if (shutdown_) {
- callback_cv_.notify_all();
+
+void Server::DoWork(void* tag) {
+ auto* mrd = static_cast<SyncRequest*>(tag);
+ if (mrd) {
+ SyncRequest::CallData cd(this, mrd);
+ {
+ mrd->SetupRequest();
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ if (!shutdown_) {
+ mrd->Request(server_, cq_.cq());
+ } else {
+ // destroy the structure that was created
+ mrd->TeardownRequest();
+ }
}
+ GPR_TIMER_SCOPE("cd.Run()", 0);
+ cd.Run(global_callbacks_);
}
}
+*/
ServerInitializer* Server::initializer() { return server_initializer_.get(); }
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index b1d3ce92f6..a46f9f268b 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -226,6 +226,11 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
kMaxMessageSize_(8192),
special_service_("special") {
GetParam().Log();
+
+ sync_server_settings_.max_pollers = INT_MAX;
+ sync_server_settings_.min_pollers = 1;
+ sync_server_settings_.cq_timeout_msec = 10;
+ sync_server_settings_.num_cqs = 4;
}
void TearDown() GRPC_OVERRIDE {
@@ -250,6 +255,9 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
builder.SetMaxMessageSize(
kMaxMessageSize_); // For testing max message size.
builder.RegisterService(&dup_pkg_service_);
+
+ builder.SetSyncServerSettings(sync_server_settings_);
+
server_ = builder.BuildAndStart();
is_server_started_ = true;
}
@@ -279,6 +287,8 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
ServerBuilder builder;
builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials());
builder.RegisterService(proxy_service_.get());
+ builder.SetSyncServerSettings(sync_server_settings_);
+
proxy_server_ = builder.BuildAndStart();
channel_ = CreateChannel(proxyaddr.str(), InsecureChannelCredentials());
@@ -299,6 +309,7 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
TestServiceImpl special_service_;
TestServiceImplDupPkg dup_pkg_service_;
grpc::string user_agent_prefix_;
+ ServerBuilder::SyncServerSettings sync_server_settings_;
};
static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
diff --git a/test/cpp/rpcmanager/grpc_rpc_manager_test.cc b/test/cpp/rpcmanager/grpc_rpc_manager_test.cc
new file mode 100644
index 0000000000..ce43b27856
--- /dev/null
+++ b/test/cpp/rpcmanager/grpc_rpc_manager_test.cc
@@ -0,0 +1,89 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *is % allowed in string
+ */
+
+#include <chrono>
+#include <memory>
+#include <string>
+
+#include <gflags/gflags.h>
+#include <grpc++/grpc++.h>
+
+#include "test/cpp/rpcmanager/grpc_rpc_manager_test.h"
+#include "test/cpp/util/test_config.h"
+
+using grpc::testing::GrpcRpcManagerTest;
+
+// TODO: sreek - Rewrite this test. Find a better test case
+
+grpc::GrpcRpcManager::WorkStatus GrpcRpcManagerTest::PollForWork(void **tag,
+ bool *ok) {
+ {
+ std::unique_lock<grpc::mutex> lock(mu_);
+ std::cout << "Poll: " << std::this_thread::get_id() << std::endl;
+ }
+
+ WorkStatus work_status = WORK_FOUND;
+ *tag = nullptr;
+ *ok = true;
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+
+ {
+ std::unique_lock<grpc::mutex> lock(mu_);
+ num_calls_++;
+ if (num_calls_ > 50) {
+ std::cout << "poll: False" << std::endl;
+ work_status = SHUTDOWN;
+ ShutdownRpcManager();
+ }
+ }
+
+ return work_status;
+}
+
+void GrpcRpcManagerTest::DoWork(void *tag, bool ok) {
+ {
+ std::unique_lock<grpc::mutex> lock(mu_);
+ std::cout << "Work: " << std::this_thread::get_id() << std::endl;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+}
+
+int main(int argc, char **argv) {
+ grpc::testing::InitTest(&argc, &argv, true);
+ GrpcRpcManagerTest test_rpc_manager(3, 15);
+ test_rpc_manager.Initialize();
+ test_rpc_manager.Wait();
+
+ return 0;
+}
diff --git a/test/cpp/rpcmanager/grpc_rpc_manager_test.h b/test/cpp/rpcmanager/grpc_rpc_manager_test.h
new file mode 100644
index 0000000000..0f1d3b3ed2
--- /dev/null
+++ b/test/cpp/rpcmanager/grpc_rpc_manager_test.h
@@ -0,0 +1,58 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *is % allowed in string
+ */
+#ifndef GRPC_TEST_CPP_GRPC_RPC_MANAGER_TEST_H
+#define GRPC_TEST_CPP_GRPC_RPC_MANAGER_TEST_H
+
+#include "src/cpp/rpcmanager/grpc_rpc_manager.h"
+
+namespace grpc {
+namespace testing {
+
+class GrpcRpcManagerTest GRPC_FINAL : public GrpcRpcManager {
+ public:
+ GrpcRpcManagerTest(int min_pollers, int max_pollers)
+ : GrpcRpcManager(min_pollers, max_pollers), num_calls_(0){};
+
+ grpc::GrpcRpcManager::WorkStatus PollForWork(void **tag,
+ bool *ok) GRPC_OVERRIDE;
+ void DoWork(void *tag, bool ok) GRPC_OVERRIDE;
+
+ private:
+ grpc::mutex mu_;
+ int num_calls_;
+};
+
+} // namespace testing
+} // namespace grpc
+
+#endif // GRPC_TEST_CPP_GRPC_RPC_MANAGER_TEST_H
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index e266e15481..dac227d077 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -861,6 +861,7 @@ src/cpp/common/secure_auth_context.h \
src/cpp/server/secure_server_credentials.h \
src/cpp/client/create_channel_internal.h \
src/cpp/common/channel_filter.h \
+src/cpp/rpcmanager/grpc_rpc_manager.h \
src/cpp/server/dynamic_thread_pool.h \
src/cpp/server/thread_pool_interface.h \
src/cpp/client/insecure_credentials.cc \
@@ -883,6 +884,7 @@ src/cpp/common/channel_filter.cc \
src/cpp/common/completion_queue_cc.cc \
src/cpp/common/core_codegen.cc \
src/cpp/common/rpc_method.cc \
+src/cpp/rpcmanager/grpc_rpc_manager.cc \
src/cpp/server/async_generic_service.cc \
src/cpp/server/create_default_thread_pool.cc \
src/cpp/server/dynamic_thread_pool.cc \
diff --git a/tools/run_tests/performance/scenario_config.py b/tools/run_tests/performance/scenario_config.py
index fa401fdaaf..8166fbd419 100644
--- a/tools/run_tests/performance/scenario_config.py
+++ b/tools/run_tests/performance/scenario_config.py
@@ -31,52 +31,49 @@
import math
-WARMUP_SECONDS=5
-JAVA_WARMUP_SECONDS=15 # Java needs more warmup time for JIT to kick in.
-BENCHMARK_SECONDS=30
+WARMUP_SECONDS = 5
+JAVA_WARMUP_SECONDS = 15 # Java needs more warmup time for JIT to kick in.
+BENCHMARK_SECONDS = 30
-SMOKETEST='smoketest'
-SCALABLE='scalable'
-SWEEP='sweep'
-DEFAULT_CATEGORIES=[SCALABLE, SMOKETEST]
+SMOKETEST = 'smoketest'
+SCALABLE = 'scalable'
+SWEEP = 'sweep'
+DEFAULT_CATEGORIES = [SCALABLE, SMOKETEST]
SECURE_SECARGS = {'use_test_ca': True,
'server_host_override': 'foo.test.google.fr'}
HISTOGRAM_PARAMS = {
- 'resolution': 0.01,
- 'max_possible': 60e9,
+ 'resolution': 0.01,
+ 'max_possible': 60e9,
}
EMPTY_GENERIC_PAYLOAD = {
- 'bytebuf_params': {
- 'req_size': 0,
- 'resp_size': 0,
- }
+ 'bytebuf_params': {
+ 'req_size': 0,
+ 'resp_size': 0,
+ }
}
EMPTY_PROTO_PAYLOAD = {
- 'simple_params': {
- 'req_size': 0,
- 'resp_size': 0,
- }
+ 'simple_params': {
+ 'req_size': 0,
+ 'resp_size': 0,
+ }
}
BIG_GENERIC_PAYLOAD = {
- 'bytebuf_params': {
- 'req_size': 65536,
- 'resp_size': 65536,
- }
+ 'bytebuf_params': {
+ 'req_size': 65536,
+ 'resp_size': 65536,
+ }
}
# target number of RPCs outstanding on across all client channels in
# non-ping-pong tests (since we can only specify per-channel numbers, the
# actual target will be slightly higher)
-OUTSTANDING_REQUESTS={
- 'async': 6400,
- 'sync': 1000
-}
+OUTSTANDING_REQUESTS = {'async': 6400, 'sync': 1000}
# wide is the number of client channels in multi-channel tests (1 otherwise)
-WIDE=64
+WIDE = 64
def _get_secargs(is_secure):
@@ -102,8 +99,10 @@ def geometric_progression(start, stop, step):
n *= step
-def _ping_pong_scenario(name, rpc_type,
- client_type, server_type,
+def _ping_pong_scenario(name,
+ rpc_type,
+ client_type,
+ server_type,
secure=True,
use_generic_payload=False,
unconstrained_client=None,
@@ -117,29 +116,29 @@ def _ping_pong_scenario(name, rpc_type,
outstanding=None):
"""Creates a basic ping pong scenario."""
scenario = {
- 'name': name,
- 'num_servers': 1,
- 'num_clients': 1,
- 'client_config': {
- 'client_type': client_type,
- 'security_params': _get_secargs(secure),
- 'outstanding_rpcs_per_channel': 1,
- 'client_channels': 1,
- 'async_client_threads': 1,
- 'rpc_type': rpc_type,
- 'load_params': {
- 'closed_loop': {}
+ 'name': name,
+ 'num_servers': 1,
+ 'num_clients': 1,
+ 'client_config': {
+ 'client_type': client_type,
+ 'security_params': _get_secargs(secure),
+ 'outstanding_rpcs_per_channel': 1,
+ 'client_channels': 1,
+ 'async_client_threads': 1,
+ 'rpc_type': rpc_type,
+ 'load_params': {
+ 'closed_loop': {}
+ },
+ 'histogram_params': HISTOGRAM_PARAMS,
},
- 'histogram_params': HISTOGRAM_PARAMS,
- },
- 'server_config': {
- 'server_type': server_type,
- 'security_params': _get_secargs(secure),
- 'core_limit': server_core_limit,
- 'async_server_threads': async_server_threads,
- },
- 'warmup_seconds': warmup_seconds,
- 'benchmark_seconds': BENCHMARK_SECONDS
+ 'server_config': {
+ 'server_type': server_type,
+ 'security_params': _get_secargs(secure),
+ 'core_limit': server_core_limit,
+ 'async_server_threads': async_server_threads,
+ },
+ 'warmup_seconds': warmup_seconds,
+ 'benchmark_seconds': BENCHMARK_SECONDS
}
if use_generic_payload:
if server_type != 'ASYNC_GENERIC_SERVER':
@@ -151,7 +150,8 @@ def _ping_pong_scenario(name, rpc_type,
scenario['client_config']['payload_config'] = EMPTY_PROTO_PAYLOAD
if unconstrained_client:
- outstanding_calls = outstanding if outstanding is not None else OUTSTANDING_REQUESTS[unconstrained_client]
+ outstanding_calls = outstanding if outstanding is not None else OUTSTANDING_REQUESTS[
+ unconstrained_client]
wide = channels if channels is not None else WIDE
deep = int(math.ceil(1.0 * outstanding_calls / wide))
@@ -197,7 +197,9 @@ class CXXLanguage:
rpc_type='STREAMING',
client_type='ASYNC_CLIENT',
server_type='ASYNC_GENERIC_SERVER',
- use_generic_payload=True, server_core_limit=1, async_server_threads=1,
+ use_generic_payload=True,
+ server_core_limit=1,
+ async_server_threads=1,
secure=secure,
categories=smoketest_categories)
@@ -206,49 +208,71 @@ class CXXLanguage:
rpc_type='STREAMING',
client_type='ASYNC_CLIENT',
server_type='ASYNC_GENERIC_SERVER',
- unconstrained_client='async', use_generic_payload=True,
+ unconstrained_client='async',
+ use_generic_payload=True,
secure=secure,
- categories=smoketest_categories+[SCALABLE])
+ categories=smoketest_categories + [SCALABLE])
yield _ping_pong_scenario(
'cpp_generic_async_streaming_qps_one_server_core_%s' % secstr,
rpc_type='STREAMING',
client_type='ASYNC_CLIENT',
server_type='ASYNC_GENERIC_SERVER',
- unconstrained_client='async', use_generic_payload=True,
- server_core_limit=1, async_server_threads=1,
+ unconstrained_client='async',
+ use_generic_payload=True,
+ server_core_limit=1,
+ async_server_threads=1,
secure=secure)
+ yield _ping_pong_scenario(
+ 'cpp_protobuf_async_client_sync_server_unary_qps_unconstrained_%s' %
+ (secstr),
+ rpc_type='UNARY',
+ client_type='ASYNC_CLIENT',
+ server_type='SYNC_SERVER',
+ unconstrained_client='async',
+ secure=secure,
+ categories=smoketest_categories + [SCALABLE])
+
for rpc_type in ['unary', 'streaming']:
for synchronicity in ['sync', 'async']:
yield _ping_pong_scenario(
- 'cpp_protobuf_%s_%s_ping_pong_%s' % (synchronicity, rpc_type, secstr),
+ 'cpp_protobuf_%s_%s_ping_pong_%s' %
+ (synchronicity, rpc_type, secstr),
rpc_type=rpc_type.upper(),
client_type='%s_CLIENT' % synchronicity.upper(),
server_type='%s_SERVER' % synchronicity.upper(),
- server_core_limit=1, async_server_threads=1,
+ server_core_limit=1,
+ async_server_threads=1,
secure=secure)
yield _ping_pong_scenario(
- 'cpp_protobuf_%s_%s_qps_unconstrained_%s' % (synchronicity, rpc_type, secstr),
+ 'cpp_protobuf_%s_%s_qps_unconstrained_%s' %
+ (synchronicity, rpc_type, secstr),
rpc_type=rpc_type.upper(),
client_type='%s_CLIENT' % synchronicity.upper(),
server_type='%s_SERVER' % synchronicity.upper(),
unconstrained_client=synchronicity,
secure=secure,
- categories=smoketest_categories+[SCALABLE])
+ categories=smoketest_categories + [SCALABLE])
for channels in geometric_progression(1, 20000, math.sqrt(10)):
for outstanding in geometric_progression(1, 200000, math.sqrt(10)):
- if synchronicity == 'sync' and outstanding > 1200: continue
- if outstanding < channels: continue
- yield _ping_pong_scenario(
- 'cpp_protobuf_%s_%s_qps_unconstrained_%s_%d_channels_%d_outstanding' % (synchronicity, rpc_type, secstr, channels, outstanding),
- rpc_type=rpc_type.upper(),
- client_type='%s_CLIENT' % synchronicity.upper(),
- server_type='%s_SERVER' % synchronicity.upper(),
- unconstrained_client=synchronicity, secure=secure,
- categories=[SWEEP], channels=channels, outstanding=outstanding)
+ if synchronicity == 'sync' and outstanding > 1200:
+ continue
+ if outstanding < channels:
+ continue
+ yield _ping_pong_scenario(
+ 'cpp_protobuf_%s_%s_qps_unconstrained_%s_%d_channels_%d_outstanding'
+ % (synchronicity, rpc_type, secstr, channels, outstanding),
+ rpc_type=rpc_type.upper(),
+ client_type='%s_CLIENT' % synchronicity.upper(),
+ server_type='%s_SERVER' % synchronicity.upper(),
+ unconstrained_client=synchronicity,
+ secure=secure,
+ categories=[SWEEP],
+ channels=channels,
+ outstanding=outstanding)
def __str__(self):
return 'c++'
@@ -267,66 +291,94 @@ class CSharpLanguage:
def scenarios(self):
yield _ping_pong_scenario(
- 'csharp_generic_async_streaming_ping_pong', rpc_type='STREAMING',
- client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
+ 'csharp_generic_async_streaming_ping_pong',
+ rpc_type='STREAMING',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_GENERIC_SERVER',
use_generic_payload=True,
categories=[SMOKETEST])
yield _ping_pong_scenario(
- 'csharp_protobuf_async_streaming_ping_pong', rpc_type='STREAMING',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER')
+ 'csharp_protobuf_async_streaming_ping_pong',
+ rpc_type='STREAMING',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER')
yield _ping_pong_scenario(
- 'csharp_protobuf_async_unary_ping_pong', rpc_type='UNARY',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'csharp_protobuf_async_unary_ping_pong',
+ rpc_type='UNARY',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
categories=[SMOKETEST])
yield _ping_pong_scenario(
- 'csharp_protobuf_sync_to_async_unary_ping_pong', rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='ASYNC_SERVER')
+ 'csharp_protobuf_sync_to_async_unary_ping_pong',
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_SERVER')
yield _ping_pong_scenario(
- 'csharp_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'csharp_protobuf_async_unary_qps_unconstrained',
+ rpc_type='UNARY',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
unconstrained_client='async',
- categories=[SMOKETEST,SCALABLE])
+ categories=[SMOKETEST, SCALABLE])
yield _ping_pong_scenario(
- 'csharp_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'csharp_protobuf_async_streaming_qps_unconstrained',
+ rpc_type='STREAMING',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
unconstrained_client='async',
categories=[SCALABLE])
yield _ping_pong_scenario(
- 'csharp_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
- server_language='c++', server_core_limit=1, async_server_threads=1,
+ 'csharp_to_cpp_protobuf_sync_unary_ping_pong',
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
+ server_language='c++',
+ server_core_limit=1,
+ async_server_threads=1,
categories=[SMOKETEST])
yield _ping_pong_scenario(
- 'csharp_to_cpp_protobuf_async_streaming_ping_pong', rpc_type='STREAMING',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
- server_language='c++', server_core_limit=1, async_server_threads=1)
+ 'csharp_to_cpp_protobuf_async_streaming_ping_pong',
+ rpc_type='STREAMING',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
+ server_language='c++',
+ server_core_limit=1,
+ async_server_threads=1)
yield _ping_pong_scenario(
- 'csharp_to_cpp_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
- unconstrained_client='async', server_language='c++',
+ 'csharp_to_cpp_protobuf_async_unary_qps_unconstrained',
+ rpc_type='UNARY',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
+ unconstrained_client='async',
+ server_language='c++',
categories=[SCALABLE])
yield _ping_pong_scenario(
- 'csharp_to_cpp_protobuf_sync_to_async_unary_qps_unconstrained', rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
- unconstrained_client='sync', server_language='c++',
+ 'csharp_to_cpp_protobuf_sync_to_async_unary_qps_unconstrained',
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_SERVER',
+ unconstrained_client='sync',
+ server_language='c++',
categories=[SCALABLE])
yield _ping_pong_scenario(
- 'cpp_to_csharp_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
- unconstrained_client='async', client_language='c++',
+ 'cpp_to_csharp_protobuf_async_unary_qps_unconstrained',
+ rpc_type='UNARY',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
+ unconstrained_client='async',
+ client_language='c++',
categories=[SCALABLE])
-
def __str__(self):
return 'csharp'
@@ -356,13 +408,17 @@ class NodeLanguage:
# client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER')
yield _ping_pong_scenario(
- 'node_protobuf_unary_ping_pong', rpc_type='UNARY',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'node_protobuf_unary_ping_pong',
+ rpc_type='UNARY',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
categories=[SMOKETEST])
yield _ping_pong_scenario(
- 'node_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'node_protobuf_async_unary_qps_unconstrained',
+ rpc_type='UNARY',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
unconstrained_client='async',
categories=[SMOKETEST])
@@ -387,6 +443,7 @@ class NodeLanguage:
def __str__(self):
return 'node'
+
class PythonLanguage:
def __init__(self):
@@ -400,48 +457,69 @@ class PythonLanguage:
def scenarios(self):
yield _ping_pong_scenario(
- 'python_generic_sync_streaming_ping_pong', rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
+ 'python_generic_sync_streaming_ping_pong',
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_GENERIC_SERVER',
use_generic_payload=True,
categories=[SMOKETEST])
yield _ping_pong_scenario(
- 'python_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='ASYNC_SERVER')
+ 'python_protobuf_sync_streaming_ping_pong',
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_SERVER')
yield _ping_pong_scenario(
- 'python_protobuf_async_unary_ping_pong', rpc_type='UNARY',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER')
+ 'python_protobuf_async_unary_ping_pong',
+ rpc_type='UNARY',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER')
yield _ping_pong_scenario(
- 'python_protobuf_sync_unary_ping_pong', rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'python_protobuf_sync_unary_ping_pong',
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_SERVER',
categories=[SMOKETEST])
yield _ping_pong_scenario(
- 'python_protobuf_sync_unary_qps_unconstrained', rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'python_protobuf_sync_unary_qps_unconstrained',
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_SERVER',
unconstrained_client='sync')
yield _ping_pong_scenario(
- 'python_protobuf_sync_streaming_qps_unconstrained', rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'python_protobuf_sync_streaming_qps_unconstrained',
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_SERVER',
unconstrained_client='sync')
yield _ping_pong_scenario(
- 'python_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
- server_language='c++', server_core_limit=1, async_server_threads=1,
+ 'python_to_cpp_protobuf_sync_unary_ping_pong',
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_SERVER',
+ server_language='c++',
+ server_core_limit=1,
+ async_server_threads=1,
categories=[SMOKETEST])
yield _ping_pong_scenario(
- 'python_to_cpp_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
- server_language='c++', server_core_limit=1, async_server_threads=1)
+ 'python_to_cpp_protobuf_sync_streaming_ping_pong',
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_SERVER',
+ server_language='c++',
+ server_core_limit=1,
+ async_server_threads=1)
def __str__(self):
return 'python'
+
class RubyLanguage:
def __init__(self):
@@ -456,34 +534,50 @@ class RubyLanguage:
def scenarios(self):
yield _ping_pong_scenario(
- 'ruby_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+ 'ruby_protobuf_sync_streaming_ping_pong',
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
categories=[SMOKETEST])
yield _ping_pong_scenario(
- 'ruby_protobuf_unary_ping_pong', rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+ 'ruby_protobuf_unary_ping_pong',
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
categories=[SMOKETEST])
yield _ping_pong_scenario(
- 'ruby_protobuf_sync_unary_qps_unconstrained', rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+ 'ruby_protobuf_sync_unary_qps_unconstrained',
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
unconstrained_client='sync')
yield _ping_pong_scenario(
- 'ruby_protobuf_sync_streaming_qps_unconstrained', rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+ 'ruby_protobuf_sync_streaming_qps_unconstrained',
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
unconstrained_client='sync')
yield _ping_pong_scenario(
- 'ruby_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
- server_language='c++', server_core_limit=1, async_server_threads=1)
+ 'ruby_to_cpp_protobuf_sync_unary_ping_pong',
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
+ server_language='c++',
+ server_core_limit=1,
+ async_server_threads=1)
yield _ping_pong_scenario(
- 'ruby_to_cpp_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
- server_language='c++', server_core_limit=1, async_server_threads=1)
+ 'ruby_to_cpp_protobuf_sync_streaming_ping_pong',
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
+ server_language='c++',
+ server_core_limit=1,
+ async_server_threads=1)
def __str__(self):
return 'ruby'
@@ -507,58 +601,85 @@ class JavaLanguage:
smoketest_categories = [SMOKETEST] if secure else []
yield _ping_pong_scenario(
- 'java_generic_async_streaming_ping_pong_%s' % secstr, rpc_type='STREAMING',
- client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
- use_generic_payload=True, async_server_threads=1,
- secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS,
+ 'java_generic_async_streaming_ping_pong_%s' % secstr,
+ rpc_type='STREAMING',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_GENERIC_SERVER',
+ use_generic_payload=True,
+ async_server_threads=1,
+ secure=secure,
+ warmup_seconds=JAVA_WARMUP_SECONDS,
categories=smoketest_categories)
yield _ping_pong_scenario(
- 'java_protobuf_async_streaming_ping_pong_%s' % secstr, rpc_type='STREAMING',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'java_protobuf_async_streaming_ping_pong_%s' % secstr,
+ rpc_type='STREAMING',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
async_server_threads=1,
- secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS)
+ secure=secure,
+ warmup_seconds=JAVA_WARMUP_SECONDS)
yield _ping_pong_scenario(
- 'java_protobuf_async_unary_ping_pong_%s' % secstr, rpc_type='UNARY',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'java_protobuf_async_unary_ping_pong_%s' % secstr,
+ rpc_type='UNARY',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
async_server_threads=1,
- secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS,
+ secure=secure,
+ warmup_seconds=JAVA_WARMUP_SECONDS,
categories=smoketest_categories)
yield _ping_pong_scenario(
- 'java_protobuf_unary_ping_pong_%s' % secstr, rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+ 'java_protobuf_unary_ping_pong_%s' % secstr,
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
async_server_threads=1,
- secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS)
+ secure=secure,
+ warmup_seconds=JAVA_WARMUP_SECONDS)
yield _ping_pong_scenario(
- 'java_protobuf_async_unary_qps_unconstrained_%s' % secstr, rpc_type='UNARY',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'java_protobuf_async_unary_qps_unconstrained_%s' % secstr,
+ rpc_type='UNARY',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
unconstrained_client='async',
- secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS,
- categories=smoketest_categories+[SCALABLE])
+ secure=secure,
+ warmup_seconds=JAVA_WARMUP_SECONDS,
+ categories=smoketest_categories + [SCALABLE])
yield _ping_pong_scenario(
- 'java_protobuf_async_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'java_protobuf_async_streaming_qps_unconstrained_%s' % secstr,
+ rpc_type='STREAMING',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
unconstrained_client='async',
- secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS,
+ secure=secure,
+ warmup_seconds=JAVA_WARMUP_SECONDS,
categories=[SCALABLE])
yield _ping_pong_scenario(
- 'java_generic_async_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING',
- client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
- unconstrained_client='async', use_generic_payload=True,
- secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS,
+ 'java_generic_async_streaming_qps_unconstrained_%s' % secstr,
+ rpc_type='STREAMING',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_GENERIC_SERVER',
+ unconstrained_client='async',
+ use_generic_payload=True,
+ secure=secure,
+ warmup_seconds=JAVA_WARMUP_SECONDS,
categories=[SCALABLE])
yield _ping_pong_scenario(
- 'java_generic_async_streaming_qps_one_server_core_%s' % secstr, rpc_type='STREAMING',
- client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
- unconstrained_client='async', use_generic_payload=True,
+ 'java_generic_async_streaming_qps_one_server_core_%s' % secstr,
+ rpc_type='STREAMING',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_GENERIC_SERVER',
+ unconstrained_client='async',
+ use_generic_payload=True,
async_server_threads=1,
- secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS)
+ secure=secure,
+ warmup_seconds=JAVA_WARMUP_SECONDS)
# TODO(jtattermusch): add scenarios java vs C++
@@ -586,37 +707,48 @@ class GoLanguage:
# ASYNC_GENERIC_SERVER for Go actually uses a sync streaming server,
# but that's mostly because of lack of better name of the enum value.
yield _ping_pong_scenario(
- 'go_generic_sync_streaming_ping_pong_%s' % secstr, rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
- use_generic_payload=True, async_server_threads=1,
+ 'go_generic_sync_streaming_ping_pong_%s' % secstr,
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_GENERIC_SERVER',
+ use_generic_payload=True,
+ async_server_threads=1,
secure=secure,
categories=smoketest_categories)
yield _ping_pong_scenario(
- 'go_protobuf_sync_streaming_ping_pong_%s' % secstr, rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+ 'go_protobuf_sync_streaming_ping_pong_%s' % secstr,
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
async_server_threads=1,
secure=secure)
yield _ping_pong_scenario(
- 'go_protobuf_sync_unary_ping_pong_%s' % secstr, rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+ 'go_protobuf_sync_unary_ping_pong_%s' % secstr,
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
async_server_threads=1,
secure=secure,
categories=smoketest_categories)
# unconstrained_client='async' is intended (client uses goroutines)
yield _ping_pong_scenario(
- 'go_protobuf_sync_unary_qps_unconstrained_%s' % secstr, rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+ 'go_protobuf_sync_unary_qps_unconstrained_%s' % secstr,
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
unconstrained_client='async',
secure=secure,
- categories=smoketest_categories+[SCALABLE])
+ categories=smoketest_categories + [SCALABLE])
# unconstrained_client='async' is intended (client uses goroutines)
yield _ping_pong_scenario(
- 'go_protobuf_sync_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+ 'go_protobuf_sync_streaming_qps_unconstrained_%s' % secstr,
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
unconstrained_client='async',
secure=secure,
categories=[SCALABLE])
@@ -625,9 +757,12 @@ class GoLanguage:
# ASYNC_GENERIC_SERVER for Go actually uses a sync streaming server,
# but that's mostly because of lack of better name of the enum value.
yield _ping_pong_scenario(
- 'go_generic_sync_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
- unconstrained_client='async', use_generic_payload=True,
+ 'go_generic_sync_streaming_qps_unconstrained_%s' % secstr,
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_GENERIC_SERVER',
+ unconstrained_client='async',
+ use_generic_payload=True,
secure=secure,
categories=[SCALABLE])
@@ -638,11 +773,11 @@ class GoLanguage:
LANGUAGES = {
- 'c++' : CXXLanguage(),
- 'csharp' : CSharpLanguage(),
- 'node' : NodeLanguage(),
- 'ruby' : RubyLanguage(),
- 'java' : JavaLanguage(),
- 'python' : PythonLanguage(),
- 'go' : GoLanguage(),
+ 'c++': CXXLanguage(),
+ 'csharp': CSharpLanguage(),
+ 'node': NodeLanguage(),
+ 'ruby': RubyLanguage(),
+ 'java': JavaLanguage(),
+ 'python': PythonLanguage(),
+ 'go': GoLanguage(),
}
diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json
index 03bdcd4264..936703223e 100644
--- a/tools/run_tests/sources_and_headers.json
+++ b/tools/run_tests/sources_and_headers.json
@@ -2498,6 +2498,26 @@
},
{
"deps": [
+ "gpr",
+ "grpc",
+ "grpc++",
+ "grpc++_test_config"
+ ],
+ "headers": [
+ "test/cpp/rpcmanager/grpc_rpc_manager_test.h"
+ ],
+ "is_filegroup": false,
+ "language": "c++",
+ "name": "grpc_rpc_manager_test",
+ "src": [
+ "test/cpp/rpcmanager/grpc_rpc_manager_test.cc",
+ "test/cpp/rpcmanager/grpc_rpc_manager_test.h"
+ ],
+ "third_party": false,
+ "type": "target"
+ },
+ {
+ "deps": [
"grpc_plugin_support"
],
"headers": [],
@@ -7211,6 +7231,7 @@
"include/grpc++/support/time.h",
"src/cpp/client/create_channel_internal.h",
"src/cpp/common/channel_filter.h",
+ "src/cpp/rpcmanager/grpc_rpc_manager.h",
"src/cpp/server/dynamic_thread_pool.h",
"src/cpp/server/thread_pool_interface.h"
],
@@ -7279,6 +7300,8 @@
"src/cpp/common/completion_queue_cc.cc",
"src/cpp/common/core_codegen.cc",
"src/cpp/common/rpc_method.cc",
+ "src/cpp/rpcmanager/grpc_rpc_manager.cc",
+ "src/cpp/rpcmanager/grpc_rpc_manager.h",
"src/cpp/server/async_generic_service.cc",
"src/cpp/server/create_default_thread_pool.cc",
"src/cpp/server/dynamic_thread_pool.cc",
diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json
index 355fea5d5f..bc95f4274f 100644
--- a/tools/run_tests/tests.json
+++ b/tools/run_tests/tests.json
@@ -2406,6 +2406,27 @@
"cpu_cost": 1.0,
"exclude_configs": [],
"flaky": false,
+ "gtest": false,
+ "language": "c++",
+ "name": "grpc_rpc_manager_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ]
+ },
+ {
+ "args": [],
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "flaky": false,
"gtest": true,
"language": "c++",
"name": "grpc_tool_test",
@@ -31503,6 +31524,27 @@
{
"args": [
"--scenarios_json",
+ "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_client_sync_server_unary_qps_unconstrained_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}"
+ ],
+ "boringssl": true,
+ "ci_platforms": [
+ "linux"
+ ],
+ "cpu_cost": 8,
+ "defaults": "boringssl",
+ "exclude_configs": [],
+ "flaky": false,
+ "language": "c++",
+ "name": "json_run_localhost",
+ "platforms": [
+ "linux"
+ ],
+ "shortname": "json_run_localhost:cpp_protobuf_async_client_sync_server_unary_qps_unconstrained_secure",
+ "timeout_seconds": 180
+ },
+ {
+ "args": [
+ "--scenarios_json",
"{\"scenarios\": [{\"name\": \"cpp_protobuf_sync_unary_ping_pong_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"SYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}"
],
"boringssl": true,
@@ -31713,6 +31755,27 @@
{
"args": [
"--scenarios_json",
+ "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_client_sync_server_unary_qps_unconstrained_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}"
+ ],
+ "boringssl": true,
+ "ci_platforms": [
+ "linux"
+ ],
+ "cpu_cost": 8,
+ "defaults": "boringssl",
+ "exclude_configs": [],
+ "flaky": false,
+ "language": "c++",
+ "name": "json_run_localhost",
+ "platforms": [
+ "linux"
+ ],
+ "shortname": "json_run_localhost:cpp_protobuf_async_client_sync_server_unary_qps_unconstrained_insecure",
+ "timeout_seconds": 180
+ },
+ {
+ "args": [
+ "--scenarios_json",
"{\"scenarios\": [{\"name\": \"cpp_protobuf_sync_unary_ping_pong_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"SYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}"
],
"boringssl": true,
diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj b/vsprojects/vcxproj/grpc++/grpc++.vcxproj
index 6bb9a6169d..509e66d00b 100644
--- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj
+++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj
@@ -361,6 +361,7 @@
<ClInclude Include="$(SolutionDir)\..\src\cpp\server\secure_server_credentials.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\client\create_channel_internal.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\common\channel_filter.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\server\dynamic_thread_pool.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\server\thread_pool_interface.h" />
</ItemGroup>
@@ -405,6 +406,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\common\rpc_method.cc">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.cc">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\server\async_generic_service.cc">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\server\create_default_thread_pool.cc">
diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters
index 761424c3fa..1dd5fd90e5 100644
--- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters
@@ -61,6 +61,9 @@
<ClCompile Include="$(SolutionDir)\..\src\cpp\common\rpc_method.cc">
<Filter>src\cpp\common</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.cc">
+ <Filter>src\cpp\rpcmanager</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\server\async_generic_service.cc">
<Filter>src\cpp\server</Filter>
</ClCompile>
@@ -410,6 +413,9 @@
<ClInclude Include="$(SolutionDir)\..\src\cpp\common\channel_filter.h">
<Filter>src\cpp\common</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.h">
+ <Filter>src\cpp\rpcmanager</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\cpp\server\dynamic_thread_pool.h">
<Filter>src\cpp\server</Filter>
</ClInclude>
@@ -467,6 +473,9 @@
<Filter Include="src\cpp\common">
<UniqueIdentifier>{2336e396-7e0b-8bf9-3b09-adc6ad1f0e5b}</UniqueIdentifier>
</Filter>
+ <Filter Include="src\cpp\rpcmanager">
+ <UniqueIdentifier>{f142b1a2-5198-040b-9da4-2afc09e9248a}</UniqueIdentifier>
+ </Filter>
<Filter Include="src\cpp\server">
<UniqueIdentifier>{321b0980-74ad-e8ca-f23b-deffa5d6bb8f}</UniqueIdentifier>
</Filter>
diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj
index 02e65926fe..5ec59397ec 100644
--- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj
+++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj
@@ -357,6 +357,7 @@
<ItemGroup>
<ClInclude Include="$(SolutionDir)\..\src\cpp\client\create_channel_internal.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\common\channel_filter.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\server\dynamic_thread_pool.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\server\thread_pool_interface.h" />
</ItemGroup>
@@ -391,6 +392,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\common\rpc_method.cc">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.cc">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\server\async_generic_service.cc">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\server\create_default_thread_pool.cc">
diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
index ff1a0e9644..7e5b912f21 100644
--- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
@@ -46,6 +46,9 @@
<ClCompile Include="$(SolutionDir)\..\src\cpp\common\rpc_method.cc">
<Filter>src\cpp\common</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.cc">
+ <Filter>src\cpp\rpcmanager</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\server\async_generic_service.cc">
<Filter>src\cpp\server</Filter>
</ClCompile>
@@ -383,6 +386,9 @@
<ClInclude Include="$(SolutionDir)\..\src\cpp\common\channel_filter.h">
<Filter>src\cpp\common</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.h">
+ <Filter>src\cpp\rpcmanager</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\cpp\server\dynamic_thread_pool.h">
<Filter>src\cpp\server</Filter>
</ClInclude>
@@ -440,6 +446,9 @@
<Filter Include="src\cpp\common">
<UniqueIdentifier>{ed8e4daa-825f-fbe5-2a45-846ad9165d3d}</UniqueIdentifier>
</Filter>
+ <Filter Include="src\cpp\rpcmanager">
+ <UniqueIdentifier>{cb26a5cb-4725-6fee-8abc-09d5fcd52f39}</UniqueIdentifier>
+ </Filter>
<Filter Include="src\cpp\server">
<UniqueIdentifier>{8a54a279-d14b-4237-0df3-1ffe1ef5a7af}</UniqueIdentifier>
</Filter>
diff --git a/vsprojects/vcxproj/test/grpc_rpc_manager_test/grpc_rpc_manager_test.vcxproj b/vsprojects/vcxproj/test/grpc_rpc_manager_test/grpc_rpc_manager_test.vcxproj
new file mode 100644
index 0000000000..4502de8167
--- /dev/null
+++ b/vsprojects/vcxproj/test/grpc_rpc_manager_test/grpc_rpc_manager_test.vcxproj
@@ -0,0 +1,204 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\1.0.204.1.props')" />
+ <ItemGroup Label="ProjectConfigurations">
+ <ProjectConfiguration Include="Debug|Win32">
+ <Configuration>Debug</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Debug|x64">
+ <Configuration>Debug</Configuration>
+ <Platform>x64</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|Win32">
+ <Configuration>Release</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|x64">
+ <Configuration>Release</Configuration>
+ <Platform>x64</Platform>
+ </ProjectConfiguration>
+ </ItemGroup>
+ <PropertyGroup Label="Globals">
+ <ProjectGuid>{A4F24E89-1766-2FAA-9058-1094EAA018A8}</ProjectGuid>
+ <IgnoreWarnIntDirInTempDetected>true</IgnoreWarnIntDirInTempDetected>
+ <IntDir>$(SolutionDir)IntDir\$(MSBuildProjectName)\</IntDir>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '10.0'" Label="Configuration">
+ <PlatformToolset>v100</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '11.0'" Label="Configuration">
+ <PlatformToolset>v110</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '12.0'" Label="Configuration">
+ <PlatformToolset>v120</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '14.0'" Label="Configuration">
+ <PlatformToolset>v140</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)'=='Debug'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>true</UseDebugLibraries>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)'=='Release'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>false</UseDebugLibraries>
+ <WholeProgramOptimization>true</WholeProgramOptimization>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
+ <ImportGroup Label="ExtensionSettings">
+ </ImportGroup>
+ <ImportGroup Label="PropertySheets">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ <Import Project="$(SolutionDir)\..\vsprojects\cpptest.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\global.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\openssl.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\protobuf.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\winsock.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\zlib.props" />
+ </ImportGroup>
+ <PropertyGroup Label="UserMacros" />
+ <PropertyGroup Condition="'$(Configuration)'=='Debug'">
+ <TargetName>grpc_rpc_manager_test</TargetName>
+ <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
+ <Configuration-grpc_dependencies_zlib>Debug</Configuration-grpc_dependencies_zlib>
+ <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
+ <Configuration-grpc_dependencies_openssl>Debug</Configuration-grpc_dependencies_openssl>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)'=='Release'">
+ <TargetName>grpc_rpc_manager_test</TargetName>
+ <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
+ <Configuration-grpc_dependencies_zlib>Release</Configuration-grpc_dependencies_zlib>
+ <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
+ <Configuration-grpc_dependencies_openssl>Release</Configuration-grpc_dependencies_openssl>
+ </PropertyGroup>
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>Disabled</Optimization>
+ <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>Disabled</Optimization>
+ <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>MaxSpeed</Optimization>
+ <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreaded</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <OptimizeReferences>true</OptimizeReferences>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>MaxSpeed</Optimization>
+ <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreaded</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <OptimizeReferences>true</OptimizeReferences>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemGroup>
+ <ClInclude Include="$(SolutionDir)\..\test\cpp\rpcmanager\grpc_rpc_manager_test.h" />
+ </ItemGroup>
+ <ItemGroup>
+ <ClCompile Include="$(SolutionDir)\..\test\cpp\rpcmanager\grpc_rpc_manager_test.cc">
+ </ClCompile>
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc++\grpc++.vcxproj">
+ <Project>{C187A093-A0FE-489D-A40A-6E33DE0F9FEB}</Project>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc\grpc.vcxproj">
+ <Project>{29D16885-7228-4C31-81ED-5F9187C7F2A9}</Project>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr\gpr.vcxproj">
+ <Project>{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}</Project>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc++_test_config\grpc++_test_config.vcxproj">
+ <Project>{3F7D093D-11F9-C4BC-BEB7-18EB28E3F290}</Project>
+ </ProjectReference>
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="packages.config" />
+ </ItemGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
+ <ImportGroup Label="ExtensionTargets">
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
+ </ImportGroup>
+ <Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
+ <PropertyGroup>
+ <ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
+ </PropertyGroup>
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" />
+ </Target>
+</Project>
+
diff --git a/vsprojects/vcxproj/test/grpc_rpc_manager_test/grpc_rpc_manager_test.vcxproj.filters b/vsprojects/vcxproj/test/grpc_rpc_manager_test/grpc_rpc_manager_test.vcxproj.filters
new file mode 100644
index 0000000000..fedaea08d3
--- /dev/null
+++ b/vsprojects/vcxproj/test/grpc_rpc_manager_test/grpc_rpc_manager_test.vcxproj.filters
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup>
+ <ClCompile Include="$(SolutionDir)\..\test\cpp\rpcmanager\grpc_rpc_manager_test.cc">
+ <Filter>test\cpp\rpcmanager</Filter>
+ </ClCompile>
+ </ItemGroup>
+ <ItemGroup>
+ <ClInclude Include="$(SolutionDir)\..\test\cpp\rpcmanager\grpc_rpc_manager_test.h">
+ <Filter>test\cpp\rpcmanager</Filter>
+ </ClInclude>
+ </ItemGroup>
+
+ <ItemGroup>
+ <Filter Include="test">
+ <UniqueIdentifier>{9da529f7-8064-34c0-54da-0fade27184ad}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="test\cpp">
+ <UniqueIdentifier>{b6e53cff-22ab-1194-866d-57caa3551fd2}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="test\cpp\rpcmanager">
+ <UniqueIdentifier>{c63d7236-e7c6-d7b7-e3d8-f25853e358e6}</UniqueIdentifier>
+ </Filter>
+ </ItemGroup>
+</Project>
+