/* * * Copyright 2018 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #ifndef GRPCPP_IMPL_CODEGEN_INTERCEPTOR_COMMON_H #define GRPCPP_IMPL_CODEGEN_INTERCEPTOR_COMMON_H #include #include #include #include #include #include #include #include namespace grpc { namespace internal { class InterceptorBatchMethodsImpl : public experimental::InterceptorBatchMethods { public: InterceptorBatchMethodsImpl() { for (auto i = static_cast(0); i < experimental::InterceptionHookPoints::NUM_INTERCEPTION_HOOKS; i = static_cast( static_cast(i) + 1)) { hooks_[static_cast(i)] = false; } } ~InterceptorBatchMethodsImpl() {} bool QueryInterceptionHookPoint( experimental::InterceptionHookPoints type) override { return hooks_[static_cast(type)]; } void Proceed() override { if (call_->client_rpc_info() != nullptr) { return ProceedClient(); } GPR_CODEGEN_ASSERT(call_->server_rpc_info() != nullptr); ProceedServer(); } void Hijack() override { // Only the client can hijack when sending down initial metadata GPR_CODEGEN_ASSERT(!reverse_ && ops_ != nullptr && call_->client_rpc_info() != nullptr); // It is illegal to call Hijack twice GPR_CODEGEN_ASSERT(!ran_hijacking_interceptor_); auto* rpc_info = call_->client_rpc_info(); rpc_info->hijacked_ = true; rpc_info->hijacked_interceptor_ = current_interceptor_index_; ClearHookPoints(); ops_->SetHijackingState(); ran_hijacking_interceptor_ = true; rpc_info->RunInterceptor(this, current_interceptor_index_); } void AddInterceptionHookPoint(experimental::InterceptionHookPoints type) { hooks_[static_cast(type)] = true; } ByteBuffer* GetSendMessage() override { return send_message_; } std::multimap* GetSendInitialMetadata() override { return send_initial_metadata_; } Status GetSendStatus() override { return Status(static_cast(*code_), *error_message_, *error_details_); } void ModifySendStatus(const Status& status) override { *code_ = static_cast(status.error_code()); *error_details_ = status.error_details(); *error_message_ = status.error_message(); } std::multimap* GetSendTrailingMetadata() override { return send_trailing_metadata_; } void* GetRecvMessage() override { return recv_message_; } std::multimap* GetRecvInitialMetadata() override { return recv_initial_metadata_->map(); } Status* GetRecvStatus() override { return recv_status_; } std::multimap* GetRecvTrailingMetadata() override { return recv_trailing_metadata_->map(); } void SetSendMessage(ByteBuffer* buf) { send_message_ = buf; } void SetSendInitialMetadata( std::multimap* metadata) { send_initial_metadata_ = metadata; } void SetSendStatus(grpc_status_code* code, grpc::string* error_details, grpc::string* error_message) { code_ = code; error_details_ = error_details; error_message_ = error_message; } void SetSendTrailingMetadata( std::multimap* metadata) { send_trailing_metadata_ = metadata; } void SetRecvMessage(void* message) { recv_message_ = message; } void SetRecvInitialMetadata(MetadataMap* map) { recv_initial_metadata_ = map; } void SetRecvStatus(Status* status) { recv_status_ = status; } void SetRecvTrailingMetadata(MetadataMap* map) { recv_trailing_metadata_ = map; } std::unique_ptr GetInterceptedChannel() override { auto* info = call_->client_rpc_info(); if (info == nullptr) { return std::unique_ptr(nullptr); } // The intercepted channel starts from the interceptor just after the // current interceptor return std::unique_ptr(new InterceptedChannel( info->channel(), current_interceptor_index_ + 1)); } // Clears all state void ClearState() { reverse_ = false; ran_hijacking_interceptor_ = false; ClearHookPoints(); } // Prepares for Post_recv operations void SetReverse() { reverse_ = true; ran_hijacking_interceptor_ = false; ClearHookPoints(); } // This needs to be set before interceptors are run void SetCall(Call* call) { call_ = call; } // This needs to be set before interceptors are run using RunInterceptors(). // Alternatively, RunInterceptors(std::function f) can be used. void SetCallOpSetInterface(CallOpSetInterface* ops) { ops_ = ops; } // Returns true if no interceptors are run. This should be used only by // subclasses of CallOpSetInterface. SetCall and SetCallOpSetInterface should // have been called before this. After all the interceptors are done running, // either ContinueFillOpsAfterInterception or // ContinueFinalizeOpsAfterInterception will be called. Note that neither of // them is invoked if there were no interceptors registered. bool RunInterceptors() { GPR_CODEGEN_ASSERT(ops_); auto* client_rpc_info = call_->client_rpc_info(); if (client_rpc_info != nullptr) { if (client_rpc_info->interceptors_.size() == 0) { return true; } else { RunClientInterceptors(); return false; } } auto* server_rpc_info = call_->server_rpc_info(); if (server_rpc_info == nullptr || server_rpc_info->interceptors_.size() == 0) { return true; } RunServerInterceptors(); return false; } // Returns true if no interceptors are run. Returns false otherwise if there // are interceptors registered. After the interceptors are done running \a f // will be invoked. This is to be used only by BaseAsyncRequest and // SyncRequest. bool RunInterceptors(std::function f) { // This is used only by the server for initial call request GPR_CODEGEN_ASSERT(reverse_ == true); GPR_CODEGEN_ASSERT(call_->client_rpc_info() == nullptr); auto* server_rpc_info = call_->server_rpc_info(); if (server_rpc_info == nullptr || server_rpc_info->interceptors_.size() == 0) { return true; } callback_ = std::move(f); RunServerInterceptors(); return false; } private: void RunClientInterceptors() { auto* rpc_info = call_->client_rpc_info(); if (!reverse_) { current_interceptor_index_ = 0; } else { if (rpc_info->hijacked_) { current_interceptor_index_ = rpc_info->hijacked_interceptor_; } else { current_interceptor_index_ = rpc_info->interceptors_.size() - 1; } } rpc_info->RunInterceptor(this, current_interceptor_index_); } void RunServerInterceptors() { auto* rpc_info = call_->server_rpc_info(); if (!reverse_) { current_interceptor_index_ = 0; } else { current_interceptor_index_ = rpc_info->interceptors_.size() - 1; } rpc_info->RunInterceptor(this, current_interceptor_index_); } void ProceedClient() { auto* rpc_info = call_->client_rpc_info(); if (rpc_info->hijacked_ && !reverse_ && current_interceptor_index_ == rpc_info->hijacked_interceptor_ && !ran_hijacking_interceptor_) { // We now need to provide hijacked recv ops to this interceptor ClearHookPoints(); ops_->SetHijackingState(); ran_hijacking_interceptor_ = true; rpc_info->RunInterceptor(this, current_interceptor_index_); return; } if (!reverse_) { current_interceptor_index_++; // We are going down the stack of interceptors if (current_interceptor_index_ < rpc_info->interceptors_.size()) { if (rpc_info->hijacked_ && current_interceptor_index_ > rpc_info->hijacked_interceptor_) { // This is a hijacked RPC and we are done with hijacking ops_->ContinueFillOpsAfterInterception(); } else { rpc_info->RunInterceptor(this, current_interceptor_index_); } } else { // we are done running all the interceptors without any hijacking ops_->ContinueFillOpsAfterInterception(); } } else { // We are going up the stack of interceptors if (current_interceptor_index_ > 0) { // Continue running interceptors current_interceptor_index_--; rpc_info->RunInterceptor(this, current_interceptor_index_); } else { // we are done running all the interceptors without any hijacking ops_->ContinueFinalizeResultAfterInterception(); } } } void ProceedServer() { auto* rpc_info = call_->server_rpc_info(); if (!reverse_) { current_interceptor_index_++; if (current_interceptor_index_ < rpc_info->interceptors_.size()) { return rpc_info->RunInterceptor(this, current_interceptor_index_); } else if (ops_) { return ops_->ContinueFillOpsAfterInterception(); } } else { // We are going up the stack of interceptors if (current_interceptor_index_ > 0) { // Continue running interceptors current_interceptor_index_--; return rpc_info->RunInterceptor(this, current_interceptor_index_); } else if (ops_) { return ops_->ContinueFinalizeResultAfterInterception(); } } GPR_CODEGEN_ASSERT(callback_); callback_(); } void ClearHookPoints() { for (auto i = static_cast(0); i < experimental::InterceptionHookPoints::NUM_INTERCEPTION_HOOKS; i = static_cast( static_cast(i) + 1)) { hooks_[static_cast(i)] = false; } } std::array( experimental::InterceptionHookPoints::NUM_INTERCEPTION_HOOKS)> hooks_; size_t current_interceptor_index_ = 0; // Current iterator bool reverse_ = false; bool ran_hijacking_interceptor_ = false; Call* call_ = nullptr; // The Call object is present along with CallOpSet // object/callback CallOpSetInterface* ops_ = nullptr; std::function callback_; ByteBuffer* send_message_ = nullptr; std::multimap* send_initial_metadata_; grpc_status_code* code_ = nullptr; grpc::string* error_details_ = nullptr; grpc::string* error_message_ = nullptr; Status send_status_; std::multimap* send_trailing_metadata_ = nullptr; void* recv_message_ = nullptr; MetadataMap* recv_initial_metadata_ = nullptr; Status* recv_status_ = nullptr; MetadataMap* recv_trailing_metadata_ = nullptr; }; // A special implementation of InterceptorBatchMethods to send a Cancel // notification down the interceptor stack class CancelInterceptorBatchMethods : public experimental::InterceptorBatchMethods { public: bool QueryInterceptionHookPoint( experimental::InterceptionHookPoints type) override { if (type == experimental::InterceptionHookPoints::PRE_SEND_CANCEL) { return true; } else { return false; } } void Proceed() override { // This is a no-op. For actual continuation of the RPC simply needs to // return from the Intercept method } void Hijack() override { // Only the client can hijack when sending down initial metadata GPR_CODEGEN_ASSERT(false && "It is illegal to call Hijack on a method which has a " "Cancel notification"); } ByteBuffer* GetSendMessage() override { GPR_CODEGEN_ASSERT(false && "It is illegal to call GetSendMessage on a method which " "has a Cancel notification"); return nullptr; } std::multimap* GetSendInitialMetadata() override { GPR_CODEGEN_ASSERT(false && "It is illegal to call GetSendInitialMetadata on a " "method which has a Cancel notification"); return nullptr; } Status GetSendStatus() override { GPR_CODEGEN_ASSERT(false && "It is illegal to call GetSendStatus on a method which " "has a Cancel notification"); return Status(); } void ModifySendStatus(const Status& status) override { GPR_CODEGEN_ASSERT(false && "It is illegal to call ModifySendStatus on a method " "which has a Cancel notification"); return; } std::multimap* GetSendTrailingMetadata() override { GPR_CODEGEN_ASSERT(false && "It is illegal to call GetSendTrailingMetadata on a " "method which has a Cancel notification"); return nullptr; } void* GetRecvMessage() override { GPR_CODEGEN_ASSERT(false && "It is illegal to call GetRecvMessage on a method which " "has a Cancel notification"); return nullptr; } std::multimap* GetRecvInitialMetadata() override { GPR_CODEGEN_ASSERT(false && "It is illegal to call GetRecvInitialMetadata on a " "method which has a Cancel notification"); return nullptr; } Status* GetRecvStatus() override { GPR_CODEGEN_ASSERT(false && "It is illegal to call GetRecvStatus on a method which " "has a Cancel notification"); return nullptr; } std::multimap* GetRecvTrailingMetadata() override { GPR_CODEGEN_ASSERT(false && "It is illegal to call GetRecvTrailingMetadata on a " "method which has a Cancel notification"); return nullptr; } std::unique_ptr GetInterceptedChannel() override { GPR_CODEGEN_ASSERT(false && "It is illegal to call GetInterceptedChannel on a " "method which has a Cancel notification"); return std::unique_ptr(nullptr); } }; } // namespace internal } // namespace grpc #endif // GRPCPP_IMPL_CODEGEN_INTERCEPTOR_COMMON_H