aboutsummaryrefslogtreecommitdiffhomepage
path: root/include
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-11-06 14:56:24 -0800
committerGravatar GitHub <noreply@github.com>2018-11-06 14:56:24 -0800
commit193b4b57ec8ec1509a91069f8cdd1ba1917b2d71 (patch)
tree6e346ecbad7f8b579c585b3619123ce17f5f9380 /include
parent385fde64fc2e3b0241229e4a018ea0989a2a00bc (diff)
parentded9434e4c23e6be69feeda9ca05b31bceb5e5f9 (diff)
Merge pull request #17072 from yashykt/interceptor_cancellation
Interceptors should see a Cancellation notification
Diffstat (limited to 'include')
-rw-r--r--include/grpcpp/impl/codegen/call_op_set.h55
-rw-r--r--include/grpcpp/impl/codegen/client_context.h2
-rw-r--r--include/grpcpp/impl/codegen/interceptor.h9
-rw-r--r--include/grpcpp/impl/codegen/interceptor_common.h157
4 files changed, 153 insertions, 70 deletions
diff --git a/include/grpcpp/impl/codegen/call_op_set.h b/include/grpcpp/impl/codegen/call_op_set.h
index 5c52b027b2..b4c34a01c9 100644
--- a/include/grpcpp/impl/codegen/call_op_set.h
+++ b/include/grpcpp/impl/codegen/call_op_set.h
@@ -214,11 +214,10 @@ class CallNoOp {
void AddOp(grpc_op* ops, size_t* nops) {}
void FinishOp(bool* status) {}
void SetInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {}
+ InterceptorBatchMethodsImpl* interceptor_methods) {}
void SetFinishInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {}
- void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) {
- }
+ InterceptorBatchMethodsImpl* interceptor_methods) {}
+ void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {}
};
class CallOpSendInitialMetadata {
@@ -265,7 +264,7 @@ class CallOpSendInitialMetadata {
}
void SetInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (!send_) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA);
@@ -273,9 +272,9 @@ class CallOpSendInitialMetadata {
}
void SetFinishInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {}
+ InterceptorBatchMethodsImpl* interceptor_methods) {}
- void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) {
+ void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
}
@@ -318,7 +317,7 @@ class CallOpSendMessage {
void FinishOp(bool* status) { send_buf_.Clear(); }
void SetInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (!send_buf_.Valid()) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_MESSAGE);
@@ -326,9 +325,9 @@ class CallOpSendMessage {
}
void SetFinishInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {}
+ InterceptorBatchMethodsImpl* interceptor_methods) {}
- void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) {
+ void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
}
@@ -406,17 +405,17 @@ class CallOpRecvMessage {
}
void SetInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
interceptor_methods->SetRecvMessage(message_);
}
void SetFinishInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (!got_message) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
}
- void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) {
+ void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
if (message_ == nullptr) return;
interceptor_methods->AddInterceptionHookPoint(
@@ -501,17 +500,17 @@ class CallOpGenericRecvMessage {
}
void SetInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
interceptor_methods->SetRecvMessage(message_);
}
void SetFinishInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (!got_message) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
}
- void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) {
+ void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
if (!deserialize_) return;
interceptor_methods->AddInterceptionHookPoint(
@@ -543,16 +542,16 @@ class CallOpClientSendClose {
void FinishOp(bool* status) { send_ = false; }
void SetInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (!send_) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_CLOSE);
}
void SetFinishInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {}
+ InterceptorBatchMethodsImpl* interceptor_methods) {}
- void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) {
+ void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
}
@@ -600,7 +599,7 @@ class CallOpServerSendStatus {
}
void SetInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (!send_status_available_) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_STATUS);
@@ -610,9 +609,9 @@ class CallOpServerSendStatus {
}
void SetFinishInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {}
+ InterceptorBatchMethodsImpl* interceptor_methods) {}
- void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) {
+ void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
}
@@ -652,19 +651,19 @@ class CallOpRecvInitialMetadata {
}
void SetInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
interceptor_methods->SetRecvInitialMetadata(metadata_map_);
}
void SetFinishInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (metadata_map_ == nullptr) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA);
metadata_map_ = nullptr;
}
- void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) {
+ void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
if (metadata_map_ == nullptr) return;
interceptor_methods->AddInterceptionHookPoint(
@@ -720,20 +719,20 @@ class CallOpClientRecvStatus {
}
void SetInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
interceptor_methods->SetRecvStatus(recv_status_);
interceptor_methods->SetRecvTrailingMetadata(metadata_map_);
}
void SetFinishInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (recv_status_ == nullptr) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_STATUS);
recv_status_ = nullptr;
}
- void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) {
+ void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
if (recv_status_ == nullptr) return;
interceptor_methods->AddInterceptionHookPoint(
diff --git a/include/grpcpp/impl/codegen/client_context.h b/include/grpcpp/impl/codegen/client_context.h
index f53b744dcf..75b955e760 100644
--- a/include/grpcpp/impl/codegen/client_context.h
+++ b/include/grpcpp/impl/codegen/client_context.h
@@ -426,6 +426,8 @@ class ClientContext {
grpc::string authority() { return authority_; }
+ void SendCancelToInterceptors();
+
bool initial_metadata_received_;
bool wait_for_ready_;
bool wait_for_ready_explicitly_set_;
diff --git a/include/grpcpp/impl/codegen/interceptor.h b/include/grpcpp/impl/codegen/interceptor.h
index 15cab711e5..19f6afcb72 100644
--- a/include/grpcpp/impl/codegen/interceptor.h
+++ b/include/grpcpp/impl/codegen/interceptor.h
@@ -56,6 +56,11 @@ enum class InterceptionHookPoints {
POST_RECV_MESSAGE,
POST_RECV_STATUS /* client only */,
POST_RECV_CLOSE /* server only */,
+ /* This is a special hook point available to both clients and servers when
+ TryCancel() is performed. It is illegal for an interceptor to block/delay
+ this operation. ALL interceptors see this hook point irrespective of
+ whether the RPC was hijacked or not. */
+ PRE_SEND_CANCEL,
NUM_INTERCEPTION_HOOKS
};
@@ -66,7 +71,9 @@ class InterceptorBatchMethods {
// of type \a type
virtual bool QueryInterceptionHookPoint(InterceptionHookPoints type) = 0;
// Calling this will signal that the interceptor is done intercepting the
- // current batch of the RPC
+ // current batch of the RPC.
+ // Proceed is a no-op if the batch contains PRE_SEND_CANCEL. Simply returning
+ // from the Intercept method does the job of continuing the RPC in this case.
virtual void Proceed() = 0;
// Calling this indicates that the interceptor has hijacked the RPC (only
// valid if the batch contains send_initial_metadata on the client side)
diff --git a/include/grpcpp/impl/codegen/interceptor_common.h b/include/grpcpp/impl/codegen/interceptor_common.h
index cf564977f6..d0aa23cb0a 100644
--- a/include/grpcpp/impl/codegen/interceptor_common.h
+++ b/include/grpcpp/impl/codegen/interceptor_common.h
@@ -19,7 +19,13 @@
#ifndef GRPCPP_IMPL_CODEGEN_INTERCEPTOR_COMMON_H
#define GRPCPP_IMPL_CODEGEN_INTERCEPTOR_COMMON_H
+#include <array>
+#include <functional>
+
+#include <grpcpp/impl/codegen/call.h>
+#include <grpcpp/impl/codegen/call_op_set_interface.h>
#include <grpcpp/impl/codegen/client_interceptor.h>
+#include <grpcpp/impl/codegen/intercepted_channel.h>
#include <grpcpp/impl/codegen/server_interceptor.h>
#include <grpc/impl/codegen/grpc_types.h>
@@ -27,38 +33,9 @@
namespace grpc {
namespace internal {
-/// Internal methods for setting the state
-class InternalInterceptorBatchMethods
+class InterceptorBatchMethodsImpl
: public experimental::InterceptorBatchMethods {
public:
- virtual ~InternalInterceptorBatchMethods() {}
-
- virtual void AddInterceptionHookPoint(
- experimental::InterceptionHookPoints type) = 0;
-
- virtual void SetSendMessage(ByteBuffer* buf) = 0;
-
- virtual void SetSendInitialMetadata(
- std::multimap<grpc::string, grpc::string>* metadata) = 0;
-
- virtual void SetSendStatus(grpc_status_code* code,
- grpc::string* error_details,
- grpc::string* error_message) = 0;
-
- virtual void SetSendTrailingMetadata(
- std::multimap<grpc::string, grpc::string>* metadata) = 0;
-
- virtual void SetRecvMessage(void* message) = 0;
-
- virtual void SetRecvInitialMetadata(MetadataMap* map) = 0;
-
- virtual void SetRecvStatus(Status* status) = 0;
-
- virtual void SetRecvTrailingMetadata(MetadataMap* map) = 0;
-};
-
-class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods {
- public:
InterceptorBatchMethodsImpl() {
for (auto i = static_cast<experimental::InterceptionHookPoints>(0);
i < experimental::InterceptionHookPoints::NUM_INTERCEPTION_HOOKS;
@@ -75,7 +52,7 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods {
return hooks_[static_cast<size_t>(type)];
}
- void Proceed() override { /* fill this */
+ void Proceed() override {
if (call_->client_rpc_info() != nullptr) {
return ProceedClient();
}
@@ -98,8 +75,7 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods {
rpc_info->RunInterceptor(this, current_interceptor_index_);
}
- void AddInterceptionHookPoint(
- experimental::InterceptionHookPoints type) override {
+ void AddInterceptionHookPoint(experimental::InterceptionHookPoints type) {
hooks_[static_cast<size_t>(type)] = true;
}
@@ -139,34 +115,34 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods {
return recv_trailing_metadata_->map();
}
- void SetSendMessage(ByteBuffer* buf) override { send_message_ = buf; }
+ void SetSendMessage(ByteBuffer* buf) { send_message_ = buf; }
void SetSendInitialMetadata(
- std::multimap<grpc::string, grpc::string>* metadata) override {
+ std::multimap<grpc::string, grpc::string>* metadata) {
send_initial_metadata_ = metadata;
}
void SetSendStatus(grpc_status_code* code, grpc::string* error_details,
- grpc::string* error_message) override {
+ grpc::string* error_message) {
code_ = code;
error_details_ = error_details;
error_message_ = error_message;
}
void SetSendTrailingMetadata(
- std::multimap<grpc::string, grpc::string>* metadata) override {
+ std::multimap<grpc::string, grpc::string>* metadata) {
send_trailing_metadata_ = metadata;
}
- void SetRecvMessage(void* message) override { recv_message_ = message; }
+ void SetRecvMessage(void* message) { recv_message_ = message; }
- void SetRecvInitialMetadata(MetadataMap* map) override {
+ void SetRecvInitialMetadata(MetadataMap* map) {
recv_initial_metadata_ = map;
}
- void SetRecvStatus(Status* status) override { recv_status_ = status; }
+ void SetRecvStatus(Status* status) { recv_status_ = status; }
- void SetRecvTrailingMetadata(MetadataMap* map) override {
+ void SetRecvTrailingMetadata(MetadataMap* map) {
recv_trailing_metadata_ = map;
}
@@ -377,6 +353,105 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods {
MetadataMap* recv_trailing_metadata_ = nullptr;
};
+// A special implementation of InterceptorBatchMethods to send a Cancel
+// notification down the interceptor stack
+class CancelInterceptorBatchMethods
+ : public experimental::InterceptorBatchMethods {
+ public:
+ bool QueryInterceptionHookPoint(
+ experimental::InterceptionHookPoints type) override {
+ if (type == experimental::InterceptionHookPoints::PRE_SEND_CANCEL) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void Proceed() override {
+ // This is a no-op. For actual continuation of the RPC simply needs to
+ // return from the Intercept method
+ }
+
+ void Hijack() override {
+ // Only the client can hijack when sending down initial metadata
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call Hijack on a method which has a "
+ "Cancel notification");
+ }
+
+ ByteBuffer* GetSendMessage() override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call GetSendMessage on a method which "
+ "has a Cancel notification");
+ return nullptr;
+ }
+
+ std::multimap<grpc::string, grpc::string>* GetSendInitialMetadata() override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call GetSendInitialMetadata on a "
+ "method which has a Cancel notification");
+ return nullptr;
+ }
+
+ Status GetSendStatus() override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call GetSendStatus on a method which "
+ "has a Cancel notification");
+ return Status();
+ }
+
+ void ModifySendStatus(const Status& status) override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call ModifySendStatus on a method "
+ "which has a Cancel notification");
+ return;
+ }
+
+ std::multimap<grpc::string, grpc::string>* GetSendTrailingMetadata()
+ override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call GetSendTrailingMetadata on a "
+ "method which has a Cancel notification");
+ return nullptr;
+ }
+
+ void* GetRecvMessage() override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call GetRecvMessage on a method which "
+ "has a Cancel notification");
+ return nullptr;
+ }
+
+ std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvInitialMetadata()
+ override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call GetRecvInitialMetadata on a "
+ "method which has a Cancel notification");
+ return nullptr;
+ }
+
+ Status* GetRecvStatus() override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call GetRecvStatus on a method which "
+ "has a Cancel notification");
+ return nullptr;
+ }
+
+ std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvTrailingMetadata()
+ override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call GetRecvTrailingMetadata on a "
+ "method which has a Cancel notification");
+ return nullptr;
+ }
+
+ std::unique_ptr<ChannelInterface> GetInterceptedChannel() override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call GetInterceptedChannel on a "
+ "method which has a Cancel notification");
+ return std::unique_ptr<ChannelInterface>(nullptr);
+ }
+};
} // namespace internal
} // namespace grpc