aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/server_cc.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/server/server_cc.cc')
-rw-r--r--src/cpp/server/server_cc.cc28
1 files changed, 16 insertions, 12 deletions
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 0a51cf5626..13741ce7aa 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -236,9 +236,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
: nullptr),
request_(nullptr),
method_(mrd->method_),
- call_(mrd->call_, server, &cq_, server->max_receive_message_size(),
- ctx_.set_server_rpc_info(method_->name(),
- server->interceptor_creators_)),
+ call_(
+ mrd->call_, server, &cq_, server->max_receive_message_size(),
+ ctx_.set_server_rpc_info(method_->name(), method_->method_type(),
+ server->interceptor_creators_)),
server_(server),
global_callbacks_(nullptr),
resources_(false) {
@@ -277,7 +278,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
request_payload_ = nullptr;
interceptor_methods_.AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
- interceptor_methods_.SetRecvMessage(request_);
+ interceptor_methods_.SetRecvMessage(request_, nullptr);
}
if (interceptor_methods_.RunInterceptors(
@@ -291,7 +292,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
void ContinueRunAfterInterception() {
{
- ctx_.BeginCompletionOp(&call_, false);
+ ctx_.BeginCompletionOp(&call_, nullptr, nullptr);
global_callbacks_->PreSynchronousRequest(&ctx_);
auto* handler = resources_ ? method_->handler()
: server_->resource_exhausted_handler_.get();
@@ -427,7 +428,8 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
req_->call_, req_->server_, req_->cq_,
req_->server_->max_receive_message_size(),
req_->ctx_.set_server_rpc_info(
- req_->method_->name(), req_->server_->interceptor_creators_));
+ req_->method_->name(), req_->method_->method_type(),
+ req_->server_->interceptor_creators_));
req_->interceptor_methods_.SetCall(call_);
req_->interceptor_methods_.SetReverse();
@@ -444,7 +446,7 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
req_->request_payload_ = nullptr;
req_->interceptor_methods_.AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
- req_->interceptor_methods_.SetRecvMessage(req_->request_);
+ req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr);
}
if (req_->interceptor_methods_.RunInterceptors(
@@ -456,7 +458,6 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
}
}
void ContinueRunAfterInterception() {
- req_->ctx_.BeginCompletionOp(call_, true);
req_->method_->handler()->RunHandler(
internal::MethodHandler::HandlerParameter(
call_, &req_->ctx_, req_->request_, req_->request_status_,
@@ -1018,7 +1019,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
}
}
if (*status && call_) {
- context_->BeginCompletionOp(&call_wrapper_, false);
+ context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr);
}
*tag = tag_;
if (delete_on_finalize_) {
@@ -1029,7 +1030,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
void ServerInterface::BaseAsyncRequest::
ContinueFinalizeResultAfterInterception() {
- context_->BeginCompletionOp(&call_wrapper_, false);
+ context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr);
// Queue a tag which will be returned immediately
grpc_core::ExecCtx exec_ctx;
grpc_cq_begin_op(notification_cq_->cq(), this);
@@ -1042,10 +1043,12 @@ void ServerInterface::BaseAsyncRequest::
ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
ServerInterface* server, ServerContext* context,
internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq, void* tag, const char* name)
+ ServerCompletionQueue* notification_cq, void* tag, const char* name,
+ internal::RpcMethod::RpcType type)
: BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag,
true),
- name_(name) {}
+ name_(name),
+ type_(type) {}
void ServerInterface::RegisteredAsyncRequest::IssueRequest(
void* registered_method, grpc_byte_buffer** payload,
@@ -1092,6 +1095,7 @@ bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
call_, server_, call_cq_, server_->max_receive_message_size(),
context_->set_server_rpc_info(
static_cast<GenericServerContext*>(context_)->method_.c_str(),
+ internal::RpcMethod::BIDI_STREAMING,
*server_->interceptor_creators()));
return BaseAsyncRequest::FinalizeResult(tag, status);
}