diff options
-rw-r--r-- | src/core/channel/child_channel.c | 13 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.c | 29 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.h | 10 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 92 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.h | 13 | ||||
-rw-r--r-- | src/core/iomgr/iomgr_internal.h | 4 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 8 | ||||
-rw-r--r-- | src/core/iomgr/socket_windows.c | 11 | ||||
-rw-r--r-- | src/core/iomgr/socket_windows.h | 2 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.c | 8 | ||||
-rw-r--r-- | src/core/security/credentials.c | 4 | ||||
-rw-r--r-- | src/core/surface/call.c | 6 | ||||
-rw-r--r-- | src/core/surface/channel.c | 6 | ||||
-rw-r--r-- | src/core/surface/server.c | 25 | ||||
-rw-r--r-- | test/core/iomgr/alarm_test.c | 11 |
15 files changed, 170 insertions, 72 deletions
diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c index a2f3c54290..5679a8c81a 100644 --- a/src/core/channel/child_channel.c +++ b/src/core/channel/child_channel.c @@ -58,6 +58,9 @@ typedef struct { gpr_uint8 sending_farewell; /* have we sent farewell (goaway + disconnect) */ gpr_uint8 sent_farewell; + + grpc_iomgr_closure finally_destroy_channel_iocb; + grpc_iomgr_closure send_farewells_iocb; } lb_channel_data; typedef struct { grpc_child_channel *channel; } lb_call_data; @@ -213,12 +216,18 @@ static void maybe_destroy_channel(grpc_child_channel *channel) { lb_channel_data *chand = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data; if (chand->destroyed && chand->disconnected && chand->active_calls == 0 && !chand->sending_farewell && !chand->calling_back) { - grpc_iomgr_add_callback(finally_destroy_channel, channel); + chand->finally_destroy_channel_iocb.cb = finally_destroy_channel; + chand->finally_destroy_channel_iocb.cb_arg = channel; + chand->finally_destroy_channel_iocb.is_ext_managed = 1; /* GPR_TRUE */ + grpc_iomgr_add_callback(&chand->finally_destroy_channel_iocb); } else if (chand->destroyed && !chand->disconnected && chand->active_calls == 0 && !chand->sending_farewell && !chand->sent_farewell) { chand->sending_farewell = 1; - grpc_iomgr_add_callback(send_farewells, channel); + chand->send_farewells_iocb.cb = send_farewells; + chand->send_farewells_iocb.cb_arg = channel; + chand->send_farewells_iocb.is_ext_managed = 1; /* GPR_TRUE */ + grpc_iomgr_add_callback(&chand->send_farewells_iocb); } } diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index b697fcc64a..3509d021ee 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -91,6 +91,7 @@ static grpc_fd *alloc_fd(int fd) { gpr_mu_init(&r->set_state_mu); gpr_mu_init(&r->watcher_mu); } + gpr_atm_rel_store(&r->refst, 1); gpr_atm_rel_store(&r->readst, NOT_READY); gpr_atm_rel_store(&r->writest, NOT_READY); @@ -117,7 +118,10 @@ static void unref_by(grpc_fd *fd, int n) { gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n); if (old == n) { close(fd->fd); - grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data); + fd->on_done_iocb.cb = fd->on_done; + fd->on_done_iocb.cb_arg = fd->on_done_user_data; + fd->on_done_iocb.is_ext_managed = 1; + grpc_iomgr_add_callback(&fd->on_done_iocb); freelist_fd(fd); grpc_iomgr_unref(); } else { @@ -196,20 +200,25 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); } void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success, - int allow_synchronous_callback) { + int allow_synchronous_callback, + grpc_iomgr_closure *iocb) { if (allow_synchronous_callback) { cb(arg, success); } else { - grpc_iomgr_add_delayed_callback(cb, arg, success); + /* !iocb: allocate -> managed by iomgr + * iocb: "iocb" holds an instance managed by fd_posix */ + iocb = grpc_iomgr_cb_create(cb, arg, !iocb /* is_ext_managed */); + grpc_iomgr_add_delayed_callback(iocb, success); } } static void make_callbacks(grpc_iomgr_closure *callbacks, size_t n, int success, - int allow_synchronous_callback) { + int allow_synchronous_callback, + grpc_iomgr_closure *iocbs) { size_t i; for (i = 0; i < n; i++) { make_callback(callbacks[i].cb, callbacks[i].cb_arg, success, - allow_synchronous_callback); + allow_synchronous_callback, iocbs + i); } } @@ -238,7 +247,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, gpr_atm_rel_store(st, NOT_READY); make_callback(closure->cb, closure->cb_arg, !gpr_atm_acq_load(&fd->shutdown), - allow_synchronous_callback); + allow_synchronous_callback, NULL); return; default: /* WAITING */ /* upcallptr was set to a different closure. This is an error! */ @@ -284,11 +293,14 @@ static void set_ready(grpc_fd *fd, gpr_atm *st, int success; grpc_iomgr_closure cb; size_t ncb = 0; + grpc_iomgr_closure *ready_iocb; gpr_mu_lock(&fd->set_state_mu); set_ready_locked(st, &cb, &ncb); gpr_mu_unlock(&fd->set_state_mu); success = !gpr_atm_acq_load(&fd->shutdown); - make_callbacks(&cb, ncb, success, allow_synchronous_callback); + assert(ncb <= 1); + ready_iocb = grpc_iomgr_cb_create(cb.cb, cb.cb_arg, 0); + make_callbacks(&cb, ncb, success, allow_synchronous_callback, ready_iocb); } void grpc_fd_shutdown(grpc_fd *fd) { @@ -300,7 +312,8 @@ void grpc_fd_shutdown(grpc_fd *fd) { set_ready_locked(&fd->readst, cb, &ncb); set_ready_locked(&fd->writest, cb, &ncb); gpr_mu_unlock(&fd->set_state_mu); - make_callbacks(cb, ncb, 0, 0); + assert(ncb <= 2); + make_callbacks(cb, ncb, 0, 0, fd->shutdown_iocbs); } void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) { diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index cfc533b7f5..59bd01ce0f 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -40,11 +40,6 @@ #include <grpc/support/sync.h> #include <grpc/support/time.h> -typedef struct { - grpc_iomgr_cb_func cb; - void *cb_arg; -} grpc_iomgr_closure; - typedef struct grpc_fd grpc_fd; typedef struct grpc_fd_watcher { @@ -99,6 +94,11 @@ struct grpc_fd { grpc_iomgr_cb_func on_done; void *on_done_user_data; struct grpc_fd *freelist_next; + + grpc_iomgr_closure on_done_iocb; + /*grpc_iomgr_closure *ready_iocb; XXX: the only one we need to allocate on + * the spot*/ + grpc_iomgr_closure shutdown_iocbs[2]; }; /* Create a wrapped file descriptor. diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index d22542fc91..e74c32b219 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -33,6 +33,7 @@ #include "src/core/iomgr/iomgr.h" +#include <assert.h> #include <stdlib.h> #include "src/core/iomgr/iomgr_internal.h" @@ -42,17 +43,10 @@ #include <grpc/support/thd.h> #include <grpc/support/sync.h> -typedef struct delayed_callback { - grpc_iomgr_cb_func cb; - void *cb_arg; - int success; - struct delayed_callback *next; -} delayed_callback; - static gpr_mu g_mu; static gpr_cv g_rcv; -static delayed_callback *g_cbs_head = NULL; -static delayed_callback *g_cbs_tail = NULL; +static grpc_iomgr_closure *g_cbs_head = NULL; +static grpc_iomgr_closure *g_cbs_tail = NULL; static int g_shutdown; static int g_refs; static gpr_event g_background_callback_executor_done; @@ -66,12 +60,18 @@ static void background_callback_executor(void *ignored) { gpr_timespec short_deadline = gpr_time_add(gpr_now(), gpr_time_from_millis(100)); if (g_cbs_head) { - delayed_callback *cb = g_cbs_head; - g_cbs_head = cb->next; + grpc_iomgr_closure *iocb = g_cbs_head; + int is_cb_ext_managed; + g_cbs_head = iocb->next; if (!g_cbs_head) g_cbs_tail = NULL; gpr_mu_unlock(&g_mu); - cb->cb(cb->cb_arg, cb->success); - gpr_free(cb); + /* capture the managed state, as the callback may deallocate itself */ + is_cb_ext_managed = iocb->is_ext_managed; + assert(iocb->success >= 0); + iocb->cb(iocb->cb_arg, iocb->success); + if (!is_cb_ext_managed) { + gpr_free(iocb); + } gpr_mu_lock(&g_mu); } else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) { } else { @@ -103,7 +103,7 @@ void grpc_iomgr_init(void) { } void grpc_iomgr_shutdown(void) { - delayed_callback *cb; + grpc_iomgr_closure *iocb; gpr_timespec shutdown_deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(10)); @@ -114,13 +114,18 @@ void grpc_iomgr_shutdown(void) { gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", g_refs, g_cbs_head ? " and executing final callbacks" : ""); while (g_cbs_head) { - cb = g_cbs_head; - g_cbs_head = cb->next; + int is_cb_ext_managed; + iocb = g_cbs_head; + g_cbs_head = iocb->next; if (!g_cbs_head) g_cbs_tail = NULL; gpr_mu_unlock(&g_mu); - cb->cb(cb->cb_arg, 0); - gpr_free(cb); + /* capture the managed state, as the callback may deallocate itself */ + is_cb_ext_managed = iocb->is_ext_managed; + iocb->cb(iocb->cb_arg, 0); + if (!is_cb_ext_managed) { + gpr_free(iocb); + } gpr_mu_lock(&g_mu); } if (g_refs) { @@ -167,42 +172,52 @@ void grpc_iomgr_unref(void) { gpr_mu_unlock(&g_mu); } -void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg, - int success) { - delayed_callback *dcb = gpr_malloc(sizeof(delayed_callback)); - dcb->cb = cb; - dcb->cb_arg = cb_arg; - dcb->success = success; +grpc_iomgr_closure *grpc_iomgr_cb_create(grpc_iomgr_cb_func cb, void *cb_arg, + int is_ext_managed) { + grpc_iomgr_closure *iocb = gpr_malloc(sizeof(grpc_iomgr_closure)); + iocb->cb = cb; + iocb->cb_arg = cb_arg; + iocb->is_ext_managed = is_ext_managed; + iocb->success = -1; /* uninitialized */ + iocb->next = NULL; + return iocb; +} + +void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success) { + iocb->success = success; gpr_mu_lock(&g_mu); - dcb->next = NULL; + iocb->next = NULL; if (!g_cbs_tail) { - g_cbs_head = g_cbs_tail = dcb; + g_cbs_head = g_cbs_tail = iocb; } else { - g_cbs_tail->next = dcb; - g_cbs_tail = dcb; + g_cbs_tail->next = iocb; + g_cbs_tail = iocb; } gpr_mu_unlock(&g_mu); } -void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) { - grpc_iomgr_add_delayed_callback(cb, cb_arg, 1); + +void grpc_iomgr_add_callback(grpc_iomgr_closure *iocb) { + grpc_iomgr_add_delayed_callback(iocb, 1); } + int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) { int n = 0; gpr_mu *retake_mu = NULL; - delayed_callback *cb; + grpc_iomgr_closure *iocb; for (;;) { + int is_cb_ext_managed; /* check for new work */ if (!gpr_mu_trylock(&g_mu)) { break; } - cb = g_cbs_head; - if (!cb) { + iocb = g_cbs_head; + if (!iocb) { gpr_mu_unlock(&g_mu); break; } - g_cbs_head = cb->next; + g_cbs_head = iocb->next; if (!g_cbs_head) g_cbs_tail = NULL; gpr_mu_unlock(&g_mu); /* if we have a mutex to drop, do so before executing work */ @@ -211,8 +226,13 @@ int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) { retake_mu = drop_mu; drop_mu = NULL; } - cb->cb(cb->cb_arg, success && cb->success); - gpr_free(cb); + /* capture the managed state, as the callback may deallocate itself */ + is_cb_ext_managed = iocb->is_ext_managed; + assert(iocb->success >= 0); + iocb->cb(iocb->cb_arg, success && iocb->success); + if (!is_cb_ext_managed) { + gpr_free(iocb); + } n++; } if (retake_mu) { diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h index 1f5d23fdda..86bb5f3583 100644 --- a/src/core/iomgr/iomgr.h +++ b/src/core/iomgr/iomgr.h @@ -37,11 +37,22 @@ /* gRPC Callback definition */ typedef void (*grpc_iomgr_cb_func)(void *arg, int success); +typedef struct grpc_iomgr_closure { + grpc_iomgr_cb_func cb; + void *cb_arg; + int success; + int is_ext_managed; /** is memory being managed externally? */ + struct grpc_iomgr_closure *next; /** Do not touch */ +} grpc_iomgr_closure; + +grpc_iomgr_closure *grpc_iomgr_cb_create(grpc_iomgr_cb_func cb, void *cb_arg, + int is_ext_managed); + void grpc_iomgr_init(void); void grpc_iomgr_shutdown(void); /* This function is called from within a callback or from anywhere else and causes the invocation of a callback at some point in the future */ -void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg); +void grpc_iomgr_add_callback(grpc_iomgr_closure *iocb); #endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */ diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h index 07923258b9..25a731d58c 100644 --- a/src/core/iomgr/iomgr_internal.h +++ b/src/core/iomgr/iomgr_internal.h @@ -35,12 +35,10 @@ #define GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H #include "src/core/iomgr/iomgr.h" -#include "src/core/iomgr/iomgr_internal.h" #include <grpc/support/sync.h> int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success); -void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg, - int success); +void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success); void grpc_iomgr_ref(void); void grpc_iomgr_unref(void); diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 826c792990..7a2ba4be06 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -257,6 +257,7 @@ typedef struct grpc_unary_promote_args { const grpc_pollset_vtable *original_vtable; grpc_pollset *pollset; grpc_fd *fd; + grpc_iomgr_closure promotion_iocb; } grpc_unary_promote_args; static void unary_poll_do_promote(void *args, int success) { @@ -279,7 +280,7 @@ static void unary_poll_do_promote(void *args, int success) { /* First we need to ensure that nobody is polling concurrently */ while (pollset->counter != 0) { grpc_pollset_kick(pollset); - grpc_iomgr_add_callback(unary_poll_do_promote, up_args); + grpc_iomgr_add_callback(&up_args->promotion_iocb); gpr_mu_unlock(&pollset->mu); return; } @@ -363,7 +364,10 @@ static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { up_args->pollset = pollset; up_args->fd = fd; up_args->original_vtable = pollset->vtable; - grpc_iomgr_add_callback(unary_poll_do_promote, up_args); + up_args->promotion_iocb.cb = unary_poll_do_promote; + up_args->promotion_iocb.cb_arg = up_args; + up_args->promotion_iocb.is_ext_managed = 1; + grpc_iomgr_add_callback(&up_args->promotion_iocb); grpc_pollset_kick(pollset); } diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c index ee5150a696..92fcf7de2a 100644 --- a/src/core/iomgr/socket_windows.c +++ b/src/core/iomgr/socket_windows.c @@ -62,15 +62,18 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket) { int grpc_winsocket_shutdown(grpc_winsocket *socket) { int callbacks_set = 0; gpr_mu_lock(&socket->state_mu); + socket->shutdown_iocb.is_ext_managed = 1; /* GPR_TRUE */ if (socket->read_info.cb) { callbacks_set++; - grpc_iomgr_add_delayed_callback(socket->read_info.cb, - socket->read_info.opaque, 0); + socket->shutdown_iocb.cb = socket->read_info.cb; + socket->shutdown_iocb.cb_arg = socket->read_info.opaque; + grpc_iomgr_add_delayed_callback(socket->shutdown_iocb, 0); } if (socket->write_info.cb) { callbacks_set++; - grpc_iomgr_add_delayed_callback(socket->write_info.cb, - socket->write_info.opaque, 0); + socket->shutdown_iocb.cb = socket->write_info.cb; + socket->shutdown_iocb.cb_arg = socket->write_info.opaque; + grpc_iomgr_add_delayed_callback(socket->shutdown_iocb, 0); } gpr_mu_unlock(&socket->state_mu); return callbacks_set; diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h index b27eb14219..8fd8ae97b6 100644 --- a/src/core/iomgr/socket_windows.h +++ b/src/core/iomgr/socket_windows.h @@ -93,6 +93,8 @@ typedef struct grpc_winsocket { there is a pending operation that the IO Completion Port will have to wait for. The socket will be collected at that time. */ int orphan; + + grpc_iomgr_closure shutdown_iocb; } grpc_winsocket; /* Create a wrapped windows handle. This takes ownership of it, meaning that diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index cd6b2ecae6..e2cda52733 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -280,6 +280,8 @@ typedef struct { grpc_iomgr_closure read_closure; grpc_iomgr_closure write_closure; + + grpc_iomgr_closure handle_read_iocb; } grpc_tcp; static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success); @@ -443,7 +445,8 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, tcp->finished_edge = 0; grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); } else { - grpc_iomgr_add_callback(grpc_tcp_handle_read, tcp); + tcp->handle_read_iocb.cb_arg = tcp; + grpc_iomgr_add_callback(&tcp->handle_read_iocb); } } @@ -592,6 +595,9 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) { tcp->read_closure.cb_arg = tcp; tcp->write_closure.cb = grpc_tcp_handle_write; tcp->write_closure.cb_arg = tcp; + + tcp->handle_read_iocb.cb = grpc_tcp_handle_read; + tcp->handle_read_iocb.is_ext_managed = 1; return &tcp->base; } diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index ae22bf47a0..e9229f0694 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -832,8 +832,10 @@ static void fake_oauth2_get_request_metadata(grpc_credentials *creds, if (c->is_async) { grpc_iomgr_add_callback( + grpc_iomgr_cb_create( on_simulated_token_fetch_done, - grpc_credentials_metadata_request_create(creds, cb, user_data)); + grpc_credentials_metadata_request_create(creds, cb, user_data), + 0 /*GPR_FALSE*/)); } else { cb(user_data, c->access_token_md->entries, 1, GRPC_CREDENTIALS_OK); } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index e3995a407b..99e14e3e15 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -226,6 +226,7 @@ struct grpc_call { gpr_slice_buffer incoming_message; gpr_uint32 incoming_message_length; + grpc_iomgr_closure destroy_iocb; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -367,7 +368,10 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) { if (allow_immediate_deletion) { destroy_call(c, 1); } else { - grpc_iomgr_add_callback(destroy_call, c); + c->destroy_iocb.cb = destroy_call; + c->destroy_iocb.cb_arg = c; + c->destroy_iocb.is_ext_managed = 1; /* GPR_TRUE */ + grpc_iomgr_add_callback(&c->destroy_iocb); } } } diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index be9da2b7f9..8e593b8073 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -61,6 +61,7 @@ struct grpc_channel { gpr_mu registered_call_mu; registered_call *registered_calls; + grpc_iomgr_closure destroy_iocb; }; #define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1)) @@ -193,7 +194,10 @@ static void destroy_channel(void *p, int ok) { void grpc_channel_internal_unref(grpc_channel *channel) { if (gpr_unref(&channel->refs)) { - grpc_iomgr_add_callback(destroy_channel, channel); + channel->destroy_iocb.cb = destroy_channel; + channel->destroy_iocb.cb_arg = channel; + channel->destroy_iocb.is_ext_managed = 1; + grpc_iomgr_add_callback(&channel->destroy_iocb); } } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 60606c75e4..a10b921456 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -122,6 +122,8 @@ struct channel_data { channel_registered_method *registered_methods; gpr_uint32 registered_method_slots; gpr_uint32 registered_method_max_probes; + grpc_iomgr_closure finish_shutdown_channel_iocb; + grpc_iomgr_closure finish_destroy_channel_iocb; }; struct grpc_server { @@ -304,7 +306,10 @@ static void destroy_channel(channel_data *chand) { GPR_ASSERT(chand->server != NULL); orphan_channel(chand); server_ref(chand->server); - grpc_iomgr_add_callback(finish_destroy_channel, chand); + chand->finish_destroy_channel_iocb.cb = finish_destroy_channel; + chand->finish_destroy_channel_iocb.cb_arg = chand; + chand->finish_destroy_channel_iocb.is_ext_managed = 1; + grpc_iomgr_add_callback(&chand->finish_destroy_channel_iocb); } static void finish_start_new_rpc_and_unlock(grpc_server *server, @@ -416,7 +421,8 @@ static void server_on_recv(void *ptr, int success) { gpr_mu_lock(&chand->server->mu); if (calld->state == NOT_STARTED) { calld->state = ZOMBIED; - grpc_iomgr_add_callback(kill_zombie, elem); + grpc_iomgr_add_callback( + grpc_iomgr_cb_create(kill_zombie, elem, 0 /* GPR_FALSE */)); } gpr_mu_unlock(&chand->server->mu); break; @@ -424,11 +430,13 @@ static void server_on_recv(void *ptr, int success) { gpr_mu_lock(&chand->server->mu); if (calld->state == NOT_STARTED) { calld->state = ZOMBIED; - grpc_iomgr_add_callback(kill_zombie, elem); + grpc_iomgr_add_callback( + grpc_iomgr_cb_create(kill_zombie, elem, 0 /* GPR_FALSE */)); } else if (calld->state == PENDING) { call_list_remove(calld, PENDING_START); calld->state = ZOMBIED; - grpc_iomgr_add_callback(kill_zombie, elem); + grpc_iomgr_add_callback( + grpc_iomgr_cb_create(kill_zombie, elem, 0 /* GPR_FALSE */)); } gpr_mu_unlock(&chand->server->mu); break; @@ -502,7 +510,10 @@ static void finish_shutdown_channel(void *cd, int success) { static void shutdown_channel(channel_data *chand) { grpc_channel_internal_ref(chand->channel); - grpc_iomgr_add_callback(finish_shutdown_channel, chand); + chand->finish_shutdown_channel_iocb.cb = finish_shutdown_channel; + chand->finish_shutdown_channel_iocb.cb_arg = chand; + chand->finish_shutdown_channel_iocb.is_ext_managed = 1; + grpc_iomgr_add_callback(&chand->finish_shutdown_channel_iocb); } static void init_call_elem(grpc_call_element *elem, @@ -943,8 +954,10 @@ void grpc_server_destroy(grpc_server *server) { gpr_log(GPR_DEBUG, "server destroys call %p", calld->call); calld->state = ZOMBIED; grpc_iomgr_add_callback( + grpc_iomgr_cb_create( kill_zombie, - grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), + 0)); /* XXX */ } for (c = server->root_channel_data.next; c != &server->root_channel_data; diff --git a/test/core/iomgr/alarm_test.c b/test/core/iomgr/alarm_test.c index e677ba30dd..e416c79425 100644 --- a/test/core/iomgr/alarm_test.c +++ b/test/core/iomgr/alarm_test.c @@ -55,6 +55,7 @@ void no_op_cb(void *arg, int success) {} typedef struct { gpr_cv cv; gpr_mu mu; + grpc_iomgr_closure *iocb; int counter; int done_success_ctr; int done_cancel_ctr; @@ -81,7 +82,10 @@ static void alarm_cb(void *arg /* alarm_arg */, int success) { a->success = success; gpr_cv_signal(&a->cv); gpr_mu_unlock(&a->mu); - grpc_iomgr_add_callback(followup_cb, &a->fcb_arg); + a->iocb->cb = followup_cb; + a->iocb->cb_arg = &a->fcb_arg; + a->iocb->is_ext_managed = 1; + grpc_iomgr_add_callback(a->iocb); } /* Test grpc_alarm add and cancel. */ @@ -107,6 +111,7 @@ static void test_grpc_alarm(void) { arg.done = 0; gpr_mu_init(&arg.mu); gpr_cv_init(&arg.cv); + arg.iocb = gpr_malloc(sizeof(grpc_iomgr_closure)); gpr_event_init(&arg.fcb_arg); grpc_alarm_init(&alarm, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100), alarm_cb, &arg, @@ -148,6 +153,7 @@ static void test_grpc_alarm(void) { } gpr_cv_destroy(&arg.cv); gpr_mu_destroy(&arg.mu); + gpr_free(arg.iocb); arg2.counter = 0; arg2.success = SUCCESS_NOT_SET; @@ -156,6 +162,8 @@ static void test_grpc_alarm(void) { arg2.done = 0; gpr_mu_init(&arg2.mu); gpr_cv_init(&arg2.cv); + arg2.iocb = gpr_malloc(sizeof(grpc_iomgr_closure)); + gpr_event_init(&arg2.fcb_arg); grpc_alarm_init(&alarm_to_cancel, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100), @@ -206,6 +214,7 @@ static void test_grpc_alarm(void) { } gpr_cv_destroy(&arg2.cv); gpr_mu_destroy(&arg2.mu); + gpr_free(arg2.iocb); grpc_iomgr_shutdown(); } |