aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/cpp/client/channel.cc61
-rw-r--r--test/cpp/end2end/end2end_test.cc24
2 files changed, 66 insertions, 19 deletions
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index 6696f19d76..1bdb9ae0de 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -98,44 +98,69 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) {
return grpc_channel_check_connectivity_state(c_channel_, try_to_connect);
}
-void Channel::NotifyOnStateChange(grpc_connectivity_state last_observed,
- gpr_timespec deadline,
- CompletionQueue* cq, void* tag) {
- grpc_channel_watch_connectivity_state(c_channel_, last_observed, deadline,
- cq->cq(), tag);
+namespace {
+class TagSaver GRPC_FINAL : public CompletionQueueTag {
+ public:
+ explicit TagSaver(void* tag) : tag_(tag) {}
+ ~TagSaver() GRPC_OVERRIDE {}
+ bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
+ *tag = tag_;
+ delete this;
+ return true;
+ }
+ private:
+ void* tag_;
+};
+
+template <typename T>
+void NotifyOnStateChangeShared(grpc_channel* channel,
+ grpc_connectivity_state last_observed,
+ const T& deadline,
+ CompletionQueue* cq, void* tag) {
+ TimePoint<T> deadline_tp(deadline);
+ TagSaver* tag_saver = new TagSaver(tag);
+ grpc_channel_watch_connectivity_state(
+ channel, last_observed, deadline_tp.raw_time(), cq->cq(), tag_saver);
}
-bool Channel::WaitForStateChange(grpc_connectivity_state last_observed,
- gpr_timespec deadline) {
+template <typename T>
+bool WaitForStateChangeShared(grpc_channel* channel,
+ grpc_connectivity_state last_observed,
+ const T& deadline) {
CompletionQueue cq;
bool ok = false;
void* tag = NULL;
- NotifyOnStateChange(last_observed, deadline, &cq, NULL);
+ NotifyOnStateChangeShared(channel, last_observed, deadline, &cq, NULL);
cq.Next(&tag, &ok);
GPR_ASSERT(tag == NULL);
return ok;
}
+} // namespace
+
+void Channel::NotifyOnStateChange(grpc_connectivity_state last_observed,
+ gpr_timespec deadline,
+ CompletionQueue* cq, void* tag) {
+ NotifyOnStateChangeShared(c_channel_, last_observed, deadline, cq, tag);
+}
+
+bool Channel::WaitForStateChange(grpc_connectivity_state last_observed,
+ gpr_timespec deadline) {
+ return WaitForStateChangeShared(c_channel_, last_observed, deadline);
+}
+
#ifndef GRPC_CXX0X_NO_CHRONO
void Channel::NotifyOnStateChange(
grpc_connectivity_state last_observed,
const std::chrono::system_clock::time_point& deadline,
CompletionQueue* cq, void* tag) {
- TimePoint<std::chrono::system_clock::time_point> deadline_tp(deadline);
- grpc_channel_watch_connectivity_state(c_channel_, last_observed,
- deadline_tp.raw_time(), cq->cq(), tag);
+ NotifyOnStateChangeShared(c_channel_, last_observed, deadline, cq, tag);
}
bool Channel::WaitForStateChange(
grpc_connectivity_state last_observed,
const std::chrono::system_clock::time_point& deadline) {
- CompletionQueue cq;
- bool ok = false;
- void* tag = NULL;
- NotifyOnStateChange(last_observed, deadline, &cq, NULL);
- cq.Next(&tag, &ok);
- GPR_ASSERT(tag == NULL);
- return ok;
+ return WaitForStateChangeShared(c_channel_, last_observed, deadline);
}
#endif // !GRPC_CXX0X_NO_CHRONO
} // namespace grpc
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 3144ca4dc7..8963382a87 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -831,7 +831,8 @@ TEST_F(End2endTest, HugeResponse) {
}
namespace {
-void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream, gpr_event *ev) {
+void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream,
+ gpr_event *ev) {
EchoResponse resp;
gpr_event_set(ev, (void*)1);
while (stream->Read(&resp)) {
@@ -870,6 +871,27 @@ TEST_F(End2endTest, Peer) {
EXPECT_TRUE(CheckIsLocalhost(context.peer()));
}
+TEST_F(End2endTest, ChannelState) {
+ ResetStub();
+ // Start IDLE
+ EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
+
+ CompletionQueue cq;
+ std::chrono::system_clock::time_point deadline =
+ std::chrono::system_clock::now() + std::chrono::milliseconds(10);
+ // No state change.
+ channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, NULL);
+ void* tag;
+ bool ok = true;
+ cq.Next(&tag, &ok);
+ EXPECT_FALSE(ok);
+
+ EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(true));
+ EXPECT_TRUE(channel_->WaitForStateChange(
+ GRPC_CHANNEL_IDLE, gpr_inf_future(GPR_CLOCK_REALTIME)));
+ EXPECT_EQ(GRPC_CHANNEL_CONNECTING, channel_->GetState(false));
+}
+
} // namespace testing
} // namespace grpc