diff options
author | 2018-10-18 16:07:00 -0700 | |
---|---|---|
committer | 2018-10-18 16:07:00 -0700 | |
commit | adca91f6cfe57cbd4af1e5a8cc8bfe3b506445c5 (patch) | |
tree | ac6cf068868d37bb3cfeb7964d603d43a4424932 /src/cpp/server/server_cc.cc | |
parent | d042a5acf1fc83810c5a3b3e7cf2a8340748f1ba (diff) |
Server interception for SyncRequest
Diffstat (limited to 'src/cpp/server/server_cc.cc')
-rw-r--r-- | src/cpp/server/server_cc.cc | 51 |
1 files changed, 30 insertions, 21 deletions
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 59121ba136..427d5d5abb 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -214,6 +214,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { has_request_payload_(mrd->has_request_payload_), request_payload_(has_request_payload_ ? mrd->request_payload_ : nullptr), + request_(nullptr), method_(mrd->method_), call_(mrd->call_, server, &cq_, server->max_receive_message_size(), ctx_.set_server_rpc_info(experimental::ServerRpcInfo( @@ -248,11 +249,12 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { /* Set interception point for RECV MESSAGE */ auto* handler = resources_ ? method_->handler() : server_->resource_exhausted_handler_.get(); - auto* request = handler->Deserialize(request_payload_); + request_ = handler->Deserialize(request_payload_, &request_status_); + request_payload_ = nullptr; interceptor_methods_.AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_MESSAGE); - interceptor_methods_.SetRecvMessage(request); + interceptor_methods_.SetRecvMessage(request_); } interceptor_methods_.SetCall(&call_); interceptor_methods_.SetReverse(); @@ -266,22 +268,26 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { } void ContinueRunAfterInterception() { - ctx_.BeginCompletionOp(&call_); - global_callbacks_->PreSynchronousRequest(&ctx_); - auto* handler = resources_ ? method_->handler() - : server_->resource_exhausted_handler_.get(); - handler->RunHandler( - internal::MethodHandler::HandlerParameter(&call_, &ctx_)); - global_callbacks_->PostSynchronousRequest(&ctx_); - - cq_.Shutdown(); - - internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); - cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); - - /* Ensure the cq_ is shutdown */ - DummyTag ignored_tag; - GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); + { + ctx_.BeginCompletionOp(&call_); + global_callbacks_->PreSynchronousRequest(&ctx_); + auto* handler = resources_ ? method_->handler() + : server_->resource_exhausted_handler_.get(); + handler->RunHandler(internal::MethodHandler::HandlerParameter( + &call_, &ctx_, request_, request_status_)); + request_ = nullptr; + global_callbacks_->PostSynchronousRequest(&ctx_); + + cq_.Shutdown(); + + internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); + cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); + + /* Ensure the cq_ is shutdown */ + DummyTag ignored_tag; + GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); + } + delete this; } private: @@ -289,6 +295,8 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { ServerContext ctx_; const bool has_request_payload_; grpc_byte_buffer* request_payload_; + void* request_; + Status request_status_; internal::RpcServiceMethod* const method_; internal::Call call_; Server* server_; @@ -359,7 +367,7 @@ class Server::SyncRequestThreadManager : public ThreadManager { if (ok) { // Calldata takes ownership of the completion queue and interceptors // inside sync_req - SyncRequest::CallData cd(server_, sync_req); + auto* cd = new SyncRequest::CallData(server_, sync_req); // Prepare for the next request if (!IsShutdown()) { sync_req->SetupRequest(); // Create new completion queue for sync_req @@ -367,7 +375,7 @@ class Server::SyncRequestThreadManager : public ThreadManager { } GPR_TIMER_SCOPE("cd.Run()", 0); - cd.Run(global_callbacks_, resources); + cd->Run(global_callbacks_, resources); } // TODO (sreek) If ok is false here (which it isn't in case of // grpc_request_registered_call), we should still re-queue the request @@ -724,7 +732,8 @@ ServerInterface::BaseAsyncRequest::BaseAsyncRequest( call_cq_(call_cq), tag_(tag), delete_on_finalize_(delete_on_finalize), - call_(nullptr) { + call_(nullptr), + call_wrapper_() { call_cq_->RegisterAvalanching(); // This op will trigger more ops } |