aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2018-09-19 23:51:24 -0700
committerGravatar Vijay Pai <vpai@google.com>2018-09-19 23:57:56 -0700
commit9e6511ae2eb3c982bcea88096cbe079147b25fa4 (patch)
tree85d85313559030d1f97ec1d594855e5c8e8b25a8
parent830e5ad5df749a5bbcb0d6bace79b7f3fe306a9e (diff)
Make the core callback interface API so that it can be used in generated code
-rw-r--r--BUILD1
-rw-r--r--CMakeLists.txt3
-rw-r--r--Makefile3
-rw-r--r--build.yaml1
-rw-r--r--gRPC-C++.podspec1
-rw-r--r--grpc.gyp2
-rw-r--r--include/grpc/grpc.h3
-rw-r--r--include/grpc/impl/codegen/grpc_types.h15
-rw-r--r--include/grpcpp/impl/codegen/callback_common.h92
-rw-r--r--include/grpcpp/impl/codegen/client_callback.h2
-rw-r--r--src/core/lib/surface/completion_queue.cc34
-rw-r--r--src/core/lib/surface/completion_queue.h19
-rw-r--r--src/core/lib/surface/completion_queue_factory.cc6
-rw-r--r--src/cpp/client/channel_cc.cc10
-rw-r--r--src/cpp/common/callback_common.cc149
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h2
-rw-r--r--test/core/end2end/inproc_callback_test.cc31
-rw-r--r--test/core/surface/completion_queue_test.cc25
-rw-r--r--tools/doxygen/Doxyfile.c++.internal1
-rw-r--r--tools/run_tests/generated/sources_and_headers.json1
20 files changed, 156 insertions, 245 deletions
diff --git a/BUILD b/BUILD
index 271e57e36c..5b4c5e4130 100644
--- a/BUILD
+++ b/BUILD
@@ -119,7 +119,6 @@ GRPCXX_SRCS = [
"src/cpp/client/credentials_cc.cc",
"src/cpp/client/generic_stub.cc",
"src/cpp/common/alarm.cc",
- "src/cpp/common/callback_common.cc",
"src/cpp/common/channel_arguments.cc",
"src/cpp/common/channel_filter.cc",
"src/cpp/common/completion_queue_cc.cc",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index c358e9bd43..4f260c65ec 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -2773,7 +2773,6 @@ add_library(grpc++
src/cpp/client/credentials_cc.cc
src/cpp/client/generic_stub.cc
src/cpp/common/alarm.cc
- src/cpp/common/callback_common.cc
src/cpp/common/channel_arguments.cc
src/cpp/common/channel_filter.cc
src/cpp/common/completion_queue_cc.cc
@@ -3135,7 +3134,6 @@ add_library(grpc++_cronet
src/cpp/client/credentials_cc.cc
src/cpp/client/generic_stub.cc
src/cpp/common/alarm.cc
- src/cpp/common/callback_common.cc
src/cpp/common/channel_arguments.cc
src/cpp/common/channel_filter.cc
src/cpp/common/completion_queue_cc.cc
@@ -4261,7 +4259,6 @@ add_library(grpc++_unsecure
src/cpp/client/credentials_cc.cc
src/cpp/client/generic_stub.cc
src/cpp/common/alarm.cc
- src/cpp/common/callback_common.cc
src/cpp/common/channel_arguments.cc
src/cpp/common/channel_filter.cc
src/cpp/common/completion_queue_cc.cc
diff --git a/Makefile b/Makefile
index 2f2537228c..79d10f94d7 100644
--- a/Makefile
+++ b/Makefile
@@ -5228,7 +5228,6 @@ LIBGRPC++_SRC = \
src/cpp/client/credentials_cc.cc \
src/cpp/client/generic_stub.cc \
src/cpp/common/alarm.cc \
- src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \
src/cpp/common/completion_queue_cc.cc \
@@ -5598,7 +5597,6 @@ LIBGRPC++_CRONET_SRC = \
src/cpp/client/credentials_cc.cc \
src/cpp/client/generic_stub.cc \
src/cpp/common/alarm.cc \
- src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \
src/cpp/common/completion_queue_cc.cc \
@@ -6682,7 +6680,6 @@ LIBGRPC++_UNSECURE_SRC = \
src/cpp/client/credentials_cc.cc \
src/cpp/client/generic_stub.cc \
src/cpp/common/alarm.cc \
- src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \
src/cpp/common/completion_queue_cc.cc \
diff --git a/build.yaml b/build.yaml
index d6e67aa7ee..c8512186d5 100644
--- a/build.yaml
+++ b/build.yaml
@@ -1327,7 +1327,6 @@ filegroups:
- src/cpp/client/credentials_cc.cc
- src/cpp/client/generic_stub.cc
- src/cpp/common/alarm.cc
- - src/cpp/common/callback_common.cc
- src/cpp/common/channel_arguments.cc
- src/cpp/common/channel_filter.cc
- src/cpp/common/completion_queue_cc.cc
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index 03ec223279..4b75865751 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -190,7 +190,6 @@ Pod::Spec.new do |s|
'src/cpp/client/credentials_cc.cc',
'src/cpp/client/generic_stub.cc',
'src/cpp/common/alarm.cc',
- 'src/cpp/common/callback_common.cc',
'src/cpp/common/channel_arguments.cc',
'src/cpp/common/channel_filter.cc',
'src/cpp/common/completion_queue_cc.cc',
diff --git a/grpc.gyp b/grpc.gyp
index b8aae44de3..654a531092 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -1381,7 +1381,6 @@
'src/cpp/client/credentials_cc.cc',
'src/cpp/client/generic_stub.cc',
'src/cpp/common/alarm.cc',
- 'src/cpp/common/callback_common.cc',
'src/cpp/common/channel_arguments.cc',
'src/cpp/common/channel_filter.cc',
'src/cpp/common/completion_queue_cc.cc',
@@ -1529,7 +1528,6 @@
'src/cpp/client/credentials_cc.cc',
'src/cpp/client/generic_stub.cc',
'src/cpp/common/alarm.cc',
- 'src/cpp/common/callback_common.cc',
'src/cpp/common/channel_arguments.cc',
'src/cpp/common/channel_filter.cc',
'src/cpp/common/completion_queue_cc.cc',
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index 3ef95ff462..787d6ae6d7 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -111,7 +111,8 @@ GRPCAPI grpc_completion_queue* grpc_completion_queue_create_for_pluck(
of GRPC_CQ_CALLBACK and grpc_cq_polling_type of GRPC_CQ_DEFAULT_POLLING.
This function is experimental. */
GRPCAPI grpc_completion_queue* grpc_completion_queue_create_for_callback(
- void* shutdown_callback, void* reserved);
+ grpc_experimental_completion_queue_functor* shutdown_callback,
+ void* reserved);
/** Create a completion queue */
GRPCAPI grpc_completion_queue* grpc_completion_queue_create(
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index 5f3b96f40b..5bd50bc9ac 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -660,6 +660,19 @@ typedef enum {
GRPC_CQ_CALLBACK
} grpc_cq_completion_type;
+/** EXPERIMENTAL: Specifies an interface class to be used as a tag
+ for callback-based completion queues. This can be used directly,
+ as the first element of a struct in C, or as a base class in C++.
+ Its "run" value should be assigned to some non-member function, such as
+ a static method. */
+typedef struct grpc_experimental_completion_queue_functor {
+ /** The run member specifies a function that will be called when this
+ tag is extracted from the completion queue. Its arguments will be a
+ pointer to this functor and a boolean that indicates whether the
+ success status of this operation */
+ void (*functor_run)(struct grpc_experimental_completion_queue_functor*, int);
+} grpc_experimental_completion_queue_functor;
+
/* The upgrade to version 2 is currently experimental. */
#define GRPC_CQ_CURRENT_VERSION 2
@@ -678,7 +691,7 @@ typedef struct grpc_completion_queue_attributes {
/* EXPERIMENTAL: START OF VERSION 2 CQ ATTRIBUTES */
/** When creating a callbackable CQ, pass in a functor to get invoked when
* shutdown is complete */
- void* cq_shutdown_cb;
+ grpc_experimental_completion_queue_functor* cq_shutdown_cb;
/* END OF VERSION 2 CQ ATTRIBUTES */
} grpc_completion_queue_attributes;
diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h
index 8b3ad66a8d..044045034c 100644
--- a/include/grpcpp/impl/codegen/callback_common.h
+++ b/include/grpcpp/impl/codegen/callback_common.h
@@ -21,25 +21,36 @@
#include <functional>
+#include <grpc/impl/codegen/grpc_types.h>
#include <grpcpp/impl/codegen/call.h>
#include <grpcpp/impl/codegen/channel_interface.h>
#include <grpcpp/impl/codegen/config.h>
#include <grpcpp/impl/codegen/core_codegen_interface.h>
#include <grpcpp/impl/codegen/status.h>
-// Forward declarations
-namespace grpc_core {
-class CQCallbackInterface;
-};
-
namespace grpc {
namespace internal {
+/// An exception-safe way of invoking a user-specified callback function
+template <class Func, class Arg>
+void CatchingCallback(Func&& func, Arg&& arg) {
+#if GRPC_ALLOW_EXCEPTIONS
+ try {
+ func(arg);
+ } catch (...) {
+ // nothing to return or change here, just don't crash the library
+ }
+#else // GRPC_ALLOW_EXCEPTIONS
+ func(arg);
+#endif // GRPC_ALLOW_EXCEPTIONS
+}
+
// The contract on these tags is that they are single-shot. They must be
// constructed and then fired at exactly one point. There is no expectation
// that they can be reused without reconstruction.
-class CallbackWithStatusTag {
+class CallbackWithStatusTag
+ : public grpc_experimental_completion_queue_functor {
public:
// always allocated against a call arena, no memory free required
static void operator delete(void* ptr, std::size_t size) {
@@ -54,24 +65,48 @@ class CallbackWithStatusTag {
static void operator delete(void*, void*) { assert(0); }
CallbackWithStatusTag(grpc_call* call, std::function<void(Status)> f,
- CompletionQueueTag* ops);
+ CompletionQueueTag* ops)
+ : call_(call), func_(std::move(f)), ops_(ops), status_() {
+ g_core_codegen_interface->grpc_call_ref(call);
+ functor_run = &CallbackWithStatusTag::StaticRun;
+ }
~CallbackWithStatusTag() {}
- void* tag() { return static_cast<void*>(impl_); }
- Status* status_ptr() { return status_; }
- CompletionQueueTag* ops() { return ops_; }
+ Status* status_ptr() { return &status_; }
// force_run can not be performed on a tag if operations using this tag
// have been sent to PerformOpsOnCall. It is intended for error conditions
// that are detected before the operations are internally processed.
- void force_run(Status s);
+ void force_run(Status s) {
+ status_ = std::move(s);
+ Run(true);
+ }
private:
- grpc_core::CQCallbackInterface* impl_;
- Status* status_;
+ grpc_call* call_;
+ std::function<void(Status)> func_;
CompletionQueueTag* ops_;
+ Status status_;
+
+ static void StaticRun(grpc_experimental_completion_queue_functor* cb,
+ int ok) {
+ static_cast<CallbackWithStatusTag*>(cb)->Run(static_cast<bool>(ok));
+ }
+ void Run(bool ok) {
+ void* ignored = ops_;
+
+ GPR_ASSERT(ops_->FinalizeResult(&ignored, &ok));
+ GPR_ASSERT(ignored == ops_);
+
+ // Last use of func_ or status_, so ok to move them out
+ CatchingCallback(std::move(func_), std::move(status_));
+
+ func_ = nullptr; // reset to clear this out for sure
+ g_core_codegen_interface->grpc_call_unref(call_);
+ }
};
-class CallbackWithSuccessTag {
+class CallbackWithSuccessTag
+ : public grpc_experimental_completion_queue_functor {
public:
// always allocated against a call arena, no memory free required
static void operator delete(void* ptr, std::size_t size) {
@@ -86,19 +121,40 @@ class CallbackWithSuccessTag {
static void operator delete(void*, void*) { assert(0); }
CallbackWithSuccessTag(grpc_call* call, std::function<void(bool)> f,
- CompletionQueueTag* ops);
+ CompletionQueueTag* ops)
+ : call_(call), func_(std::move(f)), ops_(ops) {
+ g_core_codegen_interface->grpc_call_ref(call);
+ functor_run = &CallbackWithSuccessTag::StaticRun;
+ }
- void* tag() { return static_cast<void*>(impl_); }
CompletionQueueTag* ops() { return ops_; }
// force_run can not be performed on a tag if operations using this tag
// have been sent to PerformOpsOnCall. It is intended for error conditions
// that are detected before the operations are internally processed.
- void force_run(bool ok);
+ void force_run(bool ok) { Run(ok); }
private:
- grpc_core::CQCallbackInterface* impl_;
+ grpc_call* call_;
+ std::function<void(bool)> func_;
CompletionQueueTag* ops_;
+
+ static void StaticRun(grpc_experimental_completion_queue_functor* cb,
+ int ok) {
+ static_cast<CallbackWithSuccessTag*>(cb)->Run(static_cast<bool>(ok));
+ }
+ void Run(bool ok) {
+ void* ignored = ops_;
+ bool new_ok = ok;
+ GPR_ASSERT(ops_->FinalizeResult(&ignored, &new_ok));
+ GPR_ASSERT(ignored == ops_);
+
+ // Last use of func_, so ok to move it out for rvalue call above
+ CatchingCallback(std::move(func_), ok);
+
+ func_ = nullptr; // reset to clear this out for sure
+ g_core_codegen_interface->grpc_call_unref(call_);
+ }
};
} // namespace internal
diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h
index fc81c8aa0a..4d4faea063 100644
--- a/include/grpcpp/impl/codegen/client_callback.h
+++ b/include/grpcpp/impl/codegen/client_callback.h
@@ -84,7 +84,7 @@ class CallbackUnaryCallImpl {
ops->AllowNoMessage();
ops->ClientSendClose();
ops->ClientRecvStatus(context, tag->status_ptr());
- ops->set_cq_tag(tag->tag());
+ ops->set_cq_tag(tag);
call.PerformOps(ops);
}
};
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index c2cf450e94..01797d493a 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -184,7 +184,8 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
typedef struct cq_vtable {
grpc_cq_completion_type cq_completion_type;
size_t data_size;
- void (*init)(void* data, grpc_core::CQCallbackInterface* shutdown_callback);
+ void (*init)(void* data,
+ grpc_experimental_completion_queue_functor* shutdown_callback);
void (*shutdown)(grpc_completion_queue* cq);
void (*destroy)(void* data);
bool (*begin_op)(grpc_completion_queue* cq, void* tag);
@@ -267,7 +268,7 @@ typedef struct cq_callback_data {
bool shutdown_called;
/** A callback that gets invoked when the CQ completes shutdown */
- grpc_core::CQCallbackInterface* shutdown_callback;
+ grpc_experimental_completion_queue_functor* shutdown_callback;
} cq_callback_data;
/* Completion queue structure */
@@ -333,12 +334,12 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline, void* reserved);
// Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
-static void cq_init_next(void* data,
- grpc_core::CQCallbackInterface* shutdown_callback);
-static void cq_init_pluck(void* data,
- grpc_core::CQCallbackInterface* shutdown_callback);
-static void cq_init_callback(void* data,
- grpc_core::CQCallbackInterface* shutdown_callback);
+static void cq_init_next(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
+static void cq_init_pluck(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
+static void cq_init_callback(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
static void cq_destroy_next(void* data);
static void cq_destroy_pluck(void* data);
static void cq_destroy_callback(void* data);
@@ -462,7 +463,7 @@ static long cq_event_queue_num_items(grpc_cq_event_queue* q) {
grpc_completion_queue* grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
- grpc_core::CQCallbackInterface* shutdown_callback) {
+ grpc_experimental_completion_queue_functor* shutdown_callback) {
GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0);
grpc_completion_queue* cq;
@@ -497,8 +498,8 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
return cq;
}
-static void cq_init_next(void* data,
- grpc_core::CQCallbackInterface* shutdown_callback) {
+static void cq_init_next(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
cq_next_data* cqd = static_cast<cq_next_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
@@ -513,8 +514,8 @@ static void cq_destroy_next(void* data) {
cq_event_queue_destroy(&cqd->queue);
}
-static void cq_init_pluck(void* data,
- grpc_core::CQCallbackInterface* shutdown_callback) {
+static void cq_init_pluck(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
@@ -532,7 +533,7 @@ static void cq_destroy_pluck(void* data) {
}
static void cq_init_callback(
- void* data, grpc_core::CQCallbackInterface* shutdown_callback) {
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
cq_callback_data* cqd = static_cast<cq_callback_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
@@ -859,7 +860,8 @@ static void cq_end_op_for_callback(
GRPC_ERROR_UNREF(error);
- (static_cast<grpc_core::CQCallbackInterface*>(tag))->Run(is_success);
+ auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
+ (*functor->functor_run)(functor, is_success);
}
void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
@@ -1343,7 +1345,7 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
GPR_ASSERT(cqd->shutdown_called);
cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
- callback->Run(true);
+ callback->functor_run(callback, true);
}
static void cq_shutdown_callback(grpc_completion_queue* cq) {
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index a7c524d8e8..d60fe6d6ef 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -48,23 +48,6 @@ typedef struct grpc_cq_completion {
uintptr_t next;
} grpc_cq_completion;
-/// For callback CQs, the tag that is passed in for an operation must
-/// actually be a pointer to an implementation of the following class.
-/// When the operation completes, the tag will be typecasted from void*
-/// to grpc_core::CQCallbackInterface* and then the Run method will be
-/// invoked on it. In practice, the language binding (e.g., C++ API
-/// implementation) is responsible for providing and using an implementation
-/// of this abstract base class.
-namespace grpc_core {
-class CQCallbackInterface {
- public:
- virtual ~CQCallbackInterface() {}
- virtual void Run(bool) GRPC_ABSTRACT;
-
- GRPC_ABSTRACT_BASE_CLASS
-};
-} // namespace grpc_core
-
#ifndef NDEBUG
void grpc_cq_internal_ref(grpc_completion_queue* cc, const char* reason,
const char* file, int line);
@@ -106,6 +89,6 @@ int grpc_get_cq_poll_num(grpc_completion_queue* cc);
grpc_completion_queue* grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
- grpc_core::CQCallbackInterface* shutdown_callback);
+ grpc_experimental_completion_queue_functor* shutdown_callback);
#endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */
diff --git a/src/core/lib/surface/completion_queue_factory.cc b/src/core/lib/surface/completion_queue_factory.cc
index ed92dd7eba..2616c156e4 100644
--- a/src/core/lib/surface/completion_queue_factory.cc
+++ b/src/core/lib/surface/completion_queue_factory.cc
@@ -31,8 +31,7 @@ static grpc_completion_queue* default_create(
const grpc_completion_queue_factory* factory,
const grpc_completion_queue_attributes* attr) {
return grpc_completion_queue_create_internal(
- attr->cq_completion_type, attr->cq_polling_type,
- static_cast<grpc_core::CQCallbackInterface*>(attr->cq_shutdown_cb));
+ attr->cq_completion_type, attr->cq_polling_type, attr->cq_shutdown_cb);
}
static grpc_completion_queue_factory_vtable default_vtable = {default_create};
@@ -73,7 +72,8 @@ grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) {
}
grpc_completion_queue* grpc_completion_queue_create_for_callback(
- void* shutdown_callback, void* reserved) {
+ grpc_experimental_completion_queue_functor* shutdown_callback,
+ void* reserved) {
GPR_ASSERT(!reserved);
grpc_completion_queue_attributes attr = {
2, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback};
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index ad71286e05..c59059f045 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -193,17 +193,19 @@ bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed,
}
namespace {
-class ShutdownCallback : public grpc_core::CQCallbackInterface {
+class ShutdownCallback : public grpc_experimental_completion_queue_functor {
public:
+ ShutdownCallback() { functor_run = &ShutdownCallback::Run; }
// TakeCQ takes ownership of the cq into the shutdown callback
// so that the shutdown callback will be responsible for destroying it
void TakeCQ(CompletionQueue* cq) { cq_ = cq; }
// The Run function will get invoked by the completion queue library
// when the shutdown is actually complete
- void Run(bool) override {
- delete cq_;
- grpc_core::Delete(this);
+ static void Run(grpc_experimental_completion_queue_functor* cb, int) {
+ auto* callback = static_cast<ShutdownCallback*>(cb);
+ delete callback->cq_;
+ grpc_core::Delete(callback);
}
private:
diff --git a/src/cpp/common/callback_common.cc b/src/cpp/common/callback_common.cc
deleted file mode 100644
index a0c8eeb516..0000000000
--- a/src/cpp/common/callback_common.cc
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <functional>
-
-#include <grpcpp/impl/codegen/callback_common.h>
-#include <grpcpp/impl/codegen/status.h>
-
-#include "src/core/lib/gprpp/memory.h"
-#include "src/core/lib/surface/completion_queue.h"
-
-namespace grpc {
-namespace internal {
-namespace {
-
-template <class Func, class Arg>
-void CatchingCallback(Func&& func, Arg&& arg) {
-#if GRPC_ALLOW_EXCEPTIONS
- try {
- func(arg);
- } catch (...) {
- // nothing to return or change here, just don't crash the library
- }
-#else // GRPC_ALLOW_EXCEPTIONS
- func(arg);
-#endif // GRPC_ALLOW_EXCEPTIONS
-}
-
-class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface {
- public:
- static void operator delete(void* ptr, std::size_t size) {
- assert(size == sizeof(CallbackWithSuccessImpl));
- }
-
- // This operator should never be called as the memory should be freed as part
- // of the arena destruction. It only exists to provide a matching operator
- // delete to the operator new so that some compilers will not complain (see
- // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
- // there are no tests catching the compiler warning.
- static void operator delete(void*, void*) { assert(0); }
-
- CallbackWithSuccessImpl(grpc_call* call, CallbackWithSuccessTag* parent,
- std::function<void(bool)> f)
- : call_(call), parent_(parent), func_(std::move(f)) {
- grpc_call_ref(call);
- }
-
- void Run(bool ok) override {
- void* ignored = parent_->ops();
- bool new_ok = ok;
- GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &new_ok));
- GPR_ASSERT(ignored == parent_->ops());
-
- // Last use of func_, so ok to move it out for rvalue call above
- CatchingCallback(std::move(func_), ok);
-
- func_ = nullptr; // reset to clear this out for sure
- grpc_call_unref(call_);
- }
-
- private:
- grpc_call* call_;
- CallbackWithSuccessTag* parent_;
- std::function<void(bool)> func_;
-};
-
-class CallbackWithStatusImpl : public grpc_core::CQCallbackInterface {
- public:
- static void operator delete(void* ptr, std::size_t size) {
- assert(size == sizeof(CallbackWithStatusImpl));
- }
-
- // This operator should never be called as the memory should be freed as part
- // of the arena destruction. It only exists to provide a matching operator
- // delete to the operator new so that some compilers will not complain (see
- // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
- // there are no tests catching the compiler warning.
- static void operator delete(void*, void*) { assert(0); }
-
- CallbackWithStatusImpl(grpc_call* call, CallbackWithStatusTag* parent,
- std::function<void(Status)> f)
- : call_(call), parent_(parent), func_(std::move(f)), status_() {
- grpc_call_ref(call);
- }
-
- void Run(bool ok) override {
- void* ignored = parent_->ops();
-
- GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &ok));
- GPR_ASSERT(ignored == parent_->ops());
-
- // Last use of func_ or status_, so ok to move them out
- CatchingCallback(std::move(func_), std::move(status_));
-
- func_ = nullptr; // reset to clear this out for sure
- grpc_call_unref(call_);
- }
- Status* status_ptr() { return &status_; }
-
- private:
- grpc_call* call_;
- CallbackWithStatusTag* parent_;
- std::function<void(Status)> func_;
- Status status_;
-};
-
-} // namespace
-
-CallbackWithSuccessTag::CallbackWithSuccessTag(grpc_call* call,
- std::function<void(bool)> f,
- CompletionQueueTag* ops)
- : impl_(new (grpc_call_arena_alloc(call, sizeof(CallbackWithSuccessImpl)))
- CallbackWithSuccessImpl(call, this, std::move(f))),
- ops_(ops) {}
-
-void CallbackWithSuccessTag::force_run(bool ok) { impl_->Run(ok); }
-
-CallbackWithStatusTag::CallbackWithStatusTag(grpc_call* call,
- std::function<void(Status)> f,
- CompletionQueueTag* ops)
- : ops_(ops) {
- auto* impl = new (grpc_call_arena_alloc(call, sizeof(CallbackWithStatusImpl)))
- CallbackWithStatusImpl(call, this, std::move(f));
- impl_ = impl;
- status_ = impl->status_ptr();
-}
-
-void CallbackWithStatusTag::force_run(Status s) {
- *status_ = std::move(s);
- impl_->Run(true);
-}
-
-} // namespace internal
-} // namespace grpc
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index d00e75c326..e25d953a18 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -107,7 +107,7 @@ extern grpc_completion_queue_create_for_next_type grpc_completion_queue_create_f
typedef grpc_completion_queue*(*grpc_completion_queue_create_for_pluck_type)(void* reserved);
extern grpc_completion_queue_create_for_pluck_type grpc_completion_queue_create_for_pluck_import;
#define grpc_completion_queue_create_for_pluck grpc_completion_queue_create_for_pluck_import
-typedef grpc_completion_queue*(*grpc_completion_queue_create_for_callback_type)(void* shutdown_callback, void* reserved);
+typedef grpc_completion_queue*(*grpc_completion_queue_create_for_callback_type)(grpc_experimental_completion_queue_functor* shutdown_callback, void* reserved);
extern grpc_completion_queue_create_for_callback_type grpc_completion_queue_create_for_callback_import;
#define grpc_completion_queue_create_for_callback grpc_completion_queue_create_for_callback_import
typedef grpc_completion_queue*(*grpc_completion_queue_create_type)(const grpc_completion_queue_factory* factory, const grpc_completion_queue_attributes* attributes, void* reserved);
diff --git a/test/core/end2end/inproc_callback_test.cc b/test/core/end2end/inproc_callback_test.cc
index 0d6c7c75a8..310030046a 100644
--- a/test/core/end2end/inproc_callback_test.cc
+++ b/test/core/end2end/inproc_callback_test.cc
@@ -37,13 +37,16 @@ typedef struct inproc_fixture_data {
namespace {
template <typename F>
-class CQDeletingCallback : public grpc_core::CQCallbackInterface {
+class CQDeletingCallback : public grpc_experimental_completion_queue_functor {
public:
- explicit CQDeletingCallback(F f) : func_(f) {}
- ~CQDeletingCallback() override {}
- void Run(bool ok) override {
- func_(ok);
- grpc_core::Delete(this);
+ explicit CQDeletingCallback(F f) : func_(f) {
+ functor_run = &CQDeletingCallback::Run;
+ }
+ ~CQDeletingCallback() {}
+ static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
+ auto* callback = static_cast<CQDeletingCallback*>(cb);
+ callback->func_(static_cast<bool>(ok));
+ grpc_core::Delete(callback);
}
private:
@@ -51,18 +54,24 @@ class CQDeletingCallback : public grpc_core::CQCallbackInterface {
};
template <typename F>
-grpc_core::CQCallbackInterface* NewDeletingCallback(F f) {
+grpc_experimental_completion_queue_functor* NewDeletingCallback(F f) {
return grpc_core::New<CQDeletingCallback<F>>(f);
}
-class ShutdownCallback : public grpc_core::CQCallbackInterface {
+class ShutdownCallback : public grpc_experimental_completion_queue_functor {
public:
ShutdownCallback() : done_(false) {
+ functor_run = &ShutdownCallback::StaticRun;
gpr_mu_init(&mu_);
gpr_cv_init(&cv_);
}
- ~ShutdownCallback() override {}
- void Run(bool ok) override {
+ ~ShutdownCallback() {}
+ static void StaticRun(grpc_experimental_completion_queue_functor* cb,
+ int ok) {
+ auto* callback = static_cast<ShutdownCallback*>(cb);
+ callback->Run(static_cast<bool>(ok));
+ }
+ void Run(bool ok) {
gpr_log(GPR_DEBUG, "CQ shutdown notification invoked");
gpr_mu_lock(&mu_);
done_ = true;
@@ -170,7 +179,7 @@ static void verify_tags(gpr_timespec deadline) {
// This function creates a callback functor that emits the
// desired tag into the global tag set
-static grpc_core::CQCallbackInterface* tag(intptr_t t) {
+static grpc_experimental_completion_queue_functor* tag(intptr_t t) {
auto func = [t](bool ok) {
gpr_mu_lock(&tags_mu);
gpr_log(GPR_DEBUG, "Completing operation %" PRIdPTR, t);
diff --git a/test/core/surface/completion_queue_test.cc b/test/core/surface/completion_queue_test.cc
index b889fd0fc6..f7ce8a7042 100644
--- a/test/core/surface/completion_queue_test.cc
+++ b/test/core/surface/completion_queue_test.cc
@@ -369,11 +369,15 @@ static void test_callback(void) {
LOG_TEST("test_callback");
bool got_shutdown = false;
- class ShutdownCallback : public grpc_core::CQCallbackInterface {
+ class ShutdownCallback : public grpc_experimental_completion_queue_functor {
public:
- ShutdownCallback(bool* done) : done_(done) {}
+ ShutdownCallback(bool* done) : done_(done) {
+ functor_run = &ShutdownCallback::Run;
+ }
~ShutdownCallback() {}
- void Run(bool ok) override { *done_ = ok; }
+ static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
+ *static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok);
+ }
private:
bool* done_;
@@ -391,14 +395,17 @@ static void test_callback(void) {
grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
int counter = 0;
- class TagCallback : public grpc_core::CQCallbackInterface {
+ class TagCallback : public grpc_experimental_completion_queue_functor {
public:
- TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {}
+ TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
+ functor_run = &TagCallback::Run;
+ }
~TagCallback() {}
- void Run(bool ok) override {
- GPR_ASSERT(ok);
- *counter_ += tag_;
- grpc_core::Delete(this);
+ static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
+ GPR_ASSERT(static_cast<bool>(ok));
+ auto* callback = static_cast<TagCallback*>(cb);
+ *callback->counter_ += callback->tag_;
+ grpc_core::Delete(callback);
};
private:
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index a72390d9f8..cfc5ef98e7 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -1190,7 +1190,6 @@ src/cpp/client/secure_credentials.h \
src/cpp/codegen/codegen_init.cc \
src/cpp/common/alarm.cc \
src/cpp/common/auth_property_iterator.cc \
-src/cpp/common/callback_common.cc \
src/cpp/common/channel_arguments.cc \
src/cpp/common/channel_filter.cc \
src/cpp/common/channel_filter.h \
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index f3e93a0874..97f3e4c1fa 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -11470,7 +11470,6 @@
"src/cpp/client/credentials_cc.cc",
"src/cpp/client/generic_stub.cc",
"src/cpp/common/alarm.cc",
- "src/cpp/common/callback_common.cc",
"src/cpp/common/channel_arguments.cc",
"src/cpp/common/channel_filter.cc",
"src/cpp/common/channel_filter.h",