aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/completion_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface/completion_queue.c')
-rw-r--r--src/core/surface/completion_queue.c32
1 files changed, 23 insertions, 9 deletions
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index b58115a93f..49dfc3c0e1 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -67,8 +67,12 @@ struct grpc_completion_queue {
int is_server_cq;
int num_pluckers;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
+ grpc_closure pollset_destroy_done;
};
+static void on_pollset_destroy_done(void *cc, int success,
+ grpc_call_list *call_list);
+
grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue));
GPR_ASSERT(!reserved);
@@ -80,6 +84,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
grpc_pollset_init(&cc->pollset);
cc->completed_tail = &cc->completed_head;
cc->completed_head.next = (gpr_uintptr)cc->completed_tail;
+ grpc_closure_init(&cc->pollset_destroy_done, on_pollset_destroy_done, cc);
return cc;
}
@@ -94,7 +99,8 @@ void grpc_cq_internal_ref(grpc_completion_queue *cc) {
gpr_ref(&cc->owning_refs);
}
-static void on_pollset_destroy_done(void *arg) {
+static void on_pollset_destroy_done(void *arg, int success,
+ grpc_call_list *call_list) {
grpc_completion_queue *cc = arg;
GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy");
}
@@ -127,8 +133,10 @@ void grpc_cq_begin_op(grpc_completion_queue *cc) {
event, then enter shutdown mode */
/* Queue a GRPC_OP_COMPLETED operation */
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
- void (*done)(void *done_arg, grpc_cq_completion *storage),
- void *done_arg, grpc_cq_completion *storage) {
+ void (*done)(void *done_arg, grpc_cq_completion *storage,
+ grpc_call_list *call_list),
+ void *done_arg, grpc_cq_completion *storage,
+ grpc_call_list *call_list) {
int shutdown;
int i;
grpc_pollset_worker *pluck_worker;
@@ -162,7 +170,7 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
GPR_ASSERT(cc->shutdown_called);
cc->shutdown = 1;
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
+ grpc_pollset_shutdown(&cc->pollset, &cc->pollset_destroy_done, call_list);
}
}
@@ -172,6 +180,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
grpc_pollset_worker worker;
int first_loop = 1;
gpr_timespec now;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
GPR_ASSERT(!reserved);
@@ -190,7 +199,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(c->done_arg, c);
+ c->done(c->done_arg, c, &call_list);
break;
}
if (cc->shutdown) {
@@ -207,10 +216,11 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
break;
}
first_loop = 0;
- grpc_pollset_work(&cc->pollset, &worker, now, deadline);
+ grpc_pollset_work(&cc->pollset, &worker, now, deadline, &call_list);
}
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "next");
+ grpc_call_list_run(&call_list);
return ret;
}
@@ -247,6 +257,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_pollset_worker worker;
gpr_timespec now;
int first_loop = 1;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
GPR_ASSERT(!reserved);
@@ -268,7 +279,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(c->done_arg, c);
+ c->done(c->done_arg, c, &call_list);
goto done;
}
prev = c;
@@ -299,18 +310,20 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
break;
}
first_loop = 0;
- grpc_pollset_work(&cc->pollset, &worker, now, deadline);
+ grpc_pollset_work(&cc->pollset, &worker, now, deadline, &call_list);
del_plucker(cc, tag, &worker);
}
done:
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
+ grpc_call_list_run(&call_list);
return ret;
}
/* Shutdown simply drops a ref that we reserved at creation time; if we drop
to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
if (cc->shutdown_called) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
@@ -324,8 +337,9 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
+ grpc_pollset_shutdown(&cc->pollset, &cc->pollset_destroy_done, &call_list);
}
+ grpc_call_list_run(&call_list);
}
void grpc_completion_queue_destroy(grpc_completion_queue *cc) {