diff options
-rw-r--r-- | src/core/channel/child_channel.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.c | 51 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.h | 2 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 104 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.h | 11 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 1 | ||||
-rw-r--r-- | src/core/iomgr/socket_windows.c | 9 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.c | 1 | ||||
-rw-r--r-- | src/core/security/credentials.c | 11 | ||||
-rw-r--r-- | src/core/surface/call.c | 1 | ||||
-rw-r--r-- | src/core/surface/channel.c | 1 | ||||
-rw-r--r-- | src/core/surface/server.c | 34 | ||||
-rw-r--r-- | test/core/iomgr/alarm_test.c | 16 |
13 files changed, 123 insertions, 121 deletions
diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c index 5679a8c81a..213a4b881d 100644 --- a/src/core/channel/child_channel.c +++ b/src/core/channel/child_channel.c @@ -218,7 +218,6 @@ static void maybe_destroy_channel(grpc_child_channel *channel) { !chand->sending_farewell && !chand->calling_back) { chand->finally_destroy_channel_iocb.cb = finally_destroy_channel; chand->finally_destroy_channel_iocb.cb_arg = channel; - chand->finally_destroy_channel_iocb.is_ext_managed = 1; /* GPR_TRUE */ grpc_iomgr_add_callback(&chand->finally_destroy_channel_iocb); } else if (chand->destroyed && !chand->disconnected && chand->active_calls == 0 && !chand->sending_farewell && @@ -226,7 +225,6 @@ static void maybe_destroy_channel(grpc_child_channel *channel) { chand->sending_farewell = 1; chand->send_farewells_iocb.cb = send_farewells; chand->send_farewells_iocb.cb_arg = channel; - chand->send_farewells_iocb.is_ext_managed = 1; /* GPR_TRUE */ grpc_iomgr_add_callback(&chand->send_farewells_iocb); } } diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 3509d021ee..672b321b95 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -118,9 +118,8 @@ static void unref_by(grpc_fd *fd, int n) { gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n); if (old == n) { close(fd->fd); - fd->on_done_iocb.cb = fd->on_done; - fd->on_done_iocb.cb_arg = fd->on_done_user_data; - fd->on_done_iocb.is_ext_managed = 1; + grpc_iomgr_closure_init(&fd->on_done_iocb, fd->on_done, + fd->on_done_user_data); grpc_iomgr_add_callback(&fd->on_done_iocb); freelist_fd(fd); grpc_iomgr_unref(); @@ -199,26 +198,20 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); } void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } -static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success, - int allow_synchronous_callback, - grpc_iomgr_closure *iocb) { +static void process_callback(grpc_iomgr_closure *closure, int success, + int allow_synchronous_callback) { if (allow_synchronous_callback) { - cb(arg, success); + closure->cb(closure->cb_arg, success); } else { - /* !iocb: allocate -> managed by iomgr - * iocb: "iocb" holds an instance managed by fd_posix */ - iocb = grpc_iomgr_cb_create(cb, arg, !iocb /* is_ext_managed */); - grpc_iomgr_add_delayed_callback(iocb, success); + grpc_iomgr_add_delayed_callback(closure, success); } } -static void make_callbacks(grpc_iomgr_closure *callbacks, size_t n, int success, - int allow_synchronous_callback, - grpc_iomgr_closure *iocbs) { +static void process_callbacks(grpc_iomgr_closure *callbacks, size_t n, + int success, int allow_synchronous_callback) { size_t i; for (i = 0; i < n; i++) { - make_callback(callbacks[i].cb, callbacks[i].cb_arg, success, - allow_synchronous_callback, iocbs + i); + process_callback(callbacks + i, success, allow_synchronous_callback); } } @@ -245,9 +238,9 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, case READY: assert(gpr_atm_no_barrier_load(st) == READY); gpr_atm_rel_store(st, NOT_READY); - make_callback(closure->cb, closure->cb_arg, - !gpr_atm_acq_load(&fd->shutdown), - allow_synchronous_callback, NULL); + closure->success = -1; + process_callback(closure, !gpr_atm_acq_load(&fd->shutdown), + allow_synchronous_callback); return; default: /* WAITING */ /* upcallptr was set to a different closure. This is an error! */ @@ -291,29 +284,31 @@ static void set_ready(grpc_fd *fd, gpr_atm *st, /* only one set_ready can be active at once (but there may be a racing notify_on) */ int success; - grpc_iomgr_closure cb; + grpc_iomgr_closure closure; size_t ncb = 0; - grpc_iomgr_closure *ready_iocb; + gpr_mu_lock(&fd->set_state_mu); - set_ready_locked(st, &cb, &ncb); + set_ready_locked(st, &closure, &ncb); gpr_mu_unlock(&fd->set_state_mu); success = !gpr_atm_acq_load(&fd->shutdown); assert(ncb <= 1); - ready_iocb = grpc_iomgr_cb_create(cb.cb, cb.cb_arg, 0); - make_callbacks(&cb, ncb, success, allow_synchronous_callback, ready_iocb); + if (ncb > 0) { + grpc_iomgr_closure *managed_cb = gpr_malloc(sizeof(grpc_iomgr_closure)); + grpc_iomgr_managed_closure_init(managed_cb, closure.cb, closure.cb_arg); + process_callbacks(managed_cb, ncb, success, allow_synchronous_callback); + } } void grpc_fd_shutdown(grpc_fd *fd) { - grpc_iomgr_closure cb[2]; size_t ncb = 0; gpr_mu_lock(&fd->set_state_mu); GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown)); gpr_atm_rel_store(&fd->shutdown, 1); - set_ready_locked(&fd->readst, cb, &ncb); - set_ready_locked(&fd->writest, cb, &ncb); + set_ready_locked(&fd->readst, fd->shutdown_iocbs, &ncb); + set_ready_locked(&fd->writest, fd->shutdown_iocbs, &ncb); gpr_mu_unlock(&fd->set_state_mu); assert(ncb <= 2); - make_callbacks(cb, ncb, 0, 0, fd->shutdown_iocbs); + process_callbacks(fd->shutdown_iocbs, ncb, 0, 0); } void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) { diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index 59bd01ce0f..2d9c3245e3 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -96,8 +96,6 @@ struct grpc_fd { struct grpc_fd *freelist_next; grpc_iomgr_closure on_done_iocb; - /*grpc_iomgr_closure *ready_iocb; XXX: the only one we need to allocate on - * the spot*/ grpc_iomgr_closure shutdown_iocbs[2]; }; diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index e74c32b219..1d6fa9053e 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -60,18 +60,12 @@ static void background_callback_executor(void *ignored) { gpr_timespec short_deadline = gpr_time_add(gpr_now(), gpr_time_from_millis(100)); if (g_cbs_head) { - grpc_iomgr_closure *iocb = g_cbs_head; - int is_cb_ext_managed; - g_cbs_head = iocb->next; + grpc_iomgr_closure *closure = g_cbs_head; + g_cbs_head = closure->next; if (!g_cbs_head) g_cbs_tail = NULL; gpr_mu_unlock(&g_mu); - /* capture the managed state, as the callback may deallocate itself */ - is_cb_ext_managed = iocb->is_ext_managed; - assert(iocb->success >= 0); - iocb->cb(iocb->cb_arg, iocb->success); - if (!is_cb_ext_managed) { - gpr_free(iocb); - } + assert(closure->success >= 0); + closure->cb(closure->cb_arg, closure->success); gpr_mu_lock(&g_mu); } else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) { } else { @@ -103,7 +97,7 @@ void grpc_iomgr_init(void) { } void grpc_iomgr_shutdown(void) { - grpc_iomgr_closure *iocb; + grpc_iomgr_closure *closure; gpr_timespec shutdown_deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(10)); @@ -114,18 +108,12 @@ void grpc_iomgr_shutdown(void) { gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", g_refs, g_cbs_head ? " and executing final callbacks" : ""); while (g_cbs_head) { - int is_cb_ext_managed; - iocb = g_cbs_head; - g_cbs_head = iocb->next; + closure = g_cbs_head; + g_cbs_head = closure->next; if (!g_cbs_head) g_cbs_tail = NULL; gpr_mu_unlock(&g_mu); - /* capture the managed state, as the callback may deallocate itself */ - is_cb_ext_managed = iocb->is_ext_managed; - iocb->cb(iocb->cb_arg, 0); - if (!is_cb_ext_managed) { - gpr_free(iocb); - } + closure->cb(closure->cb_arg, 0); gpr_mu_lock(&g_mu); } if (g_refs) { @@ -172,52 +160,75 @@ void grpc_iomgr_unref(void) { gpr_mu_unlock(&g_mu); } -grpc_iomgr_closure *grpc_iomgr_cb_create(grpc_iomgr_cb_func cb, void *cb_arg, - int is_ext_managed) { - grpc_iomgr_closure *iocb = gpr_malloc(sizeof(grpc_iomgr_closure)); - iocb->cb = cb; - iocb->cb_arg = cb_arg; - iocb->is_ext_managed = is_ext_managed; - iocb->success = -1; /* uninitialized */ - iocb->next = NULL; - return iocb; + +void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb, + void *cb_arg) { + closure->cb = cb; + closure->cb_arg = cb_arg; + closure->success = -1; /* uninitialized */ + closure->next = NULL; +} + +typedef struct { + grpc_iomgr_closure managed; + grpc_iomgr_closure *manager; +} managed_closure_arg; + +static void closure_manager_func(void *arg, int success) { + managed_closure_arg *mc_arg = (managed_closure_arg*) arg; + + mc_arg->managed.cb(mc_arg->managed.cb_arg, success); + gpr_free(mc_arg->manager); + gpr_free(mc_arg); } -void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success) { - iocb->success = success; +void grpc_iomgr_managed_closure_init(grpc_iomgr_closure *manager, + grpc_iomgr_cb_func managed_cb, + void *managed_cb_arg) { + managed_closure_arg *managed_arg = gpr_malloc(sizeof(managed_closure_arg)); + + managed_arg->managed.cb = managed_cb; + managed_arg->managed.cb_arg = managed_cb_arg; + managed_arg->manager= manager; + + grpc_iomgr_closure_init(manager, closure_manager_func, managed_arg); +} + + +void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *closure, int success) { + closure->success = success; gpr_mu_lock(&g_mu); - iocb->next = NULL; + closure->next = NULL; if (!g_cbs_tail) { - g_cbs_head = g_cbs_tail = iocb; + g_cbs_head = g_cbs_tail = closure; } else { - g_cbs_tail->next = iocb; - g_cbs_tail = iocb; + g_cbs_tail->next = closure; + g_cbs_tail = closure; } gpr_mu_unlock(&g_mu); } -void grpc_iomgr_add_callback(grpc_iomgr_closure *iocb) { - grpc_iomgr_add_delayed_callback(iocb, 1); +void grpc_iomgr_add_callback(grpc_iomgr_closure *closure) { + grpc_iomgr_add_delayed_callback(closure, 1); } int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) { int n = 0; gpr_mu *retake_mu = NULL; - grpc_iomgr_closure *iocb; + grpc_iomgr_closure *closure; for (;;) { - int is_cb_ext_managed; /* check for new work */ if (!gpr_mu_trylock(&g_mu)) { break; } - iocb = g_cbs_head; - if (!iocb) { + closure = g_cbs_head; + if (!closure) { gpr_mu_unlock(&g_mu); break; } - g_cbs_head = iocb->next; + g_cbs_head = closure->next; if (!g_cbs_head) g_cbs_tail = NULL; gpr_mu_unlock(&g_mu); /* if we have a mutex to drop, do so before executing work */ @@ -226,13 +237,8 @@ int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) { retake_mu = drop_mu; drop_mu = NULL; } - /* capture the managed state, as the callback may deallocate itself */ - is_cb_ext_managed = iocb->is_ext_managed; - assert(iocb->success >= 0); - iocb->cb(iocb->cb_arg, success && iocb->success); - if (!is_cb_ext_managed) { - gpr_free(iocb); - } + assert(closure->success >= 0); + closure->cb(closure->cb_arg, success && closure->success); n++; } if (retake_mu) { diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h index 86bb5f3583..437e8cf9a8 100644 --- a/src/core/iomgr/iomgr.h +++ b/src/core/iomgr/iomgr.h @@ -41,18 +41,21 @@ typedef struct grpc_iomgr_closure { grpc_iomgr_cb_func cb; void *cb_arg; int success; - int is_ext_managed; /** is memory being managed externally? */ struct grpc_iomgr_closure *next; /** Do not touch */ } grpc_iomgr_closure; -grpc_iomgr_closure *grpc_iomgr_cb_create(grpc_iomgr_cb_func cb, void *cb_arg, - int is_ext_managed); +void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb, + void *cb_arg); + +void grpc_iomgr_managed_closure_init(grpc_iomgr_closure *manager, + grpc_iomgr_cb_func managed_cb, + void *managed_cb_arg); void grpc_iomgr_init(void); void grpc_iomgr_shutdown(void); /* This function is called from within a callback or from anywhere else and causes the invocation of a callback at some point in the future */ -void grpc_iomgr_add_callback(grpc_iomgr_closure *iocb); +void grpc_iomgr_add_callback(grpc_iomgr_closure *closure); #endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */ diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 7a2ba4be06..db17012df0 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -366,7 +366,6 @@ static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { up_args->original_vtable = pollset->vtable; up_args->promotion_iocb.cb = unary_poll_do_promote; up_args->promotion_iocb.cb_arg = up_args; - up_args->promotion_iocb.is_ext_managed = 1; grpc_iomgr_add_callback(&up_args->promotion_iocb); grpc_pollset_kick(pollset); diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c index 92fcf7de2a..f0bd0d8d69 100644 --- a/src/core/iomgr/socket_windows.c +++ b/src/core/iomgr/socket_windows.c @@ -62,17 +62,16 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket) { int grpc_winsocket_shutdown(grpc_winsocket *socket) { int callbacks_set = 0; gpr_mu_lock(&socket->state_mu); - socket->shutdown_iocb.is_ext_managed = 1; /* GPR_TRUE */ if (socket->read_info.cb) { callbacks_set++; - socket->shutdown_iocb.cb = socket->read_info.cb; - socket->shutdown_iocb.cb_arg = socket->read_info.opaque; + grpc_iomgr_closure_init(&socket->shutdown_iocb, socket->read_info.cb, + socket->read_info.opaque); grpc_iomgr_add_delayed_callback(socket->shutdown_iocb, 0); } if (socket->write_info.cb) { callbacks_set++; - socket->shutdown_iocb.cb = socket->write_info.cb; - socket->shutdown_iocb.cb_arg = socket->write_info.opaque; + grpc_iomgr_closure_init(&socket->shutdown_iocb, socket->write_info.cb, + socket->write_info.opaque); grpc_iomgr_add_delayed_callback(socket->shutdown_iocb, 0); } gpr_mu_unlock(&socket->state_mu); diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index e2cda52733..2978c2c370 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -597,7 +597,6 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) { tcp->write_closure.cb_arg = tcp; tcp->handle_read_iocb.cb = grpc_tcp_handle_read; - tcp->handle_read_iocb.is_ext_managed = 1; return &tcp->base; } diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index e9229f0694..5e595899c3 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -831,11 +831,12 @@ static void fake_oauth2_get_request_metadata(grpc_credentials *creds, grpc_fake_oauth2_credentials *c = (grpc_fake_oauth2_credentials *)creds; if (c->is_async) { - grpc_iomgr_add_callback( - grpc_iomgr_cb_create( - on_simulated_token_fetch_done, - grpc_credentials_metadata_request_create(creds, cb, user_data), - 0 /*GPR_FALSE*/)); + grpc_iomgr_closure *on_simulated_token_fetch_done_closure = + gpr_malloc(sizeof(grpc_iomgr_closure)); + grpc_iomgr_managed_closure_init( + on_simulated_token_fetch_done_closure, on_simulated_token_fetch_done, + grpc_credentials_metadata_request_create(creds, cb, user_data)); + grpc_iomgr_add_callback(on_simulated_token_fetch_done_closure); } else { cb(user_data, c->access_token_md->entries, 1, GRPC_CREDENTIALS_OK); } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 99e14e3e15..3a739315a2 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -370,7 +370,6 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) { } else { c->destroy_iocb.cb = destroy_call; c->destroy_iocb.cb_arg = c; - c->destroy_iocb.is_ext_managed = 1; /* GPR_TRUE */ grpc_iomgr_add_callback(&c->destroy_iocb); } } diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 8e593b8073..d018db7bb0 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -196,7 +196,6 @@ void grpc_channel_internal_unref(grpc_channel *channel) { if (gpr_unref(&channel->refs)) { channel->destroy_iocb.cb = destroy_channel; channel->destroy_iocb.cb_arg = channel; - channel->destroy_iocb.is_ext_managed = 1; grpc_iomgr_add_callback(&channel->destroy_iocb); } } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index a10b921456..378d50486b 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -308,7 +308,6 @@ static void destroy_channel(channel_data *chand) { server_ref(chand->server); chand->finish_destroy_channel_iocb.cb = finish_destroy_channel; chand->finish_destroy_channel_iocb.cb_arg = chand; - chand->finish_destroy_channel_iocb.is_ext_managed = 1; grpc_iomgr_add_callback(&chand->finish_destroy_channel_iocb); } @@ -420,23 +419,29 @@ static void server_on_recv(void *ptr, int success) { case GRPC_STREAM_RECV_CLOSED: gpr_mu_lock(&chand->server->mu); if (calld->state == NOT_STARTED) { + grpc_iomgr_closure *kill_zombie_closure = + gpr_malloc(sizeof(grpc_iomgr_closure)); calld->state = ZOMBIED; - grpc_iomgr_add_callback( - grpc_iomgr_cb_create(kill_zombie, elem, 0 /* GPR_FALSE */)); + grpc_iomgr_managed_closure_init(kill_zombie_closure, kill_zombie, elem); + grpc_iomgr_add_callback(kill_zombie_closure); } gpr_mu_unlock(&chand->server->mu); break; case GRPC_STREAM_CLOSED: gpr_mu_lock(&chand->server->mu); if (calld->state == NOT_STARTED) { + grpc_iomgr_closure *kill_zombie_closure = + gpr_malloc(sizeof(grpc_iomgr_closure)); calld->state = ZOMBIED; - grpc_iomgr_add_callback( - grpc_iomgr_cb_create(kill_zombie, elem, 0 /* GPR_FALSE */)); + grpc_iomgr_managed_closure_init(kill_zombie_closure, kill_zombie, elem); + grpc_iomgr_add_callback(kill_zombie_closure); } else if (calld->state == PENDING) { + grpc_iomgr_closure *kill_zombie_closure = + gpr_malloc(sizeof(grpc_iomgr_closure)); call_list_remove(calld, PENDING_START); calld->state = ZOMBIED; - grpc_iomgr_add_callback( - grpc_iomgr_cb_create(kill_zombie, elem, 0 /* GPR_FALSE */)); + grpc_iomgr_managed_closure_init(kill_zombie_closure, kill_zombie, elem); + grpc_iomgr_add_callback(kill_zombie_closure); } gpr_mu_unlock(&chand->server->mu); break; @@ -512,7 +517,6 @@ static void shutdown_channel(channel_data *chand) { grpc_channel_internal_ref(chand->channel); chand->finish_shutdown_channel_iocb.cb = finish_shutdown_channel; chand->finish_shutdown_channel_iocb.cb_arg = chand; - chand->finish_shutdown_channel_iocb.is_ext_managed = 1; grpc_iomgr_add_callback(&chand->finish_shutdown_channel_iocb); } @@ -951,13 +955,17 @@ void grpc_server_destroy(grpc_server *server) { while ((calld = call_list_remove_head(&server->lists[PENDING_START], PENDING_START)) != NULL) { + /* TODO(dgq): If we knew the size of the call list (or an upper bound), we + * could allocate all the memory for the closures in advance in a single + * chunk */ + grpc_iomgr_closure *kill_zombie_closure = + gpr_malloc(sizeof(grpc_iomgr_closure)); gpr_log(GPR_DEBUG, "server destroys call %p", calld->call); calld->state = ZOMBIED; - grpc_iomgr_add_callback( - grpc_iomgr_cb_create( - kill_zombie, - grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), - 0)); /* XXX */ + grpc_iomgr_managed_closure_init( + kill_zombie_closure, kill_zombie, + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); + grpc_iomgr_add_callback(kill_zombie_closure); } for (c = server->root_channel_data.next; c != &server->root_channel_data; diff --git a/test/core/iomgr/alarm_test.c b/test/core/iomgr/alarm_test.c index e416c79425..0ccec435e6 100644 --- a/test/core/iomgr/alarm_test.c +++ b/test/core/iomgr/alarm_test.c @@ -55,7 +55,7 @@ void no_op_cb(void *arg, int success) {} typedef struct { gpr_cv cv; gpr_mu mu; - grpc_iomgr_closure *iocb; + grpc_iomgr_closure *followup_closure; int counter; int done_success_ctr; int done_cancel_ctr; @@ -82,10 +82,8 @@ static void alarm_cb(void *arg /* alarm_arg */, int success) { a->success = success; gpr_cv_signal(&a->cv); gpr_mu_unlock(&a->mu); - a->iocb->cb = followup_cb; - a->iocb->cb_arg = &a->fcb_arg; - a->iocb->is_ext_managed = 1; - grpc_iomgr_add_callback(a->iocb); + grpc_iomgr_closure_init(a->followup_closure, followup_cb, &a->fcb_arg); + grpc_iomgr_add_callback(a->followup_closure); } /* Test grpc_alarm add and cancel. */ @@ -111,7 +109,7 @@ static void test_grpc_alarm(void) { arg.done = 0; gpr_mu_init(&arg.mu); gpr_cv_init(&arg.cv); - arg.iocb = gpr_malloc(sizeof(grpc_iomgr_closure)); + arg.followup_closure = gpr_malloc(sizeof(grpc_iomgr_closure)); gpr_event_init(&arg.fcb_arg); grpc_alarm_init(&alarm, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100), alarm_cb, &arg, @@ -153,7 +151,7 @@ static void test_grpc_alarm(void) { } gpr_cv_destroy(&arg.cv); gpr_mu_destroy(&arg.mu); - gpr_free(arg.iocb); + gpr_free(arg.followup_closure); arg2.counter = 0; arg2.success = SUCCESS_NOT_SET; @@ -162,7 +160,7 @@ static void test_grpc_alarm(void) { arg2.done = 0; gpr_mu_init(&arg2.mu); gpr_cv_init(&arg2.cv); - arg2.iocb = gpr_malloc(sizeof(grpc_iomgr_closure)); + arg2.followup_closure = gpr_malloc(sizeof(grpc_iomgr_closure)); gpr_event_init(&arg2.fcb_arg); @@ -214,7 +212,7 @@ static void test_grpc_alarm(void) { } gpr_cv_destroy(&arg2.cv); gpr_mu_destroy(&arg2.mu); - gpr_free(arg2.iocb); + gpr_free(arg2.followup_closure); grpc_iomgr_shutdown(); } |