diff options
author | 2018-10-18 22:43:49 -0700 | |
---|---|---|
committer | 2018-10-18 22:43:49 -0700 | |
commit | 456231b26d56e13b9a56b93baabede4dd8fc2519 (patch) | |
tree | ac29ea953b7e8209c3e77b3b768991c69f463199 /src/cpp/server/server_cc.cc | |
parent | adca91f6cfe57cbd4af1e5a8cc8bfe3b506445c5 (diff) |
Server side interception for CompletionOp and AsyncRequest
Diffstat (limited to 'src/cpp/server/server_cc.cc')
-rw-r--r-- | src/cpp/server/server_cc.cc | 99 |
1 files changed, 82 insertions, 17 deletions
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 427d5d5abb..d53c3534a9 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -24,6 +24,7 @@ #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpcpp/alarm.h> #include <grpcpp/completion_queue.h> #include <grpcpp/generic/async_generic_service.h> #include <grpcpp/impl/codegen/async_unary_call.h> @@ -240,6 +241,8 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { global_callbacks_ = global_callbacks; resources_ = resources; + interceptor_methods_.SetCall(&call_); + interceptor_methods_.SetReverse(); /* Set interception point for RECV INITIAL METADATA */ interceptor_methods_.AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); @@ -256,8 +259,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { experimental::InterceptionHookPoints::POST_RECV_MESSAGE); interceptor_methods_.SetRecvMessage(request_); } - interceptor_methods_.SetCall(&call_); - interceptor_methods_.SetReverse(); + auto f = std::bind(&CallData::ContinueRunAfterInterception, this); if (interceptor_methods_.RunInterceptors(f)) { ContinueRunAfterInterception(); @@ -725,15 +727,21 @@ void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops, ServerInterface::BaseAsyncRequest::BaseAsyncRequest( ServerInterface* server, ServerContext* context, internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - void* tag, bool delete_on_finalize) + ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) : server_(server), context_(context), stream_(stream), call_cq_(call_cq), + notification_cq_(notification_cq), tag_(tag), delete_on_finalize_(delete_on_finalize), call_(nullptr), - call_wrapper_() { + done_intercepting_(false) { + /* Set up interception state partially for the receive ops. call_wrapper_ is + * not filled at this point, but it will be filled before the interceptors are + * run. */ + interceptor_methods_.SetCall(&call_wrapper_); + interceptor_methods_.SetReverse(); call_cq_->RegisterAvalanching(); // This op will trigger more ops } @@ -743,17 +751,47 @@ ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() { bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) { + if (done_intercepting_) { + delete static_cast<Alarm*>(dummy_alarm_); + dummy_alarm_ = nullptr; + *tag = tag_; + if (delete_on_finalize_) { + delete this; + } + return true; + } context_->set_call(call_); context_->cq_ = call_cq_; - internal::Call call(call_, server_, call_cq_, - server_->max_receive_message_size(), nullptr); + if (call_wrapper_.call() == nullptr) { + /* Fill it since it is empty. */ + call_wrapper_ = internal::Call( + call_, server_, call_cq_, server_->max_receive_message_size(), nullptr); + } + // just the pointers inside call are copied here + stream_->BindCall(&call_wrapper_); + + if (*status && call_ && call_wrapper_.server_rpc_info()) { + done_intercepting_ = true; + /* Set interception point for RECV INITIAL METADATA */ + interceptor_methods_.AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); + interceptor_methods_.SetRecvInitialMetadata(&context_->client_metadata_); + auto f = std::bind(&ServerInterface::BaseAsyncRequest:: + ContinueFinalizeResultAfterInterception, + this); + if (interceptor_methods_.RunInterceptors(f)) { + /* There are no interceptors to run. Continue */ + } else { + /* There were interceptors to be run, so + ContinueFinalizeResultAfterInterception will be run when interceptors are + done. */ + return false; + } + } if (*status && call_) { - context_->BeginCompletionOp(&call); + context_->BeginCompletionOp(&call_wrapper_); } - // just the pointers inside call are copied here - stream_->BindCall(&call); - *tag = tag_; if (delete_on_finalize_) { delete this; @@ -761,11 +799,23 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, return true; } +void ServerInterface::BaseAsyncRequest:: + ContinueFinalizeResultAfterInterception() { + context_->BeginCompletionOp(&call_wrapper_); + /* Queue a tag which will be returned immediately */ + dummy_alarm_ = new Alarm(); + static_cast<Alarm*>(dummy_alarm_) + ->Set(notification_cq_, + g_core_codegen_interface->gpr_time_0(GPR_CLOCK_MONOTONIC), this); +} + ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( ServerInterface* server, ServerContext* context, internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - void* tag) - : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {} + ServerCompletionQueue* notification_cq, void* tag, const char* name) + : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, + true), + name_(name) {} void ServerInterface::RegisteredAsyncRequest::IssueRequest( void* registered_method, grpc_byte_buffer** payload, @@ -781,7 +831,7 @@ ServerInterface::GenericAsyncRequest::GenericAsyncRequest( ServerInterface* server, GenericServerContext* context, internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) - : BaseAsyncRequest(server, context, stream, call_cq, tag, + : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, delete_on_finalize) { grpc_call_details_init(&call_details_); GPR_ASSERT(notification_cq); @@ -794,6 +844,10 @@ ServerInterface::GenericAsyncRequest::GenericAsyncRequest( bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) { + /* If we are done intercepting, there is nothing more for us to do */ + if (done_intercepting_) { + return BaseAsyncRequest::FinalizeResult(tag, status); + } // TODO(yangg) remove the copy here. if (*status) { static_cast<GenericServerContext*>(context_)->method_ = @@ -804,16 +858,27 @@ bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, } grpc_slice_unref(call_details_.method); grpc_slice_unref(call_details_.host); + call_wrapper_ = internal::Call( + call_, server_, call_cq_, server_->max_receive_message_size(), + context_->set_server_rpc_info(experimental::ServerRpcInfo( + context_, + static_cast<GenericServerContext*>(context_)->method_.c_str(), + *server_->interceptor_creators()))); return BaseAsyncRequest::FinalizeResult(tag, status); } bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, bool* status) { - if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) { - new UnimplementedAsyncRequest(server_, cq_); - new UnimplementedAsyncResponse(this); + if (GenericAsyncRequest::FinalizeResult(tag, status)) { + /* We either had no interceptors run or we are done interceptinh */ + if (*status) { + new UnimplementedAsyncRequest(server_, cq_); + new UnimplementedAsyncResponse(this); + } else { + delete this; + } } else { - delete this; + /* The tag was swallowed due to interception. We will see it again. */ } return false; } |