diff options
Diffstat (limited to 'src/core/iomgr/iomgr.c')
-rw-r--r-- | src/core/iomgr/iomgr.c | 104 |
1 files changed, 55 insertions, 49 deletions
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index e74c32b219..1d6fa9053e 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -60,18 +60,12 @@ static void background_callback_executor(void *ignored) { gpr_timespec short_deadline = gpr_time_add(gpr_now(), gpr_time_from_millis(100)); if (g_cbs_head) { - grpc_iomgr_closure *iocb = g_cbs_head; - int is_cb_ext_managed; - g_cbs_head = iocb->next; + grpc_iomgr_closure *closure = g_cbs_head; + g_cbs_head = closure->next; if (!g_cbs_head) g_cbs_tail = NULL; gpr_mu_unlock(&g_mu); - /* capture the managed state, as the callback may deallocate itself */ - is_cb_ext_managed = iocb->is_ext_managed; - assert(iocb->success >= 0); - iocb->cb(iocb->cb_arg, iocb->success); - if (!is_cb_ext_managed) { - gpr_free(iocb); - } + assert(closure->success >= 0); + closure->cb(closure->cb_arg, closure->success); gpr_mu_lock(&g_mu); } else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) { } else { @@ -103,7 +97,7 @@ void grpc_iomgr_init(void) { } void grpc_iomgr_shutdown(void) { - grpc_iomgr_closure *iocb; + grpc_iomgr_closure *closure; gpr_timespec shutdown_deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(10)); @@ -114,18 +108,12 @@ void grpc_iomgr_shutdown(void) { gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", g_refs, g_cbs_head ? " and executing final callbacks" : ""); while (g_cbs_head) { - int is_cb_ext_managed; - iocb = g_cbs_head; - g_cbs_head = iocb->next; + closure = g_cbs_head; + g_cbs_head = closure->next; if (!g_cbs_head) g_cbs_tail = NULL; gpr_mu_unlock(&g_mu); - /* capture the managed state, as the callback may deallocate itself */ - is_cb_ext_managed = iocb->is_ext_managed; - iocb->cb(iocb->cb_arg, 0); - if (!is_cb_ext_managed) { - gpr_free(iocb); - } + closure->cb(closure->cb_arg, 0); gpr_mu_lock(&g_mu); } if (g_refs) { @@ -172,52 +160,75 @@ void grpc_iomgr_unref(void) { gpr_mu_unlock(&g_mu); } -grpc_iomgr_closure *grpc_iomgr_cb_create(grpc_iomgr_cb_func cb, void *cb_arg, - int is_ext_managed) { - grpc_iomgr_closure *iocb = gpr_malloc(sizeof(grpc_iomgr_closure)); - iocb->cb = cb; - iocb->cb_arg = cb_arg; - iocb->is_ext_managed = is_ext_managed; - iocb->success = -1; /* uninitialized */ - iocb->next = NULL; - return iocb; + +void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb, + void *cb_arg) { + closure->cb = cb; + closure->cb_arg = cb_arg; + closure->success = -1; /* uninitialized */ + closure->next = NULL; +} + +typedef struct { + grpc_iomgr_closure managed; + grpc_iomgr_closure *manager; +} managed_closure_arg; + +static void closure_manager_func(void *arg, int success) { + managed_closure_arg *mc_arg = (managed_closure_arg*) arg; + + mc_arg->managed.cb(mc_arg->managed.cb_arg, success); + gpr_free(mc_arg->manager); + gpr_free(mc_arg); } -void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success) { - iocb->success = success; +void grpc_iomgr_managed_closure_init(grpc_iomgr_closure *manager, + grpc_iomgr_cb_func managed_cb, + void *managed_cb_arg) { + managed_closure_arg *managed_arg = gpr_malloc(sizeof(managed_closure_arg)); + + managed_arg->managed.cb = managed_cb; + managed_arg->managed.cb_arg = managed_cb_arg; + managed_arg->manager= manager; + + grpc_iomgr_closure_init(manager, closure_manager_func, managed_arg); +} + + +void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *closure, int success) { + closure->success = success; gpr_mu_lock(&g_mu); - iocb->next = NULL; + closure->next = NULL; if (!g_cbs_tail) { - g_cbs_head = g_cbs_tail = iocb; + g_cbs_head = g_cbs_tail = closure; } else { - g_cbs_tail->next = iocb; - g_cbs_tail = iocb; + g_cbs_tail->next = closure; + g_cbs_tail = closure; } gpr_mu_unlock(&g_mu); } -void grpc_iomgr_add_callback(grpc_iomgr_closure *iocb) { - grpc_iomgr_add_delayed_callback(iocb, 1); +void grpc_iomgr_add_callback(grpc_iomgr_closure *closure) { + grpc_iomgr_add_delayed_callback(closure, 1); } int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) { int n = 0; gpr_mu *retake_mu = NULL; - grpc_iomgr_closure *iocb; + grpc_iomgr_closure *closure; for (;;) { - int is_cb_ext_managed; /* check for new work */ if (!gpr_mu_trylock(&g_mu)) { break; } - iocb = g_cbs_head; - if (!iocb) { + closure = g_cbs_head; + if (!closure) { gpr_mu_unlock(&g_mu); break; } - g_cbs_head = iocb->next; + g_cbs_head = closure->next; if (!g_cbs_head) g_cbs_tail = NULL; gpr_mu_unlock(&g_mu); /* if we have a mutex to drop, do so before executing work */ @@ -226,13 +237,8 @@ int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) { retake_mu = drop_mu; drop_mu = NULL; } - /* capture the managed state, as the callback may deallocate itself */ - is_cb_ext_managed = iocb->is_ext_managed; - assert(iocb->success >= 0); - iocb->cb(iocb->cb_arg, success && iocb->success); - if (!is_cb_ext_managed) { - gpr_free(iocb); - } + assert(closure->success >= 0); + closure->cb(closure->cb_arg, success && closure->success); n++; } if (retake_mu) { |