diff options
Diffstat (limited to 'src/cpp/server/server.cc')
-rw-r--r-- | src/cpp/server/server.cc | 44 |
1 files changed, 37 insertions, 7 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index da2bc94639..90f3854a72 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -67,11 +67,17 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC || method->method_type() == RpcMethod::SERVER_STREAMING), + call_details_(nullptr), cq_(nullptr) { grpc_metadata_array_init(&request_metadata_); } - ~SyncRequest() { grpc_metadata_array_destroy(&request_metadata_); } + ~SyncRequest() { + if (call_details_) { + delete call_details_; + } + grpc_metadata_array_destroy(&request_metadata_); + } static SyncRequest* Wait(CompletionQueue* cq, bool* ok) { void* tag = nullptr; @@ -94,17 +100,32 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { void Request(grpc_server* server, grpc_completion_queue* notify_cq) { GPR_ASSERT(cq_ && !in_flight_); in_flight_ = true; - GPR_ASSERT(GRPC_CALL_OK == - grpc_server_request_registered_call( - server, tag_, &call_, &deadline_, &request_metadata_, - has_request_payload_ ? &request_payload_ : nullptr, cq_, - notify_cq, this)); + if (tag_) { + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_registered_call( + server, tag_, &call_, &deadline_, &request_metadata_, + has_request_payload_ ? &request_payload_ : nullptr, cq_, + notify_cq, this)); + } else { + if (!call_details_) { + call_details_ = new grpc_call_details; + grpc_call_details_init(call_details_); + } + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( + server, &call_, call_details_, + &request_metadata_, cq_, notify_cq, this)); + } } bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { if (!*status) { grpc_completion_queue_destroy(cq_); } + if (call_details_) { + deadline_ = call_details_->deadline; + grpc_call_details_destroy(call_details_); + grpc_call_details_init(call_details_); + } return true; } @@ -157,6 +178,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { bool in_flight_; const bool has_request_payload_; grpc_call* call_; + grpc_call_details* call_details_; gpr_timespec deadline_; grpc_metadata_array request_metadata_; grpc_byte_buffer* request_payload_; @@ -183,6 +205,7 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, shutdown_(false), num_running_cb_(0), sync_methods_(new std::list<SyncRequest>), + has_generic_service_(false), server_(CreateServer(max_message_size)), thread_pool_(thread_pool), thread_pool_owned_(thread_pool_owned) { @@ -223,7 +246,8 @@ bool Server::RegisterService(const grpc::string *host, RpcService* service) { return true; } -bool Server::RegisterAsyncService(const grpc::string *host, AsynchronousService* service) { +bool Server::RegisterAsyncService(const grpc::string* host, + AsynchronousService* service) { GPR_ASSERT(service->server_ == nullptr && "Can only register an asynchronous service against one server."); service->server_ = this; @@ -245,6 +269,7 @@ void Server::RegisterAsyncGenericService(AsyncGenericService* service) { GPR_ASSERT(service->server_ == nullptr && "Can only register an async generic service against one server."); service->server_ = this; + has_generic_service_ = true; } int Server::AddListeningPort(const grpc::string& addr, @@ -258,6 +283,11 @@ bool Server::Start() { started_ = true; grpc_server_start(server_); + if (!has_generic_service_) { + unknown_method_.reset(new RpcServiceMethod( + "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); + sync_methods_->emplace_back(unknown_method_.get(), nullptr); + } // Start processing rpcs. if (!sync_methods_->empty()) { for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) { |