aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/client
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/client')
-rw-r--r--src/cpp/client/channel.cc62
-rw-r--r--src/cpp/client/channel.h15
-rw-r--r--src/cpp/client/client_context.cc12
3 files changed, 75 insertions, 14 deletions
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index ee143d68a0..af7366eb01 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -48,6 +48,7 @@
#include <grpc++/impl/call.h>
#include <grpc++/impl/rpc_method.h>
#include <grpc++/status.h>
+#include <grpc++/time.h>
namespace grpc {
@@ -61,16 +62,18 @@ Channel::~Channel() { grpc_channel_destroy(c_channel_); }
Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) {
const char* host_str = host_.empty() ? NULL : host_.c_str();
- auto c_call =
- method.channel_tag() && context->authority().empty()
- ? grpc_channel_create_registered_call(c_channel_, cq->cq(),
- method.channel_tag(),
- context->raw_deadline())
- : grpc_channel_create_call(c_channel_, cq->cq(), method.name(),
- context->authority().empty()
- ? host_str
- : context->authority().c_str(),
- context->raw_deadline());
+ auto c_call = method.channel_tag() && context->authority().empty()
+ ? grpc_channel_create_registered_call(
+ c_channel_, context->propagate_from_call_,
+ context->propagation_options_.c_bitmask(), cq->cq(),
+ method.channel_tag(), context->raw_deadline())
+ : grpc_channel_create_call(
+ c_channel_, context->propagate_from_call_,
+ context->propagation_options_.c_bitmask(), cq->cq(),
+ method.name(), context->authority().empty()
+ ? host_str
+ : context->authority().c_str(),
+ context->raw_deadline());
grpc_census_call_set_context(c_call, context->census_context());
GRPC_TIMER_MARK(GRPC_PTAG_CPP_CALL_CREATED, c_call);
context->set_call(c_call, shared_from_this());
@@ -93,4 +96,43 @@ void* Channel::RegisterMethod(const char* method) {
host_.empty() ? NULL : host_.c_str());
}
+grpc_connectivity_state Channel::GetState(bool try_to_connect) {
+ return grpc_channel_check_connectivity_state(c_channel_, try_to_connect);
+}
+
+namespace {
+class TagSaver GRPC_FINAL : public CompletionQueueTag {
+ public:
+ explicit TagSaver(void* tag) : tag_(tag) {}
+ ~TagSaver() GRPC_OVERRIDE {}
+ bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
+ *tag = tag_;
+ delete this;
+ return true;
+ }
+ private:
+ void* tag_;
+};
+
+} // namespace
+
+void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
+ gpr_timespec deadline,
+ CompletionQueue* cq, void* tag) {
+ TagSaver* tag_saver = new TagSaver(tag);
+ grpc_channel_watch_connectivity_state(c_channel_, last_observed, deadline,
+ cq->cq(), tag_saver);
+}
+
+bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed,
+ gpr_timespec deadline) {
+ CompletionQueue cq;
+ bool ok = false;
+ void* tag = NULL;
+ NotifyOnStateChangeImpl(last_observed, deadline, &cq, NULL);
+ cq.Next(&tag, &ok);
+ GPR_ASSERT(tag == NULL);
+ return ok;
+}
+
} // namespace grpc
diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h
index 8660146856..cb8e8d98d2 100644
--- a/src/cpp/client/channel.h
+++ b/src/cpp/client/channel.h
@@ -56,13 +56,22 @@ class Channel GRPC_FINAL : public GrpcLibrary, public ChannelInterface {
Channel(const grpc::string& host, grpc_channel* c_channel);
~Channel() GRPC_OVERRIDE;
- virtual void* RegisterMethod(const char* method) GRPC_OVERRIDE;
- virtual Call CreateCall(const RpcMethod& method, ClientContext* context,
+ void* RegisterMethod(const char* method) GRPC_OVERRIDE;
+ Call CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) GRPC_OVERRIDE;
- virtual void PerformOpsOnCall(CallOpSetInterface* ops,
+ void PerformOpsOnCall(CallOpSetInterface* ops,
Call* call) GRPC_OVERRIDE;
+ grpc_connectivity_state GetState(bool try_to_connect) GRPC_OVERRIDE;
+
private:
+ void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
+ gpr_timespec deadline, CompletionQueue* cq,
+ void* tag) GRPC_OVERRIDE;
+
+ bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
+ gpr_timespec deadline) GRPC_OVERRIDE;
+
const grpc::string host_;
grpc_channel* const c_channel_; // owned
};
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index c38d0c1df6..1ed2d38961 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -37,6 +37,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
#include <grpc++/credentials.h>
+#include <grpc++/server_context.h>
#include <grpc++/time.h>
#include "src/core/channel/compress_filter.h"
@@ -48,7 +49,8 @@ ClientContext::ClientContext()
: initial_metadata_received_(false),
call_(nullptr),
cq_(nullptr),
- deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {}
+ deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)),
+ propagate_from_call_(nullptr) {}
ClientContext::~ClientContext() {
if (call_) {
@@ -64,6 +66,14 @@ ClientContext::~ClientContext() {
}
}
+std::unique_ptr<ClientContext> ClientContext::FromServerContext(
+ const ServerContext& context, PropagationOptions options) {
+ std::unique_ptr<ClientContext> ctx(new ClientContext);
+ ctx->propagate_from_call_ = context.call_;
+ ctx->propagation_options_ = options;
+ return ctx;
+}
+
void ClientContext::AddMetadata(const grpc::string& meta_key,
const grpc::string& meta_value) {
send_initial_metadata_.insert(std::make_pair(meta_key, meta_value));