aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/server_cc.cc
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-10-18 16:07:00 -0700
committerGravatar Yash Tibrewal <yashkt@google.com>2018-10-18 16:07:00 -0700
commitadca91f6cfe57cbd4af1e5a8cc8bfe3b506445c5 (patch)
treeac6cf068868d37bb3cfeb7964d603d43a4424932 /src/cpp/server/server_cc.cc
parentd042a5acf1fc83810c5a3b3e7cf2a8340748f1ba (diff)
Server interception for SyncRequest
Diffstat (limited to 'src/cpp/server/server_cc.cc')
-rw-r--r--src/cpp/server/server_cc.cc51
1 files changed, 30 insertions, 21 deletions
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 59121ba136..427d5d5abb 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -214,6 +214,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
has_request_payload_(mrd->has_request_payload_),
request_payload_(has_request_payload_ ? mrd->request_payload_
: nullptr),
+ request_(nullptr),
method_(mrd->method_),
call_(mrd->call_, server, &cq_, server->max_receive_message_size(),
ctx_.set_server_rpc_info(experimental::ServerRpcInfo(
@@ -248,11 +249,12 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
/* Set interception point for RECV MESSAGE */
auto* handler = resources_ ? method_->handler()
: server_->resource_exhausted_handler_.get();
- auto* request = handler->Deserialize(request_payload_);
+ request_ = handler->Deserialize(request_payload_, &request_status_);
+
request_payload_ = nullptr;
interceptor_methods_.AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
- interceptor_methods_.SetRecvMessage(request);
+ interceptor_methods_.SetRecvMessage(request_);
}
interceptor_methods_.SetCall(&call_);
interceptor_methods_.SetReverse();
@@ -266,22 +268,26 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
}
void ContinueRunAfterInterception() {
- ctx_.BeginCompletionOp(&call_);
- global_callbacks_->PreSynchronousRequest(&ctx_);
- auto* handler = resources_ ? method_->handler()
- : server_->resource_exhausted_handler_.get();
- handler->RunHandler(
- internal::MethodHandler::HandlerParameter(&call_, &ctx_));
- global_callbacks_->PostSynchronousRequest(&ctx_);
-
- cq_.Shutdown();
-
- internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
- cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
-
- /* Ensure the cq_ is shutdown */
- DummyTag ignored_tag;
- GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
+ {
+ ctx_.BeginCompletionOp(&call_);
+ global_callbacks_->PreSynchronousRequest(&ctx_);
+ auto* handler = resources_ ? method_->handler()
+ : server_->resource_exhausted_handler_.get();
+ handler->RunHandler(internal::MethodHandler::HandlerParameter(
+ &call_, &ctx_, request_, request_status_));
+ request_ = nullptr;
+ global_callbacks_->PostSynchronousRequest(&ctx_);
+
+ cq_.Shutdown();
+
+ internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
+ cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
+
+ /* Ensure the cq_ is shutdown */
+ DummyTag ignored_tag;
+ GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
+ }
+ delete this;
}
private:
@@ -289,6 +295,8 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
ServerContext ctx_;
const bool has_request_payload_;
grpc_byte_buffer* request_payload_;
+ void* request_;
+ Status request_status_;
internal::RpcServiceMethod* const method_;
internal::Call call_;
Server* server_;
@@ -359,7 +367,7 @@ class Server::SyncRequestThreadManager : public ThreadManager {
if (ok) {
// Calldata takes ownership of the completion queue and interceptors
// inside sync_req
- SyncRequest::CallData cd(server_, sync_req);
+ auto* cd = new SyncRequest::CallData(server_, sync_req);
// Prepare for the next request
if (!IsShutdown()) {
sync_req->SetupRequest(); // Create new completion queue for sync_req
@@ -367,7 +375,7 @@ class Server::SyncRequestThreadManager : public ThreadManager {
}
GPR_TIMER_SCOPE("cd.Run()", 0);
- cd.Run(global_callbacks_, resources);
+ cd->Run(global_callbacks_, resources);
}
// TODO (sreek) If ok is false here (which it isn't in case of
// grpc_request_registered_call), we should still re-queue the request
@@ -724,7 +732,8 @@ ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
call_cq_(call_cq),
tag_(tag),
delete_on_finalize_(delete_on_finalize),
- call_(nullptr) {
+ call_(nullptr),
+ call_wrapper_() {
call_cq_->RegisterAvalanching(); // This op will trigger more ops
}