From 5dd32268be62114e8a7c81d60c0dc2633fb83081 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 14 Nov 2017 19:04:02 -0800 Subject: Switch C++ sync server to use gpr_thd rather than std::thread and provide resource exhaustion mechanism --- include/grpc++/impl/codegen/completion_queue.h | 6 ++++-- include/grpc++/impl/codegen/method_handler_impl.h | 12 +++++++++--- include/grpc++/impl/codegen/server_context.h | 6 ++++-- include/grpc++/server.h | 18 +++++++++++++++++- include/grpc++/server_builder.h | 19 +++++++++++++++++++ 5 files changed, 53 insertions(+), 8 deletions(-) (limited to 'include') diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index b8a7862578..452eac6646 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -78,7 +78,8 @@ template class ServerStreamingHandler; template class BidiStreamingHandler; -class UnknownMethodHandler; +template +class ErrorMethodHandler; template class TemplatedBidiStreamingHandler; template @@ -221,7 +222,8 @@ class CompletionQueue : private GrpcLibraryCodegen { friend class ::grpc::internal::ServerStreamingHandler; template friend class ::grpc::internal::TemplatedBidiStreamingHandler; - friend class ::grpc::internal::UnknownMethodHandler; + template + friend class ::grpc::internal::ErrorMethodHandler; friend class ::grpc::Server; friend class ::grpc::ServerContext; friend class ::grpc::ServerInterface; diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index c0af4ca130..d98ab7938c 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -242,12 +242,14 @@ class SplitServerStreamingHandler ServerSplitStreamer, false>(func) {} }; -/// Handle unknown method by returning UNIMPLEMENTED error. -class UnknownMethodHandler : public MethodHandler { +/// General method handler class for errors that prevent real method use +/// e.g., handle unknown method by returning UNIMPLEMENTED error. +template +class ErrorMethodHandler : public MethodHandler { public: template static void FillOps(ServerContext* context, T* ops) { - Status status(StatusCode::UNIMPLEMENTED, ""); + Status status(code, ""); if (!context->sent_initial_metadata_) { ops->SendInitialMetadata(context->initial_metadata_, context->initial_metadata_flags()); @@ -267,6 +269,10 @@ class UnknownMethodHandler : public MethodHandler { } }; +typedef ErrorMethodHandler UnknownMethodHandler; +typedef ErrorMethodHandler + ResourceExhaustedHandler; + } // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index a2d6967bf8..9f20335a2a 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -63,7 +63,8 @@ template class ServerStreamingHandler; template class BidiStreamingHandler; -class UnknownMethodHandler; +template +class ErrorMethodHandler; template class TemplatedBidiStreamingHandler; class Call; @@ -255,7 +256,8 @@ class ServerContext { friend class ::grpc::internal::ServerStreamingHandler; template friend class ::grpc::internal::TemplatedBidiStreamingHandler; - friend class ::grpc::internal::UnknownMethodHandler; + template + friend class ::grpc::internal::ErrorMethodHandler; friend class ::grpc::ClientContext; /// Prevent copying. diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 01c4a60d21..456603e4e7 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -35,6 +35,7 @@ #include #include #include +#include struct grpc_server; @@ -138,10 +139,17 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen { /// /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on /// server completion queues passed via sync_server_cqs param. + /// + /// \param thread_creator The thread creation function for the sync + /// server. Typically gpr_thd_new Server(int max_message_size, ChannelArguments* args, std::shared_ptr>> sync_server_cqs, - int min_pollers, int max_pollers, int sync_cq_timeout_msec); + int min_pollers, int max_pollers, int sync_cq_timeout_msec, + std::function + thread_creator, + std::function thread_joiner); /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the Server instance. @@ -220,6 +228,14 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen { std::unique_ptr health_check_service_; bool health_check_service_disabled_; + + std::function + thread_creator_; + std::function thread_joiner_; + + // A special handler for resource exhausted in sync case + std::unique_ptr resource_exhausted_handler_; }; } // namespace grpc diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index e2bae4b41f..25bbacbbc7 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -20,6 +20,7 @@ #define GRPCXX_SERVER_BUILDER_H #include +#include #include #include #include @@ -30,6 +31,7 @@ #include #include #include +#include #include #include @@ -47,6 +49,7 @@ class Service; namespace testing { class ServerBuilderPluginTest; +class ServerBuilderThreadCreatorOverrideTest; } // namespace testing /// A builder class for the creation and startup of \a grpc::Server instances. @@ -213,6 +216,17 @@ class ServerBuilder { private: friend class ::grpc::testing::ServerBuilderPluginTest; + friend class ::grpc::testing::ServerBuilderThreadCreatorOverrideTest; + + ServerBuilder& SetThreadFunctions( + std::function + thread_creator, + std::function thread_joiner) { + thread_creator_ = thread_creator; + thread_joiner_ = thread_joiner; + return *this; + } struct Port { grpc::string addr; @@ -272,6 +286,11 @@ class ServerBuilder { grpc_compression_algorithm algorithm; } maybe_default_compression_algorithm_; uint32_t enabled_compression_algorithms_bitset_; + + std::function + thread_creator_; + std::function thread_joiner_; }; } // namespace grpc -- cgit v1.2.3 From fd4884a768a03b8c459b02e7751c072f6efd147c Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 10 Jan 2018 18:47:09 +0000 Subject: Address review comments --- include/grpc++/server.h | 3 +++ 1 file changed, 3 insertions(+) (limited to 'include') diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 456603e4e7..cf590185d1 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -142,6 +142,9 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen { /// /// \param thread_creator The thread creation function for the sync /// server. Typically gpr_thd_new + /// + /// \param thread_joiner The thread joining function for the sync + /// server. Typically gpr_thd_join Server(int max_message_size, ChannelArguments* args, std::shared_ptr>> sync_server_cqs, -- cgit v1.2.3 From b4b0ac704984be21d128924433cbe9bcd568ef83 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 10 Jan 2018 22:02:37 +0000 Subject: Resolve leak by freeing request payload if resources exhausted --- include/grpc++/impl/codegen/byte_buffer.h | 4 ++++ include/grpc++/impl/codegen/method_handler_impl.h | 5 +++++ src/cpp/server/server_cc.cc | 3 ++- 3 files changed, 11 insertions(+), 1 deletion(-) (limited to 'include') diff --git a/include/grpc++/impl/codegen/byte_buffer.h b/include/grpc++/impl/codegen/byte_buffer.h index fe73ce7a83..9c0246e617 100644 --- a/include/grpc++/impl/codegen/byte_buffer.h +++ b/include/grpc++/impl/codegen/byte_buffer.h @@ -41,6 +41,8 @@ template class RpcMethodHandler; template class ServerStreamingHandler; +template +class ErrorMethodHandler; template class DeserializeFuncType; } // namespace internal @@ -107,6 +109,8 @@ class ByteBuffer final { friend class internal::RpcMethodHandler; template friend class internal::ServerStreamingHandler; + template + friend class internal::ErrorMethodHandler; template friend class internal::DeserializeFuncType; diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index d98ab7938c..93b7826e8f 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -266,6 +266,11 @@ class ErrorMethodHandler : public MethodHandler { FillOps(param.server_context, &ops); param.call->PerformOps(&ops); param.call->cq()->Pluck(&ops); + // We also have to destroy any request payload in the handler parameter + ByteBuffer* payload = param.request.bbuf_ptr(); + if (payload != nullptr) { + payload->Clear(); + } } }; diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 6ab76a287e..02a663d660 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -196,7 +196,8 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { call_(mrd->call_, server, &cq_, server->max_receive_message_size()), ctx_(mrd->deadline_, &mrd->request_metadata_), has_request_payload_(mrd->has_request_payload_), - request_payload_(mrd->request_payload_), + request_payload_(has_request_payload_ ? mrd->request_payload_ + : nullptr), method_(mrd->method_), server_(server) { ctx_.set_call(mrd->call_); -- cgit v1.2.3