From a30020f00996735ce8890dca534a31b8e97b5889 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 27 May 2015 19:21:01 -0700 Subject: Comments addressed. --- src/core/iomgr/fd_posix.c | 51 +++++++++----------- src/core/iomgr/fd_posix.h | 2 - src/core/iomgr/iomgr.c | 104 +++++++++++++++++++++------------------- src/core/iomgr/iomgr.h | 11 +++-- src/core/iomgr/pollset_posix.c | 1 - src/core/iomgr/socket_windows.c | 9 ++-- src/core/iomgr/tcp_posix.c | 1 - 7 files changed, 89 insertions(+), 90 deletions(-) (limited to 'src/core/iomgr') diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 3509d021ee..672b321b95 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -118,9 +118,8 @@ static void unref_by(grpc_fd *fd, int n) { gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n); if (old == n) { close(fd->fd); - fd->on_done_iocb.cb = fd->on_done; - fd->on_done_iocb.cb_arg = fd->on_done_user_data; - fd->on_done_iocb.is_ext_managed = 1; + grpc_iomgr_closure_init(&fd->on_done_iocb, fd->on_done, + fd->on_done_user_data); grpc_iomgr_add_callback(&fd->on_done_iocb); freelist_fd(fd); grpc_iomgr_unref(); @@ -199,26 +198,20 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); } void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } -static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success, - int allow_synchronous_callback, - grpc_iomgr_closure *iocb) { +static void process_callback(grpc_iomgr_closure *closure, int success, + int allow_synchronous_callback) { if (allow_synchronous_callback) { - cb(arg, success); + closure->cb(closure->cb_arg, success); } else { - /* !iocb: allocate -> managed by iomgr - * iocb: "iocb" holds an instance managed by fd_posix */ - iocb = grpc_iomgr_cb_create(cb, arg, !iocb /* is_ext_managed */); - grpc_iomgr_add_delayed_callback(iocb, success); + grpc_iomgr_add_delayed_callback(closure, success); } } -static void make_callbacks(grpc_iomgr_closure *callbacks, size_t n, int success, - int allow_synchronous_callback, - grpc_iomgr_closure *iocbs) { +static void process_callbacks(grpc_iomgr_closure *callbacks, size_t n, + int success, int allow_synchronous_callback) { size_t i; for (i = 0; i < n; i++) { - make_callback(callbacks[i].cb, callbacks[i].cb_arg, success, - allow_synchronous_callback, iocbs + i); + process_callback(callbacks + i, success, allow_synchronous_callback); } } @@ -245,9 +238,9 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, case READY: assert(gpr_atm_no_barrier_load(st) == READY); gpr_atm_rel_store(st, NOT_READY); - make_callback(closure->cb, closure->cb_arg, - !gpr_atm_acq_load(&fd->shutdown), - allow_synchronous_callback, NULL); + closure->success = -1; + process_callback(closure, !gpr_atm_acq_load(&fd->shutdown), + allow_synchronous_callback); return; default: /* WAITING */ /* upcallptr was set to a different closure. This is an error! */ @@ -291,29 +284,31 @@ static void set_ready(grpc_fd *fd, gpr_atm *st, /* only one set_ready can be active at once (but there may be a racing notify_on) */ int success; - grpc_iomgr_closure cb; + grpc_iomgr_closure closure; size_t ncb = 0; - grpc_iomgr_closure *ready_iocb; + gpr_mu_lock(&fd->set_state_mu); - set_ready_locked(st, &cb, &ncb); + set_ready_locked(st, &closure, &ncb); gpr_mu_unlock(&fd->set_state_mu); success = !gpr_atm_acq_load(&fd->shutdown); assert(ncb <= 1); - ready_iocb = grpc_iomgr_cb_create(cb.cb, cb.cb_arg, 0); - make_callbacks(&cb, ncb, success, allow_synchronous_callback, ready_iocb); + if (ncb > 0) { + grpc_iomgr_closure *managed_cb = gpr_malloc(sizeof(grpc_iomgr_closure)); + grpc_iomgr_managed_closure_init(managed_cb, closure.cb, closure.cb_arg); + process_callbacks(managed_cb, ncb, success, allow_synchronous_callback); + } } void grpc_fd_shutdown(grpc_fd *fd) { - grpc_iomgr_closure cb[2]; size_t ncb = 0; gpr_mu_lock(&fd->set_state_mu); GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown)); gpr_atm_rel_store(&fd->shutdown, 1); - set_ready_locked(&fd->readst, cb, &ncb); - set_ready_locked(&fd->writest, cb, &ncb); + set_ready_locked(&fd->readst, fd->shutdown_iocbs, &ncb); + set_ready_locked(&fd->writest, fd->shutdown_iocbs, &ncb); gpr_mu_unlock(&fd->set_state_mu); assert(ncb <= 2); - make_callbacks(cb, ncb, 0, 0, fd->shutdown_iocbs); + process_callbacks(fd->shutdown_iocbs, ncb, 0, 0); } void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) { diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index 59bd01ce0f..2d9c3245e3 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -96,8 +96,6 @@ struct grpc_fd { struct grpc_fd *freelist_next; grpc_iomgr_closure on_done_iocb; - /*grpc_iomgr_closure *ready_iocb; XXX: the only one we need to allocate on - * the spot*/ grpc_iomgr_closure shutdown_iocbs[2]; }; diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index e74c32b219..1d6fa9053e 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -60,18 +60,12 @@ static void background_callback_executor(void *ignored) { gpr_timespec short_deadline = gpr_time_add(gpr_now(), gpr_time_from_millis(100)); if (g_cbs_head) { - grpc_iomgr_closure *iocb = g_cbs_head; - int is_cb_ext_managed; - g_cbs_head = iocb->next; + grpc_iomgr_closure *closure = g_cbs_head; + g_cbs_head = closure->next; if (!g_cbs_head) g_cbs_tail = NULL; gpr_mu_unlock(&g_mu); - /* capture the managed state, as the callback may deallocate itself */ - is_cb_ext_managed = iocb->is_ext_managed; - assert(iocb->success >= 0); - iocb->cb(iocb->cb_arg, iocb->success); - if (!is_cb_ext_managed) { - gpr_free(iocb); - } + assert(closure->success >= 0); + closure->cb(closure->cb_arg, closure->success); gpr_mu_lock(&g_mu); } else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) { } else { @@ -103,7 +97,7 @@ void grpc_iomgr_init(void) { } void grpc_iomgr_shutdown(void) { - grpc_iomgr_closure *iocb; + grpc_iomgr_closure *closure; gpr_timespec shutdown_deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(10)); @@ -114,18 +108,12 @@ void grpc_iomgr_shutdown(void) { gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", g_refs, g_cbs_head ? " and executing final callbacks" : ""); while (g_cbs_head) { - int is_cb_ext_managed; - iocb = g_cbs_head; - g_cbs_head = iocb->next; + closure = g_cbs_head; + g_cbs_head = closure->next; if (!g_cbs_head) g_cbs_tail = NULL; gpr_mu_unlock(&g_mu); - /* capture the managed state, as the callback may deallocate itself */ - is_cb_ext_managed = iocb->is_ext_managed; - iocb->cb(iocb->cb_arg, 0); - if (!is_cb_ext_managed) { - gpr_free(iocb); - } + closure->cb(closure->cb_arg, 0); gpr_mu_lock(&g_mu); } if (g_refs) { @@ -172,52 +160,75 @@ void grpc_iomgr_unref(void) { gpr_mu_unlock(&g_mu); } -grpc_iomgr_closure *grpc_iomgr_cb_create(grpc_iomgr_cb_func cb, void *cb_arg, - int is_ext_managed) { - grpc_iomgr_closure *iocb = gpr_malloc(sizeof(grpc_iomgr_closure)); - iocb->cb = cb; - iocb->cb_arg = cb_arg; - iocb->is_ext_managed = is_ext_managed; - iocb->success = -1; /* uninitialized */ - iocb->next = NULL; - return iocb; + +void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb, + void *cb_arg) { + closure->cb = cb; + closure->cb_arg = cb_arg; + closure->success = -1; /* uninitialized */ + closure->next = NULL; +} + +typedef struct { + grpc_iomgr_closure managed; + grpc_iomgr_closure *manager; +} managed_closure_arg; + +static void closure_manager_func(void *arg, int success) { + managed_closure_arg *mc_arg = (managed_closure_arg*) arg; + + mc_arg->managed.cb(mc_arg->managed.cb_arg, success); + gpr_free(mc_arg->manager); + gpr_free(mc_arg); } -void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success) { - iocb->success = success; +void grpc_iomgr_managed_closure_init(grpc_iomgr_closure *manager, + grpc_iomgr_cb_func managed_cb, + void *managed_cb_arg) { + managed_closure_arg *managed_arg = gpr_malloc(sizeof(managed_closure_arg)); + + managed_arg->managed.cb = managed_cb; + managed_arg->managed.cb_arg = managed_cb_arg; + managed_arg->manager= manager; + + grpc_iomgr_closure_init(manager, closure_manager_func, managed_arg); +} + + +void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *closure, int success) { + closure->success = success; gpr_mu_lock(&g_mu); - iocb->next = NULL; + closure->next = NULL; if (!g_cbs_tail) { - g_cbs_head = g_cbs_tail = iocb; + g_cbs_head = g_cbs_tail = closure; } else { - g_cbs_tail->next = iocb; - g_cbs_tail = iocb; + g_cbs_tail->next = closure; + g_cbs_tail = closure; } gpr_mu_unlock(&g_mu); } -void grpc_iomgr_add_callback(grpc_iomgr_closure *iocb) { - grpc_iomgr_add_delayed_callback(iocb, 1); +void grpc_iomgr_add_callback(grpc_iomgr_closure *closure) { + grpc_iomgr_add_delayed_callback(closure, 1); } int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) { int n = 0; gpr_mu *retake_mu = NULL; - grpc_iomgr_closure *iocb; + grpc_iomgr_closure *closure; for (;;) { - int is_cb_ext_managed; /* check for new work */ if (!gpr_mu_trylock(&g_mu)) { break; } - iocb = g_cbs_head; - if (!iocb) { + closure = g_cbs_head; + if (!closure) { gpr_mu_unlock(&g_mu); break; } - g_cbs_head = iocb->next; + g_cbs_head = closure->next; if (!g_cbs_head) g_cbs_tail = NULL; gpr_mu_unlock(&g_mu); /* if we have a mutex to drop, do so before executing work */ @@ -226,13 +237,8 @@ int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) { retake_mu = drop_mu; drop_mu = NULL; } - /* capture the managed state, as the callback may deallocate itself */ - is_cb_ext_managed = iocb->is_ext_managed; - assert(iocb->success >= 0); - iocb->cb(iocb->cb_arg, success && iocb->success); - if (!is_cb_ext_managed) { - gpr_free(iocb); - } + assert(closure->success >= 0); + closure->cb(closure->cb_arg, success && closure->success); n++; } if (retake_mu) { diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h index 86bb5f3583..437e8cf9a8 100644 --- a/src/core/iomgr/iomgr.h +++ b/src/core/iomgr/iomgr.h @@ -41,18 +41,21 @@ typedef struct grpc_iomgr_closure { grpc_iomgr_cb_func cb; void *cb_arg; int success; - int is_ext_managed; /** is memory being managed externally? */ struct grpc_iomgr_closure *next; /** Do not touch */ } grpc_iomgr_closure; -grpc_iomgr_closure *grpc_iomgr_cb_create(grpc_iomgr_cb_func cb, void *cb_arg, - int is_ext_managed); +void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb, + void *cb_arg); + +void grpc_iomgr_managed_closure_init(grpc_iomgr_closure *manager, + grpc_iomgr_cb_func managed_cb, + void *managed_cb_arg); void grpc_iomgr_init(void); void grpc_iomgr_shutdown(void); /* This function is called from within a callback or from anywhere else and causes the invocation of a callback at some point in the future */ -void grpc_iomgr_add_callback(grpc_iomgr_closure *iocb); +void grpc_iomgr_add_callback(grpc_iomgr_closure *closure); #endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */ diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 7a2ba4be06..db17012df0 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -366,7 +366,6 @@ static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { up_args->original_vtable = pollset->vtable; up_args->promotion_iocb.cb = unary_poll_do_promote; up_args->promotion_iocb.cb_arg = up_args; - up_args->promotion_iocb.is_ext_managed = 1; grpc_iomgr_add_callback(&up_args->promotion_iocb); grpc_pollset_kick(pollset); diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c index 92fcf7de2a..f0bd0d8d69 100644 --- a/src/core/iomgr/socket_windows.c +++ b/src/core/iomgr/socket_windows.c @@ -62,17 +62,16 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket) { int grpc_winsocket_shutdown(grpc_winsocket *socket) { int callbacks_set = 0; gpr_mu_lock(&socket->state_mu); - socket->shutdown_iocb.is_ext_managed = 1; /* GPR_TRUE */ if (socket->read_info.cb) { callbacks_set++; - socket->shutdown_iocb.cb = socket->read_info.cb; - socket->shutdown_iocb.cb_arg = socket->read_info.opaque; + grpc_iomgr_closure_init(&socket->shutdown_iocb, socket->read_info.cb, + socket->read_info.opaque); grpc_iomgr_add_delayed_callback(socket->shutdown_iocb, 0); } if (socket->write_info.cb) { callbacks_set++; - socket->shutdown_iocb.cb = socket->write_info.cb; - socket->shutdown_iocb.cb_arg = socket->write_info.opaque; + grpc_iomgr_closure_init(&socket->shutdown_iocb, socket->write_info.cb, + socket->write_info.opaque); grpc_iomgr_add_delayed_callback(socket->shutdown_iocb, 0); } gpr_mu_unlock(&socket->state_mu); diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index e2cda52733..2978c2c370 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -597,7 +597,6 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) { tcp->write_closure.cb_arg = tcp; tcp->handle_read_iocb.cb = grpc_tcp_handle_read; - tcp->handle_read_iocb.is_ext_managed = 1; return &tcp->base; } -- cgit v1.2.3