aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/common
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/common')
-rw-r--r--src/cpp/common/alarm.cc47
-rw-r--r--src/cpp/common/callback_common.cc149
2 files changed, 36 insertions, 160 deletions
diff --git a/src/cpp/common/alarm.cc b/src/cpp/common/alarm.cc
index 15a373d8a5..5819a4210b 100644
--- a/src/cpp/common/alarm.cc
+++ b/src/cpp/common/alarm.cc
@@ -39,17 +39,6 @@ class AlarmImpl : public CompletionQueueTag {
AlarmImpl() : cq_(nullptr), tag_(nullptr) {
gpr_ref_init(&refs_, 1);
grpc_timer_init_unset(&timer_);
- GRPC_CLOSURE_INIT(&on_alarm_,
- [](void* arg, grpc_error* error) {
- // queue the op on the completion queue
- AlarmImpl* alarm = static_cast<AlarmImpl*>(arg);
- alarm->Ref();
- grpc_cq_end_op(
- alarm->cq_, alarm, error,
- [](void* arg, grpc_cq_completion* completion) {},
- arg, &alarm->completion_);
- },
- this, grpc_schedule_on_exec_ctx);
}
~AlarmImpl() {
grpc_core::ExecCtx exec_ctx;
@@ -68,6 +57,32 @@ class AlarmImpl : public CompletionQueueTag {
cq_ = cq->cq();
tag_ = tag;
GPR_ASSERT(grpc_cq_begin_op(cq_, this));
+ GRPC_CLOSURE_INIT(&on_alarm_,
+ [](void* arg, grpc_error* error) {
+ // queue the op on the completion queue
+ AlarmImpl* alarm = static_cast<AlarmImpl*>(arg);
+ alarm->Ref();
+ grpc_cq_end_op(
+ alarm->cq_, alarm, error,
+ [](void* arg, grpc_cq_completion* completion) {},
+ arg, &alarm->completion_);
+ },
+ this, grpc_schedule_on_exec_ctx);
+ grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline),
+ &on_alarm_);
+ }
+ void Set(gpr_timespec deadline, std::function<void(bool)> f) {
+ grpc_core::ExecCtx exec_ctx;
+ // Don't use any CQ at all. Instead just use the timer to fire the function
+ callback_ = std::move(f);
+ Ref();
+ GRPC_CLOSURE_INIT(&on_alarm_,
+ [](void* arg, grpc_error* error) {
+ AlarmImpl* alarm = static_cast<AlarmImpl*>(arg);
+ alarm->callback_(error == GRPC_ERROR_NONE);
+ alarm->Unref();
+ },
+ this, grpc_schedule_on_exec_ctx);
grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline),
&on_alarm_);
}
@@ -95,6 +110,7 @@ class AlarmImpl : public CompletionQueueTag {
// completion queue where events about this alarm will be posted
grpc_completion_queue* cq_;
void* tag_;
+ std::function<void(bool)> callback_;
};
} // namespace internal
@@ -113,6 +129,15 @@ void Alarm::SetInternal(CompletionQueue* cq, gpr_timespec deadline, void* tag) {
static_cast<internal::AlarmImpl*>(alarm_)->Set(cq, deadline, tag);
}
+void Alarm::SetInternal(gpr_timespec deadline, std::function<void(bool)> f) {
+ // Note that we know that alarm_ is actually an internal::AlarmImpl
+ // but we declared it as the base pointer to avoid a forward declaration
+ // or exposing core data structures in the C++ public headers.
+ // Thus it is safe to use a static_cast to the subclass here, and the
+ // C++ style guide allows us to do so in this case
+ static_cast<internal::AlarmImpl*>(alarm_)->Set(deadline, std::move(f));
+}
+
Alarm::~Alarm() {
if (alarm_ != nullptr) {
static_cast<internal::AlarmImpl*>(alarm_)->Destroy();
diff --git a/src/cpp/common/callback_common.cc b/src/cpp/common/callback_common.cc
deleted file mode 100644
index fa586286d1..0000000000
--- a/src/cpp/common/callback_common.cc
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <functional>
-
-#include <grpcpp/impl/codegen/callback_common.h>
-#include <grpcpp/impl/codegen/status.h>
-
-#include "src/core/lib/gprpp/memory.h"
-#include "src/core/lib/surface/completion_queue.h"
-
-namespace grpc {
-namespace internal {
-namespace {
-
-template <class Func, class Arg>
-void CatchingCallback(Func&& func, Arg&& arg) {
-#if GRPC_ALLOW_EXCEPTIONS
- try {
- func(arg);
- } catch (...) {
- // nothing to return or change here, just don't crash the library
- }
-#else // GRPC_ALLOW_EXCEPTIONS
- func(arg);
-#endif // GRPC_ALLOW_EXCEPTIONS
-}
-
-class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface {
- public:
- static void operator delete(void* ptr, std::size_t size) {
- assert(size == sizeof(CallbackWithSuccessImpl));
- }
-
- // This operator should never be called as the memory should be freed as part
- // of the arena destruction. It only exists to provide a matching operator
- // delete to the operator new so that some compilers will not complain (see
- // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
- // there are no tests catching the compiler warning.
- static void operator delete(void*, void*) { assert(0); }
-
- CallbackWithSuccessImpl(grpc_call* call, CallbackWithSuccessTag* parent,
- std::function<void(bool)> f)
- : call_(call), parent_(parent), func_(std::move(f)) {
- grpc_call_ref(call);
- }
-
- void Run(bool ok) override {
- void* ignored = parent_->ops();
- bool new_ok = ok;
- GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &new_ok));
- GPR_ASSERT(ignored == parent_->ops());
-
- // Last use of func_ or ok, so ok to move them out for rvalue call above
- CatchingCallback(std::move(func_), std::move(ok));
-
- func_ = nullptr; // reset to clear this out for sure
- grpc_call_unref(call_);
- }
-
- private:
- grpc_call* call_;
- CallbackWithSuccessTag* parent_;
- std::function<void(bool)> func_;
-};
-
-class CallbackWithStatusImpl : public grpc_core::CQCallbackInterface {
- public:
- static void operator delete(void* ptr, std::size_t size) {
- assert(size == sizeof(CallbackWithStatusImpl));
- }
-
- // This operator should never be called as the memory should be freed as part
- // of the arena destruction. It only exists to provide a matching operator
- // delete to the operator new so that some compilers will not complain (see
- // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
- // there are no tests catching the compiler warning.
- static void operator delete(void*, void*) { assert(0); }
-
- CallbackWithStatusImpl(grpc_call* call, CallbackWithStatusTag* parent,
- std::function<void(Status)> f)
- : call_(call), parent_(parent), func_(std::move(f)), status_() {
- grpc_call_ref(call);
- }
-
- void Run(bool ok) override {
- void* ignored = parent_->ops();
-
- GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &ok));
- GPR_ASSERT(ignored == parent_->ops());
-
- // Last use of func_ or status_, so ok to move them out
- CatchingCallback(std::move(func_), std::move(status_));
-
- func_ = nullptr; // reset to clear this out for sure
- grpc_call_unref(call_);
- }
- Status* status_ptr() { return &status_; }
-
- private:
- grpc_call* call_;
- CallbackWithStatusTag* parent_;
- std::function<void(Status)> func_;
- Status status_;
-};
-
-} // namespace
-
-CallbackWithSuccessTag::CallbackWithSuccessTag(grpc_call* call,
- std::function<void(bool)> f,
- CompletionQueueTag* ops)
- : impl_(new (grpc_call_arena_alloc(call, sizeof(CallbackWithSuccessImpl)))
- CallbackWithSuccessImpl(call, this, std::move(f))),
- ops_(ops) {}
-
-void CallbackWithSuccessTag::force_run(bool ok) { impl_->Run(ok); }
-
-CallbackWithStatusTag::CallbackWithStatusTag(grpc_call* call,
- std::function<void(Status)> f,
- CompletionQueueTag* ops)
- : ops_(ops) {
- auto* impl = new (grpc_call_arena_alloc(call, sizeof(CallbackWithStatusImpl)))
- CallbackWithStatusImpl(call, this, std::move(f));
- impl_ = impl;
- status_ = impl->status_ptr();
-}
-
-void CallbackWithStatusTag::force_run(Status s) {
- *status_ = std::move(s);
- impl_->Run(true);
-}
-
-} // namespace internal
-} // namespace grpc