From 9b83b7d19e0a3e14dbfca2f40fa8157547c190f4 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Mon, 22 Oct 2018 02:42:03 -0700 Subject: Adding intercepted channel --- include/grpcpp/impl/codegen/call.h | 109 ++++++++-------------- include/grpcpp/impl/codegen/call_wrapper.h | 92 ++++++++++++++++++ include/grpcpp/impl/codegen/channel_interface.h | 2 + include/grpcpp/impl/codegen/client_context.h | 9 +- include/grpcpp/impl/codegen/client_interceptor.h | 29 +++--- include/grpcpp/impl/codegen/intercepted_channel.h | 76 +++++++++++++++ include/grpcpp/impl/codegen/interceptor.h | 6 ++ include/grpcpp/impl/codegen/server_context.h | 8 +- include/grpcpp/impl/codegen/server_interceptor.h | 23 +++-- include/grpcpp/impl/codegen/server_interface.h | 8 +- 10 files changed, 261 insertions(+), 101 deletions(-) create mode 100644 include/grpcpp/impl/codegen/call_wrapper.h create mode 100644 include/grpcpp/impl/codegen/intercepted_channel.h (limited to 'include/grpcpp/impl/codegen') diff --git a/include/grpcpp/impl/codegen/call.h b/include/grpcpp/impl/codegen/call.h index 5033890912..f4aae6fdc8 100644 --- a/include/grpcpp/impl/codegen/call.h +++ b/include/grpcpp/impl/codegen/call.h @@ -29,11 +29,13 @@ #include #include +#include #include #include #include #include #include +#include #include #include #include @@ -232,6 +234,8 @@ class InternalInterceptorBatchMethods virtual void SetRecvStatus(Status* status) = 0; virtual void SetRecvTrailingMetadata(internal::MetadataMap* map) = 0; + + virtual std::unique_ptr GetInterceptedChannel() = 0; }; /// Default argument for CallOpSet. I is unused by the class, but can be @@ -779,63 +783,6 @@ class CallOpClientRecvStatus { grpc_slice error_message_; }; -/// Straightforward wrapping of the C call object -class Call final { - public: - Call() - : call_hook_(nullptr), - cq_(nullptr), - call_(nullptr), - max_receive_message_size_(-1) {} - /** call is owned by the caller */ - Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq) - : call_hook_(call_hook), - cq_(cq), - call_(call), - max_receive_message_size_(-1) {} - - Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq, - experimental::ClientRpcInfo* rpc_info) - : call_hook_(call_hook), - cq_(cq), - call_(call), - max_receive_message_size_(-1), - client_rpc_info_(rpc_info) {} - - Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq, - int max_receive_message_size, experimental::ServerRpcInfo* rpc_info) - : call_hook_(call_hook), - cq_(cq), - call_(call), - max_receive_message_size_(max_receive_message_size), - server_rpc_info_(rpc_info) {} - - void PerformOps(CallOpSetInterface* ops) { - call_hook_->PerformOpsOnCall(ops, this); - } - - grpc_call* call() const { return call_; } - CompletionQueue* cq() const { return cq_; } - - int max_receive_message_size() const { return max_receive_message_size_; } - - experimental::ClientRpcInfo* client_rpc_info() const { - return client_rpc_info_; - } - - experimental::ServerRpcInfo* server_rpc_info() const { - return server_rpc_info_; - } - - private: - CallHook* call_hook_; - CompletionQueue* cq_; - grpc_call* call_; - int max_receive_message_size_; - experimental::ClientRpcInfo* client_rpc_info_ = nullptr; - experimental::ServerRpcInfo* server_rpc_info_ = nullptr; -}; - /// An abstract collection of call ops, used to generate the /// grpc_call_op structure to pass down to the lower layers, /// and as it is-a CompletionQueueTag, also massages the final @@ -904,9 +851,8 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { rpc_info->hijacked_interceptor_ = curr_iteration_; ClearHookPoints(); ops_->SetHijackingState(); - curr_iteration_++; // increment so that we recognize that we have already - // run the hijacking interceptor - rpc_info->RunInterceptor(this, curr_iteration_ - 1); + ran_hijacking_interceptor_ = true; + rpc_info->RunInterceptor(this, curr_iteration_); } virtual void AddInterceptionHookPoint( @@ -951,41 +897,56 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { return recv_trailing_metadata_->map(); } - virtual void SetSendMessage(ByteBuffer* buf) { send_message_ = buf; } + virtual void SetSendMessage(ByteBuffer* buf) override { send_message_ = buf; } virtual void SetSendInitialMetadata( - std::multimap* metadata) { + std::multimap* metadata) override { send_initial_metadata_ = metadata; } virtual void SetSendStatus(grpc_status_code* code, grpc::string* error_details, - grpc::string* error_message) { + grpc::string* error_message) override { code_ = code; error_details_ = error_details; error_message_ = error_message; } virtual void SetSendTrailingMetadata( - std::multimap* metadata) { + std::multimap* metadata) override { send_trailing_metadata_ = metadata; } - virtual void SetRecvMessage(void* message) { recv_message_ = message; } + virtual void SetRecvMessage(void* message) override { + recv_message_ = message; + } - virtual void SetRecvInitialMetadata(internal::MetadataMap* map) { + virtual void SetRecvInitialMetadata(internal::MetadataMap* map) override { recv_initial_metadata_ = map; } - virtual void SetRecvStatus(Status* status) { recv_status_ = status; } + virtual void SetRecvStatus(Status* status) override { recv_status_ = status; } - virtual void SetRecvTrailingMetadata(internal::MetadataMap* map) { + virtual void SetRecvTrailingMetadata(internal::MetadataMap* map) override { recv_trailing_metadata_ = map; } + virtual std::unique_ptr GetInterceptedChannel() override { + auto* info = call_->client_rpc_info(); + if (info == nullptr) { + return std::unique_ptr(nullptr); + } + // The intercepted channel starts from the interceptor just after the + // current interceptor + return std::unique_ptr(new internal::InterceptedChannel( + reinterpret_cast(info->channel()), + curr_iteration_ + 1)); + } + // Prepares for Post_recv operations void SetReverse() { reverse_ = true; + ran_hijacking_interceptor_ = false; ClearHookPoints(); } @@ -1064,17 +1025,19 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { } void ProceedClient() { - curr_iteration_ = reverse_ ? curr_iteration_ - 1 : curr_iteration_ + 1; auto* rpc_info = call_->client_rpc_info(); - if (rpc_info->hijacked_ && - (!reverse_ && curr_iteration_ == rpc_info->hijacked_interceptor_ + 1)) { + if (rpc_info->hijacked_ && !reverse_ && + curr_iteration_ == rpc_info->hijacked_interceptor_ && + !ran_hijacking_interceptor_) { // We now need to provide hijacked recv ops to this interceptor ClearHookPoints(); ops_->SetHijackingState(); - rpc_info->RunInterceptor(this, curr_iteration_ - 1); + ran_hijacking_interceptor_ = true; + rpc_info->RunInterceptor(this, curr_iteration_); return; } if (!reverse_) { + curr_iteration_++; // We are going down the stack of interceptors if (curr_iteration_ < static_cast(rpc_info->interceptors_.size())) { if (rpc_info->hijacked_ && @@ -1089,6 +1052,7 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { ops_->ContinueFillOpsAfterInterception(); } } else { + curr_iteration_--; // We are going up the stack of interceptors if (curr_iteration_ >= 0) { // Continue running interceptors @@ -1139,6 +1103,7 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { int curr_iteration_ = 0; // Current iterator bool reverse_ = false; + bool ran_hijacking_interceptor_ = false; Call* call_ = nullptr; // The Call object is present along with CallOpSet object CallOpSetInterface* ops_ = nullptr; diff --git a/include/grpcpp/impl/codegen/call_wrapper.h b/include/grpcpp/impl/codegen/call_wrapper.h new file mode 100644 index 0000000000..42216f9b6e --- /dev/null +++ b/include/grpcpp/impl/codegen/call_wrapper.h @@ -0,0 +1,92 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef GRPCPP_IMPL_CODEGEN_CALL_WRAPPER_H +#define GRPCPP_IMPL_CODEGEN_CALL_WRAPPER_H + +#include + +namespace grpc { +class CompletionQueue; + +namespace experimental { +class ClientRpcInfo; +class ServerRpcInfo; +} // namespace experimental +namespace internal { +class CallHook; + +/// Straightforward wrapping of the C call object +class Call final { + public: + Call() + : call_hook_(nullptr), + cq_(nullptr), + call_(nullptr), + max_receive_message_size_(-1) {} + /** call is owned by the caller */ + Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq) + : call_hook_(call_hook), + cq_(cq), + call_(call), + max_receive_message_size_(-1) {} + + Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq, + experimental::ClientRpcInfo* rpc_info) + : call_hook_(call_hook), + cq_(cq), + call_(call), + max_receive_message_size_(-1), + client_rpc_info_(rpc_info) {} + + Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq, + int max_receive_message_size, experimental::ServerRpcInfo* rpc_info) + : call_hook_(call_hook), + cq_(cq), + call_(call), + max_receive_message_size_(max_receive_message_size), + server_rpc_info_(rpc_info) {} + + void PerformOps(CallOpSetInterface* ops) { + call_hook_->PerformOpsOnCall(ops, this); + } + + grpc_call* call() const { return call_; } + CompletionQueue* cq() const { return cq_; } + + int max_receive_message_size() const { return max_receive_message_size_; } + + experimental::ClientRpcInfo* client_rpc_info() const { + return client_rpc_info_; + } + + experimental::ServerRpcInfo* server_rpc_info() const { + return server_rpc_info_; + } + + private: + CallHook* call_hook_; + CompletionQueue* cq_; + grpc_call* call_; + int max_receive_message_size_; + experimental::ClientRpcInfo* client_rpc_info_ = nullptr; + experimental::ServerRpcInfo* server_rpc_info_ = nullptr; +}; +} // namespace internal +} // namespace grpc + +#endif // GRPCPP_IMPL_CODEGEN_CALL_WRAPPER_H \ No newline at end of file diff --git a/include/grpcpp/impl/codegen/channel_interface.h b/include/grpcpp/impl/codegen/channel_interface.h index b257acc1ab..4427f55f35 100644 --- a/include/grpcpp/impl/codegen/channel_interface.h +++ b/include/grpcpp/impl/codegen/channel_interface.h @@ -51,6 +51,7 @@ template class ClientAsyncReaderWriterFactory; template class ClientAsyncResponseReaderFactory; +class InterceptedChannel; } // namespace internal /// Codegen interface for \a grpc::Channel. @@ -108,6 +109,7 @@ class ChannelInterface { template friend class ::grpc::internal::CallbackUnaryCallImpl; friend class ::grpc::internal::RpcMethod; + friend class ::grpc::internal::InterceptedChannel; virtual internal::Call CreateCall(const internal::RpcMethod& method, ClientContext* context, CompletionQueue* cq) = 0; diff --git a/include/grpcpp/impl/codegen/client_context.h b/include/grpcpp/impl/codegen/client_context.h index 95462dfff3..59c61c4f0e 100644 --- a/include/grpcpp/impl/codegen/client_context.h +++ b/include/grpcpp/impl/codegen/client_context.h @@ -404,8 +404,13 @@ class ClientContext { void set_call(grpc_call* call, const std::shared_ptr& channel); experimental::ClientRpcInfo* set_client_rpc_info( - experimental::ClientRpcInfo client_rpc_info) { - rpc_info_ = std::move(client_rpc_info); + const char* method, grpc::Channel* channel, + const std::vector< + std::unique_ptr>& + creators, + int interceptor_pos) { + rpc_info_ = experimental::ClientRpcInfo(this, method, channel); + rpc_info_.RegisterInterceptors(creators, interceptor_pos); return &rpc_info_; } diff --git a/include/grpcpp/impl/codegen/client_interceptor.h b/include/grpcpp/impl/codegen/client_interceptor.h index 06f009e7d3..8f32814838 100644 --- a/include/grpcpp/impl/codegen/client_interceptor.h +++ b/include/grpcpp/impl/codegen/client_interceptor.h @@ -46,16 +46,7 @@ class ClientInterceptorFactoryInterface { class ClientRpcInfo { public: ClientRpcInfo() {} - ClientRpcInfo(grpc::ClientContext* ctx, const char* method, - grpc::Channel* channel, - const std::vector>& creators) - : ctx_(ctx), method_(method), channel_(channel) { - for (const auto& creator : creators) { - interceptors_.push_back(std::unique_ptr( - creator->CreateClientInterceptor(this))); - } - } + ~ClientRpcInfo(){}; ClientRpcInfo(const ClientRpcInfo&) = delete; @@ -67,7 +58,10 @@ class ClientRpcInfo { Channel* channel() { return channel_; } grpc::ClientContext* client_context() { return ctx_; } - public: + private: + ClientRpcInfo(grpc::ClientContext* ctx, const char* method, + grpc::Channel* channel) + : ctx_(ctx), method_(method), channel_(channel) {} // Runs interceptor at pos \a pos. void RunInterceptor( experimental::InterceptorBatchMethods* interceptor_methods, @@ -76,7 +70,17 @@ class ClientRpcInfo { interceptors_[pos]->Intercept(interceptor_methods); } - private: + void RegisterInterceptors( + const std::vector>& creators, + int interceptor_pos) { + for (auto it = creators.begin() + interceptor_pos; it != creators.end(); + ++it) { + interceptors_.push_back(std::unique_ptr( + (*it)->CreateClientInterceptor(this))); + } + } + grpc::ClientContext* ctx_ = nullptr; const char* method_ = nullptr; grpc::Channel* channel_ = nullptr; @@ -85,6 +89,7 @@ class ClientRpcInfo { int hijacked_interceptor_ = false; friend class internal::InterceptorBatchMethodsImpl; + friend class grpc::ClientContext; }; } // namespace experimental diff --git a/include/grpcpp/impl/codegen/intercepted_channel.h b/include/grpcpp/impl/codegen/intercepted_channel.h new file mode 100644 index 0000000000..545ad1d965 --- /dev/null +++ b/include/grpcpp/impl/codegen/intercepted_channel.h @@ -0,0 +1,76 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_IMPL_CODEGEN_INTERCEPTED_CHANNEL_H +#define GRPCPP_IMPL_CODEGEN_INTERCEPTED_CHANNEL_H + +#include + +namespace grpc { + +namespace internal { + +class InterceptorBatchMethodsImpl; + +class InterceptedChannel : public ChannelInterface { + public: + virtual ~InterceptedChannel() { channel_ = nullptr; } + + /// Get the current channel state. If the channel is in IDLE and + /// \a try_to_connect is set to true, try to connect. + grpc_connectivity_state GetState(bool try_to_connect) override { + return channel_->GetState(try_to_connect); + } + + private: + InterceptedChannel(ChannelInterface* channel, int pos) + : channel_(channel), interceptor_pos_(pos) {} + + internal::Call CreateCall(const internal::RpcMethod& method, + ClientContext* context, + CompletionQueue* cq) override; + + void PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) override { + return channel_->PerformOpsOnCall(ops, call); + } + void* RegisterMethod(const char* method) override { + return channel_->RegisterMethod(method); + } + + void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, + gpr_timespec deadline, CompletionQueue* cq, + void* tag) override { + return channel_->NotifyOnStateChangeImpl(last_observed, deadline, cq, tag); + } + bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, + gpr_timespec deadline) override { + return channel_->WaitForStateChangeImpl(last_observed, deadline); + } + + CompletionQueue* CallbackCQ() override { return channel_->CallbackCQ(); } + + ChannelInterface* channel_; + int interceptor_pos_; + + friend class InterceptorBatchMethodsImpl; +}; +} // namespace internal +} // namespace grpc + +#endif // GRPCPP_IMPL_CODEGEN_INTERCEPTED_CHANNEL_H \ No newline at end of file diff --git a/include/grpcpp/impl/codegen/interceptor.h b/include/grpcpp/impl/codegen/interceptor.h index e66f684c29..2027fd69b1 100644 --- a/include/grpcpp/impl/codegen/interceptor.h +++ b/include/grpcpp/impl/codegen/interceptor.h @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -107,6 +108,11 @@ class InterceptorBatchMethods { // Returns a modifiable multimap of the received trailing metadata virtual std::multimap* GetRecvTrailingMetadata() = 0; + + // Gets an intercepted channel. When a call is started on this interceptor, + // only interceptors after the current interceptor are created from the + // factory objects registered with the channel. + virtual std::unique_ptr GetInterceptedChannel() = 0; }; class Interceptor { diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h index ad6f04260f..810c0bf35b 100644 --- a/include/grpcpp/impl/codegen/server_context.h +++ b/include/grpcpp/impl/codegen/server_context.h @@ -286,8 +286,12 @@ class ServerContext { uint32_t initial_metadata_flags() const { return 0; } experimental::ServerRpcInfo* set_server_rpc_info( - experimental::ServerRpcInfo info) { - rpc_info_ = std::move(info); + const char* method, + const std::vector< + std::unique_ptr>& + creators) { + rpc_info_ = experimental::ServerRpcInfo(this, method); + rpc_info_.RegisterInterceptors(creators); return &rpc_info_; } diff --git a/include/grpcpp/impl/codegen/server_interceptor.h b/include/grpcpp/impl/codegen/server_interceptor.h index 14b786cef9..3f8cbcca8d 100644 --- a/include/grpcpp/impl/codegen/server_interceptor.h +++ b/include/grpcpp/impl/codegen/server_interceptor.h @@ -45,15 +45,7 @@ class ServerInterceptorFactoryInterface { class ServerRpcInfo { public: ServerRpcInfo() {} - ServerRpcInfo(grpc::ServerContext* ctx, const char* method, - const std::vector>& creators) - : ctx_(ctx), method_(method) { - for (const auto& creator : creators) { - interceptors_.push_back(std::unique_ptr( - creator->CreateServerInterceptor(this))); - } - } + ~ServerRpcInfo(){}; ServerRpcInfo(const ServerRpcInfo&) = delete; @@ -74,11 +66,24 @@ class ServerRpcInfo { } private: + ServerRpcInfo(grpc::ServerContext* ctx, const char* method) + : ctx_(ctx), method_(method) {} + + void RegisterInterceptors( + const std::vector< + std::unique_ptr>& + creators) { + for (const auto& creator : creators) { + interceptors_.push_back(std::unique_ptr( + creator->CreateServerInterceptor(this))); + } + } grpc::ServerContext* ctx_ = nullptr; const char* method_ = nullptr; std::vector> interceptors_; friend class internal::InterceptorBatchMethodsImpl; + friend class grpc::ServerContext; }; } // namespace experimental diff --git a/include/grpcpp/impl/codegen/server_interface.h b/include/grpcpp/impl/codegen/server_interface.h index c83d4aa194..9b4177b641 100644 --- a/include/grpcpp/impl/codegen/server_interface.h +++ b/include/grpcpp/impl/codegen/server_interface.h @@ -192,8 +192,8 @@ class ServerInterface : public internal::CallHook { } call_wrapper_ = internal::Call( call_, server_, call_cq_, server_->max_receive_message_size(), - context_->set_server_rpc_info(experimental::ServerRpcInfo( - context_, name_, *server_->interceptor_creators()))); + context_->set_server_rpc_info(name_, + *server_->interceptor_creators())); return BaseAsyncRequest::FinalizeResult(tag, status); } @@ -272,8 +272,8 @@ class ServerInterface : public internal::CallHook { } call_wrapper_ = internal::Call( call_, server_, call_cq_, server_->max_receive_message_size(), - context_->set_server_rpc_info(experimental::ServerRpcInfo( - context_, name_, *server_->interceptor_creators()))); + context_->set_server_rpc_info(name_, + *server_->interceptor_creators())); /* Set interception point for recv message */ interceptor_methods_.AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_MESSAGE); -- cgit v1.2.3