aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/server_cc.cc
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-10-18 22:43:49 -0700
committerGravatar Yash Tibrewal <yashkt@google.com>2018-10-18 22:43:49 -0700
commit456231b26d56e13b9a56b93baabede4dd8fc2519 (patch)
treeac29ea953b7e8209c3e77b3b768991c69f463199 /src/cpp/server/server_cc.cc
parentadca91f6cfe57cbd4af1e5a8cc8bfe3b506445c5 (diff)
Server side interception for CompletionOp and AsyncRequest
Diffstat (limited to 'src/cpp/server/server_cc.cc')
-rw-r--r--src/cpp/server/server_cc.cc99
1 files changed, 82 insertions, 17 deletions
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 427d5d5abb..d53c3534a9 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -24,6 +24,7 @@
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpcpp/alarm.h>
#include <grpcpp/completion_queue.h>
#include <grpcpp/generic/async_generic_service.h>
#include <grpcpp/impl/codegen/async_unary_call.h>
@@ -240,6 +241,8 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
global_callbacks_ = global_callbacks;
resources_ = resources;
+ interceptor_methods_.SetCall(&call_);
+ interceptor_methods_.SetReverse();
/* Set interception point for RECV INITIAL METADATA */
interceptor_methods_.AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA);
@@ -256,8 +259,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
interceptor_methods_.SetRecvMessage(request_);
}
- interceptor_methods_.SetCall(&call_);
- interceptor_methods_.SetReverse();
+
auto f = std::bind(&CallData::ContinueRunAfterInterception, this);
if (interceptor_methods_.RunInterceptors(f)) {
ContinueRunAfterInterception();
@@ -725,15 +727,21 @@ void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops,
ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
ServerInterface* server, ServerContext* context,
internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
- void* tag, bool delete_on_finalize)
+ ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
: server_(server),
context_(context),
stream_(stream),
call_cq_(call_cq),
+ notification_cq_(notification_cq),
tag_(tag),
delete_on_finalize_(delete_on_finalize),
call_(nullptr),
- call_wrapper_() {
+ done_intercepting_(false) {
+ /* Set up interception state partially for the receive ops. call_wrapper_ is
+ * not filled at this point, but it will be filled before the interceptors are
+ * run. */
+ interceptor_methods_.SetCall(&call_wrapper_);
+ interceptor_methods_.SetReverse();
call_cq_->RegisterAvalanching(); // This op will trigger more ops
}
@@ -743,17 +751,47 @@ ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() {
bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
bool* status) {
+ if (done_intercepting_) {
+ delete static_cast<Alarm*>(dummy_alarm_);
+ dummy_alarm_ = nullptr;
+ *tag = tag_;
+ if (delete_on_finalize_) {
+ delete this;
+ }
+ return true;
+ }
context_->set_call(call_);
context_->cq_ = call_cq_;
- internal::Call call(call_, server_, call_cq_,
- server_->max_receive_message_size(), nullptr);
+ if (call_wrapper_.call() == nullptr) {
+ /* Fill it since it is empty. */
+ call_wrapper_ = internal::Call(
+ call_, server_, call_cq_, server_->max_receive_message_size(), nullptr);
+ }
+ // just the pointers inside call are copied here
+ stream_->BindCall(&call_wrapper_);
+
+ if (*status && call_ && call_wrapper_.server_rpc_info()) {
+ done_intercepting_ = true;
+ /* Set interception point for RECV INITIAL METADATA */
+ interceptor_methods_.AddInterceptionHookPoint(
+ experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA);
+ interceptor_methods_.SetRecvInitialMetadata(&context_->client_metadata_);
+ auto f = std::bind(&ServerInterface::BaseAsyncRequest::
+ ContinueFinalizeResultAfterInterception,
+ this);
+ if (interceptor_methods_.RunInterceptors(f)) {
+ /* There are no interceptors to run. Continue */
+ } else {
+ /* There were interceptors to be run, so
+ ContinueFinalizeResultAfterInterception will be run when interceptors are
+ done. */
+ return false;
+ }
+ }
if (*status && call_) {
- context_->BeginCompletionOp(&call);
+ context_->BeginCompletionOp(&call_wrapper_);
}
- // just the pointers inside call are copied here
- stream_->BindCall(&call);
-
*tag = tag_;
if (delete_on_finalize_) {
delete this;
@@ -761,11 +799,23 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
return true;
}
+void ServerInterface::BaseAsyncRequest::
+ ContinueFinalizeResultAfterInterception() {
+ context_->BeginCompletionOp(&call_wrapper_);
+ /* Queue a tag which will be returned immediately */
+ dummy_alarm_ = new Alarm();
+ static_cast<Alarm*>(dummy_alarm_)
+ ->Set(notification_cq_,
+ g_core_codegen_interface->gpr_time_0(GPR_CLOCK_MONOTONIC), this);
+}
+
ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
ServerInterface* server, ServerContext* context,
internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
- void* tag)
- : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
+ ServerCompletionQueue* notification_cq, void* tag, const char* name)
+ : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag,
+ true),
+ name_(name) {}
void ServerInterface::RegisteredAsyncRequest::IssueRequest(
void* registered_method, grpc_byte_buffer** payload,
@@ -781,7 +831,7 @@ ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
ServerInterface* server, GenericServerContext* context,
internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
- : BaseAsyncRequest(server, context, stream, call_cq, tag,
+ : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag,
delete_on_finalize) {
grpc_call_details_init(&call_details_);
GPR_ASSERT(notification_cq);
@@ -794,6 +844,10 @@ ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
bool* status) {
+ /* If we are done intercepting, there is nothing more for us to do */
+ if (done_intercepting_) {
+ return BaseAsyncRequest::FinalizeResult(tag, status);
+ }
// TODO(yangg) remove the copy here.
if (*status) {
static_cast<GenericServerContext*>(context_)->method_ =
@@ -804,16 +858,27 @@ bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
}
grpc_slice_unref(call_details_.method);
grpc_slice_unref(call_details_.host);
+ call_wrapper_ = internal::Call(
+ call_, server_, call_cq_, server_->max_receive_message_size(),
+ context_->set_server_rpc_info(experimental::ServerRpcInfo(
+ context_,
+ static_cast<GenericServerContext*>(context_)->method_.c_str(),
+ *server_->interceptor_creators())));
return BaseAsyncRequest::FinalizeResult(tag, status);
}
bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
bool* status) {
- if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) {
- new UnimplementedAsyncRequest(server_, cq_);
- new UnimplementedAsyncResponse(this);
+ if (GenericAsyncRequest::FinalizeResult(tag, status)) {
+ /* We either had no interceptors run or we are done interceptinh */
+ if (*status) {
+ new UnimplementedAsyncRequest(server_, cq_);
+ new UnimplementedAsyncResponse(this);
+ } else {
+ delete this;
+ }
} else {
- delete this;
+ /* The tag was swallowed due to interception. We will see it again. */
}
return false;
}