diff options
author | Yash Tibrewal <yashkt@google.com> | 2018-11-06 14:56:24 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-11-06 14:56:24 -0800 |
commit | 193b4b57ec8ec1509a91069f8cdd1ba1917b2d71 (patch) | |
tree | 6e346ecbad7f8b579c585b3619123ce17f5f9380 /include | |
parent | 385fde64fc2e3b0241229e4a018ea0989a2a00bc (diff) | |
parent | ded9434e4c23e6be69feeda9ca05b31bceb5e5f9 (diff) |
Merge pull request #17072 from yashykt/interceptor_cancellation
Interceptors should see a Cancellation notification
Diffstat (limited to 'include')
-rw-r--r-- | include/grpcpp/impl/codegen/call_op_set.h | 55 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/client_context.h | 2 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/interceptor.h | 9 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/interceptor_common.h | 157 |
4 files changed, 153 insertions, 70 deletions
diff --git a/include/grpcpp/impl/codegen/call_op_set.h b/include/grpcpp/impl/codegen/call_op_set.h index 5c52b027b2..b4c34a01c9 100644 --- a/include/grpcpp/impl/codegen/call_op_set.h +++ b/include/grpcpp/impl/codegen/call_op_set.h @@ -214,11 +214,10 @@ class CallNoOp { void AddOp(grpc_op* ops, size_t* nops) {} void FinishOp(bool* status) {} void SetInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) {} + InterceptorBatchMethodsImpl* interceptor_methods) {} void SetFinishInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) {} - void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) { - } + InterceptorBatchMethodsImpl* interceptor_methods) {} + void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {} }; class CallOpSendInitialMetadata { @@ -265,7 +264,7 @@ class CallOpSendInitialMetadata { } void SetInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { if (!send_) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA); @@ -273,9 +272,9 @@ class CallOpSendInitialMetadata { } void SetFinishInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) {} + InterceptorBatchMethodsImpl* interceptor_methods) {} - void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) { + void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; } @@ -318,7 +317,7 @@ class CallOpSendMessage { void FinishOp(bool* status) { send_buf_.Clear(); } void SetInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { if (!send_buf_.Valid()) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_MESSAGE); @@ -326,9 +325,9 @@ class CallOpSendMessage { } void SetFinishInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) {} + InterceptorBatchMethodsImpl* interceptor_methods) {} - void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) { + void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; } @@ -406,17 +405,17 @@ class CallOpRecvMessage { } void SetInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { interceptor_methods->SetRecvMessage(message_); } void SetFinishInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { if (!got_message) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_MESSAGE); } - void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) { + void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; if (message_ == nullptr) return; interceptor_methods->AddInterceptionHookPoint( @@ -501,17 +500,17 @@ class CallOpGenericRecvMessage { } void SetInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { interceptor_methods->SetRecvMessage(message_); } void SetFinishInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { if (!got_message) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_MESSAGE); } - void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) { + void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; if (!deserialize_) return; interceptor_methods->AddInterceptionHookPoint( @@ -543,16 +542,16 @@ class CallOpClientSendClose { void FinishOp(bool* status) { send_ = false; } void SetInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { if (!send_) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_CLOSE); } void SetFinishInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) {} + InterceptorBatchMethodsImpl* interceptor_methods) {} - void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) { + void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; } @@ -600,7 +599,7 @@ class CallOpServerSendStatus { } void SetInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { if (!send_status_available_) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_STATUS); @@ -610,9 +609,9 @@ class CallOpServerSendStatus { } void SetFinishInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) {} + InterceptorBatchMethodsImpl* interceptor_methods) {} - void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) { + void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; } @@ -652,19 +651,19 @@ class CallOpRecvInitialMetadata { } void SetInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { interceptor_methods->SetRecvInitialMetadata(metadata_map_); } void SetFinishInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { if (metadata_map_ == nullptr) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); metadata_map_ = nullptr; } - void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) { + void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; if (metadata_map_ == nullptr) return; interceptor_methods->AddInterceptionHookPoint( @@ -720,20 +719,20 @@ class CallOpClientRecvStatus { } void SetInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { interceptor_methods->SetRecvStatus(recv_status_); interceptor_methods->SetRecvTrailingMetadata(metadata_map_); } void SetFinishInterceptionHookPoint( - InternalInterceptorBatchMethods* interceptor_methods) { + InterceptorBatchMethodsImpl* interceptor_methods) { if (recv_status_ == nullptr) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_STATUS); recv_status_ = nullptr; } - void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) { + void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { hijacked_ = true; if (recv_status_ == nullptr) return; interceptor_methods->AddInterceptionHookPoint( diff --git a/include/grpcpp/impl/codegen/client_context.h b/include/grpcpp/impl/codegen/client_context.h index f53b744dcf..75b955e760 100644 --- a/include/grpcpp/impl/codegen/client_context.h +++ b/include/grpcpp/impl/codegen/client_context.h @@ -426,6 +426,8 @@ class ClientContext { grpc::string authority() { return authority_; } + void SendCancelToInterceptors(); + bool initial_metadata_received_; bool wait_for_ready_; bool wait_for_ready_explicitly_set_; diff --git a/include/grpcpp/impl/codegen/interceptor.h b/include/grpcpp/impl/codegen/interceptor.h index 15cab711e5..19f6afcb72 100644 --- a/include/grpcpp/impl/codegen/interceptor.h +++ b/include/grpcpp/impl/codegen/interceptor.h @@ -56,6 +56,11 @@ enum class InterceptionHookPoints { POST_RECV_MESSAGE, POST_RECV_STATUS /* client only */, POST_RECV_CLOSE /* server only */, + /* This is a special hook point available to both clients and servers when + TryCancel() is performed. It is illegal for an interceptor to block/delay + this operation. ALL interceptors see this hook point irrespective of + whether the RPC was hijacked or not. */ + PRE_SEND_CANCEL, NUM_INTERCEPTION_HOOKS }; @@ -66,7 +71,9 @@ class InterceptorBatchMethods { // of type \a type virtual bool QueryInterceptionHookPoint(InterceptionHookPoints type) = 0; // Calling this will signal that the interceptor is done intercepting the - // current batch of the RPC + // current batch of the RPC. + // Proceed is a no-op if the batch contains PRE_SEND_CANCEL. Simply returning + // from the Intercept method does the job of continuing the RPC in this case. virtual void Proceed() = 0; // Calling this indicates that the interceptor has hijacked the RPC (only // valid if the batch contains send_initial_metadata on the client side) diff --git a/include/grpcpp/impl/codegen/interceptor_common.h b/include/grpcpp/impl/codegen/interceptor_common.h index cf564977f6..d0aa23cb0a 100644 --- a/include/grpcpp/impl/codegen/interceptor_common.h +++ b/include/grpcpp/impl/codegen/interceptor_common.h @@ -19,7 +19,13 @@ #ifndef GRPCPP_IMPL_CODEGEN_INTERCEPTOR_COMMON_H #define GRPCPP_IMPL_CODEGEN_INTERCEPTOR_COMMON_H +#include <array> +#include <functional> + +#include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/call_op_set_interface.h> #include <grpcpp/impl/codegen/client_interceptor.h> +#include <grpcpp/impl/codegen/intercepted_channel.h> #include <grpcpp/impl/codegen/server_interceptor.h> #include <grpc/impl/codegen/grpc_types.h> @@ -27,38 +33,9 @@ namespace grpc { namespace internal { -/// Internal methods for setting the state -class InternalInterceptorBatchMethods +class InterceptorBatchMethodsImpl : public experimental::InterceptorBatchMethods { public: - virtual ~InternalInterceptorBatchMethods() {} - - virtual void AddInterceptionHookPoint( - experimental::InterceptionHookPoints type) = 0; - - virtual void SetSendMessage(ByteBuffer* buf) = 0; - - virtual void SetSendInitialMetadata( - std::multimap<grpc::string, grpc::string>* metadata) = 0; - - virtual void SetSendStatus(grpc_status_code* code, - grpc::string* error_details, - grpc::string* error_message) = 0; - - virtual void SetSendTrailingMetadata( - std::multimap<grpc::string, grpc::string>* metadata) = 0; - - virtual void SetRecvMessage(void* message) = 0; - - virtual void SetRecvInitialMetadata(MetadataMap* map) = 0; - - virtual void SetRecvStatus(Status* status) = 0; - - virtual void SetRecvTrailingMetadata(MetadataMap* map) = 0; -}; - -class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { - public: InterceptorBatchMethodsImpl() { for (auto i = static_cast<experimental::InterceptionHookPoints>(0); i < experimental::InterceptionHookPoints::NUM_INTERCEPTION_HOOKS; @@ -75,7 +52,7 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { return hooks_[static_cast<size_t>(type)]; } - void Proceed() override { /* fill this */ + void Proceed() override { if (call_->client_rpc_info() != nullptr) { return ProceedClient(); } @@ -98,8 +75,7 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { rpc_info->RunInterceptor(this, current_interceptor_index_); } - void AddInterceptionHookPoint( - experimental::InterceptionHookPoints type) override { + void AddInterceptionHookPoint(experimental::InterceptionHookPoints type) { hooks_[static_cast<size_t>(type)] = true; } @@ -139,34 +115,34 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { return recv_trailing_metadata_->map(); } - void SetSendMessage(ByteBuffer* buf) override { send_message_ = buf; } + void SetSendMessage(ByteBuffer* buf) { send_message_ = buf; } void SetSendInitialMetadata( - std::multimap<grpc::string, grpc::string>* metadata) override { + std::multimap<grpc::string, grpc::string>* metadata) { send_initial_metadata_ = metadata; } void SetSendStatus(grpc_status_code* code, grpc::string* error_details, - grpc::string* error_message) override { + grpc::string* error_message) { code_ = code; error_details_ = error_details; error_message_ = error_message; } void SetSendTrailingMetadata( - std::multimap<grpc::string, grpc::string>* metadata) override { + std::multimap<grpc::string, grpc::string>* metadata) { send_trailing_metadata_ = metadata; } - void SetRecvMessage(void* message) override { recv_message_ = message; } + void SetRecvMessage(void* message) { recv_message_ = message; } - void SetRecvInitialMetadata(MetadataMap* map) override { + void SetRecvInitialMetadata(MetadataMap* map) { recv_initial_metadata_ = map; } - void SetRecvStatus(Status* status) override { recv_status_ = status; } + void SetRecvStatus(Status* status) { recv_status_ = status; } - void SetRecvTrailingMetadata(MetadataMap* map) override { + void SetRecvTrailingMetadata(MetadataMap* map) { recv_trailing_metadata_ = map; } @@ -377,6 +353,105 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { MetadataMap* recv_trailing_metadata_ = nullptr; }; +// A special implementation of InterceptorBatchMethods to send a Cancel +// notification down the interceptor stack +class CancelInterceptorBatchMethods + : public experimental::InterceptorBatchMethods { + public: + bool QueryInterceptionHookPoint( + experimental::InterceptionHookPoints type) override { + if (type == experimental::InterceptionHookPoints::PRE_SEND_CANCEL) { + return true; + } else { + return false; + } + } + + void Proceed() override { + // This is a no-op. For actual continuation of the RPC simply needs to + // return from the Intercept method + } + + void Hijack() override { + // Only the client can hijack when sending down initial metadata + GPR_CODEGEN_ASSERT(false && + "It is illegal to call Hijack on a method which has a " + "Cancel notification"); + } + + ByteBuffer* GetSendMessage() override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetSendMessage on a method which " + "has a Cancel notification"); + return nullptr; + } + + std::multimap<grpc::string, grpc::string>* GetSendInitialMetadata() override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetSendInitialMetadata on a " + "method which has a Cancel notification"); + return nullptr; + } + + Status GetSendStatus() override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetSendStatus on a method which " + "has a Cancel notification"); + return Status(); + } + + void ModifySendStatus(const Status& status) override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call ModifySendStatus on a method " + "which has a Cancel notification"); + return; + } + + std::multimap<grpc::string, grpc::string>* GetSendTrailingMetadata() + override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetSendTrailingMetadata on a " + "method which has a Cancel notification"); + return nullptr; + } + + void* GetRecvMessage() override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetRecvMessage on a method which " + "has a Cancel notification"); + return nullptr; + } + + std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvInitialMetadata() + override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetRecvInitialMetadata on a " + "method which has a Cancel notification"); + return nullptr; + } + + Status* GetRecvStatus() override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetRecvStatus on a method which " + "has a Cancel notification"); + return nullptr; + } + + std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvTrailingMetadata() + override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetRecvTrailingMetadata on a " + "method which has a Cancel notification"); + return nullptr; + } + + std::unique_ptr<ChannelInterface> GetInterceptedChannel() override { + GPR_CODEGEN_ASSERT(false && + "It is illegal to call GetInterceptedChannel on a " + "method which has a Cancel notification"); + return std::unique_ptr<ChannelInterface>(nullptr); + } +}; } // namespace internal } // namespace grpc |