aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/completion_queue.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface/completion_queue.cc')
-rw-r--r--src/core/lib/surface/completion_queue.cc34
1 files changed, 18 insertions, 16 deletions
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index c2cf450e94..5dc9991f70 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -184,7 +184,8 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
typedef struct cq_vtable {
grpc_cq_completion_type cq_completion_type;
size_t data_size;
- void (*init)(void* data, grpc_core::CQCallbackInterface* shutdown_callback);
+ void (*init)(void* data,
+ grpc_experimental_completion_queue_functor* shutdown_callback);
void (*shutdown)(grpc_completion_queue* cq);
void (*destroy)(void* data);
bool (*begin_op)(grpc_completion_queue* cq, void* tag);
@@ -267,7 +268,7 @@ typedef struct cq_callback_data {
bool shutdown_called;
/** A callback that gets invoked when the CQ completes shutdown */
- grpc_core::CQCallbackInterface* shutdown_callback;
+ grpc_experimental_completion_queue_functor* shutdown_callback;
} cq_callback_data;
/* Completion queue structure */
@@ -333,12 +334,12 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline, void* reserved);
// Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
-static void cq_init_next(void* data,
- grpc_core::CQCallbackInterface* shutdown_callback);
-static void cq_init_pluck(void* data,
- grpc_core::CQCallbackInterface* shutdown_callback);
-static void cq_init_callback(void* data,
- grpc_core::CQCallbackInterface* shutdown_callback);
+static void cq_init_next(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
+static void cq_init_pluck(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
+static void cq_init_callback(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
static void cq_destroy_next(void* data);
static void cq_destroy_pluck(void* data);
static void cq_destroy_callback(void* data);
@@ -462,7 +463,7 @@ static long cq_event_queue_num_items(grpc_cq_event_queue* q) {
grpc_completion_queue* grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
- grpc_core::CQCallbackInterface* shutdown_callback) {
+ grpc_experimental_completion_queue_functor* shutdown_callback) {
GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0);
grpc_completion_queue* cq;
@@ -497,8 +498,8 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
return cq;
}
-static void cq_init_next(void* data,
- grpc_core::CQCallbackInterface* shutdown_callback) {
+static void cq_init_next(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
cq_next_data* cqd = static_cast<cq_next_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
@@ -513,8 +514,8 @@ static void cq_destroy_next(void* data) {
cq_event_queue_destroy(&cqd->queue);
}
-static void cq_init_pluck(void* data,
- grpc_core::CQCallbackInterface* shutdown_callback) {
+static void cq_init_pluck(
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
@@ -532,7 +533,7 @@ static void cq_destroy_pluck(void* data) {
}
static void cq_init_callback(
- void* data, grpc_core::CQCallbackInterface* shutdown_callback) {
+ void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
cq_callback_data* cqd = static_cast<cq_callback_data*>(data);
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
@@ -859,7 +860,8 @@ static void cq_end_op_for_callback(
GRPC_ERROR_UNREF(error);
- (static_cast<grpc_core::CQCallbackInterface*>(tag))->Run(is_success);
+ auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
+ (*functor->functor_run)(functor, is_success);
}
void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
@@ -1343,7 +1345,7 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
GPR_ASSERT(cqd->shutdown_called);
cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
- callback->Run(true);
+ (*callback->functor_run)(callback, true);
}
static void cq_shutdown_callback(grpc_completion_queue* cq) {