aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-11-06 14:56:24 -0800
committerGravatar GitHub <noreply@github.com>2018-11-06 14:56:24 -0800
commit193b4b57ec8ec1509a91069f8cdd1ba1917b2d71 (patch)
tree6e346ecbad7f8b579c585b3619123ce17f5f9380
parent385fde64fc2e3b0241229e4a018ea0989a2a00bc (diff)
parentded9434e4c23e6be69feeda9ca05b31bceb5e5f9 (diff)
Merge pull request #17072 from yashykt/interceptor_cancellation
Interceptors should see a Cancellation notification
-rw-r--r--CMakeLists.txt3
-rw-r--r--Makefile9
-rw-r--r--build.yaml5
-rw-r--r--include/grpcpp/impl/codegen/call_op_set.h55
-rw-r--r--include/grpcpp/impl/codegen/client_context.h2
-rw-r--r--include/grpcpp/impl/codegen/interceptor.h9
-rw-r--r--include/grpcpp/impl/codegen/interceptor_common.h157
-rw-r--r--src/cpp/client/channel_cc.cc6
-rw-r--r--src/cpp/client/client_context.cc12
-rw-r--r--src/cpp/server/server_context.cc6
-rw-r--r--test/cpp/end2end/BUILD2
-rw-r--r--test/cpp/end2end/client_interceptors_end2end_test.cc121
-rw-r--r--test/cpp/end2end/end2end_test.cc83
-rw-r--r--test/cpp/end2end/interceptors_util.cc151
-rw-r--r--test/cpp/end2end/interceptors_util.h175
-rw-r--r--test/cpp/end2end/server_interceptors_end2end_test.cc45
-rw-r--r--tools/run_tests/generated/sources_and_headers.json10
17 files changed, 536 insertions, 315 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index ff5461b30e..09d625fb02 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -12445,6 +12445,7 @@ if (gRPC_BUILD_TESTS)
add_executable(client_interceptors_end2end_test
test/cpp/end2end/client_interceptors_end2end_test.cc
+ test/cpp/end2end/interceptors_util.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
@@ -12898,6 +12899,7 @@ if (gRPC_BUILD_TESTS)
add_executable(end2end_test
test/cpp/end2end/end2end_test.cc
+ test/cpp/end2end/interceptors_util.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
@@ -15355,6 +15357,7 @@ endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(server_interceptors_end2end_test
+ test/cpp/end2end/interceptors_util.cc
test/cpp/end2end/server_interceptors_end2end_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
diff --git a/Makefile b/Makefile
index 92db4cdb71..c4ba5fb83e 100644
--- a/Makefile
+++ b/Makefile
@@ -17327,6 +17327,7 @@ endif
CLIENT_INTERCEPTORS_END2END_TEST_SRC = \
test/cpp/end2end/client_interceptors_end2end_test.cc \
+ test/cpp/end2end/interceptors_util.cc \
CLIENT_INTERCEPTORS_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CLIENT_INTERCEPTORS_END2END_TEST_SRC))))
ifeq ($(NO_SECURE),true)
@@ -17359,6 +17360,8 @@ endif
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/client_interceptors_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+$(OBJDIR)/$(CONFIG)/test/cpp/end2end/interceptors_util.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
deps_client_interceptors_end2end_test: $(CLIENT_INTERCEPTORS_END2END_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
@@ -17762,6 +17765,7 @@ endif
END2END_TEST_SRC = \
test/cpp/end2end/end2end_test.cc \
+ test/cpp/end2end/interceptors_util.cc \
END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(END2END_TEST_SRC))))
ifeq ($(NO_SECURE),true)
@@ -17794,6 +17798,8 @@ endif
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+$(OBJDIR)/$(CONFIG)/test/cpp/end2end/interceptors_util.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
deps_end2end_test: $(END2END_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
@@ -20154,6 +20160,7 @@ endif
SERVER_INTERCEPTORS_END2END_TEST_SRC = \
+ test/cpp/end2end/interceptors_util.cc \
test/cpp/end2end/server_interceptors_end2end_test.cc \
SERVER_INTERCEPTORS_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(SERVER_INTERCEPTORS_END2END_TEST_SRC))))
@@ -20185,6 +20192,8 @@ endif
endif
+$(OBJDIR)/$(CONFIG)/test/cpp/end2end/interceptors_util.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/server_interceptors_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_server_interceptors_end2end_test: $(SERVER_INTERCEPTORS_END2END_TEST_OBJS:.o=.dep)
diff --git a/build.yaml b/build.yaml
index 3fa36791cf..3ee13a7dec 100644
--- a/build.yaml
+++ b/build.yaml
@@ -4540,6 +4540,7 @@ targets:
- test/cpp/end2end/interceptors_util.h
src:
- test/cpp/end2end/client_interceptors_end2end_test.cc
+ - test/cpp/end2end/interceptors_util.cc
deps:
- grpc++_test_util
- grpc_test_util
@@ -4666,8 +4667,11 @@ targets:
cpu_cost: 0.5
build: test
language: c++
+ headers:
+ - test/cpp/end2end/interceptors_util.h
src:
- test/cpp/end2end/end2end_test.cc
+ - test/cpp/end2end/interceptors_util.cc
deps:
- grpc++_test_util
- grpc_test_util
@@ -5467,6 +5471,7 @@ targets:
headers:
- test/cpp/end2end/interceptors_util.h
src:
+ - test/cpp/end2end/interceptors_util.cc
- test/cpp/end2end/server_interceptors_end2end_test.cc
deps:
- grpc++_test_util
diff --git a/include/grpcpp/impl/codegen/call_op_set.h b/include/grpcpp/impl/codegen/call_op_set.h
index 5c52b027b2..b4c34a01c9 100644
--- a/include/grpcpp/impl/codegen/call_op_set.h
+++ b/include/grpcpp/impl/codegen/call_op_set.h
@@ -214,11 +214,10 @@ class CallNoOp {
void AddOp(grpc_op* ops, size_t* nops) {}
void FinishOp(bool* status) {}
void SetInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {}
+ InterceptorBatchMethodsImpl* interceptor_methods) {}
void SetFinishInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {}
- void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) {
- }
+ InterceptorBatchMethodsImpl* interceptor_methods) {}
+ void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {}
};
class CallOpSendInitialMetadata {
@@ -265,7 +264,7 @@ class CallOpSendInitialMetadata {
}
void SetInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (!send_) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA);
@@ -273,9 +272,9 @@ class CallOpSendInitialMetadata {
}
void SetFinishInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {}
+ InterceptorBatchMethodsImpl* interceptor_methods) {}
- void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) {
+ void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
}
@@ -318,7 +317,7 @@ class CallOpSendMessage {
void FinishOp(bool* status) { send_buf_.Clear(); }
void SetInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (!send_buf_.Valid()) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_MESSAGE);
@@ -326,9 +325,9 @@ class CallOpSendMessage {
}
void SetFinishInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {}
+ InterceptorBatchMethodsImpl* interceptor_methods) {}
- void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) {
+ void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
}
@@ -406,17 +405,17 @@ class CallOpRecvMessage {
}
void SetInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
interceptor_methods->SetRecvMessage(message_);
}
void SetFinishInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (!got_message) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
}
- void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) {
+ void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
if (message_ == nullptr) return;
interceptor_methods->AddInterceptionHookPoint(
@@ -501,17 +500,17 @@ class CallOpGenericRecvMessage {
}
void SetInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
interceptor_methods->SetRecvMessage(message_);
}
void SetFinishInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (!got_message) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
}
- void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) {
+ void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
if (!deserialize_) return;
interceptor_methods->AddInterceptionHookPoint(
@@ -543,16 +542,16 @@ class CallOpClientSendClose {
void FinishOp(bool* status) { send_ = false; }
void SetInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (!send_) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_CLOSE);
}
void SetFinishInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {}
+ InterceptorBatchMethodsImpl* interceptor_methods) {}
- void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) {
+ void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
}
@@ -600,7 +599,7 @@ class CallOpServerSendStatus {
}
void SetInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (!send_status_available_) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_STATUS);
@@ -610,9 +609,9 @@ class CallOpServerSendStatus {
}
void SetFinishInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {}
+ InterceptorBatchMethodsImpl* interceptor_methods) {}
- void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) {
+ void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
}
@@ -652,19 +651,19 @@ class CallOpRecvInitialMetadata {
}
void SetInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
interceptor_methods->SetRecvInitialMetadata(metadata_map_);
}
void SetFinishInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (metadata_map_ == nullptr) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA);
metadata_map_ = nullptr;
}
- void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) {
+ void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
if (metadata_map_ == nullptr) return;
interceptor_methods->AddInterceptionHookPoint(
@@ -720,20 +719,20 @@ class CallOpClientRecvStatus {
}
void SetInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
interceptor_methods->SetRecvStatus(recv_status_);
interceptor_methods->SetRecvTrailingMetadata(metadata_map_);
}
void SetFinishInterceptionHookPoint(
- InternalInterceptorBatchMethods* interceptor_methods) {
+ InterceptorBatchMethodsImpl* interceptor_methods) {
if (recv_status_ == nullptr) return;
interceptor_methods->AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_STATUS);
recv_status_ = nullptr;
}
- void SetHijackingState(InternalInterceptorBatchMethods* interceptor_methods) {
+ void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
hijacked_ = true;
if (recv_status_ == nullptr) return;
interceptor_methods->AddInterceptionHookPoint(
diff --git a/include/grpcpp/impl/codegen/client_context.h b/include/grpcpp/impl/codegen/client_context.h
index f53b744dcf..75b955e760 100644
--- a/include/grpcpp/impl/codegen/client_context.h
+++ b/include/grpcpp/impl/codegen/client_context.h
@@ -426,6 +426,8 @@ class ClientContext {
grpc::string authority() { return authority_; }
+ void SendCancelToInterceptors();
+
bool initial_metadata_received_;
bool wait_for_ready_;
bool wait_for_ready_explicitly_set_;
diff --git a/include/grpcpp/impl/codegen/interceptor.h b/include/grpcpp/impl/codegen/interceptor.h
index 15cab711e5..19f6afcb72 100644
--- a/include/grpcpp/impl/codegen/interceptor.h
+++ b/include/grpcpp/impl/codegen/interceptor.h
@@ -56,6 +56,11 @@ enum class InterceptionHookPoints {
POST_RECV_MESSAGE,
POST_RECV_STATUS /* client only */,
POST_RECV_CLOSE /* server only */,
+ /* This is a special hook point available to both clients and servers when
+ TryCancel() is performed. It is illegal for an interceptor to block/delay
+ this operation. ALL interceptors see this hook point irrespective of
+ whether the RPC was hijacked or not. */
+ PRE_SEND_CANCEL,
NUM_INTERCEPTION_HOOKS
};
@@ -66,7 +71,9 @@ class InterceptorBatchMethods {
// of type \a type
virtual bool QueryInterceptionHookPoint(InterceptionHookPoints type) = 0;
// Calling this will signal that the interceptor is done intercepting the
- // current batch of the RPC
+ // current batch of the RPC.
+ // Proceed is a no-op if the batch contains PRE_SEND_CANCEL. Simply returning
+ // from the Intercept method does the job of continuing the RPC in this case.
virtual void Proceed() = 0;
// Calling this indicates that the interceptor has hijacked the RPC (only
// valid if the batch contains send_initial_metadata on the client side)
diff --git a/include/grpcpp/impl/codegen/interceptor_common.h b/include/grpcpp/impl/codegen/interceptor_common.h
index cf564977f6..d0aa23cb0a 100644
--- a/include/grpcpp/impl/codegen/interceptor_common.h
+++ b/include/grpcpp/impl/codegen/interceptor_common.h
@@ -19,7 +19,13 @@
#ifndef GRPCPP_IMPL_CODEGEN_INTERCEPTOR_COMMON_H
#define GRPCPP_IMPL_CODEGEN_INTERCEPTOR_COMMON_H
+#include <array>
+#include <functional>
+
+#include <grpcpp/impl/codegen/call.h>
+#include <grpcpp/impl/codegen/call_op_set_interface.h>
#include <grpcpp/impl/codegen/client_interceptor.h>
+#include <grpcpp/impl/codegen/intercepted_channel.h>
#include <grpcpp/impl/codegen/server_interceptor.h>
#include <grpc/impl/codegen/grpc_types.h>
@@ -27,38 +33,9 @@
namespace grpc {
namespace internal {
-/// Internal methods for setting the state
-class InternalInterceptorBatchMethods
+class InterceptorBatchMethodsImpl
: public experimental::InterceptorBatchMethods {
public:
- virtual ~InternalInterceptorBatchMethods() {}
-
- virtual void AddInterceptionHookPoint(
- experimental::InterceptionHookPoints type) = 0;
-
- virtual void SetSendMessage(ByteBuffer* buf) = 0;
-
- virtual void SetSendInitialMetadata(
- std::multimap<grpc::string, grpc::string>* metadata) = 0;
-
- virtual void SetSendStatus(grpc_status_code* code,
- grpc::string* error_details,
- grpc::string* error_message) = 0;
-
- virtual void SetSendTrailingMetadata(
- std::multimap<grpc::string, grpc::string>* metadata) = 0;
-
- virtual void SetRecvMessage(void* message) = 0;
-
- virtual void SetRecvInitialMetadata(MetadataMap* map) = 0;
-
- virtual void SetRecvStatus(Status* status) = 0;
-
- virtual void SetRecvTrailingMetadata(MetadataMap* map) = 0;
-};
-
-class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods {
- public:
InterceptorBatchMethodsImpl() {
for (auto i = static_cast<experimental::InterceptionHookPoints>(0);
i < experimental::InterceptionHookPoints::NUM_INTERCEPTION_HOOKS;
@@ -75,7 +52,7 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods {
return hooks_[static_cast<size_t>(type)];
}
- void Proceed() override { /* fill this */
+ void Proceed() override {
if (call_->client_rpc_info() != nullptr) {
return ProceedClient();
}
@@ -98,8 +75,7 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods {
rpc_info->RunInterceptor(this, current_interceptor_index_);
}
- void AddInterceptionHookPoint(
- experimental::InterceptionHookPoints type) override {
+ void AddInterceptionHookPoint(experimental::InterceptionHookPoints type) {
hooks_[static_cast<size_t>(type)] = true;
}
@@ -139,34 +115,34 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods {
return recv_trailing_metadata_->map();
}
- void SetSendMessage(ByteBuffer* buf) override { send_message_ = buf; }
+ void SetSendMessage(ByteBuffer* buf) { send_message_ = buf; }
void SetSendInitialMetadata(
- std::multimap<grpc::string, grpc::string>* metadata) override {
+ std::multimap<grpc::string, grpc::string>* metadata) {
send_initial_metadata_ = metadata;
}
void SetSendStatus(grpc_status_code* code, grpc::string* error_details,
- grpc::string* error_message) override {
+ grpc::string* error_message) {
code_ = code;
error_details_ = error_details;
error_message_ = error_message;
}
void SetSendTrailingMetadata(
- std::multimap<grpc::string, grpc::string>* metadata) override {
+ std::multimap<grpc::string, grpc::string>* metadata) {
send_trailing_metadata_ = metadata;
}
- void SetRecvMessage(void* message) override { recv_message_ = message; }
+ void SetRecvMessage(void* message) { recv_message_ = message; }
- void SetRecvInitialMetadata(MetadataMap* map) override {
+ void SetRecvInitialMetadata(MetadataMap* map) {
recv_initial_metadata_ = map;
}
- void SetRecvStatus(Status* status) override { recv_status_ = status; }
+ void SetRecvStatus(Status* status) { recv_status_ = status; }
- void SetRecvTrailingMetadata(MetadataMap* map) override {
+ void SetRecvTrailingMetadata(MetadataMap* map) {
recv_trailing_metadata_ = map;
}
@@ -377,6 +353,105 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods {
MetadataMap* recv_trailing_metadata_ = nullptr;
};
+// A special implementation of InterceptorBatchMethods to send a Cancel
+// notification down the interceptor stack
+class CancelInterceptorBatchMethods
+ : public experimental::InterceptorBatchMethods {
+ public:
+ bool QueryInterceptionHookPoint(
+ experimental::InterceptionHookPoints type) override {
+ if (type == experimental::InterceptionHookPoints::PRE_SEND_CANCEL) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void Proceed() override {
+ // This is a no-op. For actual continuation of the RPC simply needs to
+ // return from the Intercept method
+ }
+
+ void Hijack() override {
+ // Only the client can hijack when sending down initial metadata
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call Hijack on a method which has a "
+ "Cancel notification");
+ }
+
+ ByteBuffer* GetSendMessage() override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call GetSendMessage on a method which "
+ "has a Cancel notification");
+ return nullptr;
+ }
+
+ std::multimap<grpc::string, grpc::string>* GetSendInitialMetadata() override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call GetSendInitialMetadata on a "
+ "method which has a Cancel notification");
+ return nullptr;
+ }
+
+ Status GetSendStatus() override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call GetSendStatus on a method which "
+ "has a Cancel notification");
+ return Status();
+ }
+
+ void ModifySendStatus(const Status& status) override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call ModifySendStatus on a method "
+ "which has a Cancel notification");
+ return;
+ }
+
+ std::multimap<grpc::string, grpc::string>* GetSendTrailingMetadata()
+ override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call GetSendTrailingMetadata on a "
+ "method which has a Cancel notification");
+ return nullptr;
+ }
+
+ void* GetRecvMessage() override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call GetRecvMessage on a method which "
+ "has a Cancel notification");
+ return nullptr;
+ }
+
+ std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvInitialMetadata()
+ override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call GetRecvInitialMetadata on a "
+ "method which has a Cancel notification");
+ return nullptr;
+ }
+
+ Status* GetRecvStatus() override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call GetRecvStatus on a method which "
+ "has a Cancel notification");
+ return nullptr;
+ }
+
+ std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvTrailingMetadata()
+ override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call GetRecvTrailingMetadata on a "
+ "method which has a Cancel notification");
+ return nullptr;
+ }
+
+ std::unique_ptr<ChannelInterface> GetInterceptedChannel() override {
+ GPR_CODEGEN_ASSERT(false &&
+ "It is illegal to call GetInterceptedChannel on a "
+ "method which has a Cancel notification");
+ return std::unique_ptr<ChannelInterface>(nullptr);
+ }
+};
} // namespace internal
} // namespace grpc
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index 6ef7df4ea0..8e1cea0269 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -147,10 +147,14 @@ internal::Call Channel::CreateCallInternal(const internal::RpcMethod& method,
}
}
grpc_census_call_set_context(c_call, context->census_context());
- context->set_call(c_call, shared_from_this());
+ // ClientRpcInfo should be set before call because set_call also checks
+ // whether the call has been cancelled, and if the call was cancelled, we
+ // should notify the interceptors too/
auto* info = context->set_client_rpc_info(
method.name(), this, interceptor_creators_, interceptor_pos);
+ context->set_call(c_call, shared_from_this());
+
return internal::Call(c_call, this, cq, info);
}
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index 07a04e4268..50da75f09c 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -24,6 +24,7 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include <grpcpp/impl/codegen/interceptor_common.h>
#include <grpcpp/impl/grpc_library.h>
#include <grpcpp/security/credentials.h>
#include <grpcpp/server_context.h>
@@ -86,10 +87,13 @@ void ClientContext::set_call(grpc_call* call,
call_ = call;
channel_ = channel;
if (creds_ && !creds_->ApplyToCall(call_)) {
+ // TODO(yashykt): should interceptors also see this status?
+ SendCancelToInterceptors();
grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED,
"Failed to set credentials to rpc.", nullptr);
}
if (call_canceled_) {
+ SendCancelToInterceptors();
grpc_call_cancel(call_, nullptr);
}
}
@@ -110,12 +114,20 @@ void ClientContext::set_compression_algorithm(
void ClientContext::TryCancel() {
std::unique_lock<std::mutex> lock(mu_);
if (call_) {
+ SendCancelToInterceptors();
grpc_call_cancel(call_, nullptr);
} else {
call_canceled_ = true;
}
}
+void ClientContext::SendCancelToInterceptors() {
+ internal::CancelInterceptorBatchMethods cancel_methods;
+ for (size_t i = 0; i < rpc_info_.interceptors_.size(); i++) {
+ rpc_info_.RunInterceptor(&cancel_methods, i);
+ }
+}
+
grpc::string ClientContext::peer() const {
grpc::string peer;
if (call_) {
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 19e6cfb0b4..396996e5bc 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -294,6 +294,12 @@ void ServerContext::AddTrailingMetadata(const grpc::string& key,
}
void ServerContext::TryCancel() const {
+ internal::CancelInterceptorBatchMethods cancel_methods;
+ if (rpc_info_) {
+ for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
+ rpc_info_->RunInterceptor(&cancel_methods, i);
+ }
+ }
grpc_call_error err = grpc_call_cancel_with_status(
call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", nullptr);
if (err != GRPC_CALL_OK) {
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD
index 235249e8bf..4e3d841db0 100644
--- a/test/cpp/end2end/BUILD
+++ b/test/cpp/end2end/BUILD
@@ -38,6 +38,7 @@ grpc_cc_library(
grpc_cc_library(
name = "interceptors_util",
testonly = True,
+ srcs = ["interceptors_util.cc"],
hdrs = ["interceptors_util.h"],
external_deps = [
"gtest",
@@ -158,6 +159,7 @@ grpc_cc_library(
"gtest",
],
deps = [
+ ":interceptors_util",
":test_service_impl",
"//:gpr",
"//:grpc",
diff --git a/test/cpp/end2end/client_interceptors_end2end_test.cc b/test/cpp/end2end/client_interceptors_end2end_test.cc
index 205f64c0f2..0b34ec93ae 100644
--- a/test/cpp/end2end/client_interceptors_end2end_test.cc
+++ b/test/cpp/end2end/client_interceptors_end2end_test.cc
@@ -43,89 +43,6 @@ namespace grpc {
namespace testing {
namespace {
-class ClientInterceptorsStreamingEnd2endTest : public ::testing::Test {
- protected:
- ClientInterceptorsStreamingEnd2endTest() {
- int port = grpc_pick_unused_port_or_die();
-
- ServerBuilder builder;
- server_address_ = "localhost:" + std::to_string(port);
- builder.AddListeningPort(server_address_, InsecureServerCredentials());
- builder.RegisterService(&service_);
- server_ = builder.BuildAndStart();
- }
-
- ~ClientInterceptorsStreamingEnd2endTest() { server_->Shutdown(); }
-
- std::string server_address_;
- EchoTestServiceStreamingImpl service_;
- std::unique_ptr<Server> server_;
-};
-
-class ClientInterceptorsEnd2endTest : public ::testing::Test {
- protected:
- ClientInterceptorsEnd2endTest() {
- int port = grpc_pick_unused_port_or_die();
-
- ServerBuilder builder;
- server_address_ = "localhost:" + std::to_string(port);
- builder.AddListeningPort(server_address_, InsecureServerCredentials());
- builder.RegisterService(&service_);
- server_ = builder.BuildAndStart();
- }
-
- ~ClientInterceptorsEnd2endTest() { server_->Shutdown(); }
-
- std::string server_address_;
- TestServiceImpl service_;
- std::unique_ptr<Server> server_;
-};
-
-/* This interceptor does nothing. Just keeps a global count on the number of
- * times it was invoked. */
-class DummyInterceptor : public experimental::Interceptor {
- public:
- DummyInterceptor(experimental::ClientRpcInfo* info) {}
-
- virtual void Intercept(experimental::InterceptorBatchMethods* methods) {
- if (methods->QueryInterceptionHookPoint(
- experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
- num_times_run_++;
- } else if (methods->QueryInterceptionHookPoint(
- experimental::InterceptionHookPoints::
- POST_RECV_INITIAL_METADATA)) {
- num_times_run_reverse_++;
- }
- methods->Proceed();
- }
-
- static void Reset() {
- num_times_run_.store(0);
- num_times_run_reverse_.store(0);
- }
-
- static int GetNumTimesRun() {
- EXPECT_EQ(num_times_run_.load(), num_times_run_reverse_.load());
- return num_times_run_.load();
- }
-
- private:
- static std::atomic<int> num_times_run_;
- static std::atomic<int> num_times_run_reverse_;
-};
-
-std::atomic<int> DummyInterceptor::num_times_run_;
-std::atomic<int> DummyInterceptor::num_times_run_reverse_;
-
-class DummyInterceptorFactory
- : public experimental::ClientInterceptorFactoryInterface {
- public:
- virtual experimental::Interceptor* CreateClientInterceptor(
- experimental::ClientRpcInfo* info) override {
- return new DummyInterceptor(info);
- }
-};
-
/* Hijacks Echo RPC and fills in the expected values */
class HijackingInterceptor : public experimental::Interceptor {
public:
@@ -422,6 +339,25 @@ class LoggingInterceptorFactory
}
};
+class ClientInterceptorsEnd2endTest : public ::testing::Test {
+ protected:
+ ClientInterceptorsEnd2endTest() {
+ int port = grpc_pick_unused_port_or_die();
+
+ ServerBuilder builder;
+ server_address_ = "localhost:" + std::to_string(port);
+ builder.AddListeningPort(server_address_, InsecureServerCredentials());
+ builder.RegisterService(&service_);
+ server_ = builder.BuildAndStart();
+ }
+
+ ~ClientInterceptorsEnd2endTest() { server_->Shutdown(); }
+
+ std::string server_address_;
+ TestServiceImpl service_;
+ std::unique_ptr<Server> server_;
+};
+
TEST_F(ClientInterceptorsEnd2endTest, ClientInterceptorLoggingTest) {
ChannelArguments args;
DummyInterceptor::Reset();
@@ -538,6 +474,25 @@ TEST_F(ClientInterceptorsEnd2endTest,
EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
}
+class ClientInterceptorsStreamingEnd2endTest : public ::testing::Test {
+ protected:
+ ClientInterceptorsStreamingEnd2endTest() {
+ int port = grpc_pick_unused_port_or_die();
+
+ ServerBuilder builder;
+ server_address_ = "localhost:" + std::to_string(port);
+ builder.AddListeningPort(server_address_, InsecureServerCredentials());
+ builder.RegisterService(&service_);
+ server_ = builder.BuildAndStart();
+ }
+
+ ~ClientInterceptorsStreamingEnd2endTest() { server_->Shutdown(); }
+
+ std::string server_address_;
+ EchoTestServiceStreamingImpl service_;
+ std::unique_ptr<Server> server_;
+};
+
TEST_F(ClientInterceptorsStreamingEnd2endTest, ClientStreamingTest) {
ChannelArguments args;
DummyInterceptor::Reset();
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index fc07681535..4558437102 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -40,6 +40,7 @@
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
+#include "test/cpp/end2end/interceptors_util.h"
#include "test/cpp/end2end/test_service_impl.h"
#include "test/cpp/util/string_ref_helper.h"
#include "test/cpp/util/test_credentials_provider.h"
@@ -179,7 +180,7 @@ class Proxy : public ::grpc::testing::EchoTestService::Service {
}
private:
- std::unique_ptr< ::grpc::testing::EchoTestService::Stub> stub_;
+ std::unique_ptr<::grpc::testing::EchoTestService::Stub> stub_;
};
class TestServiceImplDupPkg
@@ -194,9 +195,14 @@ class TestServiceImplDupPkg
class TestScenario {
public:
- TestScenario(bool proxy, bool inproc_stub, const grpc::string& creds_type)
- : use_proxy(proxy), inproc(inproc_stub), credentials_type(creds_type) {}
+ TestScenario(bool interceptors, bool proxy, bool inproc_stub,
+ const grpc::string& creds_type)
+ : use_interceptors(interceptors),
+ use_proxy(proxy),
+ inproc(inproc_stub),
+ credentials_type(creds_type) {}
void Log() const;
+ bool use_interceptors;
bool use_proxy;
bool inproc;
const grpc::string credentials_type;
@@ -204,8 +210,9 @@ class TestScenario {
static std::ostream& operator<<(std::ostream& out,
const TestScenario& scenario) {
- return out << "TestScenario{use_proxy="
- << (scenario.use_proxy ? "true" : "false")
+ return out << "TestScenario{use_interceptors="
+ << (scenario.use_interceptors ? "true" : "false")
+ << ", use_proxy=" << (scenario.use_proxy ? "true" : "false")
<< ", inproc=" << (scenario.inproc ? "true" : "false")
<< ", credentials='" << scenario.credentials_type << "'}";
}
@@ -260,6 +267,17 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
if (GetParam().credentials_type != kInsecureCredentialsType) {
server_creds->SetAuthMetadataProcessor(processor);
}
+ if (GetParam().use_interceptors) {
+ std::vector<
+ std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
+ creators;
+ // Add 20 dummy server interceptors
+ for (auto i = 0; i < 20; i++) {
+ creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
+ new DummyInterceptorFactory()));
+ }
+ builder.experimental().SetInterceptorCreators(std::move(creators));
+ }
builder.AddListeningPort(server_address_.str(), server_creds);
builder.RegisterService(&service_);
builder.RegisterService("foo.test.youtube.com", &special_service_);
@@ -292,10 +310,21 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test");
if (!GetParam().inproc) {
- channel_ =
- CreateCustomChannel(server_address_.str(), channel_creds, args);
+ if (!GetParam().use_interceptors) {
+ channel_ =
+ CreateCustomChannel(server_address_.str(), channel_creds, args);
+ } else {
+ channel_ = CreateCustomChannelWithInterceptors(
+ server_address_.str(), channel_creds, args,
+ CreateDummyClientInterceptors());
+ }
} else {
- channel_ = server_->InProcessChannel(args);
+ if (!GetParam().use_interceptors) {
+ channel_ = server_->InProcessChannel(args);
+ } else {
+ channel_ = server_->experimental().InProcessChannelWithInterceptors(
+ args, CreateDummyClientInterceptors());
+ }
}
}
@@ -320,6 +349,7 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
}
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
+ DummyInterceptor::Reset();
}
bool is_server_started_;
@@ -376,6 +406,7 @@ class End2endServerTryCancelTest : public End2endTest {
// NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
void TestRequestStreamServerCancel(
ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) {
+ RestartServer(std::shared_ptr<AuthMetadataProcessor>());
ResetStub();
EchoRequest request;
EchoResponse response;
@@ -432,6 +463,10 @@ class End2endServerTryCancelTest : public End2endTest {
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+ // Make sure that the server interceptors were notified
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+ }
}
// Helper for testing server-streaming RPCs which are cancelled on the server.
@@ -449,6 +484,7 @@ class End2endServerTryCancelTest : public End2endTest {
// NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
void TestResponseStreamServerCancel(
ServerTryCancelRequestPhase server_try_cancel) {
+ RestartServer(std::shared_ptr<AuthMetadataProcessor>());
ResetStub();
EchoRequest request;
EchoResponse response;
@@ -508,7 +544,10 @@ class End2endServerTryCancelTest : public End2endTest {
}
EXPECT_FALSE(s.ok());
- EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+ // Make sure that the server interceptors were notified
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+ }
}
// Helper for testing bidirectional-streaming RPCs which are cancelled on the
@@ -526,6 +565,7 @@ class End2endServerTryCancelTest : public End2endTest {
// NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,
int num_messages) {
+ RestartServer(std::shared_ptr<AuthMetadataProcessor>());
ResetStub();
EchoRequest request;
EchoResponse response;
@@ -592,6 +632,10 @@ class End2endServerTryCancelTest : public End2endTest {
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+ // Make sure that the server interceptors were notified
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+ }
}
};
@@ -989,6 +1033,9 @@ TEST_P(End2endTest, CancelRpcBeforeStart) {
Status s = stub_->Echo(&context, request, &response);
EXPECT_EQ("", response.message());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+ }
}
// Client cancels request stream after sending two messages
@@ -1009,6 +1056,9 @@ TEST_P(End2endTest, ClientCancelsRequestStream) {
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
EXPECT_EQ(response.message(), "");
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+ }
}
// Client cancels server stream after sending some messages
@@ -1041,6 +1091,9 @@ TEST_P(End2endTest, ClientCancelsResponseStream) {
// The final status could be either of CANCELLED or OK depending on
// who won the race.
EXPECT_GE(grpc::StatusCode::CANCELLED, s.error_code());
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+ }
}
// Client cancels bidi stream after sending some messages
@@ -1074,6 +1127,9 @@ TEST_P(End2endTest, ClientCancelsBidi) {
Status s = stream->Finish();
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
+ }
}
TEST_P(End2endTest, RpcMaxMessageSize) {
@@ -1802,13 +1858,16 @@ std::vector<TestScenario> CreateTestScenarios(bool use_proxy,
}
GPR_ASSERT(!credentials_types.empty());
for (const auto& cred : credentials_types) {
- scenarios.emplace_back(false, false, cred);
+ scenarios.emplace_back(false, false, false, cred);
+ scenarios.emplace_back(true, false, false, cred);
if (use_proxy) {
- scenarios.emplace_back(true, false, cred);
+ scenarios.emplace_back(false, true, false, cred);
+ scenarios.emplace_back(true, true, false, cred);
}
}
if (test_inproc && insec_ok()) {
- scenarios.emplace_back(false, true, kInsecureCredentialsType);
+ scenarios.emplace_back(false, false, true, kInsecureCredentialsType);
+ scenarios.emplace_back(true, false, true, kInsecureCredentialsType);
}
return scenarios;
}
diff --git a/test/cpp/end2end/interceptors_util.cc b/test/cpp/end2end/interceptors_util.cc
new file mode 100644
index 0000000000..602d1695a3
--- /dev/null
+++ b/test/cpp/end2end/interceptors_util.cc
@@ -0,0 +1,151 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "test/cpp/end2end/interceptors_util.h"
+
+namespace grpc {
+namespace testing {
+
+std::atomic<int> DummyInterceptor::num_times_run_;
+std::atomic<int> DummyInterceptor::num_times_run_reverse_;
+std::atomic<int> DummyInterceptor::num_times_cancel_;
+
+void MakeCall(const std::shared_ptr<Channel>& channel) {
+ auto stub = grpc::testing::EchoTestService::NewStub(channel);
+ ClientContext ctx;
+ EchoRequest req;
+ req.mutable_param()->set_echo_metadata(true);
+ ctx.AddMetadata("testkey", "testvalue");
+ req.set_message("Hello");
+ EchoResponse resp;
+ Status s = stub->Echo(&ctx, req, &resp);
+ EXPECT_EQ(s.ok(), true);
+ EXPECT_EQ(resp.message(), "Hello");
+}
+
+void MakeClientStreamingCall(const std::shared_ptr<Channel>& channel) {
+ auto stub = grpc::testing::EchoTestService::NewStub(channel);
+ ClientContext ctx;
+ EchoRequest req;
+ req.mutable_param()->set_echo_metadata(true);
+ ctx.AddMetadata("testkey", "testvalue");
+ req.set_message("Hello");
+ EchoResponse resp;
+ string expected_resp = "";
+ auto writer = stub->RequestStream(&ctx, &resp);
+ for (int i = 0; i < 10; i++) {
+ writer->Write(req);
+ expected_resp += "Hello";
+ }
+ writer->WritesDone();
+ Status s = writer->Finish();
+ EXPECT_EQ(s.ok(), true);
+ EXPECT_EQ(resp.message(), expected_resp);
+}
+
+void MakeServerStreamingCall(const std::shared_ptr<Channel>& channel) {
+ auto stub = grpc::testing::EchoTestService::NewStub(channel);
+ ClientContext ctx;
+ EchoRequest req;
+ req.mutable_param()->set_echo_metadata(true);
+ ctx.AddMetadata("testkey", "testvalue");
+ req.set_message("Hello");
+ EchoResponse resp;
+ string expected_resp = "";
+ auto reader = stub->ResponseStream(&ctx, req);
+ int count = 0;
+ while (reader->Read(&resp)) {
+ EXPECT_EQ(resp.message(), "Hello");
+ count++;
+ }
+ ASSERT_EQ(count, 10);
+ Status s = reader->Finish();
+ EXPECT_EQ(s.ok(), true);
+}
+
+void MakeBidiStreamingCall(const std::shared_ptr<Channel>& channel) {
+ auto stub = grpc::testing::EchoTestService::NewStub(channel);
+ ClientContext ctx;
+ EchoRequest req;
+ EchoResponse resp;
+ ctx.AddMetadata("testkey", "testvalue");
+ auto stream = stub->BidiStream(&ctx);
+ for (auto i = 0; i < 10; i++) {
+ req.set_message("Hello" + std::to_string(i));
+ stream->Write(req);
+ stream->Read(&resp);
+ EXPECT_EQ(req.message(), resp.message());
+ }
+ ASSERT_TRUE(stream->WritesDone());
+ Status s = stream->Finish();
+ EXPECT_EQ(s.ok(), true);
+}
+
+void MakeCallbackCall(const std::shared_ptr<Channel>& channel) {
+ auto stub = grpc::testing::EchoTestService::NewStub(channel);
+ ClientContext ctx;
+ EchoRequest req;
+ std::mutex mu;
+ std::condition_variable cv;
+ bool done = false;
+ req.mutable_param()->set_echo_metadata(true);
+ ctx.AddMetadata("testkey", "testvalue");
+ req.set_message("Hello");
+ EchoResponse resp;
+ stub->experimental_async()->Echo(&ctx, &req, &resp,
+ [&resp, &mu, &done, &cv](Status s) {
+ // gpr_log(GPR_ERROR, "got the callback");
+ EXPECT_EQ(s.ok(), true);
+ EXPECT_EQ(resp.message(), "Hello");
+ std::lock_guard<std::mutex> l(mu);
+ done = true;
+ cv.notify_one();
+ });
+ std::unique_lock<std::mutex> l(mu);
+ while (!done) {
+ cv.wait(l);
+ }
+}
+
+bool CheckMetadata(const std::multimap<grpc::string_ref, grpc::string_ref>& map,
+ const string& key, const string& value) {
+ for (const auto& pair : map) {
+ if (pair.first.starts_with(key) && pair.second.starts_with(value)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+std::unique_ptr<std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>
+CreateDummyClientInterceptors() {
+ auto creators = std::unique_ptr<std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>(
+ new std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>());
+ // Add 20 dummy interceptors before hijacking interceptor
+ for (auto i = 0; i < 20; i++) {
+ creators->push_back(std::unique_ptr<DummyInterceptorFactory>(
+ new DummyInterceptorFactory()));
+ }
+ return creators;
+}
+
+} // namespace testing
+} // namespace grpc
diff --git a/test/cpp/end2end/interceptors_util.h b/test/cpp/end2end/interceptors_util.h
index 5f0aa37dc0..b4c4791fca 100644
--- a/test/cpp/end2end/interceptors_util.h
+++ b/test/cpp/end2end/interceptors_util.h
@@ -16,6 +16,10 @@
*
*/
+#include <condition_variable>
+
+#include <grpcpp/channel.h>
+
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/cpp/util/string_ref_helper.h"
@@ -23,6 +27,61 @@
namespace grpc {
namespace testing {
+/* This interceptor does nothing. Just keeps a global count on the number of
+ * times it was invoked. */
+class DummyInterceptor : public experimental::Interceptor {
+ public:
+ DummyInterceptor() {}
+
+ virtual void Intercept(experimental::InterceptorBatchMethods* methods) {
+ if (methods->QueryInterceptionHookPoint(
+ experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
+ num_times_run_++;
+ } else if (methods->QueryInterceptionHookPoint(
+ experimental::InterceptionHookPoints::
+ POST_RECV_INITIAL_METADATA)) {
+ num_times_run_reverse_++;
+ } else if (methods->QueryInterceptionHookPoint(
+ experimental::InterceptionHookPoints::PRE_SEND_CANCEL)) {
+ num_times_cancel_++;
+ }
+ methods->Proceed();
+ }
+
+ static void Reset() {
+ num_times_run_.store(0);
+ num_times_run_reverse_.store(0);
+ num_times_cancel_.store(0);
+ }
+
+ static int GetNumTimesRun() {
+ EXPECT_EQ(num_times_run_.load(), num_times_run_reverse_.load());
+ return num_times_run_.load();
+ }
+
+ static int GetNumTimesCancel() { return num_times_cancel_.load(); }
+
+ private:
+ static std::atomic<int> num_times_run_;
+ static std::atomic<int> num_times_run_reverse_;
+ static std::atomic<int> num_times_cancel_;
+};
+
+class DummyInterceptorFactory
+ : public experimental::ClientInterceptorFactoryInterface,
+ public experimental::ServerInterceptorFactoryInterface {
+ public:
+ virtual experimental::Interceptor* CreateClientInterceptor(
+ experimental::ClientRpcInfo* info) override {
+ return new DummyInterceptor();
+ }
+
+ virtual experimental::Interceptor* CreateServerInterceptor(
+ experimental::ServerRpcInfo* info) override {
+ return new DummyInterceptor();
+ }
+};
+
class EchoTestServiceStreamingImpl : public EchoTestService::Service {
public:
~EchoTestServiceStreamingImpl() override {}
@@ -77,115 +136,27 @@ class EchoTestServiceStreamingImpl : public EchoTestService::Service {
}
};
-void MakeCall(const std::shared_ptr<Channel>& channel) {
- auto stub = grpc::testing::EchoTestService::NewStub(channel);
- ClientContext ctx;
- EchoRequest req;
- req.mutable_param()->set_echo_metadata(true);
- ctx.AddMetadata("testkey", "testvalue");
- req.set_message("Hello");
- EchoResponse resp;
- Status s = stub->Echo(&ctx, req, &resp);
- EXPECT_EQ(s.ok(), true);
- EXPECT_EQ(resp.message(), "Hello");
-}
+void MakeCall(const std::shared_ptr<Channel>& channel);
-void MakeClientStreamingCall(const std::shared_ptr<Channel>& channel) {
- auto stub = grpc::testing::EchoTestService::NewStub(channel);
- ClientContext ctx;
- EchoRequest req;
- req.mutable_param()->set_echo_metadata(true);
- ctx.AddMetadata("testkey", "testvalue");
- req.set_message("Hello");
- EchoResponse resp;
- string expected_resp = "";
- auto writer = stub->RequestStream(&ctx, &resp);
- for (int i = 0; i < 10; i++) {
- writer->Write(req);
- expected_resp += "Hello";
- }
- writer->WritesDone();
- Status s = writer->Finish();
- EXPECT_EQ(s.ok(), true);
- EXPECT_EQ(resp.message(), expected_resp);
-}
+void MakeClientStreamingCall(const std::shared_ptr<Channel>& channel);
-void MakeServerStreamingCall(const std::shared_ptr<Channel>& channel) {
- auto stub = grpc::testing::EchoTestService::NewStub(channel);
- ClientContext ctx;
- EchoRequest req;
- req.mutable_param()->set_echo_metadata(true);
- ctx.AddMetadata("testkey", "testvalue");
- req.set_message("Hello");
- EchoResponse resp;
- string expected_resp = "";
- auto reader = stub->ResponseStream(&ctx, req);
- int count = 0;
- while (reader->Read(&resp)) {
- EXPECT_EQ(resp.message(), "Hello");
- count++;
- }
- ASSERT_EQ(count, 10);
- Status s = reader->Finish();
- EXPECT_EQ(s.ok(), true);
-}
+void MakeServerStreamingCall(const std::shared_ptr<Channel>& channel);
-void MakeBidiStreamingCall(const std::shared_ptr<Channel>& channel) {
- auto stub = grpc::testing::EchoTestService::NewStub(channel);
- ClientContext ctx;
- EchoRequest req;
- EchoResponse resp;
- ctx.AddMetadata("testkey", "testvalue");
- auto stream = stub->BidiStream(&ctx);
- for (auto i = 0; i < 10; i++) {
- req.set_message("Hello" + std::to_string(i));
- stream->Write(req);
- stream->Read(&resp);
- EXPECT_EQ(req.message(), resp.message());
- }
- ASSERT_TRUE(stream->WritesDone());
- Status s = stream->Finish();
- EXPECT_EQ(s.ok(), true);
-}
+void MakeBidiStreamingCall(const std::shared_ptr<Channel>& channel);
-void MakeCallbackCall(const std::shared_ptr<Channel>& channel) {
- auto stub = grpc::testing::EchoTestService::NewStub(channel);
- ClientContext ctx;
- EchoRequest req;
- std::mutex mu;
- std::condition_variable cv;
- bool done = false;
- req.mutable_param()->set_echo_metadata(true);
- ctx.AddMetadata("testkey", "testvalue");
- req.set_message("Hello");
- EchoResponse resp;
- stub->experimental_async()->Echo(&ctx, &req, &resp,
- [&resp, &mu, &done, &cv](Status s) {
- // gpr_log(GPR_ERROR, "got the callback");
- EXPECT_EQ(s.ok(), true);
- EXPECT_EQ(resp.message(), "Hello");
- std::lock_guard<std::mutex> l(mu);
- done = true;
- cv.notify_one();
- });
- std::unique_lock<std::mutex> l(mu);
- while (!done) {
- cv.wait(l);
- }
-}
+void MakeCallbackCall(const std::shared_ptr<Channel>& channel);
bool CheckMetadata(const std::multimap<grpc::string_ref, grpc::string_ref>& map,
- const string& key, const string& value) {
- for (const auto& pair : map) {
- if (pair.first.starts_with(key) && pair.second.starts_with(value)) {
- return true;
- }
- }
- return false;
-}
+ const string& key, const string& value);
-void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
-int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
+std::unique_ptr<std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>
+CreateDummyClientInterceptors();
+
+inline void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
+inline int detag(void* p) {
+ return static_cast<int>(reinterpret_cast<intptr_t>(p));
+}
class Verifier {
public:
diff --git a/test/cpp/end2end/server_interceptors_end2end_test.cc b/test/cpp/end2end/server_interceptors_end2end_test.cc
index e08a4493d3..4ae086ea76 100644
--- a/test/cpp/end2end/server_interceptors_end2end_test.cc
+++ b/test/cpp/end2end/server_interceptors_end2end_test.cc
@@ -42,51 +42,6 @@ namespace grpc {
namespace testing {
namespace {
-/* This interceptor does nothing. Just keeps a global count on the number of
- * times it was invoked. */
-class DummyInterceptor : public experimental::Interceptor {
- public:
- DummyInterceptor(experimental::ServerRpcInfo* info) {}
-
- virtual void Intercept(experimental::InterceptorBatchMethods* methods) {
- if (methods->QueryInterceptionHookPoint(
- experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
- num_times_run_++;
- } else if (methods->QueryInterceptionHookPoint(
- experimental::InterceptionHookPoints::
- POST_RECV_INITIAL_METADATA)) {
- num_times_run_reverse_++;
- }
- methods->Proceed();
- }
-
- static void Reset() {
- num_times_run_.store(0);
- num_times_run_reverse_.store(0);
- }
-
- static int GetNumTimesRun() {
- EXPECT_EQ(num_times_run_.load(), num_times_run_reverse_.load());
- return num_times_run_.load();
- }
-
- private:
- static std::atomic<int> num_times_run_;
- static std::atomic<int> num_times_run_reverse_;
-};
-
-std::atomic<int> DummyInterceptor::num_times_run_;
-std::atomic<int> DummyInterceptor::num_times_run_reverse_;
-
-class DummyInterceptorFactory
- : public experimental::ServerInterceptorFactoryInterface {
- public:
- virtual experimental::Interceptor* CreateServerInterceptor(
- experimental::ServerRpcInfo* info) override {
- return new DummyInterceptor(info);
- }
-};
-
class LoggingInterceptor : public experimental::Interceptor {
public:
LoggingInterceptor(experimental::ServerRpcInfo* info) { info_ = info; }
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index 67dde8d156..1bbacc41f0 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -3402,6 +3402,7 @@
"name": "client_interceptors_end2end_test",
"src": [
"test/cpp/end2end/client_interceptors_end2end_test.cc",
+ "test/cpp/end2end/interceptors_util.cc",
"test/cpp/end2end/interceptors_util.h"
],
"third_party": false,
@@ -3600,12 +3601,16 @@
"grpc++_test_util",
"grpc_test_util"
],
- "headers": [],
+ "headers": [
+ "test/cpp/end2end/interceptors_util.h"
+ ],
"is_filegroup": false,
"language": "c++",
"name": "end2end_test",
"src": [
- "test/cpp/end2end/end2end_test.cc"
+ "test/cpp/end2end/end2end_test.cc",
+ "test/cpp/end2end/interceptors_util.cc",
+ "test/cpp/end2end/interceptors_util.h"
],
"third_party": false,
"type": "target"
@@ -4736,6 +4741,7 @@
"language": "c++",
"name": "server_interceptors_end2end_test",
"src": [
+ "test/cpp/end2end/interceptors_util.cc",
"test/cpp/end2end/interceptors_util.h",
"test/cpp/end2end/server_interceptors_end2end_test.cc"
],