aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/server_cc.cc
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-10-17 22:05:48 -0700
committerGravatar Yash Tibrewal <yashkt@google.com>2018-10-17 22:05:48 -0700
commitd042a5acf1fc83810c5a3b3e7cf2a8340748f1ba (patch)
treeb954e4116d1c0d4fa39a21792c46832f917bba26 /src/cpp/server/server_cc.cc
parent0b785ae8cf3dfef0937d300ce00bff8d4b173a18 (diff)
some tests fail
Diffstat (limited to 'src/cpp/server/server_cc.cc')
-rw-r--r--src/cpp/server/server_cc.cc59
1 files changed, 48 insertions, 11 deletions
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 807a66baef..59121ba136 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -27,7 +27,9 @@
#include <grpcpp/completion_queue.h>
#include <grpcpp/generic/async_generic_service.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
+#include <grpcpp/impl/codegen/call.h>
#include <grpcpp/impl/codegen/completion_queue_tag.h>
+#include <grpcpp/impl/codegen/server_interceptor.h>
#include <grpcpp/impl/grpc_library.h>
#include <grpcpp/impl/method_handler_impl.h>
#include <grpcpp/impl/rpc_service_method.h>
@@ -208,14 +210,17 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
public:
explicit CallData(Server* server, SyncRequest* mrd)
: cq_(mrd->cq_),
- call_(mrd->call_, server, &cq_, server->max_receive_message_size(),
- nullptr),
ctx_(mrd->deadline_, &mrd->request_metadata_),
has_request_payload_(mrd->has_request_payload_),
request_payload_(has_request_payload_ ? mrd->request_payload_
: nullptr),
method_(mrd->method_),
- server_(server) {
+ call_(mrd->call_, server, &cq_, server->max_receive_message_size(),
+ ctx_.set_server_rpc_info(experimental::ServerRpcInfo(
+ &ctx_, method_->name(), server->interceptor_creators_))),
+ server_(server),
+ global_callbacks_(nullptr),
+ resources_(false) {
ctx_.set_call(mrd->call_);
ctx_.cq_ = &cq_;
GPR_ASSERT(mrd->in_flight_);
@@ -231,14 +236,43 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
bool resources) {
+ global_callbacks_ = global_callbacks;
+ resources_ = resources;
+
+ /* Set interception point for RECV INITIAL METADATA */
+ interceptor_methods_.AddInterceptionHookPoint(
+ experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA);
+ interceptor_methods_.SetRecvInitialMetadata(&ctx_.client_metadata_);
+
+ if (has_request_payload_) {
+ /* Set interception point for RECV MESSAGE */
+ auto* handler = resources_ ? method_->handler()
+ : server_->resource_exhausted_handler_.get();
+ auto* request = handler->Deserialize(request_payload_);
+ request_payload_ = nullptr;
+ interceptor_methods_.AddInterceptionHookPoint(
+ experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
+ interceptor_methods_.SetRecvMessage(request);
+ }
+ interceptor_methods_.SetCall(&call_);
+ interceptor_methods_.SetReverse();
+ auto f = std::bind(&CallData::ContinueRunAfterInterception, this);
+ if (interceptor_methods_.RunInterceptors(f)) {
+ ContinueRunAfterInterception();
+ } else {
+ /* There were interceptors to be run, so ContinueRunAfterInterception
+ will be run when interceptors are done. */
+ }
+ }
+
+ void ContinueRunAfterInterception() {
ctx_.BeginCompletionOp(&call_);
- global_callbacks->PreSynchronousRequest(&ctx_);
- auto* handler = resources ? method_->handler()
- : server_->resource_exhausted_handler_.get();
- handler->RunHandler(internal::MethodHandler::HandlerParameter(
- &call_, &ctx_, request_payload_));
- global_callbacks->PostSynchronousRequest(&ctx_);
- request_payload_ = nullptr;
+ global_callbacks_->PreSynchronousRequest(&ctx_);
+ auto* handler = resources_ ? method_->handler()
+ : server_->resource_exhausted_handler_.get();
+ handler->RunHandler(
+ internal::MethodHandler::HandlerParameter(&call_, &ctx_));
+ global_callbacks_->PostSynchronousRequest(&ctx_);
cq_.Shutdown();
@@ -252,12 +286,15 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
private:
CompletionQueue cq_;
- internal::Call call_;
ServerContext ctx_;
const bool has_request_payload_;
grpc_byte_buffer* request_payload_;
internal::RpcServiceMethod* const method_;
+ internal::Call call_;
Server* server_;
+ std::shared_ptr<GlobalCallbacks> global_callbacks_;
+ bool resources_;
+ internal::InterceptorBatchMethodsImpl interceptor_methods_;
};
private: