diff options
Diffstat (limited to 'src/core/surface/completion_queue.c')
-rw-r--r-- | src/core/surface/completion_queue.c | 32 |
1 files changed, 23 insertions, 9 deletions
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index b58115a93f..49dfc3c0e1 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -67,8 +67,12 @@ struct grpc_completion_queue { int is_server_cq; int num_pluckers; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; + grpc_closure pollset_destroy_done; }; +static void on_pollset_destroy_done(void *cc, int success, + grpc_call_list *call_list); + grpc_completion_queue *grpc_completion_queue_create(void *reserved) { grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue)); GPR_ASSERT(!reserved); @@ -80,6 +84,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { grpc_pollset_init(&cc->pollset); cc->completed_tail = &cc->completed_head; cc->completed_head.next = (gpr_uintptr)cc->completed_tail; + grpc_closure_init(&cc->pollset_destroy_done, on_pollset_destroy_done, cc); return cc; } @@ -94,7 +99,8 @@ void grpc_cq_internal_ref(grpc_completion_queue *cc) { gpr_ref(&cc->owning_refs); } -static void on_pollset_destroy_done(void *arg) { +static void on_pollset_destroy_done(void *arg, int success, + grpc_call_list *call_list) { grpc_completion_queue *cc = arg; GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy"); } @@ -127,8 +133,10 @@ void grpc_cq_begin_op(grpc_completion_queue *cc) { event, then enter shutdown mode */ /* Queue a GRPC_OP_COMPLETED operation */ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, - void (*done)(void *done_arg, grpc_cq_completion *storage), - void *done_arg, grpc_cq_completion *storage) { + void (*done)(void *done_arg, grpc_cq_completion *storage, + grpc_call_list *call_list), + void *done_arg, grpc_cq_completion *storage, + grpc_call_list *call_list) { int shutdown; int i; grpc_pollset_worker *pluck_worker; @@ -162,7 +170,7 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, GPR_ASSERT(cc->shutdown_called); cc->shutdown = 1; gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); + grpc_pollset_shutdown(&cc->pollset, &cc->pollset_destroy_done, call_list); } } @@ -172,6 +180,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, grpc_pollset_worker worker; int first_loop = 1; gpr_timespec now; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; GPR_ASSERT(!reserved); @@ -190,7 +199,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; - c->done(c->done_arg, c); + c->done(c->done_arg, c, &call_list); break; } if (cc->shutdown) { @@ -207,10 +216,11 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, break; } first_loop = 0; - grpc_pollset_work(&cc->pollset, &worker, now, deadline); + grpc_pollset_work(&cc->pollset, &worker, now, deadline, &call_list); } GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "next"); + grpc_call_list_run(&call_list); return ret; } @@ -247,6 +257,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_pollset_worker worker; gpr_timespec now; int first_loop = 1; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; GPR_ASSERT(!reserved); @@ -268,7 +279,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; - c->done(c->done_arg, c); + c->done(c->done_arg, c, &call_list); goto done; } prev = c; @@ -299,18 +310,20 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, break; } first_loop = 0; - grpc_pollset_work(&cc->pollset, &worker, now, deadline); + grpc_pollset_work(&cc->pollset, &worker, now, deadline, &call_list); del_plucker(cc, tag, &worker); } done: GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "pluck"); + grpc_call_list_run(&call_list); return ret; } /* Shutdown simply drops a ref that we reserved at creation time; if we drop to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); if (cc->shutdown_called) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); @@ -324,8 +337,9 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); + grpc_pollset_shutdown(&cc->pollset, &cc->pollset_destroy_done, &call_list); } + grpc_call_list_run(&call_list); } void grpc_completion_queue_destroy(grpc_completion_queue *cc) { |