From 9e6511ae2eb3c982bcea88096cbe079147b25fa4 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 19 Sep 2018 23:51:24 -0700 Subject: Make the core callback interface API so that it can be used in generated code --- BUILD | 1 - CMakeLists.txt | 3 - Makefile | 3 - build.yaml | 1 - gRPC-C++.podspec | 1 - grpc.gyp | 2 - include/grpc/grpc.h | 3 +- include/grpc/impl/codegen/grpc_types.h | 15 ++- include/grpcpp/impl/codegen/callback_common.h | 92 ++++++++++--- include/grpcpp/impl/codegen/client_callback.h | 2 +- src/core/lib/surface/completion_queue.cc | 34 ++--- src/core/lib/surface/completion_queue.h | 19 +-- src/core/lib/surface/completion_queue_factory.cc | 6 +- src/cpp/client/channel_cc.cc | 10 +- src/cpp/common/callback_common.cc | 149 --------------------- src/ruby/ext/grpc/rb_grpc_imports.generated.h | 2 +- test/core/end2end/inproc_callback_test.cc | 31 +++-- test/core/surface/completion_queue_test.cc | 25 ++-- tools/doxygen/Doxyfile.c++.internal | 1 - tools/run_tests/generated/sources_and_headers.json | 1 - 20 files changed, 156 insertions(+), 245 deletions(-) delete mode 100644 src/cpp/common/callback_common.cc diff --git a/BUILD b/BUILD index 271e57e36c..5b4c5e4130 100644 --- a/BUILD +++ b/BUILD @@ -119,7 +119,6 @@ GRPCXX_SRCS = [ "src/cpp/client/credentials_cc.cc", "src/cpp/client/generic_stub.cc", "src/cpp/common/alarm.cc", - "src/cpp/common/callback_common.cc", "src/cpp/common/channel_arguments.cc", "src/cpp/common/channel_filter.cc", "src/cpp/common/completion_queue_cc.cc", diff --git a/CMakeLists.txt b/CMakeLists.txt index c358e9bd43..4f260c65ec 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2773,7 +2773,6 @@ add_library(grpc++ src/cpp/client/credentials_cc.cc src/cpp/client/generic_stub.cc src/cpp/common/alarm.cc - src/cpp/common/callback_common.cc src/cpp/common/channel_arguments.cc src/cpp/common/channel_filter.cc src/cpp/common/completion_queue_cc.cc @@ -3135,7 +3134,6 @@ add_library(grpc++_cronet src/cpp/client/credentials_cc.cc src/cpp/client/generic_stub.cc src/cpp/common/alarm.cc - src/cpp/common/callback_common.cc src/cpp/common/channel_arguments.cc src/cpp/common/channel_filter.cc src/cpp/common/completion_queue_cc.cc @@ -4261,7 +4259,6 @@ add_library(grpc++_unsecure src/cpp/client/credentials_cc.cc src/cpp/client/generic_stub.cc src/cpp/common/alarm.cc - src/cpp/common/callback_common.cc src/cpp/common/channel_arguments.cc src/cpp/common/channel_filter.cc src/cpp/common/completion_queue_cc.cc diff --git a/Makefile b/Makefile index 2f2537228c..79d10f94d7 100644 --- a/Makefile +++ b/Makefile @@ -5228,7 +5228,6 @@ LIBGRPC++_SRC = \ src/cpp/client/credentials_cc.cc \ src/cpp/client/generic_stub.cc \ src/cpp/common/alarm.cc \ - src/cpp/common/callback_common.cc \ src/cpp/common/channel_arguments.cc \ src/cpp/common/channel_filter.cc \ src/cpp/common/completion_queue_cc.cc \ @@ -5598,7 +5597,6 @@ LIBGRPC++_CRONET_SRC = \ src/cpp/client/credentials_cc.cc \ src/cpp/client/generic_stub.cc \ src/cpp/common/alarm.cc \ - src/cpp/common/callback_common.cc \ src/cpp/common/channel_arguments.cc \ src/cpp/common/channel_filter.cc \ src/cpp/common/completion_queue_cc.cc \ @@ -6682,7 +6680,6 @@ LIBGRPC++_UNSECURE_SRC = \ src/cpp/client/credentials_cc.cc \ src/cpp/client/generic_stub.cc \ src/cpp/common/alarm.cc \ - src/cpp/common/callback_common.cc \ src/cpp/common/channel_arguments.cc \ src/cpp/common/channel_filter.cc \ src/cpp/common/completion_queue_cc.cc \ diff --git a/build.yaml b/build.yaml index d6e67aa7ee..c8512186d5 100644 --- a/build.yaml +++ b/build.yaml @@ -1327,7 +1327,6 @@ filegroups: - src/cpp/client/credentials_cc.cc - src/cpp/client/generic_stub.cc - src/cpp/common/alarm.cc - - src/cpp/common/callback_common.cc - src/cpp/common/channel_arguments.cc - src/cpp/common/channel_filter.cc - src/cpp/common/completion_queue_cc.cc diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 03ec223279..4b75865751 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -190,7 +190,6 @@ Pod::Spec.new do |s| 'src/cpp/client/credentials_cc.cc', 'src/cpp/client/generic_stub.cc', 'src/cpp/common/alarm.cc', - 'src/cpp/common/callback_common.cc', 'src/cpp/common/channel_arguments.cc', 'src/cpp/common/channel_filter.cc', 'src/cpp/common/completion_queue_cc.cc', diff --git a/grpc.gyp b/grpc.gyp index b8aae44de3..654a531092 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -1381,7 +1381,6 @@ 'src/cpp/client/credentials_cc.cc', 'src/cpp/client/generic_stub.cc', 'src/cpp/common/alarm.cc', - 'src/cpp/common/callback_common.cc', 'src/cpp/common/channel_arguments.cc', 'src/cpp/common/channel_filter.cc', 'src/cpp/common/completion_queue_cc.cc', @@ -1529,7 +1528,6 @@ 'src/cpp/client/credentials_cc.cc', 'src/cpp/client/generic_stub.cc', 'src/cpp/common/alarm.cc', - 'src/cpp/common/callback_common.cc', 'src/cpp/common/channel_arguments.cc', 'src/cpp/common/channel_filter.cc', 'src/cpp/common/completion_queue_cc.cc', diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 3ef95ff462..787d6ae6d7 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -111,7 +111,8 @@ GRPCAPI grpc_completion_queue* grpc_completion_queue_create_for_pluck( of GRPC_CQ_CALLBACK and grpc_cq_polling_type of GRPC_CQ_DEFAULT_POLLING. This function is experimental. */ GRPCAPI grpc_completion_queue* grpc_completion_queue_create_for_callback( - void* shutdown_callback, void* reserved); + grpc_experimental_completion_queue_functor* shutdown_callback, + void* reserved); /** Create a completion queue */ GRPCAPI grpc_completion_queue* grpc_completion_queue_create( diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 5f3b96f40b..5bd50bc9ac 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -660,6 +660,19 @@ typedef enum { GRPC_CQ_CALLBACK } grpc_cq_completion_type; +/** EXPERIMENTAL: Specifies an interface class to be used as a tag + for callback-based completion queues. This can be used directly, + as the first element of a struct in C, or as a base class in C++. + Its "run" value should be assigned to some non-member function, such as + a static method. */ +typedef struct grpc_experimental_completion_queue_functor { + /** The run member specifies a function that will be called when this + tag is extracted from the completion queue. Its arguments will be a + pointer to this functor and a boolean that indicates whether the + success status of this operation */ + void (*functor_run)(struct grpc_experimental_completion_queue_functor*, int); +} grpc_experimental_completion_queue_functor; + /* The upgrade to version 2 is currently experimental. */ #define GRPC_CQ_CURRENT_VERSION 2 @@ -678,7 +691,7 @@ typedef struct grpc_completion_queue_attributes { /* EXPERIMENTAL: START OF VERSION 2 CQ ATTRIBUTES */ /** When creating a callbackable CQ, pass in a functor to get invoked when * shutdown is complete */ - void* cq_shutdown_cb; + grpc_experimental_completion_queue_functor* cq_shutdown_cb; /* END OF VERSION 2 CQ ATTRIBUTES */ } grpc_completion_queue_attributes; diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h index 8b3ad66a8d..044045034c 100644 --- a/include/grpcpp/impl/codegen/callback_common.h +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -21,25 +21,36 @@ #include +#include #include #include #include #include #include -// Forward declarations -namespace grpc_core { -class CQCallbackInterface; -}; - namespace grpc { namespace internal { +/// An exception-safe way of invoking a user-specified callback function +template +void CatchingCallback(Func&& func, Arg&& arg) { +#if GRPC_ALLOW_EXCEPTIONS + try { + func(arg); + } catch (...) { + // nothing to return or change here, just don't crash the library + } +#else // GRPC_ALLOW_EXCEPTIONS + func(arg); +#endif // GRPC_ALLOW_EXCEPTIONS +} + // The contract on these tags is that they are single-shot. They must be // constructed and then fired at exactly one point. There is no expectation // that they can be reused without reconstruction. -class CallbackWithStatusTag { +class CallbackWithStatusTag + : public grpc_experimental_completion_queue_functor { public: // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -54,24 +65,48 @@ class CallbackWithStatusTag { static void operator delete(void*, void*) { assert(0); } CallbackWithStatusTag(grpc_call* call, std::function f, - CompletionQueueTag* ops); + CompletionQueueTag* ops) + : call_(call), func_(std::move(f)), ops_(ops), status_() { + g_core_codegen_interface->grpc_call_ref(call); + functor_run = &CallbackWithStatusTag::StaticRun; + } ~CallbackWithStatusTag() {} - void* tag() { return static_cast(impl_); } - Status* status_ptr() { return status_; } - CompletionQueueTag* ops() { return ops_; } + Status* status_ptr() { return &status_; } // force_run can not be performed on a tag if operations using this tag // have been sent to PerformOpsOnCall. It is intended for error conditions // that are detected before the operations are internally processed. - void force_run(Status s); + void force_run(Status s) { + status_ = std::move(s); + Run(true); + } private: - grpc_core::CQCallbackInterface* impl_; - Status* status_; + grpc_call* call_; + std::function func_; CompletionQueueTag* ops_; + Status status_; + + static void StaticRun(grpc_experimental_completion_queue_functor* cb, + int ok) { + static_cast(cb)->Run(static_cast(ok)); + } + void Run(bool ok) { + void* ignored = ops_; + + GPR_ASSERT(ops_->FinalizeResult(&ignored, &ok)); + GPR_ASSERT(ignored == ops_); + + // Last use of func_ or status_, so ok to move them out + CatchingCallback(std::move(func_), std::move(status_)); + + func_ = nullptr; // reset to clear this out for sure + g_core_codegen_interface->grpc_call_unref(call_); + } }; -class CallbackWithSuccessTag { +class CallbackWithSuccessTag + : public grpc_experimental_completion_queue_functor { public: // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -86,19 +121,40 @@ class CallbackWithSuccessTag { static void operator delete(void*, void*) { assert(0); } CallbackWithSuccessTag(grpc_call* call, std::function f, - CompletionQueueTag* ops); + CompletionQueueTag* ops) + : call_(call), func_(std::move(f)), ops_(ops) { + g_core_codegen_interface->grpc_call_ref(call); + functor_run = &CallbackWithSuccessTag::StaticRun; + } - void* tag() { return static_cast(impl_); } CompletionQueueTag* ops() { return ops_; } // force_run can not be performed on a tag if operations using this tag // have been sent to PerformOpsOnCall. It is intended for error conditions // that are detected before the operations are internally processed. - void force_run(bool ok); + void force_run(bool ok) { Run(ok); } private: - grpc_core::CQCallbackInterface* impl_; + grpc_call* call_; + std::function func_; CompletionQueueTag* ops_; + + static void StaticRun(grpc_experimental_completion_queue_functor* cb, + int ok) { + static_cast(cb)->Run(static_cast(ok)); + } + void Run(bool ok) { + void* ignored = ops_; + bool new_ok = ok; + GPR_ASSERT(ops_->FinalizeResult(&ignored, &new_ok)); + GPR_ASSERT(ignored == ops_); + + // Last use of func_, so ok to move it out for rvalue call above + CatchingCallback(std::move(func_), ok); + + func_ = nullptr; // reset to clear this out for sure + g_core_codegen_interface->grpc_call_unref(call_); + } }; } // namespace internal diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h index fc81c8aa0a..4d4faea063 100644 --- a/include/grpcpp/impl/codegen/client_callback.h +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -84,7 +84,7 @@ class CallbackUnaryCallImpl { ops->AllowNoMessage(); ops->ClientSendClose(); ops->ClientRecvStatus(context, tag->status_ptr()); - ops->set_cq_tag(tag->tag()); + ops->set_cq_tag(tag); call.PerformOps(ops); } }; diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index c2cf450e94..01797d493a 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -184,7 +184,8 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { typedef struct cq_vtable { grpc_cq_completion_type cq_completion_type; size_t data_size; - void (*init)(void* data, grpc_core::CQCallbackInterface* shutdown_callback); + void (*init)(void* data, + grpc_experimental_completion_queue_functor* shutdown_callback); void (*shutdown)(grpc_completion_queue* cq); void (*destroy)(void* data); bool (*begin_op)(grpc_completion_queue* cq, void* tag); @@ -267,7 +268,7 @@ typedef struct cq_callback_data { bool shutdown_called; /** A callback that gets invoked when the CQ completes shutdown */ - grpc_core::CQCallbackInterface* shutdown_callback; + grpc_experimental_completion_queue_functor* shutdown_callback; } cq_callback_data; /* Completion queue structure */ @@ -333,12 +334,12 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, void* reserved); // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback -static void cq_init_next(void* data, - grpc_core::CQCallbackInterface* shutdown_callback); -static void cq_init_pluck(void* data, - grpc_core::CQCallbackInterface* shutdown_callback); -static void cq_init_callback(void* data, - grpc_core::CQCallbackInterface* shutdown_callback); +static void cq_init_next( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback); +static void cq_init_pluck( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback); +static void cq_init_callback( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback); static void cq_destroy_next(void* data); static void cq_destroy_pluck(void* data); static void cq_destroy_callback(void* data); @@ -462,7 +463,7 @@ static long cq_event_queue_num_items(grpc_cq_event_queue* q) { grpc_completion_queue* grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, - grpc_core::CQCallbackInterface* shutdown_callback) { + grpc_experimental_completion_queue_functor* shutdown_callback) { GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0); grpc_completion_queue* cq; @@ -497,8 +498,8 @@ grpc_completion_queue* grpc_completion_queue_create_internal( return cq; } -static void cq_init_next(void* data, - grpc_core::CQCallbackInterface* shutdown_callback) { +static void cq_init_next( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { cq_next_data* cqd = static_cast(data); /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); @@ -513,8 +514,8 @@ static void cq_destroy_next(void* data) { cq_event_queue_destroy(&cqd->queue); } -static void cq_init_pluck(void* data, - grpc_core::CQCallbackInterface* shutdown_callback) { +static void cq_init_pluck( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { cq_pluck_data* cqd = static_cast(data); /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); @@ -532,7 +533,7 @@ static void cq_destroy_pluck(void* data) { } static void cq_init_callback( - void* data, grpc_core::CQCallbackInterface* shutdown_callback) { + void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { cq_callback_data* cqd = static_cast(data); /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); @@ -859,7 +860,8 @@ static void cq_end_op_for_callback( GRPC_ERROR_UNREF(error); - (static_cast(tag))->Run(is_success); + auto* functor = static_cast(tag); + (*functor->functor_run)(functor, is_success); } void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, @@ -1343,7 +1345,7 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { GPR_ASSERT(cqd->shutdown_called); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); - callback->Run(true); + callback->functor_run(callback, true); } static void cq_shutdown_callback(grpc_completion_queue* cq) { diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index a7c524d8e8..d60fe6d6ef 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -48,23 +48,6 @@ typedef struct grpc_cq_completion { uintptr_t next; } grpc_cq_completion; -/// For callback CQs, the tag that is passed in for an operation must -/// actually be a pointer to an implementation of the following class. -/// When the operation completes, the tag will be typecasted from void* -/// to grpc_core::CQCallbackInterface* and then the Run method will be -/// invoked on it. In practice, the language binding (e.g., C++ API -/// implementation) is responsible for providing and using an implementation -/// of this abstract base class. -namespace grpc_core { -class CQCallbackInterface { - public: - virtual ~CQCallbackInterface() {} - virtual void Run(bool) GRPC_ABSTRACT; - - GRPC_ABSTRACT_BASE_CLASS -}; -} // namespace grpc_core - #ifndef NDEBUG void grpc_cq_internal_ref(grpc_completion_queue* cc, const char* reason, const char* file, int line); @@ -106,6 +89,6 @@ int grpc_get_cq_poll_num(grpc_completion_queue* cc); grpc_completion_queue* grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, - grpc_core::CQCallbackInterface* shutdown_callback); + grpc_experimental_completion_queue_functor* shutdown_callback); #endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */ diff --git a/src/core/lib/surface/completion_queue_factory.cc b/src/core/lib/surface/completion_queue_factory.cc index ed92dd7eba..2616c156e4 100644 --- a/src/core/lib/surface/completion_queue_factory.cc +++ b/src/core/lib/surface/completion_queue_factory.cc @@ -31,8 +31,7 @@ static grpc_completion_queue* default_create( const grpc_completion_queue_factory* factory, const grpc_completion_queue_attributes* attr) { return grpc_completion_queue_create_internal( - attr->cq_completion_type, attr->cq_polling_type, - static_cast(attr->cq_shutdown_cb)); + attr->cq_completion_type, attr->cq_polling_type, attr->cq_shutdown_cb); } static grpc_completion_queue_factory_vtable default_vtable = {default_create}; @@ -73,7 +72,8 @@ grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) { } grpc_completion_queue* grpc_completion_queue_create_for_callback( - void* shutdown_callback, void* reserved) { + grpc_experimental_completion_queue_functor* shutdown_callback, + void* reserved) { GPR_ASSERT(!reserved); grpc_completion_queue_attributes attr = { 2, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback}; diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index ad71286e05..c59059f045 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -193,17 +193,19 @@ bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed, } namespace { -class ShutdownCallback : public grpc_core::CQCallbackInterface { +class ShutdownCallback : public grpc_experimental_completion_queue_functor { public: + ShutdownCallback() { functor_run = &ShutdownCallback::Run; } // TakeCQ takes ownership of the cq into the shutdown callback // so that the shutdown callback will be responsible for destroying it void TakeCQ(CompletionQueue* cq) { cq_ = cq; } // The Run function will get invoked by the completion queue library // when the shutdown is actually complete - void Run(bool) override { - delete cq_; - grpc_core::Delete(this); + static void Run(grpc_experimental_completion_queue_functor* cb, int) { + auto* callback = static_cast(cb); + delete callback->cq_; + grpc_core::Delete(callback); } private: diff --git a/src/cpp/common/callback_common.cc b/src/cpp/common/callback_common.cc deleted file mode 100644 index a0c8eeb516..0000000000 --- a/src/cpp/common/callback_common.cc +++ /dev/null @@ -1,149 +0,0 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include - -#include -#include - -#include "src/core/lib/gprpp/memory.h" -#include "src/core/lib/surface/completion_queue.h" - -namespace grpc { -namespace internal { -namespace { - -template -void CatchingCallback(Func&& func, Arg&& arg) { -#if GRPC_ALLOW_EXCEPTIONS - try { - func(arg); - } catch (...) { - // nothing to return or change here, just don't crash the library - } -#else // GRPC_ALLOW_EXCEPTIONS - func(arg); -#endif // GRPC_ALLOW_EXCEPTIONS -} - -class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface { - public: - static void operator delete(void* ptr, std::size_t size) { - assert(size == sizeof(CallbackWithSuccessImpl)); - } - - // This operator should never be called as the memory should be freed as part - // of the arena destruction. It only exists to provide a matching operator - // delete to the operator new so that some compilers will not complain (see - // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this - // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { assert(0); } - - CallbackWithSuccessImpl(grpc_call* call, CallbackWithSuccessTag* parent, - std::function f) - : call_(call), parent_(parent), func_(std::move(f)) { - grpc_call_ref(call); - } - - void Run(bool ok) override { - void* ignored = parent_->ops(); - bool new_ok = ok; - GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &new_ok)); - GPR_ASSERT(ignored == parent_->ops()); - - // Last use of func_, so ok to move it out for rvalue call above - CatchingCallback(std::move(func_), ok); - - func_ = nullptr; // reset to clear this out for sure - grpc_call_unref(call_); - } - - private: - grpc_call* call_; - CallbackWithSuccessTag* parent_; - std::function func_; -}; - -class CallbackWithStatusImpl : public grpc_core::CQCallbackInterface { - public: - static void operator delete(void* ptr, std::size_t size) { - assert(size == sizeof(CallbackWithStatusImpl)); - } - - // This operator should never be called as the memory should be freed as part - // of the arena destruction. It only exists to provide a matching operator - // delete to the operator new so that some compilers will not complain (see - // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this - // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { assert(0); } - - CallbackWithStatusImpl(grpc_call* call, CallbackWithStatusTag* parent, - std::function f) - : call_(call), parent_(parent), func_(std::move(f)), status_() { - grpc_call_ref(call); - } - - void Run(bool ok) override { - void* ignored = parent_->ops(); - - GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &ok)); - GPR_ASSERT(ignored == parent_->ops()); - - // Last use of func_ or status_, so ok to move them out - CatchingCallback(std::move(func_), std::move(status_)); - - func_ = nullptr; // reset to clear this out for sure - grpc_call_unref(call_); - } - Status* status_ptr() { return &status_; } - - private: - grpc_call* call_; - CallbackWithStatusTag* parent_; - std::function func_; - Status status_; -}; - -} // namespace - -CallbackWithSuccessTag::CallbackWithSuccessTag(grpc_call* call, - std::function f, - CompletionQueueTag* ops) - : impl_(new (grpc_call_arena_alloc(call, sizeof(CallbackWithSuccessImpl))) - CallbackWithSuccessImpl(call, this, std::move(f))), - ops_(ops) {} - -void CallbackWithSuccessTag::force_run(bool ok) { impl_->Run(ok); } - -CallbackWithStatusTag::CallbackWithStatusTag(grpc_call* call, - std::function f, - CompletionQueueTag* ops) - : ops_(ops) { - auto* impl = new (grpc_call_arena_alloc(call, sizeof(CallbackWithStatusImpl))) - CallbackWithStatusImpl(call, this, std::move(f)); - impl_ = impl; - status_ = impl->status_ptr(); -} - -void CallbackWithStatusTag::force_run(Status s) { - *status_ = std::move(s); - impl_->Run(true); -} - -} // namespace internal -} // namespace grpc diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index d00e75c326..e25d953a18 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -107,7 +107,7 @@ extern grpc_completion_queue_create_for_next_type grpc_completion_queue_create_f typedef grpc_completion_queue*(*grpc_completion_queue_create_for_pluck_type)(void* reserved); extern grpc_completion_queue_create_for_pluck_type grpc_completion_queue_create_for_pluck_import; #define grpc_completion_queue_create_for_pluck grpc_completion_queue_create_for_pluck_import -typedef grpc_completion_queue*(*grpc_completion_queue_create_for_callback_type)(void* shutdown_callback, void* reserved); +typedef grpc_completion_queue*(*grpc_completion_queue_create_for_callback_type)(grpc_experimental_completion_queue_functor* shutdown_callback, void* reserved); extern grpc_completion_queue_create_for_callback_type grpc_completion_queue_create_for_callback_import; #define grpc_completion_queue_create_for_callback grpc_completion_queue_create_for_callback_import typedef grpc_completion_queue*(*grpc_completion_queue_create_type)(const grpc_completion_queue_factory* factory, const grpc_completion_queue_attributes* attributes, void* reserved); diff --git a/test/core/end2end/inproc_callback_test.cc b/test/core/end2end/inproc_callback_test.cc index 0d6c7c75a8..310030046a 100644 --- a/test/core/end2end/inproc_callback_test.cc +++ b/test/core/end2end/inproc_callback_test.cc @@ -37,13 +37,16 @@ typedef struct inproc_fixture_data { namespace { template -class CQDeletingCallback : public grpc_core::CQCallbackInterface { +class CQDeletingCallback : public grpc_experimental_completion_queue_functor { public: - explicit CQDeletingCallback(F f) : func_(f) {} - ~CQDeletingCallback() override {} - void Run(bool ok) override { - func_(ok); - grpc_core::Delete(this); + explicit CQDeletingCallback(F f) : func_(f) { + functor_run = &CQDeletingCallback::Run; + } + ~CQDeletingCallback() {} + static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { + auto* callback = static_cast(cb); + callback->func_(static_cast(ok)); + grpc_core::Delete(callback); } private: @@ -51,18 +54,24 @@ class CQDeletingCallback : public grpc_core::CQCallbackInterface { }; template -grpc_core::CQCallbackInterface* NewDeletingCallback(F f) { +grpc_experimental_completion_queue_functor* NewDeletingCallback(F f) { return grpc_core::New>(f); } -class ShutdownCallback : public grpc_core::CQCallbackInterface { +class ShutdownCallback : public grpc_experimental_completion_queue_functor { public: ShutdownCallback() : done_(false) { + functor_run = &ShutdownCallback::StaticRun; gpr_mu_init(&mu_); gpr_cv_init(&cv_); } - ~ShutdownCallback() override {} - void Run(bool ok) override { + ~ShutdownCallback() {} + static void StaticRun(grpc_experimental_completion_queue_functor* cb, + int ok) { + auto* callback = static_cast(cb); + callback->Run(static_cast(ok)); + } + void Run(bool ok) { gpr_log(GPR_DEBUG, "CQ shutdown notification invoked"); gpr_mu_lock(&mu_); done_ = true; @@ -170,7 +179,7 @@ static void verify_tags(gpr_timespec deadline) { // This function creates a callback functor that emits the // desired tag into the global tag set -static grpc_core::CQCallbackInterface* tag(intptr_t t) { +static grpc_experimental_completion_queue_functor* tag(intptr_t t) { auto func = [t](bool ok) { gpr_mu_lock(&tags_mu); gpr_log(GPR_DEBUG, "Completing operation %" PRIdPTR, t); diff --git a/test/core/surface/completion_queue_test.cc b/test/core/surface/completion_queue_test.cc index b889fd0fc6..f7ce8a7042 100644 --- a/test/core/surface/completion_queue_test.cc +++ b/test/core/surface/completion_queue_test.cc @@ -369,11 +369,15 @@ static void test_callback(void) { LOG_TEST("test_callback"); bool got_shutdown = false; - class ShutdownCallback : public grpc_core::CQCallbackInterface { + class ShutdownCallback : public grpc_experimental_completion_queue_functor { public: - ShutdownCallback(bool* done) : done_(done) {} + ShutdownCallback(bool* done) : done_(done) { + functor_run = &ShutdownCallback::Run; + } ~ShutdownCallback() {} - void Run(bool ok) override { *done_ = ok; } + static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { + *static_cast(cb)->done_ = static_cast(ok); + } private: bool* done_; @@ -391,14 +395,17 @@ static void test_callback(void) { grpc_completion_queue_factory_lookup(&attr), &attr, nullptr); int counter = 0; - class TagCallback : public grpc_core::CQCallbackInterface { + class TagCallback : public grpc_experimental_completion_queue_functor { public: - TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {} + TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) { + functor_run = &TagCallback::Run; + } ~TagCallback() {} - void Run(bool ok) override { - GPR_ASSERT(ok); - *counter_ += tag_; - grpc_core::Delete(this); + static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { + GPR_ASSERT(static_cast(ok)); + auto* callback = static_cast(cb); + *callback->counter_ += callback->tag_; + grpc_core::Delete(callback); }; private: diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index a72390d9f8..cfc5ef98e7 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1190,7 +1190,6 @@ src/cpp/client/secure_credentials.h \ src/cpp/codegen/codegen_init.cc \ src/cpp/common/alarm.cc \ src/cpp/common/auth_property_iterator.cc \ -src/cpp/common/callback_common.cc \ src/cpp/common/channel_arguments.cc \ src/cpp/common/channel_filter.cc \ src/cpp/common/channel_filter.h \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index f3e93a0874..97f3e4c1fa 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -11470,7 +11470,6 @@ "src/cpp/client/credentials_cc.cc", "src/cpp/client/generic_stub.cc", "src/cpp/common/alarm.cc", - "src/cpp/common/callback_common.cc", "src/cpp/common/channel_arguments.cc", "src/cpp/common/channel_filter.cc", "src/cpp/common/channel_filter.h", -- cgit v1.2.3