aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2015-05-26 19:58:50 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2015-05-31 12:45:10 -0700
commit5f228f5001c7c1f85c0d1965640bd6258d7b9188 (patch)
tree15c767d330d3c302420ee4b45d6330f5693ffefa
parent2e378c89f89820941d1b30ad8c8b501fefa213b0 (diff)
Adapted the following to the new iomgr's cb API:
alarm_test, tcp_posix, fd_posix, pollset_posix, credentials, call, channel, server, child_channel
-rw-r--r--src/core/channel/child_channel.c13
-rw-r--r--src/core/iomgr/fd_posix.c29
-rw-r--r--src/core/iomgr/fd_posix.h10
-rw-r--r--src/core/iomgr/iomgr.c92
-rw-r--r--src/core/iomgr/iomgr.h13
-rw-r--r--src/core/iomgr/iomgr_internal.h4
-rw-r--r--src/core/iomgr/pollset_posix.c8
-rw-r--r--src/core/iomgr/socket_windows.c11
-rw-r--r--src/core/iomgr/socket_windows.h2
-rw-r--r--src/core/iomgr/tcp_posix.c8
-rw-r--r--src/core/security/credentials.c4
-rw-r--r--src/core/surface/call.c6
-rw-r--r--src/core/surface/channel.c6
-rw-r--r--src/core/surface/server.c25
-rw-r--r--test/core/iomgr/alarm_test.c11
15 files changed, 170 insertions, 72 deletions
diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c
index a2f3c54290..5679a8c81a 100644
--- a/src/core/channel/child_channel.c
+++ b/src/core/channel/child_channel.c
@@ -58,6 +58,9 @@ typedef struct {
gpr_uint8 sending_farewell;
/* have we sent farewell (goaway + disconnect) */
gpr_uint8 sent_farewell;
+
+ grpc_iomgr_closure finally_destroy_channel_iocb;
+ grpc_iomgr_closure send_farewells_iocb;
} lb_channel_data;
typedef struct { grpc_child_channel *channel; } lb_call_data;
@@ -213,12 +216,18 @@ static void maybe_destroy_channel(grpc_child_channel *channel) {
lb_channel_data *chand = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data;
if (chand->destroyed && chand->disconnected && chand->active_calls == 0 &&
!chand->sending_farewell && !chand->calling_back) {
- grpc_iomgr_add_callback(finally_destroy_channel, channel);
+ chand->finally_destroy_channel_iocb.cb = finally_destroy_channel;
+ chand->finally_destroy_channel_iocb.cb_arg = channel;
+ chand->finally_destroy_channel_iocb.is_ext_managed = 1; /* GPR_TRUE */
+ grpc_iomgr_add_callback(&chand->finally_destroy_channel_iocb);
} else if (chand->destroyed && !chand->disconnected &&
chand->active_calls == 0 && !chand->sending_farewell &&
!chand->sent_farewell) {
chand->sending_farewell = 1;
- grpc_iomgr_add_callback(send_farewells, channel);
+ chand->send_farewells_iocb.cb = send_farewells;
+ chand->send_farewells_iocb.cb_arg = channel;
+ chand->send_farewells_iocb.is_ext_managed = 1; /* GPR_TRUE */
+ grpc_iomgr_add_callback(&chand->send_farewells_iocb);
}
}
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index b697fcc64a..3509d021ee 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -91,6 +91,7 @@ static grpc_fd *alloc_fd(int fd) {
gpr_mu_init(&r->set_state_mu);
gpr_mu_init(&r->watcher_mu);
}
+
gpr_atm_rel_store(&r->refst, 1);
gpr_atm_rel_store(&r->readst, NOT_READY);
gpr_atm_rel_store(&r->writest, NOT_READY);
@@ -117,7 +118,10 @@ static void unref_by(grpc_fd *fd, int n) {
gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
if (old == n) {
close(fd->fd);
- grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data);
+ fd->on_done_iocb.cb = fd->on_done;
+ fd->on_done_iocb.cb_arg = fd->on_done_user_data;
+ fd->on_done_iocb.is_ext_managed = 1;
+ grpc_iomgr_add_callback(&fd->on_done_iocb);
freelist_fd(fd);
grpc_iomgr_unref();
} else {
@@ -196,20 +200,25 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success,
- int allow_synchronous_callback) {
+ int allow_synchronous_callback,
+ grpc_iomgr_closure *iocb) {
if (allow_synchronous_callback) {
cb(arg, success);
} else {
- grpc_iomgr_add_delayed_callback(cb, arg, success);
+ /* !iocb: allocate -> managed by iomgr
+ * iocb: "iocb" holds an instance managed by fd_posix */
+ iocb = grpc_iomgr_cb_create(cb, arg, !iocb /* is_ext_managed */);
+ grpc_iomgr_add_delayed_callback(iocb, success);
}
}
static void make_callbacks(grpc_iomgr_closure *callbacks, size_t n, int success,
- int allow_synchronous_callback) {
+ int allow_synchronous_callback,
+ grpc_iomgr_closure *iocbs) {
size_t i;
for (i = 0; i < n; i++) {
make_callback(callbacks[i].cb, callbacks[i].cb_arg, success,
- allow_synchronous_callback);
+ allow_synchronous_callback, iocbs + i);
}
}
@@ -238,7 +247,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
gpr_atm_rel_store(st, NOT_READY);
make_callback(closure->cb, closure->cb_arg,
!gpr_atm_acq_load(&fd->shutdown),
- allow_synchronous_callback);
+ allow_synchronous_callback, NULL);
return;
default: /* WAITING */
/* upcallptr was set to a different closure. This is an error! */
@@ -284,11 +293,14 @@ static void set_ready(grpc_fd *fd, gpr_atm *st,
int success;
grpc_iomgr_closure cb;
size_t ncb = 0;
+ grpc_iomgr_closure *ready_iocb;
gpr_mu_lock(&fd->set_state_mu);
set_ready_locked(st, &cb, &ncb);
gpr_mu_unlock(&fd->set_state_mu);
success = !gpr_atm_acq_load(&fd->shutdown);
- make_callbacks(&cb, ncb, success, allow_synchronous_callback);
+ assert(ncb <= 1);
+ ready_iocb = grpc_iomgr_cb_create(cb.cb, cb.cb_arg, 0);
+ make_callbacks(&cb, ncb, success, allow_synchronous_callback, ready_iocb);
}
void grpc_fd_shutdown(grpc_fd *fd) {
@@ -300,7 +312,8 @@ void grpc_fd_shutdown(grpc_fd *fd) {
set_ready_locked(&fd->readst, cb, &ncb);
set_ready_locked(&fd->writest, cb, &ncb);
gpr_mu_unlock(&fd->set_state_mu);
- make_callbacks(cb, ncb, 0, 0);
+ assert(ncb <= 2);
+ make_callbacks(cb, ncb, 0, 0, fd->shutdown_iocbs);
}
void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) {
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index cfc533b7f5..59bd01ce0f 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -40,11 +40,6 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
-typedef struct {
- grpc_iomgr_cb_func cb;
- void *cb_arg;
-} grpc_iomgr_closure;
-
typedef struct grpc_fd grpc_fd;
typedef struct grpc_fd_watcher {
@@ -99,6 +94,11 @@ struct grpc_fd {
grpc_iomgr_cb_func on_done;
void *on_done_user_data;
struct grpc_fd *freelist_next;
+
+ grpc_iomgr_closure on_done_iocb;
+ /*grpc_iomgr_closure *ready_iocb; XXX: the only one we need to allocate on
+ * the spot*/
+ grpc_iomgr_closure shutdown_iocbs[2];
};
/* Create a wrapped file descriptor.
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index d22542fc91..e74c32b219 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -33,6 +33,7 @@
#include "src/core/iomgr/iomgr.h"
+#include <assert.h>
#include <stdlib.h>
#include "src/core/iomgr/iomgr_internal.h"
@@ -42,17 +43,10 @@
#include <grpc/support/thd.h>
#include <grpc/support/sync.h>
-typedef struct delayed_callback {
- grpc_iomgr_cb_func cb;
- void *cb_arg;
- int success;
- struct delayed_callback *next;
-} delayed_callback;
-
static gpr_mu g_mu;
static gpr_cv g_rcv;
-static delayed_callback *g_cbs_head = NULL;
-static delayed_callback *g_cbs_tail = NULL;
+static grpc_iomgr_closure *g_cbs_head = NULL;
+static grpc_iomgr_closure *g_cbs_tail = NULL;
static int g_shutdown;
static int g_refs;
static gpr_event g_background_callback_executor_done;
@@ -66,12 +60,18 @@ static void background_callback_executor(void *ignored) {
gpr_timespec short_deadline =
gpr_time_add(gpr_now(), gpr_time_from_millis(100));
if (g_cbs_head) {
- delayed_callback *cb = g_cbs_head;
- g_cbs_head = cb->next;
+ grpc_iomgr_closure *iocb = g_cbs_head;
+ int is_cb_ext_managed;
+ g_cbs_head = iocb->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);
- cb->cb(cb->cb_arg, cb->success);
- gpr_free(cb);
+ /* capture the managed state, as the callback may deallocate itself */
+ is_cb_ext_managed = iocb->is_ext_managed;
+ assert(iocb->success >= 0);
+ iocb->cb(iocb->cb_arg, iocb->success);
+ if (!is_cb_ext_managed) {
+ gpr_free(iocb);
+ }
gpr_mu_lock(&g_mu);
} else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) {
} else {
@@ -103,7 +103,7 @@ void grpc_iomgr_init(void) {
}
void grpc_iomgr_shutdown(void) {
- delayed_callback *cb;
+ grpc_iomgr_closure *iocb;
gpr_timespec shutdown_deadline =
gpr_time_add(gpr_now(), gpr_time_from_seconds(10));
@@ -114,13 +114,18 @@ void grpc_iomgr_shutdown(void) {
gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", g_refs,
g_cbs_head ? " and executing final callbacks" : "");
while (g_cbs_head) {
- cb = g_cbs_head;
- g_cbs_head = cb->next;
+ int is_cb_ext_managed;
+ iocb = g_cbs_head;
+ g_cbs_head = iocb->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);
- cb->cb(cb->cb_arg, 0);
- gpr_free(cb);
+ /* capture the managed state, as the callback may deallocate itself */
+ is_cb_ext_managed = iocb->is_ext_managed;
+ iocb->cb(iocb->cb_arg, 0);
+ if (!is_cb_ext_managed) {
+ gpr_free(iocb);
+ }
gpr_mu_lock(&g_mu);
}
if (g_refs) {
@@ -167,42 +172,52 @@ void grpc_iomgr_unref(void) {
gpr_mu_unlock(&g_mu);
}
-void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg,
- int success) {
- delayed_callback *dcb = gpr_malloc(sizeof(delayed_callback));
- dcb->cb = cb;
- dcb->cb_arg = cb_arg;
- dcb->success = success;
+grpc_iomgr_closure *grpc_iomgr_cb_create(grpc_iomgr_cb_func cb, void *cb_arg,
+ int is_ext_managed) {
+ grpc_iomgr_closure *iocb = gpr_malloc(sizeof(grpc_iomgr_closure));
+ iocb->cb = cb;
+ iocb->cb_arg = cb_arg;
+ iocb->is_ext_managed = is_ext_managed;
+ iocb->success = -1; /* uninitialized */
+ iocb->next = NULL;
+ return iocb;
+}
+
+void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success) {
+ iocb->success = success;
gpr_mu_lock(&g_mu);
- dcb->next = NULL;
+ iocb->next = NULL;
if (!g_cbs_tail) {
- g_cbs_head = g_cbs_tail = dcb;
+ g_cbs_head = g_cbs_tail = iocb;
} else {
- g_cbs_tail->next = dcb;
- g_cbs_tail = dcb;
+ g_cbs_tail->next = iocb;
+ g_cbs_tail = iocb;
}
gpr_mu_unlock(&g_mu);
}
-void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) {
- grpc_iomgr_add_delayed_callback(cb, cb_arg, 1);
+
+void grpc_iomgr_add_callback(grpc_iomgr_closure *iocb) {
+ grpc_iomgr_add_delayed_callback(iocb, 1);
}
+
int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) {
int n = 0;
gpr_mu *retake_mu = NULL;
- delayed_callback *cb;
+ grpc_iomgr_closure *iocb;
for (;;) {
+ int is_cb_ext_managed;
/* check for new work */
if (!gpr_mu_trylock(&g_mu)) {
break;
}
- cb = g_cbs_head;
- if (!cb) {
+ iocb = g_cbs_head;
+ if (!iocb) {
gpr_mu_unlock(&g_mu);
break;
}
- g_cbs_head = cb->next;
+ g_cbs_head = iocb->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);
/* if we have a mutex to drop, do so before executing work */
@@ -211,8 +226,13 @@ int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) {
retake_mu = drop_mu;
drop_mu = NULL;
}
- cb->cb(cb->cb_arg, success && cb->success);
- gpr_free(cb);
+ /* capture the managed state, as the callback may deallocate itself */
+ is_cb_ext_managed = iocb->is_ext_managed;
+ assert(iocb->success >= 0);
+ iocb->cb(iocb->cb_arg, success && iocb->success);
+ if (!is_cb_ext_managed) {
+ gpr_free(iocb);
+ }
n++;
}
if (retake_mu) {
diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h
index 1f5d23fdda..86bb5f3583 100644
--- a/src/core/iomgr/iomgr.h
+++ b/src/core/iomgr/iomgr.h
@@ -37,11 +37,22 @@
/* gRPC Callback definition */
typedef void (*grpc_iomgr_cb_func)(void *arg, int success);
+typedef struct grpc_iomgr_closure {
+ grpc_iomgr_cb_func cb;
+ void *cb_arg;
+ int success;
+ int is_ext_managed; /** is memory being managed externally? */
+ struct grpc_iomgr_closure *next; /** Do not touch */
+} grpc_iomgr_closure;
+
+grpc_iomgr_closure *grpc_iomgr_cb_create(grpc_iomgr_cb_func cb, void *cb_arg,
+ int is_ext_managed);
+
void grpc_iomgr_init(void);
void grpc_iomgr_shutdown(void);
/* This function is called from within a callback or from anywhere else
and causes the invocation of a callback at some point in the future */
-void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg);
+void grpc_iomgr_add_callback(grpc_iomgr_closure *iocb);
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */
diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h
index 07923258b9..25a731d58c 100644
--- a/src/core/iomgr/iomgr_internal.h
+++ b/src/core/iomgr/iomgr_internal.h
@@ -35,12 +35,10 @@
#define GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H
#include "src/core/iomgr/iomgr.h"
-#include "src/core/iomgr/iomgr_internal.h"
#include <grpc/support/sync.h>
int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success);
-void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg,
- int success);
+void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success);
void grpc_iomgr_ref(void);
void grpc_iomgr_unref(void);
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 826c792990..7a2ba4be06 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -257,6 +257,7 @@ typedef struct grpc_unary_promote_args {
const grpc_pollset_vtable *original_vtable;
grpc_pollset *pollset;
grpc_fd *fd;
+ grpc_iomgr_closure promotion_iocb;
} grpc_unary_promote_args;
static void unary_poll_do_promote(void *args, int success) {
@@ -279,7 +280,7 @@ static void unary_poll_do_promote(void *args, int success) {
/* First we need to ensure that nobody is polling concurrently */
while (pollset->counter != 0) {
grpc_pollset_kick(pollset);
- grpc_iomgr_add_callback(unary_poll_do_promote, up_args);
+ grpc_iomgr_add_callback(&up_args->promotion_iocb);
gpr_mu_unlock(&pollset->mu);
return;
}
@@ -363,7 +364,10 @@ static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
up_args->pollset = pollset;
up_args->fd = fd;
up_args->original_vtable = pollset->vtable;
- grpc_iomgr_add_callback(unary_poll_do_promote, up_args);
+ up_args->promotion_iocb.cb = unary_poll_do_promote;
+ up_args->promotion_iocb.cb_arg = up_args;
+ up_args->promotion_iocb.is_ext_managed = 1;
+ grpc_iomgr_add_callback(&up_args->promotion_iocb);
grpc_pollset_kick(pollset);
}
diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c
index ee5150a696..92fcf7de2a 100644
--- a/src/core/iomgr/socket_windows.c
+++ b/src/core/iomgr/socket_windows.c
@@ -62,15 +62,18 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket) {
int grpc_winsocket_shutdown(grpc_winsocket *socket) {
int callbacks_set = 0;
gpr_mu_lock(&socket->state_mu);
+ socket->shutdown_iocb.is_ext_managed = 1; /* GPR_TRUE */
if (socket->read_info.cb) {
callbacks_set++;
- grpc_iomgr_add_delayed_callback(socket->read_info.cb,
- socket->read_info.opaque, 0);
+ socket->shutdown_iocb.cb = socket->read_info.cb;
+ socket->shutdown_iocb.cb_arg = socket->read_info.opaque;
+ grpc_iomgr_add_delayed_callback(socket->shutdown_iocb, 0);
}
if (socket->write_info.cb) {
callbacks_set++;
- grpc_iomgr_add_delayed_callback(socket->write_info.cb,
- socket->write_info.opaque, 0);
+ socket->shutdown_iocb.cb = socket->write_info.cb;
+ socket->shutdown_iocb.cb_arg = socket->write_info.opaque;
+ grpc_iomgr_add_delayed_callback(socket->shutdown_iocb, 0);
}
gpr_mu_unlock(&socket->state_mu);
return callbacks_set;
diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h
index b27eb14219..8fd8ae97b6 100644
--- a/src/core/iomgr/socket_windows.h
+++ b/src/core/iomgr/socket_windows.h
@@ -93,6 +93,8 @@ typedef struct grpc_winsocket {
there is a pending operation that the IO Completion Port will have to
wait for. The socket will be collected at that time. */
int orphan;
+
+ grpc_iomgr_closure shutdown_iocb;
} grpc_winsocket;
/* Create a wrapped windows handle. This takes ownership of it, meaning that
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index cd6b2ecae6..e2cda52733 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -280,6 +280,8 @@ typedef struct {
grpc_iomgr_closure read_closure;
grpc_iomgr_closure write_closure;
+
+ grpc_iomgr_closure handle_read_iocb;
} grpc_tcp;
static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success);
@@ -443,7 +445,8 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
tcp->finished_edge = 0;
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
} else {
- grpc_iomgr_add_callback(grpc_tcp_handle_read, tcp);
+ tcp->handle_read_iocb.cb_arg = tcp;
+ grpc_iomgr_add_callback(&tcp->handle_read_iocb);
}
}
@@ -592,6 +595,9 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
tcp->read_closure.cb_arg = tcp;
tcp->write_closure.cb = grpc_tcp_handle_write;
tcp->write_closure.cb_arg = tcp;
+
+ tcp->handle_read_iocb.cb = grpc_tcp_handle_read;
+ tcp->handle_read_iocb.is_ext_managed = 1;
return &tcp->base;
}
diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c
index ae22bf47a0..e9229f0694 100644
--- a/src/core/security/credentials.c
+++ b/src/core/security/credentials.c
@@ -832,8 +832,10 @@ static void fake_oauth2_get_request_metadata(grpc_credentials *creds,
if (c->is_async) {
grpc_iomgr_add_callback(
+ grpc_iomgr_cb_create(
on_simulated_token_fetch_done,
- grpc_credentials_metadata_request_create(creds, cb, user_data));
+ grpc_credentials_metadata_request_create(creds, cb, user_data),
+ 0 /*GPR_FALSE*/));
} else {
cb(user_data, c->access_token_md->entries, 1, GRPC_CREDENTIALS_OK);
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index e3995a407b..99e14e3e15 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -226,6 +226,7 @@ struct grpc_call {
gpr_slice_buffer incoming_message;
gpr_uint32 incoming_message_length;
+ grpc_iomgr_closure destroy_iocb;
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -367,7 +368,10 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
if (allow_immediate_deletion) {
destroy_call(c, 1);
} else {
- grpc_iomgr_add_callback(destroy_call, c);
+ c->destroy_iocb.cb = destroy_call;
+ c->destroy_iocb.cb_arg = c;
+ c->destroy_iocb.is_ext_managed = 1; /* GPR_TRUE */
+ grpc_iomgr_add_callback(&c->destroy_iocb);
}
}
}
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index be9da2b7f9..8e593b8073 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -61,6 +61,7 @@ struct grpc_channel {
gpr_mu registered_call_mu;
registered_call *registered_calls;
+ grpc_iomgr_closure destroy_iocb;
};
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
@@ -193,7 +194,10 @@ static void destroy_channel(void *p, int ok) {
void grpc_channel_internal_unref(grpc_channel *channel) {
if (gpr_unref(&channel->refs)) {
- grpc_iomgr_add_callback(destroy_channel, channel);
+ channel->destroy_iocb.cb = destroy_channel;
+ channel->destroy_iocb.cb_arg = channel;
+ channel->destroy_iocb.is_ext_managed = 1;
+ grpc_iomgr_add_callback(&channel->destroy_iocb);
}
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 60606c75e4..a10b921456 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -122,6 +122,8 @@ struct channel_data {
channel_registered_method *registered_methods;
gpr_uint32 registered_method_slots;
gpr_uint32 registered_method_max_probes;
+ grpc_iomgr_closure finish_shutdown_channel_iocb;
+ grpc_iomgr_closure finish_destroy_channel_iocb;
};
struct grpc_server {
@@ -304,7 +306,10 @@ static void destroy_channel(channel_data *chand) {
GPR_ASSERT(chand->server != NULL);
orphan_channel(chand);
server_ref(chand->server);
- grpc_iomgr_add_callback(finish_destroy_channel, chand);
+ chand->finish_destroy_channel_iocb.cb = finish_destroy_channel;
+ chand->finish_destroy_channel_iocb.cb_arg = chand;
+ chand->finish_destroy_channel_iocb.is_ext_managed = 1;
+ grpc_iomgr_add_callback(&chand->finish_destroy_channel_iocb);
}
static void finish_start_new_rpc_and_unlock(grpc_server *server,
@@ -416,7 +421,8 @@ static void server_on_recv(void *ptr, int success) {
gpr_mu_lock(&chand->server->mu);
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
+ grpc_iomgr_add_callback(
+ grpc_iomgr_cb_create(kill_zombie, elem, 0 /* GPR_FALSE */));
}
gpr_mu_unlock(&chand->server->mu);
break;
@@ -424,11 +430,13 @@ static void server_on_recv(void *ptr, int success) {
gpr_mu_lock(&chand->server->mu);
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
+ grpc_iomgr_add_callback(
+ grpc_iomgr_cb_create(kill_zombie, elem, 0 /* GPR_FALSE */));
} else if (calld->state == PENDING) {
call_list_remove(calld, PENDING_START);
calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
+ grpc_iomgr_add_callback(
+ grpc_iomgr_cb_create(kill_zombie, elem, 0 /* GPR_FALSE */));
}
gpr_mu_unlock(&chand->server->mu);
break;
@@ -502,7 +510,10 @@ static void finish_shutdown_channel(void *cd, int success) {
static void shutdown_channel(channel_data *chand) {
grpc_channel_internal_ref(chand->channel);
- grpc_iomgr_add_callback(finish_shutdown_channel, chand);
+ chand->finish_shutdown_channel_iocb.cb = finish_shutdown_channel;
+ chand->finish_shutdown_channel_iocb.cb_arg = chand;
+ chand->finish_shutdown_channel_iocb.is_ext_managed = 1;
+ grpc_iomgr_add_callback(&chand->finish_shutdown_channel_iocb);
}
static void init_call_elem(grpc_call_element *elem,
@@ -943,8 +954,10 @@ void grpc_server_destroy(grpc_server *server) {
gpr_log(GPR_DEBUG, "server destroys call %p", calld->call);
calld->state = ZOMBIED;
grpc_iomgr_add_callback(
+ grpc_iomgr_cb_create(
kill_zombie,
- grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
+ 0)); /* XXX */
}
for (c = server->root_channel_data.next; c != &server->root_channel_data;
diff --git a/test/core/iomgr/alarm_test.c b/test/core/iomgr/alarm_test.c
index e677ba30dd..e416c79425 100644
--- a/test/core/iomgr/alarm_test.c
+++ b/test/core/iomgr/alarm_test.c
@@ -55,6 +55,7 @@ void no_op_cb(void *arg, int success) {}
typedef struct {
gpr_cv cv;
gpr_mu mu;
+ grpc_iomgr_closure *iocb;
int counter;
int done_success_ctr;
int done_cancel_ctr;
@@ -81,7 +82,10 @@ static void alarm_cb(void *arg /* alarm_arg */, int success) {
a->success = success;
gpr_cv_signal(&a->cv);
gpr_mu_unlock(&a->mu);
- grpc_iomgr_add_callback(followup_cb, &a->fcb_arg);
+ a->iocb->cb = followup_cb;
+ a->iocb->cb_arg = &a->fcb_arg;
+ a->iocb->is_ext_managed = 1;
+ grpc_iomgr_add_callback(a->iocb);
}
/* Test grpc_alarm add and cancel. */
@@ -107,6 +111,7 @@ static void test_grpc_alarm(void) {
arg.done = 0;
gpr_mu_init(&arg.mu);
gpr_cv_init(&arg.cv);
+ arg.iocb = gpr_malloc(sizeof(grpc_iomgr_closure));
gpr_event_init(&arg.fcb_arg);
grpc_alarm_init(&alarm, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100), alarm_cb, &arg,
@@ -148,6 +153,7 @@ static void test_grpc_alarm(void) {
}
gpr_cv_destroy(&arg.cv);
gpr_mu_destroy(&arg.mu);
+ gpr_free(arg.iocb);
arg2.counter = 0;
arg2.success = SUCCESS_NOT_SET;
@@ -156,6 +162,8 @@ static void test_grpc_alarm(void) {
arg2.done = 0;
gpr_mu_init(&arg2.mu);
gpr_cv_init(&arg2.cv);
+ arg2.iocb = gpr_malloc(sizeof(grpc_iomgr_closure));
+
gpr_event_init(&arg2.fcb_arg);
grpc_alarm_init(&alarm_to_cancel, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100),
@@ -206,6 +214,7 @@ static void test_grpc_alarm(void) {
}
gpr_cv_destroy(&arg2.cv);
gpr_mu_destroy(&arg2.mu);
+ gpr_free(arg2.iocb);
grpc_iomgr_shutdown();
}