diff options
Diffstat (limited to 'src/cpp/client/channel_cc.cc')
-rw-r--r-- | src/cpp/client/channel_cc.cc | 104 |
1 files changed, 86 insertions, 18 deletions
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 39b891c2e1..8e1cea0269 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -20,6 +20,7 @@ #include <chrono> #include <condition_variable> +#include <cstring> #include <memory> #include <mutex> @@ -32,6 +33,7 @@ #include <grpcpp/client_context.h> #include <grpcpp/completion_queue.h> #include <grpcpp/impl/call.h> +#include <grpcpp/impl/codegen/call_op_set.h> #include <grpcpp/impl/codegen/completion_queue_tag.h> #include <grpcpp/impl/grpc_library.h> #include <grpcpp/impl/rpc_method.h> @@ -42,21 +44,39 @@ #include <grpcpp/support/time.h> #include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/surface/completion_queue.h" namespace grpc { static internal::GrpcLibraryInitializer g_gli_initializer; -Channel::Channel(const grpc::string& host, grpc_channel* channel) +Channel::Channel( + const grpc::string& host, grpc_channel* channel, + std::unique_ptr<std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>> + interceptor_creators) : host_(host), c_channel_(channel) { + if (interceptor_creators != nullptr) { + interceptor_creators_ = std::move(*interceptor_creators); + } g_gli_initializer.summon(); } -Channel::~Channel() { grpc_channel_destroy(c_channel_); } +Channel::~Channel() { + grpc_channel_destroy(c_channel_); + if (callback_cq_ != nullptr) { + callback_cq_->Shutdown(); + } +} namespace { +inline grpc_slice SliceFromArray(const char* arr, size_t len) { + return g_core_codegen_interface->grpc_slice_from_copied_buffer(arr, len); +} + grpc::string GetChannelInfoField(grpc_channel* channel, grpc_channel_info* channel_info, char*** channel_info_field) { @@ -92,9 +112,10 @@ void ChannelResetConnectionBackoff(Channel* channel) { } // namespace experimental -internal::Call Channel::CreateCall(const internal::RpcMethod& method, - ClientContext* context, - CompletionQueue* cq) { +internal::Call Channel::CreateCallInternal(const internal::RpcMethod& method, + ClientContext* context, + CompletionQueue* cq, + size_t interceptor_pos) { const bool kRegistered = method.channel_tag() && context->authority().empty(); grpc_call* c_call = nullptr; if (kRegistered) { @@ -103,16 +124,17 @@ internal::Call Channel::CreateCall(const internal::RpcMethod& method, context->propagation_options_.c_bitmask(), cq->cq(), method.channel_tag(), context->raw_deadline(), nullptr); } else { - const char* host_str = nullptr; - if (!context->authority().empty()) { - host_str = context->authority_.c_str(); + const string* host_str = nullptr; + if (!context->authority_.empty()) { + host_str = &context->authority_; } else if (!host_.empty()) { - host_str = host_.c_str(); + host_str = &host_; } - grpc_slice method_slice = SliceFromCopiedString(method.name()); + grpc_slice method_slice = + SliceFromArray(method.name(), strlen(method.name())); grpc_slice host_slice; if (host_str != nullptr) { - host_slice = SliceFromCopiedString(host_str); + host_slice = SliceFromCopiedString(*host_str); } c_call = grpc_channel_create_call( c_channel_, context->propagate_from_call_, @@ -125,18 +147,27 @@ internal::Call Channel::CreateCall(const internal::RpcMethod& method, } } grpc_census_call_set_context(c_call, context->census_context()); + + // ClientRpcInfo should be set before call because set_call also checks + // whether the call has been cancelled, and if the call was cancelled, we + // should notify the interceptors too/ + auto* info = context->set_client_rpc_info( + method.name(), this, interceptor_creators_, interceptor_pos); context->set_call(c_call, shared_from_this()); - return internal::Call(c_call, this, cq); + + return internal::Call(c_call, this, cq, info); +} + +internal::Call Channel::CreateCall(const internal::RpcMethod& method, + ClientContext* context, + CompletionQueue* cq) { + return CreateCallInternal(method, context, cq, 0); } void Channel::PerformOpsOnCall(internal::CallOpSetInterface* ops, internal::Call* call) { - static const size_t MAX_OPS = 8; - size_t nops = 0; - grpc_op cops[MAX_OPS]; - ops->FillOps(call->call(), cops, &nops); - GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call->call(), cops, nops, ops, nullptr)); + ops->FillOps( + call); // Make a copy of call. It's fine since Call just has pointers } void* Channel::RegisterMethod(const char* method) { @@ -185,4 +216,41 @@ bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed, return ok; } +namespace { +class ShutdownCallback : public grpc_experimental_completion_queue_functor { + public: + ShutdownCallback() { functor_run = &ShutdownCallback::Run; } + // TakeCQ takes ownership of the cq into the shutdown callback + // so that the shutdown callback will be responsible for destroying it + void TakeCQ(CompletionQueue* cq) { cq_ = cq; } + + // The Run function will get invoked by the completion queue library + // when the shutdown is actually complete + static void Run(grpc_experimental_completion_queue_functor* cb, int) { + auto* callback = static_cast<ShutdownCallback*>(cb); + delete callback->cq_; + delete callback; + } + + private: + CompletionQueue* cq_ = nullptr; +}; +} // namespace + +CompletionQueue* Channel::CallbackCQ() { + // TODO(vjpai): Consider using a single global CQ for the default CQ + // if there is no explicit per-channel CQ registered + std::lock_guard<std::mutex> l(mu_); + if (callback_cq_ == nullptr) { + auto* shutdown_callback = new ShutdownCallback; + callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, + shutdown_callback}); + + // Transfer ownership of the new cq to its own shutdown callback + shutdown_callback->TakeCQ(callback_cq_); + } + return callback_cq_; +} + } // namespace grpc |