aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar vjpai <vpai@google.com>2015-03-16 23:58:44 -0700
committerGravatar vjpai <vpai@google.com>2015-03-17 00:39:18 -0700
commit7aadf46fdb05b5f595440c41998023e69e1700e5 (patch)
tree549b9c05238f155657f4cf3d87bfbdd7c45d682b
parentc41bf3cb5a497ee2a6ab9a9b959587a4e56328ed (diff)
Change to std::chrono and add a test.
-rw-r--r--include/grpc++/completion_queue.h9
-rw-r--r--src/cpp/common/completion_queue.cc9
-rw-r--r--test/cpp/end2end/async_end2end_test.cc58
3 files changed, 70 insertions, 6 deletions
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h
index 4181911b58..d742d85ef7 100644
--- a/include/grpc++/completion_queue.h
+++ b/include/grpc++/completion_queue.h
@@ -34,7 +34,7 @@
#ifndef GRPCXX_COMPLETION_QUEUE_H
#define GRPCXX_COMPLETION_QUEUE_H
-#include <grpc/support/time.h>
+#include <chrono>
#include <grpc++/impl/client_unary_call.h>
struct grpc_completion_queue;
@@ -81,12 +81,15 @@ class CompletionQueue {
// Nonblocking (until deadline) read from queue.
// Cannot rely on result of tag or ok if return is TIMEOUT
- NextStatus AsyncNext(void **tag, bool *ok, gpr_timespec deadline);
+ NextStatus AsyncNext(void **tag, bool *ok,
+ std::chrono::system_clock::time_point deadline);
// Blocking (until deadline) read from queue.
// Returns false if the queue is ready for destruction, true if event
bool Next(void **tag, bool *ok) {
- return (AsyncNext(tag,ok,gpr_inf_future) != SHUTDOWN);
+ return (AsyncNext(tag,ok,
+ std::chrono::system_clock::time_point::max()) !=
+ SHUTDOWN);
}
// Shutdown has to be called, and the CompletionQueue can only be
diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc
index 2913298afe..fede2da016 100644
--- a/src/cpp/common/completion_queue.cc
+++ b/src/cpp/common/completion_queue.cc
@@ -57,12 +57,15 @@ class EventDeleter {
}
};
-CompletionQueue::NextStatus CompletionQueue::AsyncNext(void** tag, bool* ok,
- gpr_timespec deadline) {
+CompletionQueue::NextStatus
+CompletionQueue::AsyncNext(void** tag, bool* ok,
+ std::chrono::system_clock::time_point deadline) {
std::unique_ptr<grpc_event, EventDeleter> ev;
+ gpr_timespec gpr_deadline;
+ Timepoint2Timespec(deadline, &gpr_deadline);
for (;;) {
- ev.reset(grpc_completion_queue_next(cq_, deadline));
+ ev.reset(grpc_completion_queue_next(cq_, gpr_deadline));
if (!ev) { /* got a NULL back because deadline passed */
return TIMEOUT;
}
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 70df9e14b2..e011b788ff 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -76,6 +76,20 @@ void verify_ok(CompletionQueue* cq, int i, bool expect_ok) {
EXPECT_EQ(tag(i), got_tag);
}
+void verify_timed_ok(CompletionQueue* cq, int i, bool expect_ok,
+ std::chrono::system_clock::time_point deadline =
+ std::chrono::system_clock::time_point::max(),
+ CompletionQueue::NextStatus expected_outcome =
+ CompletionQueue::GOT_EVENT) {
+ bool ok;
+ void* got_tag;
+ EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), expected_outcome);
+ if (expected_outcome == CompletionQueue::GOT_EVENT) {
+ EXPECT_EQ(expect_ok, ok);
+ EXPECT_EQ(tag(i), got_tag);
+ }
+}
+
class AsyncEnd2endTest : public ::testing::Test {
protected:
AsyncEnd2endTest() : service_(&srv_cq_) {}
@@ -166,6 +180,50 @@ TEST_F(AsyncEnd2endTest, SequentialRpcs) {
SendRpc(10);
}
+// Test a simple RPC using the async version of Next
+TEST_F(AsyncEnd2endTest, AsyncNextRpc) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
+
+ send_request.set_message("Hello");
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse> >
+ response_reader(stub_->AsyncEcho(&cli_ctx, send_request,
+ &cli_cq_, tag(1)));
+
+ std::chrono::system_clock::time_point
+ time_now(std::chrono::system_clock::now()),
+ time_limit(std::chrono::system_clock::now()+std::chrono::seconds(5));
+ verify_timed_ok(&srv_cq_, -1, true, time_now, CompletionQueue::TIMEOUT);
+ verify_timed_ok(&cli_cq_, -1, true, time_now, CompletionQueue::TIMEOUT);
+
+ service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
+ tag(2));
+
+ verify_timed_ok(&srv_cq_, 2, true, time_limit);
+ EXPECT_EQ(send_request.message(), recv_request.message());
+ verify_timed_ok(&cli_cq_, 1, true, time_limit);
+
+ send_response.set_message(recv_request.message());
+ response_writer.Finish(send_response, Status::OK, tag(3));
+ verify_timed_ok(&srv_cq_, 3, true);
+
+ response_reader->Finish(&recv_response, &recv_status, tag(4));
+ verify_timed_ok(&cli_cq_, 4, true);
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.IsOk());
+
+}
+
// Two pings and a final pong.
TEST_F(AsyncEnd2endTest, SimpleClientStreaming) {
ResetStub();