diff options
Diffstat (limited to 'src/cpp/server/server_cc.cc')
-rw-r--r-- | src/cpp/server/server_cc.cc | 59 |
1 files changed, 48 insertions, 11 deletions
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 807a66baef..59121ba136 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -27,7 +27,9 @@ #include <grpcpp/completion_queue.h> #include <grpcpp/generic/async_generic_service.h> #include <grpcpp/impl/codegen/async_unary_call.h> +#include <grpcpp/impl/codegen/call.h> #include <grpcpp/impl/codegen/completion_queue_tag.h> +#include <grpcpp/impl/codegen/server_interceptor.h> #include <grpcpp/impl/grpc_library.h> #include <grpcpp/impl/method_handler_impl.h> #include <grpcpp/impl/rpc_service_method.h> @@ -208,14 +210,17 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { public: explicit CallData(Server* server, SyncRequest* mrd) : cq_(mrd->cq_), - call_(mrd->call_, server, &cq_, server->max_receive_message_size(), - nullptr), ctx_(mrd->deadline_, &mrd->request_metadata_), has_request_payload_(mrd->has_request_payload_), request_payload_(has_request_payload_ ? mrd->request_payload_ : nullptr), method_(mrd->method_), - server_(server) { + call_(mrd->call_, server, &cq_, server->max_receive_message_size(), + ctx_.set_server_rpc_info(experimental::ServerRpcInfo( + &ctx_, method_->name(), server->interceptor_creators_))), + server_(server), + global_callbacks_(nullptr), + resources_(false) { ctx_.set_call(mrd->call_); ctx_.cq_ = &cq_; GPR_ASSERT(mrd->in_flight_); @@ -231,14 +236,43 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks, bool resources) { + global_callbacks_ = global_callbacks; + resources_ = resources; + + /* Set interception point for RECV INITIAL METADATA */ + interceptor_methods_.AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); + interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_); + + if (has_request_payload_) { + /* Set interception point for RECV MESSAGE */ + auto* handler = resources_ ? method_->handler() + : server_->resource_exhausted_handler_.get(); + auto* request = handler->Deserialize(request_payload_); + request_payload_ = nullptr; + interceptor_methods_.AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_MESSAGE); + interceptor_methods_.SetRecvMessage(request); + } + interceptor_methods_.SetCall(&call_); + interceptor_methods_.SetReverse(); + auto f = std::bind(&CallData::ContinueRunAfterInterception, this); + if (interceptor_methods_.RunInterceptors(f)) { + ContinueRunAfterInterception(); + } else { + /* There were interceptors to be run, so ContinueRunAfterInterception + will be run when interceptors are done. */ + } + } + + void ContinueRunAfterInterception() { ctx_.BeginCompletionOp(&call_); - global_callbacks->PreSynchronousRequest(&ctx_); - auto* handler = resources ? method_->handler() - : server_->resource_exhausted_handler_.get(); - handler->RunHandler(internal::MethodHandler::HandlerParameter( - &call_, &ctx_, request_payload_)); - global_callbacks->PostSynchronousRequest(&ctx_); - request_payload_ = nullptr; + global_callbacks_->PreSynchronousRequest(&ctx_); + auto* handler = resources_ ? method_->handler() + : server_->resource_exhausted_handler_.get(); + handler->RunHandler( + internal::MethodHandler::HandlerParameter(&call_, &ctx_)); + global_callbacks_->PostSynchronousRequest(&ctx_); cq_.Shutdown(); @@ -252,12 +286,15 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { private: CompletionQueue cq_; - internal::Call call_; ServerContext ctx_; const bool has_request_payload_; grpc_byte_buffer* request_payload_; internal::RpcServiceMethod* const method_; + internal::Call call_; Server* server_; + std::shared_ptr<GlobalCallbacks> global_callbacks_; + bool resources_; + internal::InterceptorBatchMethodsImpl interceptor_methods_; }; private: |