aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/client/channel_cc.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/client/channel_cc.cc')
-rw-r--r--src/cpp/client/channel_cc.cc104
1 files changed, 86 insertions, 18 deletions
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index 39b891c2e1..8e1cea0269 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -20,6 +20,7 @@
#include <chrono>
#include <condition_variable>
+#include <cstring>
#include <memory>
#include <mutex>
@@ -32,6 +33,7 @@
#include <grpcpp/client_context.h>
#include <grpcpp/completion_queue.h>
#include <grpcpp/impl/call.h>
+#include <grpcpp/impl/codegen/call_op_set.h>
#include <grpcpp/impl/codegen/completion_queue_tag.h>
#include <grpcpp/impl/grpc_library.h>
#include <grpcpp/impl/rpc_method.h>
@@ -42,21 +44,39 @@
#include <grpcpp/support/time.h>
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/surface/completion_queue.h"
namespace grpc {
static internal::GrpcLibraryInitializer g_gli_initializer;
-Channel::Channel(const grpc::string& host, grpc_channel* channel)
+Channel::Channel(
+ const grpc::string& host, grpc_channel* channel,
+ std::unique_ptr<std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>
+ interceptor_creators)
: host_(host), c_channel_(channel) {
+ if (interceptor_creators != nullptr) {
+ interceptor_creators_ = std::move(*interceptor_creators);
+ }
g_gli_initializer.summon();
}
-Channel::~Channel() { grpc_channel_destroy(c_channel_); }
+Channel::~Channel() {
+ grpc_channel_destroy(c_channel_);
+ if (callback_cq_ != nullptr) {
+ callback_cq_->Shutdown();
+ }
+}
namespace {
+inline grpc_slice SliceFromArray(const char* arr, size_t len) {
+ return g_core_codegen_interface->grpc_slice_from_copied_buffer(arr, len);
+}
+
grpc::string GetChannelInfoField(grpc_channel* channel,
grpc_channel_info* channel_info,
char*** channel_info_field) {
@@ -92,9 +112,10 @@ void ChannelResetConnectionBackoff(Channel* channel) {
} // namespace experimental
-internal::Call Channel::CreateCall(const internal::RpcMethod& method,
- ClientContext* context,
- CompletionQueue* cq) {
+internal::Call Channel::CreateCallInternal(const internal::RpcMethod& method,
+ ClientContext* context,
+ CompletionQueue* cq,
+ size_t interceptor_pos) {
const bool kRegistered = method.channel_tag() && context->authority().empty();
grpc_call* c_call = nullptr;
if (kRegistered) {
@@ -103,16 +124,17 @@ internal::Call Channel::CreateCall(const internal::RpcMethod& method,
context->propagation_options_.c_bitmask(), cq->cq(),
method.channel_tag(), context->raw_deadline(), nullptr);
} else {
- const char* host_str = nullptr;
- if (!context->authority().empty()) {
- host_str = context->authority_.c_str();
+ const string* host_str = nullptr;
+ if (!context->authority_.empty()) {
+ host_str = &context->authority_;
} else if (!host_.empty()) {
- host_str = host_.c_str();
+ host_str = &host_;
}
- grpc_slice method_slice = SliceFromCopiedString(method.name());
+ grpc_slice method_slice =
+ SliceFromArray(method.name(), strlen(method.name()));
grpc_slice host_slice;
if (host_str != nullptr) {
- host_slice = SliceFromCopiedString(host_str);
+ host_slice = SliceFromCopiedString(*host_str);
}
c_call = grpc_channel_create_call(
c_channel_, context->propagate_from_call_,
@@ -125,18 +147,27 @@ internal::Call Channel::CreateCall(const internal::RpcMethod& method,
}
}
grpc_census_call_set_context(c_call, context->census_context());
+
+ // ClientRpcInfo should be set before call because set_call also checks
+ // whether the call has been cancelled, and if the call was cancelled, we
+ // should notify the interceptors too/
+ auto* info = context->set_client_rpc_info(
+ method.name(), this, interceptor_creators_, interceptor_pos);
context->set_call(c_call, shared_from_this());
- return internal::Call(c_call, this, cq);
+
+ return internal::Call(c_call, this, cq, info);
+}
+
+internal::Call Channel::CreateCall(const internal::RpcMethod& method,
+ ClientContext* context,
+ CompletionQueue* cq) {
+ return CreateCallInternal(method, context, cq, 0);
}
void Channel::PerformOpsOnCall(internal::CallOpSetInterface* ops,
internal::Call* call) {
- static const size_t MAX_OPS = 8;
- size_t nops = 0;
- grpc_op cops[MAX_OPS];
- ops->FillOps(call->call(), cops, &nops);
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_batch(call->call(), cops, nops, ops, nullptr));
+ ops->FillOps(
+ call); // Make a copy of call. It's fine since Call just has pointers
}
void* Channel::RegisterMethod(const char* method) {
@@ -185,4 +216,41 @@ bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed,
return ok;
}
+namespace {
+class ShutdownCallback : public grpc_experimental_completion_queue_functor {
+ public:
+ ShutdownCallback() { functor_run = &ShutdownCallback::Run; }
+ // TakeCQ takes ownership of the cq into the shutdown callback
+ // so that the shutdown callback will be responsible for destroying it
+ void TakeCQ(CompletionQueue* cq) { cq_ = cq; }
+
+ // The Run function will get invoked by the completion queue library
+ // when the shutdown is actually complete
+ static void Run(grpc_experimental_completion_queue_functor* cb, int) {
+ auto* callback = static_cast<ShutdownCallback*>(cb);
+ delete callback->cq_;
+ delete callback;
+ }
+
+ private:
+ CompletionQueue* cq_ = nullptr;
+};
+} // namespace
+
+CompletionQueue* Channel::CallbackCQ() {
+ // TODO(vjpai): Consider using a single global CQ for the default CQ
+ // if there is no explicit per-channel CQ registered
+ std::lock_guard<std::mutex> l(mu_);
+ if (callback_cq_ == nullptr) {
+ auto* shutdown_callback = new ShutdownCallback;
+ callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
+ shutdown_callback});
+
+ // Transfer ownership of the new cq to its own shutdown callback
+ shutdown_callback->TakeCQ(callback_cq_);
+ }
+ return callback_cq_;
+}
+
} // namespace grpc