From 9b7757dd356dae2549433d5b2686ce9a902c7bcc Mon Sep 17 00:00:00 2001 From: yang-g Date: Thu, 13 Aug 2015 11:15:53 -0700 Subject: Use a sync service to handle requests to unknown services --- src/cpp/server/server.cc | 44 +++++++++++++++++++++++++++++++++------- src/cpp/server/server_builder.cc | 7 +++++++ 2 files changed, 44 insertions(+), 7 deletions(-) (limited to 'src/cpp') diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index ab87b22f5f..c054eb0a7d 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -67,11 +67,17 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC || method->method_type() == RpcMethod::SERVER_STREAMING), + call_details_(nullptr), cq_(nullptr) { grpc_metadata_array_init(&request_metadata_); } - ~SyncRequest() { grpc_metadata_array_destroy(&request_metadata_); } + ~SyncRequest() { + if (call_details_) { + delete call_details_; + } + grpc_metadata_array_destroy(&request_metadata_); + } static SyncRequest* Wait(CompletionQueue* cq, bool* ok) { void* tag = nullptr; @@ -94,17 +100,32 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { void Request(grpc_server* server, grpc_completion_queue* notify_cq) { GPR_ASSERT(cq_ && !in_flight_); in_flight_ = true; - GPR_ASSERT(GRPC_CALL_OK == - grpc_server_request_registered_call( - server, tag_, &call_, &deadline_, &request_metadata_, - has_request_payload_ ? &request_payload_ : nullptr, cq_, - notify_cq, this)); + if (tag_) { + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_registered_call( + server, tag_, &call_, &deadline_, &request_metadata_, + has_request_payload_ ? &request_payload_ : nullptr, cq_, + notify_cq, this)); + } else { + if (!call_details_) { + call_details_ = new grpc_call_details; + grpc_call_details_init(call_details_); + } + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( + server, &call_, call_details_, + &request_metadata_, cq_, notify_cq, this)); + } } bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { if (!*status) { grpc_completion_queue_destroy(cq_); } + if (call_details_) { + deadline_ = call_details_->deadline; + grpc_call_details_destroy(call_details_); + grpc_call_details_init(call_details_); + } return true; } @@ -157,6 +178,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { bool in_flight_; const bool has_request_payload_; grpc_call* call_; + grpc_call_details* call_details_; gpr_timespec deadline_; grpc_metadata_array request_metadata_; grpc_byte_buffer* request_payload_; @@ -183,6 +205,7 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, shutdown_(false), num_running_cb_(0), sync_methods_(new std::list), + has_generic_service_(false), server_(CreateServer(max_message_size)), thread_pool_(thread_pool), thread_pool_owned_(thread_pool_owned) { @@ -223,7 +246,8 @@ bool Server::RegisterService(const grpc::string *host, RpcService* service) { return true; } -bool Server::RegisterAsyncService(const grpc::string *host, AsynchronousService* service) { +bool Server::RegisterAsyncService(const grpc::string* host, + AsynchronousService* service) { GPR_ASSERT(service->server_ == nullptr && "Can only register an asynchronous service against one server."); service->server_ = this; @@ -245,6 +269,7 @@ void Server::RegisterAsyncGenericService(AsyncGenericService* service) { GPR_ASSERT(service->server_ == nullptr && "Can only register an async generic service against one server."); service->server_ = this; + has_generic_service_ = true; } int Server::AddListeningPort(const grpc::string& addr, @@ -258,6 +283,11 @@ bool Server::Start() { started_ = true; grpc_server_start(server_); + if (!has_generic_service_) { + unknown_method_.reset(new RpcServiceMethod( + "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); + sync_methods_->emplace_back(SyncRequest(unknown_method_.get(), nullptr)); + } // Start processing rpcs. if (!sync_methods_->empty()) { for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) { diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index f723d4611a..fd97ad082c 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -38,6 +38,7 @@ #include #include #include +#include namespace grpc { @@ -100,6 +101,12 @@ std::unique_ptr ServerBuilder::BuildAndStart() { thread_pool_ = CreateDefaultThreadPool(); thread_pool_owned = true; } + // Async services only, create a thread pool to handle requests to unknown + // services. + if (!thread_pool_ && !generic_service_ && !async_services_.empty()) { + thread_pool_ = new FixedSizeThreadPool(1); + thread_pool_owned = true; + } std::unique_ptr server( new Server(thread_pool_, thread_pool_owned, max_message_size_)); for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { -- cgit v1.2.3 From 523201cee080457cb1386f729c2af4cff1477c92 Mon Sep 17 00:00:00 2001 From: yang-g Date: Thu, 13 Aug 2015 14:47:42 -0700 Subject: remove redundant ctor --- src/cpp/server/server.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/cpp') diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index c054eb0a7d..600ce834cf 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -286,7 +286,7 @@ bool Server::Start() { if (!has_generic_service_) { unknown_method_.reset(new RpcServiceMethod( "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); - sync_methods_->emplace_back(SyncRequest(unknown_method_.get(), nullptr)); + sync_methods_->emplace_back(unknown_method_.get(), nullptr); } // Start processing rpcs. if (!sync_methods_->empty()) { -- cgit v1.2.3