diff options
Diffstat (limited to 'include')
-rw-r--r-- | include/grpcpp/impl/codegen/call.h | 223 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/client_interceptor.h | 59 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/interceptor.h | 2 |
3 files changed, 215 insertions, 69 deletions
diff --git a/include/grpcpp/impl/codegen/call.h b/include/grpcpp/impl/codegen/call.h index 7cadea0055..771fc22d46 100644 --- a/include/grpcpp/impl/codegen/call.h +++ b/include/grpcpp/impl/codegen/call.h @@ -24,10 +24,12 @@ #include <functional> #include <map> #include <memory> +#include <vector> #include <grpcpp/impl/codegen/byte_buffer.h> #include <grpcpp/impl/codegen/call_hook.h> #include <grpcpp/impl/codegen/client_context.h> +#include <grpcpp/impl/codegen/client_interceptor.h> #include <grpcpp/impl/codegen/completion_queue_tag.h> #include <grpcpp/impl/codegen/config.h> #include <grpcpp/impl/codegen/core_codegen_interface.h> @@ -50,6 +52,58 @@ namespace internal { class Call; class CallHook; +/// Straightforward wrapping of the C call object +class Call final { + public: + /** call is owned by the caller */ + Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq) + : call_hook_(call_hook), + cq_(cq), + call_(call), + max_receive_message_size_(-1), + rpc_info_(nullptr, nullptr, nullptr) {} + + Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq, + experimental::ClientRpcInfo rpc_info, + const std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>& + creators) + : call_hook_(call_hook), + cq_(cq), + call_(call), + max_receive_message_size_(-1), + rpc_info_(rpc_info) { + for (const auto& creator : creators) { + interceptors_.push_back(creator->CreateClientInterceptor(&rpc_info_)); + } + } + + Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq, + int max_receive_message_size) + : call_hook_(call_hook), + cq_(cq), + call_(call), + max_receive_message_size_(max_receive_message_size), + rpc_info_(nullptr, nullptr, nullptr) {} + + void PerformOps(CallOpSetInterface* ops) { + call_hook_->PerformOpsOnCall(ops, this); + } + + grpc_call* call() const { return call_; } + CompletionQueue* cq() const { return cq_; } + + int max_receive_message_size() const { return max_receive_message_size_; } + + private: + CallHook* call_hook_; + CompletionQueue* cq_; + grpc_call* call_; + int max_receive_message_size_; + experimental::ClientRpcInfo rpc_info_; + std::vector<experimental::ClientInterceptor*> interceptors_; +}; + // TODO(yangg) if the map is changed before we send, the pointers will be a // mess. Make sure it does not happen. inline grpc_metadata* FillMetadataArray( @@ -201,13 +255,45 @@ class WriteOptions { }; namespace internal { + +class InterceptorBatchMethodsImpl + : public experimental::InterceptorBatchMethods { + public: + InterceptorBatchMethodsImpl() {} + + virtual ~InterceptorBatchMethodsImpl() {} + + virtual bool QueryInterceptionHookPoint( + experimental::InterceptionHookPoints type) override { + return hooks_[static_cast<int>(type)]; + } + + virtual void Proceed() override { /* fill this */ + } + + virtual void Hijack() override { /* fill this */ + } + + void AddInterceptionHookPoint(experimental::InterceptionHookPoints type) { + hooks_[static_cast<int>(type)]; + } + + private: + std::array<bool, + static_cast<int>( + experimental::InterceptionHookPoints::NUM_INTERCEPTION_HOOKS)> + hooks_; +}; + /// Default argument for CallOpSet. I is unused by the class, but can be /// used for generating multiple names for the same thing. template <int I> class CallNoOp { protected: - void AddOp(grpc_op* ops, size_t* nops) {} - void FinishOp(bool* status) {} + void AddOp(grpc_op* ops, size_t* nops, + InterceptorBatchMethodsImpl* interceptor_methods) {} + void FinishOp(bool* status, + InterceptorBatchMethodsImpl* interceptor_methods) {} }; class CallOpSendInitialMetadata { @@ -232,7 +318,8 @@ class CallOpSendInitialMetadata { } protected: - void AddOp(grpc_op* ops, size_t* nops) { + void AddOp(grpc_op* ops, size_t* nops, + InterceptorBatchMethodsImpl* interceptor_methods) { if (!send_) return; grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_SEND_INITIAL_METADATA; @@ -246,8 +333,11 @@ class CallOpSendInitialMetadata { op->data.send_initial_metadata.maybe_compression_level.level = maybe_compression_level_.level; } + interceptor_methods->AddInterceptionHookPoint( + experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA); } - void FinishOp(bool* status) { + void FinishOp(bool* status, + InterceptorBatchMethodsImpl* interceptor_methods) { if (!send_) return; g_core_codegen_interface->gpr_free(initial_metadata_); send_ = false; @@ -277,7 +367,8 @@ class CallOpSendMessage { Status SendMessage(const M& message) GRPC_MUST_USE_RESULT; protected: - void AddOp(grpc_op* ops, size_t* nops) { + void AddOp(grpc_op* ops, size_t* nops, + InterceptorBatchMethodsImpl* interceptor_methods) { if (!send_buf_.Valid()) return; grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_SEND_MESSAGE; @@ -286,8 +377,13 @@ class CallOpSendMessage { op->data.send_message.send_message = send_buf_.c_buffer(); // Flags are per-message: clear them after use. write_options_.Clear(); + interceptor_methods->AddInterceptionHookPoint( + experimental::InterceptionHookPoints::PRE_SEND_MESSAGE); + } + void FinishOp(bool* status, + InterceptorBatchMethodsImpl* interceptor_methods) { + send_buf_.Clear(); } - void FinishOp(bool* status) { send_buf_.Clear(); } private: ByteBuffer send_buf_; @@ -331,7 +427,8 @@ class CallOpRecvMessage { bool got_message; protected: - void AddOp(grpc_op* ops, size_t* nops) { + void AddOp(grpc_op* ops, size_t* nops, + InterceptorBatchMethodsImpl* interceptor_methods) { if (message_ == nullptr) return; grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_RECV_MESSAGE; @@ -340,7 +437,8 @@ class CallOpRecvMessage { op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr(); } - void FinishOp(bool* status) { + void FinishOp(bool* status, + InterceptorBatchMethodsImpl* interceptor_methods) { if (message_ == nullptr) return; if (recv_buf_.Valid()) { if (*status) { @@ -359,6 +457,8 @@ class CallOpRecvMessage { } } message_ = nullptr; + interceptor_methods->AddInterceptionHookPoint( + experimental::InterceptionHookPoints::PRE_RECV_MESSAGE); } private: @@ -406,7 +506,8 @@ class CallOpGenericRecvMessage { bool got_message; protected: - void AddOp(grpc_op* ops, size_t* nops) { + void AddOp(grpc_op* ops, size_t* nops, + InterceptorBatchMethodsImpl* interceptor_methods) { if (!deserialize_) return; grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_RECV_MESSAGE; @@ -415,7 +516,8 @@ class CallOpGenericRecvMessage { op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr(); } - void FinishOp(bool* status) { + void FinishOp(bool* status, + InterceptorBatchMethodsImpl* interceptor_methods) { if (!deserialize_) return; if (recv_buf_.Valid()) { if (*status) { @@ -433,6 +535,8 @@ class CallOpGenericRecvMessage { } } deserialize_.reset(); + interceptor_methods->AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_MESSAGE); } private: @@ -448,14 +552,18 @@ class CallOpClientSendClose { void ClientSendClose() { send_ = true; } protected: - void AddOp(grpc_op* ops, size_t* nops) { + void AddOp(grpc_op* ops, size_t* nops, + InterceptorBatchMethodsImpl* interceptor_methods) { if (!send_) return; grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; op->flags = 0; op->reserved = NULL; } - void FinishOp(bool* status) { send_ = false; } + void FinishOp(bool* status, + InterceptorBatchMethodsImpl* interceptor_methods) { + send_ = false; + } private: bool send_; @@ -477,7 +585,8 @@ class CallOpServerSendStatus { } protected: - void AddOp(grpc_op* ops, size_t* nops) { + void AddOp(grpc_op* ops, size_t* nops, + InterceptorBatchMethodsImpl* interceptor_methods) { if (!send_status_available_) return; grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; @@ -490,9 +599,12 @@ class CallOpServerSendStatus { send_error_message_.empty() ? nullptr : &error_message_slice_; op->flags = 0; op->reserved = NULL; + interceptor_methods->AddInterceptionHookPoint( + experimental::InterceptionHookPoints::PRE_SEND_STATUS); } - void FinishOp(bool* status) { + void FinishOp(bool* status, + InterceptorBatchMethodsImpl* interceptor_methods) { if (!send_status_available_) return; g_core_codegen_interface->gpr_free(trailing_metadata_); send_status_available_ = false; @@ -518,7 +630,8 @@ class CallOpRecvInitialMetadata { } protected: - void AddOp(grpc_op* ops, size_t* nops) { + void AddOp(grpc_op* ops, size_t* nops, + InterceptorBatchMethodsImpl* interceptor_methods) { if (metadata_map_ == nullptr) return; grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_RECV_INITIAL_METADATA; @@ -527,9 +640,12 @@ class CallOpRecvInitialMetadata { op->reserved = NULL; } - void FinishOp(bool* status) { + void FinishOp(bool* status, + InterceptorBatchMethodsImpl* interceptor_methods) { if (metadata_map_ == nullptr) return; metadata_map_ = nullptr; + interceptor_methods->AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); } private: @@ -549,7 +665,8 @@ class CallOpClientRecvStatus { } protected: - void AddOp(grpc_op* ops, size_t* nops) { + void AddOp(grpc_op* ops, size_t* nops, + InterceptorBatchMethodsImpl* interceptor_methods) { if (recv_status_ == nullptr) return; grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; @@ -561,7 +678,8 @@ class CallOpClientRecvStatus { op->reserved = NULL; } - void FinishOp(bool* status) { + void FinishOp(bool* status, + InterceptorBatchMethodsImpl* interceptor_methods) { if (recv_status_ == nullptr) return; grpc::string binary_error_details = metadata_map_->GetBinaryErrorDetails(); *recv_status_ = @@ -578,6 +696,8 @@ class CallOpClientRecvStatus { g_core_codegen_interface->gpr_free((void*)debug_error_string_); } recv_status_ = nullptr; + interceptor_methods->AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_STATUS); } private: @@ -598,7 +718,7 @@ class CallOpSetInterface : public CompletionQueueTag { public: /// Fills in grpc_op, starting from ops[*nops] and moving /// upwards. - virtual void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) = 0; + virtual void FillOps(internal::Call* call, grpc_op* ops, size_t* nops) = 0; /// Get the tag to be used at the core completion queue. Generally, the /// value of cq_tag will be "this". However, it can be overridden if we @@ -624,27 +744,27 @@ class CallOpSet : public CallOpSetInterface, public Op6 { public: CallOpSet() : cq_tag_(this), return_tag_(this), call_(nullptr) {} - void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override { - this->Op1::AddOp(ops, nops); - this->Op2::AddOp(ops, nops); - this->Op3::AddOp(ops, nops); - this->Op4::AddOp(ops, nops); - this->Op5::AddOp(ops, nops); - this->Op6::AddOp(ops, nops); - g_core_codegen_interface->grpc_call_ref(call); + void FillOps(Call* call, grpc_op* ops, size_t* nops) override { + this->Op1::AddOp(ops, nops, &interceptor_methods_); + this->Op2::AddOp(ops, nops, &interceptor_methods_); + this->Op3::AddOp(ops, nops, &interceptor_methods_); + this->Op4::AddOp(ops, nops, &interceptor_methods_); + this->Op5::AddOp(ops, nops, &interceptor_methods_); + this->Op6::AddOp(ops, nops, &interceptor_methods_); + g_core_codegen_interface->grpc_call_ref(call->call()); call_ = call; } bool FinalizeResult(void** tag, bool* status) override { - this->Op1::FinishOp(status); - this->Op2::FinishOp(status); - this->Op3::FinishOp(status); - this->Op4::FinishOp(status); - this->Op5::FinishOp(status); - this->Op6::FinishOp(status); + this->Op1::FinishOp(status, &interceptor_methods_); + this->Op2::FinishOp(status, &interceptor_methods_); + this->Op3::FinishOp(status, &interceptor_methods_); + this->Op4::FinishOp(status, &interceptor_methods_); + this->Op5::FinishOp(status, &interceptor_methods_); + this->Op6::FinishOp(status, &interceptor_methods_); *tag = return_tag_; - g_core_codegen_interface->grpc_call_unref(call_); + g_core_codegen_interface->grpc_call_unref(call_->call()); return true; } @@ -661,41 +781,10 @@ class CallOpSet : public CallOpSetInterface, private: void* cq_tag_; void* return_tag_; - grpc_call* call_; + Call* call_; + InterceptorBatchMethodsImpl interceptor_methods_; }; -/// Straightforward wrapping of the C call object -class Call final { - public: - /** call is owned by the caller */ - Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq) - : call_hook_(call_hook), - cq_(cq), - call_(call), - max_receive_message_size_(-1) {} - - Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq, - int max_receive_message_size) - : call_hook_(call_hook), - cq_(cq), - call_(call), - max_receive_message_size_(max_receive_message_size) {} - - void PerformOps(CallOpSetInterface* ops) { - call_hook_->PerformOpsOnCall(ops, this); - } - - grpc_call* call() const { return call_; } - CompletionQueue* cq() const { return cq_; } - - int max_receive_message_size() const { return max_receive_message_size_; } - - private: - CallHook* call_hook_; - CompletionQueue* cq_; - grpc_call* call_; - int max_receive_message_size_; -}; } // namespace internal } // namespace grpc diff --git a/include/grpcpp/impl/codegen/client_interceptor.h b/include/grpcpp/impl/codegen/client_interceptor.h index f460c5ac0c..f7963a57d5 100644 --- a/include/grpcpp/impl/codegen/client_interceptor.h +++ b/include/grpcpp/impl/codegen/client_interceptor.h @@ -19,7 +19,9 @@ #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_INTERCEPTOR_H #define GRPCPP_IMPL_CODEGEN_CLIENT_INTERCEPTOR_H +#include <grpcpp/impl/codegen/client_context.h> #include <grpcpp/impl/codegen/interceptor.h> +#include <grpcpp/impl/codegen/string_ref.h> namespace grpc { namespace experimental { @@ -30,7 +32,62 @@ class ClientInterceptor { virtual void Intercept(InterceptorBatchMethods* methods) = 0; }; -class ClientRpcInfo {}; +class ClientRpcInfo { + public: + ClientRpcInfo(grpc::ClientContext* ctx, const char* method, + const grpc::Channel* channel) + : ctx_(ctx), method_(method), channel_(channel) {} + ~ClientRpcInfo(){}; + + // Getter methods + const char* method() { return method_; } + string peer() { return ctx_->peer(); } + const Channel* channel() { return channel_; } + // const grpc::InterceptedMessage& outgoing_message(); + // grpc::InterceptedMessage *mutable_outgoing_message(); + // const grpc::InterceptedMessage& received_message(); + // grpc::InterceptedMessage *mutable_received_message(); + std::shared_ptr<const AuthContext> auth_context() { + return ctx_->auth_context(); + } + const struct census_context* census_context() { + return ctx_->census_context(); + } + gpr_timespec deadline() { return ctx_->raw_deadline(); } + // const std::multimap<grpc::string, grpc::string>* client_initial_metadata() + // { return &ctx_->send_initial_metadata_; } const + // std::multimap<grpc::string_ref, grpc::string_ref>* + // server_initial_metadata() { return &ctx_->GetServerInitialMetadata(); } + // const std::multimap<grpc::string_ref, grpc::string_ref>* + // server_trailing_metadata() { return &ctx_->GetServerTrailingMetadata(); } + // const Status *status(); + + // Setter methods + template <typename T> + void set_deadline(const T& deadline) { + ctx_->set_deadline(deadline); + } + void set_census_context(struct census_context* cc) { + ctx_->set_census_context(cc); + } + // template <class M> + // void set_outgoing_message(M* msg); // edit outgoing message + // template <class M> + // void set_received_message(M* msg); // edit received message + // for hijacking (can be called multiple times for streaming) + // template <class M> + // void inject_received_message(M* msg); + // void set_client_initial_metadata( + // const std::multimap<grpc::string, grpc::string>& overwrite); + // void set_server_initial_metadata(const std::multimap<grpc::string, + // grpc::string>& overwrite); void set_server_trailing_metadata(const + // std::multimap<grpc::string, grpc::string>& overwrite); void + // set_status(Status status); + private: + grpc::ClientContext* ctx_; + const char* method_; + const grpc::Channel* channel_; +}; class ClientInterceptorFactoryInterface { public: diff --git a/include/grpcpp/impl/codegen/interceptor.h b/include/grpcpp/impl/codegen/interceptor.h index 6402a3a946..84dce42f97 100644 --- a/include/grpcpp/impl/codegen/interceptor.h +++ b/include/grpcpp/impl/codegen/interceptor.h @@ -50,7 +50,7 @@ enum class InterceptionHookPoints { class InterceptorBatchMethods { public: - virtual ~InterceptorBatchMethods(); + virtual ~InterceptorBatchMethods(){}; // Queries to check whether the current batch has an interception hook point // of type \a type virtual bool QueryInterceptionHookPoint(InterceptionHookPoints type) = 0; |