diff options
Diffstat (limited to 'src/cpp/common')
-rw-r--r-- | src/cpp/common/alarm.cc | 47 | ||||
-rw-r--r-- | src/cpp/common/callback_common.cc | 149 |
2 files changed, 36 insertions, 160 deletions
diff --git a/src/cpp/common/alarm.cc b/src/cpp/common/alarm.cc index 15a373d8a5..5819a4210b 100644 --- a/src/cpp/common/alarm.cc +++ b/src/cpp/common/alarm.cc @@ -39,17 +39,6 @@ class AlarmImpl : public CompletionQueueTag { AlarmImpl() : cq_(nullptr), tag_(nullptr) { gpr_ref_init(&refs_, 1); grpc_timer_init_unset(&timer_); - GRPC_CLOSURE_INIT(&on_alarm_, - [](void* arg, grpc_error* error) { - // queue the op on the completion queue - AlarmImpl* alarm = static_cast<AlarmImpl*>(arg); - alarm->Ref(); - grpc_cq_end_op( - alarm->cq_, alarm, error, - [](void* arg, grpc_cq_completion* completion) {}, - arg, &alarm->completion_); - }, - this, grpc_schedule_on_exec_ctx); } ~AlarmImpl() { grpc_core::ExecCtx exec_ctx; @@ -68,6 +57,32 @@ class AlarmImpl : public CompletionQueueTag { cq_ = cq->cq(); tag_ = tag; GPR_ASSERT(grpc_cq_begin_op(cq_, this)); + GRPC_CLOSURE_INIT(&on_alarm_, + [](void* arg, grpc_error* error) { + // queue the op on the completion queue + AlarmImpl* alarm = static_cast<AlarmImpl*>(arg); + alarm->Ref(); + grpc_cq_end_op( + alarm->cq_, alarm, error, + [](void* arg, grpc_cq_completion* completion) {}, + arg, &alarm->completion_); + }, + this, grpc_schedule_on_exec_ctx); + grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline), + &on_alarm_); + } + void Set(gpr_timespec deadline, std::function<void(bool)> f) { + grpc_core::ExecCtx exec_ctx; + // Don't use any CQ at all. Instead just use the timer to fire the function + callback_ = std::move(f); + Ref(); + GRPC_CLOSURE_INIT(&on_alarm_, + [](void* arg, grpc_error* error) { + AlarmImpl* alarm = static_cast<AlarmImpl*>(arg); + alarm->callback_(error == GRPC_ERROR_NONE); + alarm->Unref(); + }, + this, grpc_schedule_on_exec_ctx); grpc_timer_init(&timer_, grpc_timespec_to_millis_round_up(deadline), &on_alarm_); } @@ -95,6 +110,7 @@ class AlarmImpl : public CompletionQueueTag { // completion queue where events about this alarm will be posted grpc_completion_queue* cq_; void* tag_; + std::function<void(bool)> callback_; }; } // namespace internal @@ -113,6 +129,15 @@ void Alarm::SetInternal(CompletionQueue* cq, gpr_timespec deadline, void* tag) { static_cast<internal::AlarmImpl*>(alarm_)->Set(cq, deadline, tag); } +void Alarm::SetInternal(gpr_timespec deadline, std::function<void(bool)> f) { + // Note that we know that alarm_ is actually an internal::AlarmImpl + // but we declared it as the base pointer to avoid a forward declaration + // or exposing core data structures in the C++ public headers. + // Thus it is safe to use a static_cast to the subclass here, and the + // C++ style guide allows us to do so in this case + static_cast<internal::AlarmImpl*>(alarm_)->Set(deadline, std::move(f)); +} + Alarm::~Alarm() { if (alarm_ != nullptr) { static_cast<internal::AlarmImpl*>(alarm_)->Destroy(); diff --git a/src/cpp/common/callback_common.cc b/src/cpp/common/callback_common.cc deleted file mode 100644 index fa586286d1..0000000000 --- a/src/cpp/common/callback_common.cc +++ /dev/null @@ -1,149 +0,0 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <functional> - -#include <grpcpp/impl/codegen/callback_common.h> -#include <grpcpp/impl/codegen/status.h> - -#include "src/core/lib/gprpp/memory.h" -#include "src/core/lib/surface/completion_queue.h" - -namespace grpc { -namespace internal { -namespace { - -template <class Func, class Arg> -void CatchingCallback(Func&& func, Arg&& arg) { -#if GRPC_ALLOW_EXCEPTIONS - try { - func(arg); - } catch (...) { - // nothing to return or change here, just don't crash the library - } -#else // GRPC_ALLOW_EXCEPTIONS - func(arg); -#endif // GRPC_ALLOW_EXCEPTIONS -} - -class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface { - public: - static void operator delete(void* ptr, std::size_t size) { - assert(size == sizeof(CallbackWithSuccessImpl)); - } - - // This operator should never be called as the memory should be freed as part - // of the arena destruction. It only exists to provide a matching operator - // delete to the operator new so that some compilers will not complain (see - // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this - // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { assert(0); } - - CallbackWithSuccessImpl(grpc_call* call, CallbackWithSuccessTag* parent, - std::function<void(bool)> f) - : call_(call), parent_(parent), func_(std::move(f)) { - grpc_call_ref(call); - } - - void Run(bool ok) override { - void* ignored = parent_->ops(); - bool new_ok = ok; - GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &new_ok)); - GPR_ASSERT(ignored == parent_->ops()); - - // Last use of func_ or ok, so ok to move them out for rvalue call above - CatchingCallback(std::move(func_), std::move(ok)); - - func_ = nullptr; // reset to clear this out for sure - grpc_call_unref(call_); - } - - private: - grpc_call* call_; - CallbackWithSuccessTag* parent_; - std::function<void(bool)> func_; -}; - -class CallbackWithStatusImpl : public grpc_core::CQCallbackInterface { - public: - static void operator delete(void* ptr, std::size_t size) { - assert(size == sizeof(CallbackWithStatusImpl)); - } - - // This operator should never be called as the memory should be freed as part - // of the arena destruction. It only exists to provide a matching operator - // delete to the operator new so that some compilers will not complain (see - // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this - // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { assert(0); } - - CallbackWithStatusImpl(grpc_call* call, CallbackWithStatusTag* parent, - std::function<void(Status)> f) - : call_(call), parent_(parent), func_(std::move(f)), status_() { - grpc_call_ref(call); - } - - void Run(bool ok) override { - void* ignored = parent_->ops(); - - GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &ok)); - GPR_ASSERT(ignored == parent_->ops()); - - // Last use of func_ or status_, so ok to move them out - CatchingCallback(std::move(func_), std::move(status_)); - - func_ = nullptr; // reset to clear this out for sure - grpc_call_unref(call_); - } - Status* status_ptr() { return &status_; } - - private: - grpc_call* call_; - CallbackWithStatusTag* parent_; - std::function<void(Status)> func_; - Status status_; -}; - -} // namespace - -CallbackWithSuccessTag::CallbackWithSuccessTag(grpc_call* call, - std::function<void(bool)> f, - CompletionQueueTag* ops) - : impl_(new (grpc_call_arena_alloc(call, sizeof(CallbackWithSuccessImpl))) - CallbackWithSuccessImpl(call, this, std::move(f))), - ops_(ops) {} - -void CallbackWithSuccessTag::force_run(bool ok) { impl_->Run(ok); } - -CallbackWithStatusTag::CallbackWithStatusTag(grpc_call* call, - std::function<void(Status)> f, - CompletionQueueTag* ops) - : ops_(ops) { - auto* impl = new (grpc_call_arena_alloc(call, sizeof(CallbackWithStatusImpl))) - CallbackWithStatusImpl(call, this, std::move(f)); - impl_ = impl; - status_ = impl->status_ptr(); -} - -void CallbackWithStatusTag::force_run(Status s) { - *status_ = std::move(s); - impl_->Run(true); -} - -} // namespace internal -} // namespace grpc |