aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2018-09-11 17:01:19 -0700
committerGravatar Vijay Pai <vpai@google.com>2018-09-13 09:54:41 -0700
commitdb01bf793aeab78b8b8d85686977240afb56a536 (patch)
tree6f86588cd95dacbd0ed0bbe48f55ce26a6723fe2
parent8bce2a643b9d49413b537933fd433c831f4d85fc (diff)
Add callback-based alarms
-rw-r--r--include/grpcpp/alarm.h24
-rw-r--r--src/cpp/common/alarm.cc47
-rw-r--r--test/cpp/common/alarm_test.cc106
3 files changed, 165 insertions, 12 deletions
diff --git a/include/grpcpp/alarm.h b/include/grpcpp/alarm.h
index f484610a6e..f9008c327e 100644
--- a/include/grpcpp/alarm.h
+++ b/include/grpcpp/alarm.h
@@ -21,6 +21,8 @@
#ifndef GRPCPP_ALARM_H
#define GRPCPP_ALARM_H
+#include <functional>
+
#include <grpc/grpc.h>
#include <grpcpp/impl/codegen/completion_queue.h>
#include <grpcpp/impl/codegen/completion_queue_tag.h>
@@ -76,8 +78,30 @@ class Alarm : private GrpcLibraryCodegen {
/// has already fired has no effect.
void Cancel();
+ /// NOTE: class experimental_type is not part of the public API of this class
+ /// TODO(vjpai): Move these contents to the public API of Alarm when
+ /// they are no longer experimental
+ class experimental_type {
+ public:
+ explicit experimental_type(Alarm* alarm) : alarm_(alarm) {}
+
+ template <typename T>
+ void Set(const T& deadline, std::function<void(bool)> f) {
+ alarm_->SetInternal(TimePoint<T>(deadline).raw_time(), std::move(f));
+ }
+
+ private:
+ Alarm* alarm_;
+ };
+
+ /// NOTE: The function experimental() is not stable public API. It is a view
+ /// to the experimental components of this class. It may be changed or removed
+ /// at any time.
+ experimental_type experimental() { return experimental_type(this); }
+
private:
void SetInternal(CompletionQueue* cq, gpr_timespec deadline, void* tag);
+ void SetInternal(gpr_timespec deadline, std::function<void(bool)> f);
internal::CompletionQueueTag* alarm_;
};
diff --git a/src/cpp/common/alarm.cc b/src/cpp/common/alarm.cc
index 15a373d8a5..5819a4210b 100644
--- a/src/cpp/common/alarm.cc
+++ b/src/cpp/common/alarm.cc
@@ -39,17 +39,6 @@ class AlarmImpl : public CompletionQueueTag {
AlarmImpl() : cq_(nullptr), tag_(nullptr) {
gpr_ref_init(&refs_, 1);
grpc_timer_init_unset(&timer_);
- GRPC_CLOSURE_INIT(&on_alarm_,
- [](void* arg, grpc_error* error) {
- // queue the op on the completion queue
- AlarmImpl* alarm = static_cast<AlarmImpl*>(arg);
- alarm->Ref();
- grpc_cq_end_op(
- alarm->cq_, alarm, error,
- [](void* arg, grpc_cq_completion* completion) {},
- arg, &alarm->completion_);
- },
- this, grpc_schedule_on_exec_ctx);
}
~AlarmImpl() {
grpc_core::ExecCtx exec_ctx;
@@ -68,6 +57,32 @@ class AlarmImpl : public CompletionQueueTag {
cq_ = cq->cq();
tag_ = tag;
GPR_ASSERT(grpc_cq_begin_op(cq_, this));
+ GRPC_CLOSURE_INIT(&on_alarm_,
+ [](void* arg, grpc_error* error) {
+ // queue the op on the completion queue
+ AlarmImpl* alarm = static_cast<AlarmImpl*>(arg);
+ alarm->Ref();
+ grpc_cq_end_op(
+ alarm->cq_, alarm, error,
+ [](void* arg, grpc_cq_completion* completion) {},
+ arg, &alarm->completion_);
+ },
+ this, grpc_schedule_on_exec_ctx);
+ grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline),
+ &on_alarm_);
+ }
+ void Set(gpr_timespec deadline, std::function<void(bool)> f) {
+ grpc_core::ExecCtx exec_ctx;
+ // Don't use any CQ at all. Instead just use the timer to fire the function
+ callback_ = std::move(f);
+ Ref();
+ GRPC_CLOSURE_INIT(&on_alarm_,
+ [](void* arg, grpc_error* error) {
+ AlarmImpl* alarm = static_cast<AlarmImpl*>(arg);
+ alarm->callback_(error == GRPC_ERROR_NONE);
+ alarm->Unref();
+ },
+ this, grpc_schedule_on_exec_ctx);
grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline),
&on_alarm_);
}
@@ -95,6 +110,7 @@ class AlarmImpl : public CompletionQueueTag {
// completion queue where events about this alarm will be posted
grpc_completion_queue* cq_;
void* tag_;
+ std::function<void(bool)> callback_;
};
} // namespace internal
@@ -113,6 +129,15 @@ void Alarm::SetInternal(CompletionQueue* cq, gpr_timespec deadline, void* tag) {
static_cast<internal::AlarmImpl*>(alarm_)->Set(cq, deadline, tag);
}
+void Alarm::SetInternal(gpr_timespec deadline, std::function<void(bool)> f) {
+ // Note that we know that alarm_ is actually an internal::AlarmImpl
+ // but we declared it as the base pointer to avoid a forward declaration
+ // or exposing core data structures in the C++ public headers.
+ // Thus it is safe to use a static_cast to the subclass here, and the
+ // C++ style guide allows us to do so in this case
+ static_cast<internal::AlarmImpl*>(alarm_)->Set(deadline, std::move(f));
+}
+
Alarm::~Alarm() {
if (alarm_ != nullptr) {
static_cast<internal::AlarmImpl*>(alarm_)->Destroy();
diff --git a/test/cpp/common/alarm_test.cc b/test/cpp/common/alarm_test.cc
index 57d958349e..e909d03658 100644
--- a/test/cpp/common/alarm_test.cc
+++ b/test/cpp/common/alarm_test.cc
@@ -16,9 +16,13 @@
*
*/
+#include <condition_variable>
+#include <memory>
+#include <mutex>
+#include <thread>
+
#include <grpcpp/alarm.h>
#include <grpcpp/completion_queue.h>
-#include <thread>
#include <gtest/gtest.h>
@@ -43,6 +47,66 @@ TEST(AlarmTest, RegularExpiry) {
EXPECT_EQ(junk, output_tag);
}
+struct Completion {
+ bool completed = false;
+ std::mutex mu;
+ std::condition_variable cv;
+};
+
+TEST(AlarmTest, CallbackRegularExpiry) {
+ Alarm alarm;
+
+ auto c = std::make_shared<Completion>();
+ alarm.experimental().Set(
+ std::chrono::system_clock::now() + std::chrono::seconds(1), [c](bool ok) {
+ EXPECT_TRUE(ok);
+ std::lock_guard<std::mutex> l(c->mu);
+ c->completed = true;
+ c->cv.notify_one();
+ });
+
+ std::unique_lock<std::mutex> l(c->mu);
+ EXPECT_TRUE(c->cv.wait_until(
+ l, std::chrono::system_clock::now() + std::chrono::seconds(10),
+ [c] { return c->completed; }));
+}
+
+TEST(AlarmTest, CallbackZeroExpiry) {
+ Alarm alarm;
+
+ auto c = std::make_shared<Completion>();
+ alarm.experimental().Set(grpc_timeout_seconds_to_deadline(0), [c](bool ok) {
+ EXPECT_TRUE(ok);
+ std::lock_guard<std::mutex> l(c->mu);
+ c->completed = true;
+ c->cv.notify_one();
+ });
+
+ std::unique_lock<std::mutex> l(c->mu);
+ EXPECT_TRUE(c->cv.wait_until(
+ l, std::chrono::system_clock::now() + std::chrono::seconds(10),
+ [c] { return c->completed; }));
+}
+
+TEST(AlarmTest, CallbackNegativeExpiry) {
+ Alarm alarm;
+
+ auto c = std::make_shared<Completion>();
+ alarm.experimental().Set(
+ std::chrono::system_clock::now() + std::chrono::seconds(-1),
+ [c](bool ok) {
+ EXPECT_TRUE(ok);
+ std::lock_guard<std::mutex> l(c->mu);
+ c->completed = true;
+ c->cv.notify_one();
+ });
+
+ std::unique_lock<std::mutex> l(c->mu);
+ EXPECT_TRUE(c->cv.wait_until(
+ l, std::chrono::system_clock::now() + std::chrono::seconds(10),
+ [c] { return c->completed; }));
+}
+
TEST(AlarmTest, MultithreadedRegularExpiry) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
@@ -182,6 +246,26 @@ TEST(AlarmTest, Cancellation) {
EXPECT_EQ(junk, output_tag);
}
+TEST(AlarmTest, CallbackCancellation) {
+ Alarm alarm;
+
+ auto c = std::make_shared<Completion>();
+ alarm.experimental().Set(
+ std::chrono::system_clock::now() + std::chrono::seconds(10),
+ [c](bool ok) {
+ EXPECT_FALSE(ok);
+ std::lock_guard<std::mutex> l(c->mu);
+ c->completed = true;
+ c->cv.notify_one();
+ });
+ alarm.Cancel();
+
+ std::unique_lock<std::mutex> l(c->mu);
+ EXPECT_TRUE(c->cv.wait_until(
+ l, std::chrono::system_clock::now() + std::chrono::seconds(1),
+ [c] { return c->completed; }));
+}
+
TEST(AlarmTest, SetDestruction) {
CompletionQueue cq;
void* junk = reinterpret_cast<void*>(1618033);
@@ -200,6 +284,26 @@ TEST(AlarmTest, SetDestruction) {
EXPECT_EQ(junk, output_tag);
}
+TEST(AlarmTest, CallbackSetDestruction) {
+ auto c = std::make_shared<Completion>();
+ {
+ Alarm alarm;
+ alarm.experimental().Set(
+ std::chrono::system_clock::now() + std::chrono::seconds(10),
+ [c](bool ok) {
+ EXPECT_FALSE(ok);
+ std::lock_guard<std::mutex> l(c->mu);
+ c->completed = true;
+ c->cv.notify_one();
+ });
+ }
+
+ std::unique_lock<std::mutex> l(c->mu);
+ EXPECT_TRUE(c->cv.wait_until(
+ l, std::chrono::system_clock::now() + std::chrono::seconds(1),
+ [c] { return c->completed; }));
+}
+
TEST(AlarmTest, UnsetDestruction) {
CompletionQueue cq;
Alarm alarm;