aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/channel/child_channel.c2
-rw-r--r--src/core/iomgr/fd_posix.c51
-rw-r--r--src/core/iomgr/fd_posix.h2
-rw-r--r--src/core/iomgr/iomgr.c104
-rw-r--r--src/core/iomgr/iomgr.h11
-rw-r--r--src/core/iomgr/pollset_posix.c1
-rw-r--r--src/core/iomgr/socket_windows.c9
-rw-r--r--src/core/iomgr/tcp_posix.c1
-rw-r--r--src/core/security/credentials.c11
-rw-r--r--src/core/surface/call.c1
-rw-r--r--src/core/surface/channel.c1
-rw-r--r--src/core/surface/server.c34
-rw-r--r--test/core/iomgr/alarm_test.c16
13 files changed, 123 insertions, 121 deletions
diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c
index 5679a8c81a..213a4b881d 100644
--- a/src/core/channel/child_channel.c
+++ b/src/core/channel/child_channel.c
@@ -218,7 +218,6 @@ static void maybe_destroy_channel(grpc_child_channel *channel) {
!chand->sending_farewell && !chand->calling_back) {
chand->finally_destroy_channel_iocb.cb = finally_destroy_channel;
chand->finally_destroy_channel_iocb.cb_arg = channel;
- chand->finally_destroy_channel_iocb.is_ext_managed = 1; /* GPR_TRUE */
grpc_iomgr_add_callback(&chand->finally_destroy_channel_iocb);
} else if (chand->destroyed && !chand->disconnected &&
chand->active_calls == 0 && !chand->sending_farewell &&
@@ -226,7 +225,6 @@ static void maybe_destroy_channel(grpc_child_channel *channel) {
chand->sending_farewell = 1;
chand->send_farewells_iocb.cb = send_farewells;
chand->send_farewells_iocb.cb_arg = channel;
- chand->send_farewells_iocb.is_ext_managed = 1; /* GPR_TRUE */
grpc_iomgr_add_callback(&chand->send_farewells_iocb);
}
}
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 3509d021ee..672b321b95 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -118,9 +118,8 @@ static void unref_by(grpc_fd *fd, int n) {
gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
if (old == n) {
close(fd->fd);
- fd->on_done_iocb.cb = fd->on_done;
- fd->on_done_iocb.cb_arg = fd->on_done_user_data;
- fd->on_done_iocb.is_ext_managed = 1;
+ grpc_iomgr_closure_init(&fd->on_done_iocb, fd->on_done,
+ fd->on_done_user_data);
grpc_iomgr_add_callback(&fd->on_done_iocb);
freelist_fd(fd);
grpc_iomgr_unref();
@@ -199,26 +198,20 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
-static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success,
- int allow_synchronous_callback,
- grpc_iomgr_closure *iocb) {
+static void process_callback(grpc_iomgr_closure *closure, int success,
+ int allow_synchronous_callback) {
if (allow_synchronous_callback) {
- cb(arg, success);
+ closure->cb(closure->cb_arg, success);
} else {
- /* !iocb: allocate -> managed by iomgr
- * iocb: "iocb" holds an instance managed by fd_posix */
- iocb = grpc_iomgr_cb_create(cb, arg, !iocb /* is_ext_managed */);
- grpc_iomgr_add_delayed_callback(iocb, success);
+ grpc_iomgr_add_delayed_callback(closure, success);
}
}
-static void make_callbacks(grpc_iomgr_closure *callbacks, size_t n, int success,
- int allow_synchronous_callback,
- grpc_iomgr_closure *iocbs) {
+static void process_callbacks(grpc_iomgr_closure *callbacks, size_t n,
+ int success, int allow_synchronous_callback) {
size_t i;
for (i = 0; i < n; i++) {
- make_callback(callbacks[i].cb, callbacks[i].cb_arg, success,
- allow_synchronous_callback, iocbs + i);
+ process_callback(callbacks + i, success, allow_synchronous_callback);
}
}
@@ -245,9 +238,9 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
case READY:
assert(gpr_atm_no_barrier_load(st) == READY);
gpr_atm_rel_store(st, NOT_READY);
- make_callback(closure->cb, closure->cb_arg,
- !gpr_atm_acq_load(&fd->shutdown),
- allow_synchronous_callback, NULL);
+ closure->success = -1;
+ process_callback(closure, !gpr_atm_acq_load(&fd->shutdown),
+ allow_synchronous_callback);
return;
default: /* WAITING */
/* upcallptr was set to a different closure. This is an error! */
@@ -291,29 +284,31 @@ static void set_ready(grpc_fd *fd, gpr_atm *st,
/* only one set_ready can be active at once (but there may be a racing
notify_on) */
int success;
- grpc_iomgr_closure cb;
+ grpc_iomgr_closure closure;
size_t ncb = 0;
- grpc_iomgr_closure *ready_iocb;
+
gpr_mu_lock(&fd->set_state_mu);
- set_ready_locked(st, &cb, &ncb);
+ set_ready_locked(st, &closure, &ncb);
gpr_mu_unlock(&fd->set_state_mu);
success = !gpr_atm_acq_load(&fd->shutdown);
assert(ncb <= 1);
- ready_iocb = grpc_iomgr_cb_create(cb.cb, cb.cb_arg, 0);
- make_callbacks(&cb, ncb, success, allow_synchronous_callback, ready_iocb);
+ if (ncb > 0) {
+ grpc_iomgr_closure *managed_cb = gpr_malloc(sizeof(grpc_iomgr_closure));
+ grpc_iomgr_managed_closure_init(managed_cb, closure.cb, closure.cb_arg);
+ process_callbacks(managed_cb, ncb, success, allow_synchronous_callback);
+ }
}
void grpc_fd_shutdown(grpc_fd *fd) {
- grpc_iomgr_closure cb[2];
size_t ncb = 0;
gpr_mu_lock(&fd->set_state_mu);
GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown));
gpr_atm_rel_store(&fd->shutdown, 1);
- set_ready_locked(&fd->readst, cb, &ncb);
- set_ready_locked(&fd->writest, cb, &ncb);
+ set_ready_locked(&fd->readst, fd->shutdown_iocbs, &ncb);
+ set_ready_locked(&fd->writest, fd->shutdown_iocbs, &ncb);
gpr_mu_unlock(&fd->set_state_mu);
assert(ncb <= 2);
- make_callbacks(cb, ncb, 0, 0, fd->shutdown_iocbs);
+ process_callbacks(fd->shutdown_iocbs, ncb, 0, 0);
}
void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) {
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index 59bd01ce0f..2d9c3245e3 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -96,8 +96,6 @@ struct grpc_fd {
struct grpc_fd *freelist_next;
grpc_iomgr_closure on_done_iocb;
- /*grpc_iomgr_closure *ready_iocb; XXX: the only one we need to allocate on
- * the spot*/
grpc_iomgr_closure shutdown_iocbs[2];
};
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index e74c32b219..1d6fa9053e 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -60,18 +60,12 @@ static void background_callback_executor(void *ignored) {
gpr_timespec short_deadline =
gpr_time_add(gpr_now(), gpr_time_from_millis(100));
if (g_cbs_head) {
- grpc_iomgr_closure *iocb = g_cbs_head;
- int is_cb_ext_managed;
- g_cbs_head = iocb->next;
+ grpc_iomgr_closure *closure = g_cbs_head;
+ g_cbs_head = closure->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);
- /* capture the managed state, as the callback may deallocate itself */
- is_cb_ext_managed = iocb->is_ext_managed;
- assert(iocb->success >= 0);
- iocb->cb(iocb->cb_arg, iocb->success);
- if (!is_cb_ext_managed) {
- gpr_free(iocb);
- }
+ assert(closure->success >= 0);
+ closure->cb(closure->cb_arg, closure->success);
gpr_mu_lock(&g_mu);
} else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) {
} else {
@@ -103,7 +97,7 @@ void grpc_iomgr_init(void) {
}
void grpc_iomgr_shutdown(void) {
- grpc_iomgr_closure *iocb;
+ grpc_iomgr_closure *closure;
gpr_timespec shutdown_deadline =
gpr_time_add(gpr_now(), gpr_time_from_seconds(10));
@@ -114,18 +108,12 @@ void grpc_iomgr_shutdown(void) {
gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", g_refs,
g_cbs_head ? " and executing final callbacks" : "");
while (g_cbs_head) {
- int is_cb_ext_managed;
- iocb = g_cbs_head;
- g_cbs_head = iocb->next;
+ closure = g_cbs_head;
+ g_cbs_head = closure->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);
- /* capture the managed state, as the callback may deallocate itself */
- is_cb_ext_managed = iocb->is_ext_managed;
- iocb->cb(iocb->cb_arg, 0);
- if (!is_cb_ext_managed) {
- gpr_free(iocb);
- }
+ closure->cb(closure->cb_arg, 0);
gpr_mu_lock(&g_mu);
}
if (g_refs) {
@@ -172,52 +160,75 @@ void grpc_iomgr_unref(void) {
gpr_mu_unlock(&g_mu);
}
-grpc_iomgr_closure *grpc_iomgr_cb_create(grpc_iomgr_cb_func cb, void *cb_arg,
- int is_ext_managed) {
- grpc_iomgr_closure *iocb = gpr_malloc(sizeof(grpc_iomgr_closure));
- iocb->cb = cb;
- iocb->cb_arg = cb_arg;
- iocb->is_ext_managed = is_ext_managed;
- iocb->success = -1; /* uninitialized */
- iocb->next = NULL;
- return iocb;
+
+void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
+ void *cb_arg) {
+ closure->cb = cb;
+ closure->cb_arg = cb_arg;
+ closure->success = -1; /* uninitialized */
+ closure->next = NULL;
+}
+
+typedef struct {
+ grpc_iomgr_closure managed;
+ grpc_iomgr_closure *manager;
+} managed_closure_arg;
+
+static void closure_manager_func(void *arg, int success) {
+ managed_closure_arg *mc_arg = (managed_closure_arg*) arg;
+
+ mc_arg->managed.cb(mc_arg->managed.cb_arg, success);
+ gpr_free(mc_arg->manager);
+ gpr_free(mc_arg);
}
-void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success) {
- iocb->success = success;
+void grpc_iomgr_managed_closure_init(grpc_iomgr_closure *manager,
+ grpc_iomgr_cb_func managed_cb,
+ void *managed_cb_arg) {
+ managed_closure_arg *managed_arg = gpr_malloc(sizeof(managed_closure_arg));
+
+ managed_arg->managed.cb = managed_cb;
+ managed_arg->managed.cb_arg = managed_cb_arg;
+ managed_arg->manager= manager;
+
+ grpc_iomgr_closure_init(manager, closure_manager_func, managed_arg);
+}
+
+
+void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *closure, int success) {
+ closure->success = success;
gpr_mu_lock(&g_mu);
- iocb->next = NULL;
+ closure->next = NULL;
if (!g_cbs_tail) {
- g_cbs_head = g_cbs_tail = iocb;
+ g_cbs_head = g_cbs_tail = closure;
} else {
- g_cbs_tail->next = iocb;
- g_cbs_tail = iocb;
+ g_cbs_tail->next = closure;
+ g_cbs_tail = closure;
}
gpr_mu_unlock(&g_mu);
}
-void grpc_iomgr_add_callback(grpc_iomgr_closure *iocb) {
- grpc_iomgr_add_delayed_callback(iocb, 1);
+void grpc_iomgr_add_callback(grpc_iomgr_closure *closure) {
+ grpc_iomgr_add_delayed_callback(closure, 1);
}
int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) {
int n = 0;
gpr_mu *retake_mu = NULL;
- grpc_iomgr_closure *iocb;
+ grpc_iomgr_closure *closure;
for (;;) {
- int is_cb_ext_managed;
/* check for new work */
if (!gpr_mu_trylock(&g_mu)) {
break;
}
- iocb = g_cbs_head;
- if (!iocb) {
+ closure = g_cbs_head;
+ if (!closure) {
gpr_mu_unlock(&g_mu);
break;
}
- g_cbs_head = iocb->next;
+ g_cbs_head = closure->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);
/* if we have a mutex to drop, do so before executing work */
@@ -226,13 +237,8 @@ int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) {
retake_mu = drop_mu;
drop_mu = NULL;
}
- /* capture the managed state, as the callback may deallocate itself */
- is_cb_ext_managed = iocb->is_ext_managed;
- assert(iocb->success >= 0);
- iocb->cb(iocb->cb_arg, success && iocb->success);
- if (!is_cb_ext_managed) {
- gpr_free(iocb);
- }
+ assert(closure->success >= 0);
+ closure->cb(closure->cb_arg, success && closure->success);
n++;
}
if (retake_mu) {
diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h
index 86bb5f3583..437e8cf9a8 100644
--- a/src/core/iomgr/iomgr.h
+++ b/src/core/iomgr/iomgr.h
@@ -41,18 +41,21 @@ typedef struct grpc_iomgr_closure {
grpc_iomgr_cb_func cb;
void *cb_arg;
int success;
- int is_ext_managed; /** is memory being managed externally? */
struct grpc_iomgr_closure *next; /** Do not touch */
} grpc_iomgr_closure;
-grpc_iomgr_closure *grpc_iomgr_cb_create(grpc_iomgr_cb_func cb, void *cb_arg,
- int is_ext_managed);
+void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
+ void *cb_arg);
+
+void grpc_iomgr_managed_closure_init(grpc_iomgr_closure *manager,
+ grpc_iomgr_cb_func managed_cb,
+ void *managed_cb_arg);
void grpc_iomgr_init(void);
void grpc_iomgr_shutdown(void);
/* This function is called from within a callback or from anywhere else
and causes the invocation of a callback at some point in the future */
-void grpc_iomgr_add_callback(grpc_iomgr_closure *iocb);
+void grpc_iomgr_add_callback(grpc_iomgr_closure *closure);
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 7a2ba4be06..db17012df0 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -366,7 +366,6 @@ static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
up_args->original_vtable = pollset->vtable;
up_args->promotion_iocb.cb = unary_poll_do_promote;
up_args->promotion_iocb.cb_arg = up_args;
- up_args->promotion_iocb.is_ext_managed = 1;
grpc_iomgr_add_callback(&up_args->promotion_iocb);
grpc_pollset_kick(pollset);
diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c
index 92fcf7de2a..f0bd0d8d69 100644
--- a/src/core/iomgr/socket_windows.c
+++ b/src/core/iomgr/socket_windows.c
@@ -62,17 +62,16 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket) {
int grpc_winsocket_shutdown(grpc_winsocket *socket) {
int callbacks_set = 0;
gpr_mu_lock(&socket->state_mu);
- socket->shutdown_iocb.is_ext_managed = 1; /* GPR_TRUE */
if (socket->read_info.cb) {
callbacks_set++;
- socket->shutdown_iocb.cb = socket->read_info.cb;
- socket->shutdown_iocb.cb_arg = socket->read_info.opaque;
+ grpc_iomgr_closure_init(&socket->shutdown_iocb, socket->read_info.cb,
+ socket->read_info.opaque);
grpc_iomgr_add_delayed_callback(socket->shutdown_iocb, 0);
}
if (socket->write_info.cb) {
callbacks_set++;
- socket->shutdown_iocb.cb = socket->write_info.cb;
- socket->shutdown_iocb.cb_arg = socket->write_info.opaque;
+ grpc_iomgr_closure_init(&socket->shutdown_iocb, socket->write_info.cb,
+ socket->write_info.opaque);
grpc_iomgr_add_delayed_callback(socket->shutdown_iocb, 0);
}
gpr_mu_unlock(&socket->state_mu);
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index e2cda52733..2978c2c370 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -597,7 +597,6 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
tcp->write_closure.cb_arg = tcp;
tcp->handle_read_iocb.cb = grpc_tcp_handle_read;
- tcp->handle_read_iocb.is_ext_managed = 1;
return &tcp->base;
}
diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c
index e9229f0694..5e595899c3 100644
--- a/src/core/security/credentials.c
+++ b/src/core/security/credentials.c
@@ -831,11 +831,12 @@ static void fake_oauth2_get_request_metadata(grpc_credentials *creds,
grpc_fake_oauth2_credentials *c = (grpc_fake_oauth2_credentials *)creds;
if (c->is_async) {
- grpc_iomgr_add_callback(
- grpc_iomgr_cb_create(
- on_simulated_token_fetch_done,
- grpc_credentials_metadata_request_create(creds, cb, user_data),
- 0 /*GPR_FALSE*/));
+ grpc_iomgr_closure *on_simulated_token_fetch_done_closure =
+ gpr_malloc(sizeof(grpc_iomgr_closure));
+ grpc_iomgr_managed_closure_init(
+ on_simulated_token_fetch_done_closure, on_simulated_token_fetch_done,
+ grpc_credentials_metadata_request_create(creds, cb, user_data));
+ grpc_iomgr_add_callback(on_simulated_token_fetch_done_closure);
} else {
cb(user_data, c->access_token_md->entries, 1, GRPC_CREDENTIALS_OK);
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 99e14e3e15..3a739315a2 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -370,7 +370,6 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
} else {
c->destroy_iocb.cb = destroy_call;
c->destroy_iocb.cb_arg = c;
- c->destroy_iocb.is_ext_managed = 1; /* GPR_TRUE */
grpc_iomgr_add_callback(&c->destroy_iocb);
}
}
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 8e593b8073..d018db7bb0 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -196,7 +196,6 @@ void grpc_channel_internal_unref(grpc_channel *channel) {
if (gpr_unref(&channel->refs)) {
channel->destroy_iocb.cb = destroy_channel;
channel->destroy_iocb.cb_arg = channel;
- channel->destroy_iocb.is_ext_managed = 1;
grpc_iomgr_add_callback(&channel->destroy_iocb);
}
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index a10b921456..378d50486b 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -308,7 +308,6 @@ static void destroy_channel(channel_data *chand) {
server_ref(chand->server);
chand->finish_destroy_channel_iocb.cb = finish_destroy_channel;
chand->finish_destroy_channel_iocb.cb_arg = chand;
- chand->finish_destroy_channel_iocb.is_ext_managed = 1;
grpc_iomgr_add_callback(&chand->finish_destroy_channel_iocb);
}
@@ -420,23 +419,29 @@ static void server_on_recv(void *ptr, int success) {
case GRPC_STREAM_RECV_CLOSED:
gpr_mu_lock(&chand->server->mu);
if (calld->state == NOT_STARTED) {
+ grpc_iomgr_closure *kill_zombie_closure =
+ gpr_malloc(sizeof(grpc_iomgr_closure));
calld->state = ZOMBIED;
- grpc_iomgr_add_callback(
- grpc_iomgr_cb_create(kill_zombie, elem, 0 /* GPR_FALSE */));
+ grpc_iomgr_managed_closure_init(kill_zombie_closure, kill_zombie, elem);
+ grpc_iomgr_add_callback(kill_zombie_closure);
}
gpr_mu_unlock(&chand->server->mu);
break;
case GRPC_STREAM_CLOSED:
gpr_mu_lock(&chand->server->mu);
if (calld->state == NOT_STARTED) {
+ grpc_iomgr_closure *kill_zombie_closure =
+ gpr_malloc(sizeof(grpc_iomgr_closure));
calld->state = ZOMBIED;
- grpc_iomgr_add_callback(
- grpc_iomgr_cb_create(kill_zombie, elem, 0 /* GPR_FALSE */));
+ grpc_iomgr_managed_closure_init(kill_zombie_closure, kill_zombie, elem);
+ grpc_iomgr_add_callback(kill_zombie_closure);
} else if (calld->state == PENDING) {
+ grpc_iomgr_closure *kill_zombie_closure =
+ gpr_malloc(sizeof(grpc_iomgr_closure));
call_list_remove(calld, PENDING_START);
calld->state = ZOMBIED;
- grpc_iomgr_add_callback(
- grpc_iomgr_cb_create(kill_zombie, elem, 0 /* GPR_FALSE */));
+ grpc_iomgr_managed_closure_init(kill_zombie_closure, kill_zombie, elem);
+ grpc_iomgr_add_callback(kill_zombie_closure);
}
gpr_mu_unlock(&chand->server->mu);
break;
@@ -512,7 +517,6 @@ static void shutdown_channel(channel_data *chand) {
grpc_channel_internal_ref(chand->channel);
chand->finish_shutdown_channel_iocb.cb = finish_shutdown_channel;
chand->finish_shutdown_channel_iocb.cb_arg = chand;
- chand->finish_shutdown_channel_iocb.is_ext_managed = 1;
grpc_iomgr_add_callback(&chand->finish_shutdown_channel_iocb);
}
@@ -951,13 +955,17 @@ void grpc_server_destroy(grpc_server *server) {
while ((calld = call_list_remove_head(&server->lists[PENDING_START],
PENDING_START)) != NULL) {
+ /* TODO(dgq): If we knew the size of the call list (or an upper bound), we
+ * could allocate all the memory for the closures in advance in a single
+ * chunk */
+ grpc_iomgr_closure *kill_zombie_closure =
+ gpr_malloc(sizeof(grpc_iomgr_closure));
gpr_log(GPR_DEBUG, "server destroys call %p", calld->call);
calld->state = ZOMBIED;
- grpc_iomgr_add_callback(
- grpc_iomgr_cb_create(
- kill_zombie,
- grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
- 0)); /* XXX */
+ grpc_iomgr_managed_closure_init(
+ kill_zombie_closure, kill_zombie,
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
+ grpc_iomgr_add_callback(kill_zombie_closure);
}
for (c = server->root_channel_data.next; c != &server->root_channel_data;
diff --git a/test/core/iomgr/alarm_test.c b/test/core/iomgr/alarm_test.c
index e416c79425..0ccec435e6 100644
--- a/test/core/iomgr/alarm_test.c
+++ b/test/core/iomgr/alarm_test.c
@@ -55,7 +55,7 @@ void no_op_cb(void *arg, int success) {}
typedef struct {
gpr_cv cv;
gpr_mu mu;
- grpc_iomgr_closure *iocb;
+ grpc_iomgr_closure *followup_closure;
int counter;
int done_success_ctr;
int done_cancel_ctr;
@@ -82,10 +82,8 @@ static void alarm_cb(void *arg /* alarm_arg */, int success) {
a->success = success;
gpr_cv_signal(&a->cv);
gpr_mu_unlock(&a->mu);
- a->iocb->cb = followup_cb;
- a->iocb->cb_arg = &a->fcb_arg;
- a->iocb->is_ext_managed = 1;
- grpc_iomgr_add_callback(a->iocb);
+ grpc_iomgr_closure_init(a->followup_closure, followup_cb, &a->fcb_arg);
+ grpc_iomgr_add_callback(a->followup_closure);
}
/* Test grpc_alarm add and cancel. */
@@ -111,7 +109,7 @@ static void test_grpc_alarm(void) {
arg.done = 0;
gpr_mu_init(&arg.mu);
gpr_cv_init(&arg.cv);
- arg.iocb = gpr_malloc(sizeof(grpc_iomgr_closure));
+ arg.followup_closure = gpr_malloc(sizeof(grpc_iomgr_closure));
gpr_event_init(&arg.fcb_arg);
grpc_alarm_init(&alarm, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100), alarm_cb, &arg,
@@ -153,7 +151,7 @@ static void test_grpc_alarm(void) {
}
gpr_cv_destroy(&arg.cv);
gpr_mu_destroy(&arg.mu);
- gpr_free(arg.iocb);
+ gpr_free(arg.followup_closure);
arg2.counter = 0;
arg2.success = SUCCESS_NOT_SET;
@@ -162,7 +160,7 @@ static void test_grpc_alarm(void) {
arg2.done = 0;
gpr_mu_init(&arg2.mu);
gpr_cv_init(&arg2.cv);
- arg2.iocb = gpr_malloc(sizeof(grpc_iomgr_closure));
+ arg2.followup_closure = gpr_malloc(sizeof(grpc_iomgr_closure));
gpr_event_init(&arg2.fcb_arg);
@@ -214,7 +212,7 @@ static void test_grpc_alarm(void) {
}
gpr_cv_destroy(&arg2.cv);
gpr_mu_destroy(&arg2.mu);
- gpr_free(arg2.iocb);
+ gpr_free(arg2.followup_closure);
grpc_iomgr_shutdown();
}