aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/iomgr')
-rw-r--r--src/core/iomgr/alarm.c66
-rw-r--r--src/core/iomgr/alarm.h7
-rw-r--r--src/core/iomgr/alarm_internal.h6
-rw-r--r--src/core/iomgr/endpoint.c32
-rw-r--r--src/core/iomgr/endpoint.h44
-rw-r--r--src/core/iomgr/endpoint_pair.h3
-rw-r--r--src/core/iomgr/endpoint_pair_posix.c11
-rw-r--r--src/core/iomgr/fd_posix.c97
-rw-r--r--src/core/iomgr/fd_posix.h21
-rw-r--r--src/core/iomgr/iomgr.c25
-rw-r--r--src/core/iomgr/iomgr.h22
-rw-r--r--src/core/iomgr/pollset.h5
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c43
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c22
-rw-r--r--src/core/iomgr/pollset_posix.c138
-rw-r--r--src/core/iomgr/pollset_posix.h36
-rw-r--r--src/core/iomgr/pollset_set.h6
-rw-r--r--src/core/iomgr/pollset_set_posix.c16
-rw-r--r--src/core/iomgr/pollset_set_posix.h6
-rw-r--r--src/core/iomgr/resolve_address.h5
-rw-r--r--src/core/iomgr/resolve_address_posix.c8
-rw-r--r--src/core/iomgr/tcp_client.h7
-rw-r--r--src/core/iomgr/tcp_client_posix.c58
-rw-r--r--src/core/iomgr/tcp_posix.c128
-rw-r--r--src/core/iomgr/tcp_server.h10
-rw-r--r--src/core/iomgr/tcp_server_posix.c64
-rw-r--r--src/core/iomgr/udp_server.c55
-rw-r--r--src/core/iomgr/udp_server.h7
-rw-r--r--src/core/iomgr/workqueue.h16
-rw-r--r--src/core/iomgr/workqueue_posix.c49
30 files changed, 473 insertions, 540 deletions
diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c
index 7b67fe3b1d..6e0d516f0c 100644
--- a/src/core/iomgr/alarm.c
+++ b/src/core/iomgr/alarm.c
@@ -44,7 +44,6 @@
#define LOG2_NUM_SHARDS 5
#define NUM_SHARDS (1 << LOG2_NUM_SHARDS)
-#define MAX_ALARMS_PER_CHECK 128
#define ADD_DEADLINE_SCALE 0.33
#define MIN_QUEUE_WINDOW_DURATION 0.01
#define MAX_QUEUE_WINDOW_DURATION 1
@@ -73,8 +72,8 @@ static shard_type g_shards[NUM_SHARDS];
/* Protected by g_mu */
static shard_type *g_shard_queue[NUM_SHARDS];
-static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now,
- gpr_timespec *next, int success);
+static int run_some_expired_alarms(gpr_timespec now, gpr_timespec *next,
+ int success, grpc_call_list *call_list);
static gpr_timespec compute_min_deadline(shard_type *shard) {
return grpc_alarm_heap_is_empty(&shard->heap)
@@ -103,10 +102,9 @@ void grpc_alarm_list_init(gpr_timespec now) {
}
}
-void grpc_alarm_list_shutdown(void) {
+void grpc_alarm_list_shutdown(grpc_call_list *call_list) {
int i;
- while (run_some_expired_alarms(NULL, gpr_inf_future(g_clock_type), NULL, 0))
- ;
+ run_some_expired_alarms(gpr_inf_future(g_clock_type), NULL, 0, call_list);
for (i = 0; i < NUM_SHARDS; i++) {
shard_type *shard = &g_shards[i];
gpr_mu_destroy(&shard->mu);
@@ -174,13 +172,12 @@ static void note_deadline_change(shard_type *shard) {
void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline,
grpc_iomgr_cb_func alarm_cb, void *alarm_cb_arg,
- gpr_timespec now) {
+ gpr_timespec now, grpc_call_list *call_list) {
int is_first_alarm = 0;
shard_type *shard = &g_shards[shard_idx(alarm)];
GPR_ASSERT(deadline.clock_type == g_clock_type);
GPR_ASSERT(now.clock_type == g_clock_type);
- alarm->cb = alarm_cb;
- alarm->cb_arg = alarm_cb_arg;
+ grpc_closure_init(&alarm->closure, alarm_cb, alarm_cb_arg);
alarm->deadline = deadline;
alarm->triggered = 0;
@@ -223,12 +220,11 @@ void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline,
}
}
-void grpc_alarm_cancel(grpc_alarm *alarm) {
+void grpc_alarm_cancel(grpc_alarm *alarm, grpc_call_list *call_list) {
shard_type *shard = &g_shards[shard_idx(alarm)];
- int triggered = 0;
gpr_mu_lock(&shard->mu);
if (!alarm->triggered) {
- triggered = 1;
+ grpc_call_list_add(call_list, &alarm->closure, 1);
alarm->triggered = 1;
if (alarm->heap_index == INVALID_HEAP_INDEX) {
list_remove(alarm);
@@ -237,10 +233,6 @@ void grpc_alarm_cancel(grpc_alarm *alarm) {
}
}
gpr_mu_unlock(&shard->mu);
-
- if (triggered) {
- alarm->cb(alarm->cb_arg, 0);
- }
}
/* This is called when the queue is empty and "now" has reached the
@@ -292,39 +284,36 @@ static grpc_alarm *pop_one(shard_type *shard, gpr_timespec now) {
/* REQUIRES: shard->mu unlocked */
static size_t pop_alarms(shard_type *shard, gpr_timespec now,
- grpc_alarm **alarms, size_t max_alarms,
- gpr_timespec *new_min_deadline) {
+ gpr_timespec *new_min_deadline, int success,
+ grpc_call_list *call_list) {
size_t n = 0;
grpc_alarm *alarm;
gpr_mu_lock(&shard->mu);
- while (n < max_alarms && (alarm = pop_one(shard, now))) {
- alarms[n++] = alarm;
+ while ((alarm = pop_one(shard, now))) {
+ grpc_call_list_add(call_list, &alarm->closure, success);
}
*new_min_deadline = compute_min_deadline(shard);
gpr_mu_unlock(&shard->mu);
return n;
}
-static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now,
- gpr_timespec *next, int success) {
+static int run_some_expired_alarms(gpr_timespec now, gpr_timespec *next,
+ int success, grpc_call_list *call_list) {
size_t n = 0;
- size_t i;
- grpc_alarm *alarms[MAX_ALARMS_PER_CHECK];
/* TODO(ctiller): verify that there are any alarms (atomically) here */
if (gpr_mu_trylock(&g_checker_mu)) {
gpr_mu_lock(&g_mu);
- while (n < MAX_ALARMS_PER_CHECK &&
- gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) {
+ while (gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) {
gpr_timespec new_min_deadline;
/* For efficiency, we pop as many available alarms as we can from the
shard. This may violate perfect alarm deadline ordering, but that
shouldn't be a big deal because we don't make ordering guarantees. */
- n += pop_alarms(g_shard_queue[0], now, alarms + n,
- MAX_ALARMS_PER_CHECK - n, &new_min_deadline);
+ n += pop_alarms(g_shard_queue[0], now, &new_min_deadline, success,
+ call_list);
/* An grpc_alarm_init() on the shard could intervene here, adding a new
alarm that is earlier than new_min_deadline. However,
@@ -343,26 +332,15 @@ static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now,
gpr_mu_unlock(&g_checker_mu);
}
- if (n && drop_mu) {
- gpr_mu_unlock(drop_mu);
- }
-
- for (i = 0; i < n; i++) {
- alarms[i]->cb(alarms[i]->cb_arg, success);
- }
-
- if (n && drop_mu) {
- gpr_mu_lock(drop_mu);
- }
-
- return (int)n;
+ return n > 0;
}
-int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) {
+int grpc_alarm_check(gpr_timespec now, gpr_timespec *next,
+ grpc_call_list *call_list) {
GPR_ASSERT(now.clock_type == g_clock_type);
return run_some_expired_alarms(
- drop_mu, now, next,
- gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0);
+ now, next, gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0,
+ call_list);
}
gpr_timespec grpc_alarm_list_next_timeout(void) {
diff --git a/src/core/iomgr/alarm.h b/src/core/iomgr/alarm.h
index 4a13527e64..05ca7c27bb 100644
--- a/src/core/iomgr/alarm.h
+++ b/src/core/iomgr/alarm.h
@@ -44,8 +44,7 @@ typedef struct grpc_alarm {
int triggered;
struct grpc_alarm *next;
struct grpc_alarm *prev;
- grpc_iomgr_cb_func cb;
- void *cb_arg;
+ grpc_closure closure;
} grpc_alarm;
/* Initialize *alarm. When expired or canceled, alarm_cb will be called with
@@ -56,7 +55,7 @@ typedef struct grpc_alarm {
information about when to free up any user-level state. */
void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline,
grpc_iomgr_cb_func alarm_cb, void *alarm_cb_arg,
- gpr_timespec now);
+ gpr_timespec now, grpc_call_list *call_list);
/* Note that there is no alarm destroy function. This is because the
alarm is a one-time occurrence with a guarantee that the callback will
@@ -84,6 +83,6 @@ void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline,
matches this aim.
Requires: cancel() must happen after add() on a given alarm */
-void grpc_alarm_cancel(grpc_alarm *alarm);
+void grpc_alarm_cancel(grpc_alarm *alarm, grpc_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_H */
diff --git a/src/core/iomgr/alarm_internal.h b/src/core/iomgr/alarm_internal.h
index e9f98a3444..a2e9946f4f 100644
--- a/src/core/iomgr/alarm_internal.h
+++ b/src/core/iomgr/alarm_internal.h
@@ -48,10 +48,10 @@
with high probability at least one thread in the system will see an update
at any time slice. */
-int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next);
-
+int grpc_alarm_check(gpr_timespec now, gpr_timespec *next,
+ grpc_call_list *call_list);
void grpc_alarm_list_init(gpr_timespec now);
-void grpc_alarm_list_shutdown(void);
+void grpc_alarm_list_shutdown(grpc_call_list *call_list);
gpr_timespec grpc_alarm_list_next_timeout(void);
diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c
index 1955f74b9a..769f155a63 100644
--- a/src/core/iomgr/endpoint.c
+++ b/src/core/iomgr/endpoint.c
@@ -33,30 +33,34 @@
#include "src/core/iomgr/endpoint.h"
-grpc_endpoint_op_status grpc_endpoint_read(grpc_endpoint *ep,
- gpr_slice_buffer *slices,
- grpc_closure *cb) {
- return ep->vtable->read(ep, slices, cb);
+void grpc_endpoint_read(grpc_endpoint *ep, gpr_slice_buffer *slices,
+ grpc_closure *cb, grpc_call_list *call_list) {
+ ep->vtable->read(ep, slices, cb, call_list);
}
-grpc_endpoint_op_status grpc_endpoint_write(grpc_endpoint *ep,
- gpr_slice_buffer *slices,
- grpc_closure *cb) {
- return ep->vtable->write(ep, slices, cb);
+void grpc_endpoint_write(grpc_endpoint *ep, gpr_slice_buffer *slices,
+ grpc_closure *cb, grpc_call_list *call_list) {
+ ep->vtable->write(ep, slices, cb, call_list);
}
-void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
- ep->vtable->add_to_pollset(ep, pollset);
+void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset,
+ grpc_call_list *call_list) {
+ ep->vtable->add_to_pollset(ep, pollset, call_list);
}
void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep,
- grpc_pollset_set *pollset_set) {
- ep->vtable->add_to_pollset_set(ep, pollset_set);
+ grpc_pollset_set *pollset_set,
+ grpc_call_list *call_list) {
+ ep->vtable->add_to_pollset_set(ep, pollset_set, call_list);
}
-void grpc_endpoint_shutdown(grpc_endpoint *ep) { ep->vtable->shutdown(ep); }
+void grpc_endpoint_shutdown(grpc_endpoint *ep, grpc_call_list *call_list) {
+ ep->vtable->shutdown(ep, call_list);
+}
-void grpc_endpoint_destroy(grpc_endpoint *ep) { ep->vtable->destroy(ep); }
+void grpc_endpoint_destroy(grpc_endpoint *ep, grpc_call_list *call_list) {
+ ep->vtable->destroy(ep, call_list);
+}
char *grpc_endpoint_get_peer(grpc_endpoint *ep) {
return ep->vtable->get_peer(ep);
diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h
index 79b7d6a78e..f9881684ff 100644
--- a/src/core/iomgr/endpoint.h
+++ b/src/core/iomgr/endpoint.h
@@ -46,21 +46,17 @@
typedef struct grpc_endpoint grpc_endpoint;
typedef struct grpc_endpoint_vtable grpc_endpoint_vtable;
-typedef enum grpc_endpoint_op_status {
- GRPC_ENDPOINT_DONE, /* completed immediately, cb won't be called */
- GRPC_ENDPOINT_PENDING, /* cb will be called when completed */
- GRPC_ENDPOINT_ERROR /* write errored out, cb won't be called */
-} grpc_endpoint_op_status;
-
struct grpc_endpoint_vtable {
- grpc_endpoint_op_status (*read)(grpc_endpoint *ep, gpr_slice_buffer *slices,
- grpc_closure *cb);
- grpc_endpoint_op_status (*write)(grpc_endpoint *ep, gpr_slice_buffer *slices,
- grpc_closure *cb);
- void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset);
- void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset);
- void (*shutdown)(grpc_endpoint *ep);
- void (*destroy)(grpc_endpoint *ep);
+ void (*read)(grpc_endpoint *ep, gpr_slice_buffer *slices, grpc_closure *cb,
+ grpc_call_list *call_list);
+ void (*write)(grpc_endpoint *ep, gpr_slice_buffer *slices, grpc_closure *cb,
+ grpc_call_list *call_list);
+ void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset,
+ grpc_call_list *call_list);
+ void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset,
+ grpc_call_list *call_list);
+ void (*shutdown)(grpc_endpoint *ep, grpc_call_list *call_list);
+ void (*destroy)(grpc_endpoint *ep, grpc_call_list *call_list);
char *(*get_peer)(grpc_endpoint *ep);
};
@@ -68,9 +64,8 @@ struct grpc_endpoint_vtable {
Callback success indicates that the endpoint can accept more reads, failure
indicates the endpoint is closed.
Valid slices may be placed into \a slices even on callback success == 0. */
-grpc_endpoint_op_status grpc_endpoint_read(
- grpc_endpoint *ep, gpr_slice_buffer *slices,
- grpc_closure *cb) GRPC_MUST_USE_RESULT;
+void grpc_endpoint_read(grpc_endpoint *ep, gpr_slice_buffer *slices,
+ grpc_closure *cb, grpc_call_list *call_list);
char *grpc_endpoint_get_peer(grpc_endpoint *ep);
@@ -84,20 +79,21 @@ char *grpc_endpoint_get_peer(grpc_endpoint *ep);
No guarantee is made to the content of slices after a write EXCEPT that
it is a valid slice buffer.
*/
-grpc_endpoint_op_status grpc_endpoint_write(
- grpc_endpoint *ep, gpr_slice_buffer *slices,
- grpc_closure *cb) GRPC_MUST_USE_RESULT;
+void grpc_endpoint_write(grpc_endpoint *ep, gpr_slice_buffer *slices,
+ grpc_closure *cb, grpc_call_list *call_list);
/* Causes any pending read/write callbacks to run immediately with
success==0 */
-void grpc_endpoint_shutdown(grpc_endpoint *ep);
-void grpc_endpoint_destroy(grpc_endpoint *ep);
+void grpc_endpoint_shutdown(grpc_endpoint *ep, grpc_call_list *call_list);
+void grpc_endpoint_destroy(grpc_endpoint *ep, grpc_call_list *call_list);
/* Add an endpoint to a pollset, so that when the pollset is polled, events from
this endpoint are considered */
-void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset);
+void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset,
+ grpc_call_list *call_list);
void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep,
- grpc_pollset_set *pollset_set);
+ grpc_pollset_set *pollset_set,
+ grpc_call_list *call_list);
struct grpc_endpoint {
const grpc_endpoint_vtable *vtable;
diff --git a/src/core/iomgr/endpoint_pair.h b/src/core/iomgr/endpoint_pair.h
index 25ef1891fb..095ec5fcc9 100644
--- a/src/core/iomgr/endpoint_pair.h
+++ b/src/core/iomgr/endpoint_pair.h
@@ -42,7 +42,6 @@ typedef struct {
} grpc_endpoint_pair;
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
- size_t read_slice_size,
- grpc_workqueue *workqueue);
+ size_t read_slice_size);
#endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_PAIR_H */
diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c
index dc1f441b4b..deae9c6875 100644
--- a/src/core/iomgr/endpoint_pair_posix.c
+++ b/src/core/iomgr/endpoint_pair_posix.c
@@ -59,20 +59,19 @@ static void create_sockets(int sv[2]) {
}
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
- size_t read_slice_size,
- grpc_workqueue *workqueue) {
+ size_t read_slice_size) {
int sv[2];
grpc_endpoint_pair p;
char *final_name;
create_sockets(sv);
gpr_asprintf(&final_name, "%s:client", name);
- p.client = grpc_tcp_create(grpc_fd_create(sv[1], workqueue, final_name),
- read_slice_size, "socketpair-server");
+ p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size,
+ "socketpair-server");
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
- p.server = grpc_tcp_create(grpc_fd_create(sv[0], workqueue, final_name),
- read_slice_size, "socketpair-client");
+ p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size,
+ "socketpair-client");
gpr_free(final_name);
return p;
}
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index dfebbcc2e1..294cb70746 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -71,9 +71,6 @@ static grpc_fd *fd_freelist = NULL;
static gpr_mu fd_freelist_mu;
static void freelist_fd(grpc_fd *fd) {
- if (fd->workqueue->wakeup_read_fd != fd) {
- GRPC_WORKQUEUE_UNREF(fd->workqueue, "fd");
- }
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
fd_freelist = fd;
@@ -161,14 +158,8 @@ void grpc_fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
-grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name) {
+grpc_fd *grpc_fd_create(int fd, const char *name) {
grpc_fd *r = alloc_fd(fd);
- r->workqueue = workqueue;
- /* if the wakeup_read_fd is NULL, then the workqueue is under construction
- ==> this fd will be the wakeup_read_fd, and we shouldn't take a ref */
- if (workqueue->wakeup_read_fd != NULL) {
- GRPC_WORKQUEUE_REF(workqueue, "fd");
- }
grpc_iomgr_register_object(&r->iomgr_object, name);
return r;
}
@@ -218,7 +209,8 @@ static int has_watchers(grpc_fd *fd) {
fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
}
-void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason) {
+void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason,
+ grpc_call_list *call_list) {
fd->on_done_closure = on_done;
shutdown(fd->fd, SHUT_RDWR);
gpr_mu_lock(&fd->watcher_mu);
@@ -226,9 +218,7 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason) {
if (!has_watchers(fd)) {
fd->closed = 1;
close(fd->fd);
- if (fd->on_done_closure) {
- grpc_workqueue_push(fd->workqueue, fd->on_done_closure, 1);
- }
+ grpc_call_list_add(call_list, fd->on_done_closure, 1);
} else {
wake_all_watchers_locked(fd);
}
@@ -252,25 +242,8 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
#endif
-static void process_callback(grpc_closure *closure, int success,
- grpc_workqueue *optional_workqueue) {
- if (optional_workqueue == NULL) {
- closure->cb(closure->cb_arg, success);
- } else {
- grpc_workqueue_push(optional_workqueue, closure, success);
- }
-}
-
-static void process_callbacks(grpc_closure *callbacks, size_t n, int success,
- grpc_workqueue *optional_workqueue) {
- size_t i;
- for (i = 0; i < n; i++) {
- process_callback(callbacks + i, success, optional_workqueue);
- }
-}
-
static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_closure *closure,
- int allow_synchronous_callback) {
+ grpc_call_list *call_list) {
switch (gpr_atm_acq_load(st)) {
case NOT_READY:
/* There is no race if the descriptor is already ready, so we skip
@@ -292,8 +265,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_closure *closure,
case READY:
GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY);
gpr_atm_rel_store(st, NOT_READY);
- process_callback(closure, !gpr_atm_acq_load(&fd->shutdown),
- allow_synchronous_callback ? NULL : fd->workqueue);
+ grpc_call_list_add(call_list, closure, !gpr_atm_acq_load(&fd->shutdown));
return;
default: /* WAITING */
/* upcallptr was set to a different closure. This is an error! */
@@ -306,8 +278,8 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_closure *closure,
abort();
}
-static void set_ready_locked(gpr_atm *st, grpc_closure **callbacks,
- size_t *ncallbacks) {
+static void set_ready_locked(grpc_fd *fd, gpr_atm *st,
+ grpc_call_list *call_list) {
gpr_intptr state = gpr_atm_acq_load(st);
switch (state) {
@@ -326,50 +298,38 @@ static void set_ready_locked(gpr_atm *st, grpc_closure **callbacks,
default: /* waiting */
GPR_ASSERT(gpr_atm_no_barrier_load(st) != READY &&
gpr_atm_no_barrier_load(st) != NOT_READY);
- callbacks[(*ncallbacks)++] = (grpc_closure *)state;
+ grpc_call_list_add(call_list, (grpc_closure *)state,
+ !gpr_atm_acq_load(&fd->shutdown));
gpr_atm_rel_store(st, NOT_READY);
return;
}
}
-static void set_ready(grpc_fd *fd, gpr_atm *st,
- int allow_synchronous_callback) {
+static void set_ready(grpc_fd *fd, gpr_atm *st, grpc_call_list *call_list) {
/* only one set_ready can be active at once (but there may be a racing
notify_on) */
- int success;
- grpc_closure *closure;
- size_t ncb = 0;
-
gpr_mu_lock(&fd->set_state_mu);
- set_ready_locked(st, &closure, &ncb);
+ set_ready_locked(fd, st, call_list);
gpr_mu_unlock(&fd->set_state_mu);
- success = !gpr_atm_acq_load(&fd->shutdown);
- GPR_ASSERT(ncb <= 1);
- if (ncb > 0) {
- process_callbacks(closure, ncb, success,
- allow_synchronous_callback ? NULL : fd->workqueue);
- }
}
-void grpc_fd_shutdown(grpc_fd *fd) {
- size_t ncb = 0;
+void grpc_fd_shutdown(grpc_fd *fd, grpc_call_list *call_list) {
gpr_mu_lock(&fd->set_state_mu);
GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown));
gpr_atm_rel_store(&fd->shutdown, 1);
- set_ready_locked(&fd->readst, &fd->shutdown_closures[0], &ncb);
- set_ready_locked(&fd->writest, &fd->shutdown_closures[0], &ncb);
+ set_ready_locked(fd, &fd->readst, call_list);
+ set_ready_locked(fd, &fd->writest, call_list);
gpr_mu_unlock(&fd->set_state_mu);
- GPR_ASSERT(ncb <= 2);
- process_callbacks(fd->shutdown_closures[0], ncb, 0 /* GPR_FALSE */,
- 0 /* GPR_FALSE */);
}
-void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure) {
- notify_on(fd, &fd->readst, closure, 0);
+void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure,
+ grpc_call_list *call_list) {
+ notify_on(fd, &fd->readst, closure, call_list);
}
-void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure) {
- notify_on(fd, &fd->writest, closure, 0);
+void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure,
+ grpc_call_list *call_list) {
+ notify_on(fd, &fd->writest, closure, call_list);
}
gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
@@ -415,7 +375,8 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
return mask;
}
-void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
+void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write,
+ grpc_call_list *call_list) {
int was_polling = 0;
int kick = 0;
grpc_fd *fd = watcher->fd;
@@ -448,21 +409,19 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
if (grpc_fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
fd->closed = 1;
close(fd->fd);
- if (fd->on_done_closure != NULL) {
- grpc_workqueue_push(fd->workqueue, fd->on_done_closure, 1);
- }
+ grpc_call_list_add(call_list, fd->on_done_closure, 1);
}
gpr_mu_unlock(&fd->watcher_mu);
GRPC_FD_UNREF(fd, "poll");
}
-void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) {
- set_ready(fd, &fd->readst, allow_synchronous_callback);
+void grpc_fd_become_readable(grpc_fd *fd, grpc_call_list *call_list) {
+ set_ready(fd, &fd->readst, call_list);
}
-void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback) {
- set_ready(fd, &fd->writest, allow_synchronous_callback);
+void grpc_fd_become_writable(grpc_fd *fd, grpc_call_list *call_list) {
+ set_ready(fd, &fd->writest, call_list);
}
#endif
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index bb85b6c16e..f435e2d3f9 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -58,7 +58,6 @@ struct grpc_fd {
meaning that mostly we ref by two to avoid altering the orphaned bit,
and just unref by 1 when we're ready to flag the object as orphaned */
gpr_atm refst;
- grpc_workqueue *workqueue;
gpr_mu set_state_mu;
gpr_atm shutdown;
@@ -105,7 +104,7 @@ struct grpc_fd {
/* Create a wrapped file descriptor.
Requires fd is a non-blocking file descriptor.
This takes ownership of closing fd. */
-grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name);
+grpc_fd *grpc_fd_create(int fd, const char *name);
/* Releases fd to be asynchronously destroyed.
on_done is called when the underlying file descriptor is definitely close()d.
@@ -113,7 +112,8 @@ grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name);
Requires: *fd initialized; no outstanding notify_on_read or
notify_on_write.
MUST NOT be called with a pollset lock taken */
-void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason);
+void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason,
+ grpc_call_list *call_list);
/* Begin polling on an fd.
Registers that the given pollset is interested in this fd - so that if read
@@ -131,13 +131,14 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
grpc_fd_watcher *rec);
/* Complete polling previously started with grpc_fd_begin_poll
MUST NOT be called with a pollset lock taken */
-void grpc_fd_end_poll(grpc_fd_watcher *rec, int got_read, int got_write);
+void grpc_fd_end_poll(grpc_fd_watcher *rec, int got_read, int got_write,
+ grpc_call_list *call_list);
/* Return 1 if this fd is orphaned, 0 otherwise */
int grpc_fd_is_orphaned(grpc_fd *fd);
/* Cause any current callbacks to error out with GRPC_CALLBACK_CANCELLED. */
-void grpc_fd_shutdown(grpc_fd *fd);
+void grpc_fd_shutdown(grpc_fd *fd, grpc_call_list *call_list);
/* Register read interest, causing read_cb to be called once when fd becomes
readable, on deadline specified by deadline, or on shutdown triggered by
@@ -152,17 +153,19 @@ void grpc_fd_shutdown(grpc_fd *fd);
underlying platform. This means that users must drain fd in read_cb before
calling notify_on_read again. Users are also expected to handle spurious
events, i.e read_cb is called while nothing can be readable from fd */
-void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure);
+void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure,
+ grpc_call_list *call_list);
/* Exactly the same semantics as above, except based on writable events. */
-void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure);
+void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure,
+ grpc_call_list *call_list);
/* Notification from the poller to an fd that it has become readable or
writable.
If allow_synchronous_callback is 1, allow running the fd callback inline
in this callstack, otherwise register an asynchronous callback and return */
-void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback);
-void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback);
+void grpc_fd_become_readable(grpc_fd *fd, grpc_call_list *call_list);
+void grpc_fd_become_writable(grpc_fd *fd, grpc_call_list *call_list);
/* Reference counting for fds */
#ifdef GRPC_FD_REF_COUNT_DEBUG
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index dd76044913..029c689982 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -88,6 +88,7 @@ void grpc_iomgr_shutdown(void) {
gpr_timespec shutdown_deadline = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10, GPR_TIMESPAN));
gpr_timespec last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_mu_lock(&g_mu);
g_shutdown = 1;
@@ -101,7 +102,11 @@ void grpc_iomgr_shutdown(void) {
}
last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
}
- if (grpc_alarm_check(&g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) {
+ if (grpc_alarm_check(gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL,
+ &call_list)) {
+ gpr_mu_unlock(&g_mu);
+ grpc_call_list_run(&call_list);
+ gpr_mu_lock(&g_mu);
continue;
}
if (g_root_object.next != &g_root_object) {
@@ -126,7 +131,8 @@ void grpc_iomgr_shutdown(void) {
}
gpr_mu_unlock(&g_mu);
- grpc_alarm_list_shutdown();
+ grpc_alarm_list_shutdown(&call_list);
+ grpc_call_list_run(&call_list);
grpc_iomgr_platform_shutdown();
gpr_mu_destroy(&g_mu);
@@ -171,12 +177,15 @@ void grpc_call_list_add(grpc_call_list *call_list, grpc_closure *closure,
call_list->tail = closure;
}
-void grpc_call_list_run(grpc_call_list call_list) {
- grpc_closure *c = call_list.head;
- while (c) {
- grpc_closure *next = c->next;
- c->cb(c->cb_arg, c->success);
- c = next;
+void grpc_call_list_run(grpc_call_list *call_list) {
+ while (!grpc_call_list_empty(*call_list)) {
+ grpc_closure *c = call_list->head;
+ call_list->head = call_list->tail = NULL;
+ while (c) {
+ grpc_closure *next = c->next;
+ c->cb(c->cb_arg, c->success, call_list);
+ c = next;
+ }
}
}
diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h
index 56f0195be9..94f3912990 100644
--- a/src/core/iomgr/iomgr.h
+++ b/src/core/iomgr/iomgr.h
@@ -34,15 +34,24 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_IOMGR_H
#define GRPC_INTERNAL_CORE_IOMGR_IOMGR_H
+struct grpc_closure;
+typedef struct grpc_closure grpc_closure;
+
+typedef struct grpc_call_list {
+ grpc_closure *head;
+ grpc_closure *tail;
+} grpc_call_list;
+
/** gRPC Callback definition.
*
* \param arg Arbitrary input.
* \param success An indication on the state of the iomgr. On false, cleanup
* actions should be taken (eg, shutdown). */
-typedef void (*grpc_iomgr_cb_func)(void *arg, int success);
+typedef void (*grpc_iomgr_cb_func)(void *arg, int success,
+ grpc_call_list *call_list);
/** A closure over a grpc_iomgr_cb_func. */
-typedef struct grpc_closure {
+struct grpc_closure {
/** Bound callback. */
grpc_iomgr_cb_func cb;
@@ -56,12 +65,7 @@ typedef struct grpc_closure {
/**< Internal. Do not touch */
struct grpc_closure *next;
-} grpc_closure;
-
-typedef struct grpc_call_list {
- grpc_closure *head;
- grpc_closure *tail;
-} grpc_call_list;
+};
/** Initializes \a closure with \a cb and \a cb_arg. */
void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
@@ -72,7 +76,7 @@ void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
void grpc_call_list_add(grpc_call_list *list, grpc_closure *closure,
int success);
-void grpc_call_list_run(grpc_call_list list);
+void grpc_call_list_run(grpc_call_list *list);
void grpc_call_list_move(grpc_call_list *src, grpc_call_list *dst);
int grpc_call_list_empty(grpc_call_list list);
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
index 337596cb74..98653412bb 100644
--- a/src/core/iomgr/pollset.h
+++ b/src/core/iomgr/pollset.h
@@ -55,9 +55,8 @@
#endif
void grpc_pollset_init(grpc_pollset *pollset);
-void grpc_pollset_shutdown(grpc_pollset *pollset,
- void (*shutdown_done)(void *arg),
- void *shutdown_done_arg);
+void grpc_pollset_shutdown(grpc_pollset *pollset, grpc_closure *closure,
+ grpc_call_list *call_list);
void grpc_pollset_destroy(grpc_pollset *pollset);
/* Do some work on a pollset.
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 5ca957b8e1..e6e0f5cdd4 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -61,7 +61,8 @@ typedef struct {
wakeup_fd_hdl *free_wakeup_fds;
} pollset_hdr;
-static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
+static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd,
+ grpc_call_list *call_list) {
pollset_hdr *h = pollset->data.ptr;
struct epoll_event ev;
int err;
@@ -83,15 +84,15 @@ static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
}
}
}
- grpc_fd_end_poll(&watcher, 0, 0);
+ grpc_fd_end_poll(&watcher, 0, 0, call_list);
}
-static void perform_delayed_add(void *arg, int iomgr_status) {
+static void perform_delayed_add(void *arg, int iomgr_status,
+ grpc_call_list *call_list) {
delayed_add *da = arg;
- int do_shutdown_cb = 0;
if (!grpc_fd_is_orphaned(da->fd)) {
- finally_add_fd(da->pollset, da->fd);
+ finally_add_fd(da->pollset, da->fd, call_list);
}
gpr_mu_lock(&da->pollset->mu);
@@ -100,26 +101,23 @@ static void perform_delayed_add(void *arg, int iomgr_status) {
/* We don't care about this pollset anymore. */
if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) {
da->pollset->called_shutdown = 1;
- do_shutdown_cb = 1;
+ grpc_call_list_add(call_list, da->pollset->shutdown_done, 1);
}
}
gpr_mu_unlock(&da->pollset->mu);
GRPC_FD_UNREF(da->fd, "delayed_add");
- if (do_shutdown_cb) {
- da->pollset->shutdown_done_cb(da->pollset->shutdown_done_arg);
- }
-
gpr_free(da);
}
static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
grpc_fd *fd,
- int and_unlock_pollset) {
+ int and_unlock_pollset,
+ grpc_call_list *call_list) {
if (and_unlock_pollset) {
gpr_mu_unlock(&pollset->mu);
- finally_add_fd(pollset, fd);
+ finally_add_fd(pollset, fd, call_list);
} else {
delayed_add *da = gpr_malloc(sizeof(*da));
da->pollset = pollset;
@@ -127,13 +125,14 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
GRPC_FD_REF(fd, "delayed_add");
grpc_closure_init(&da->closure, perform_delayed_add, da);
pollset->in_flight_cbs++;
- grpc_pollset_add_unlock_job(pollset, &da->closure);
+ grpc_call_list_add(call_list, &da->closure, 1);
}
}
static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,
grpc_fd *fd,
- int and_unlock_pollset) {
+ int and_unlock_pollset,
+ grpc_call_list *call_list) {
pollset_hdr *h = pollset->data.ptr;
int err;
@@ -153,9 +152,9 @@ static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,
/* TODO(klempner): We probably want to turn this down a bit */
#define GRPC_EPOLL_MAX_EVENTS 1000
-static void multipoll_with_epoll_pollset_maybe_work(
+static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline,
- gpr_timespec now, int allow_synchronous_callback) {
+ gpr_timespec now, grpc_call_list *call_list) {
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
int ep_rv;
int poll_rv;
@@ -209,18 +208,16 @@ static void multipoll_with_epoll_pollset_maybe_work(
int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
int write = ep_ev[i].events & EPOLLOUT;
if (read || cancel) {
- grpc_fd_become_readable(fd, allow_synchronous_callback);
+ grpc_fd_become_readable(fd, call_list);
}
if (write || cancel) {
- grpc_fd_become_writable(fd, allow_synchronous_callback);
+ grpc_fd_become_writable(fd, call_list);
}
}
}
} while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
}
}
-
- gpr_mu_lock(&pollset->mu);
}
static void multipoll_with_epoll_pollset_finish_shutdown(
@@ -234,12 +231,12 @@ static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) {
static const grpc_pollset_vtable multipoll_with_epoll_pollset = {
multipoll_with_epoll_pollset_add_fd, multipoll_with_epoll_pollset_del_fd,
- multipoll_with_epoll_pollset_maybe_work,
+ multipoll_with_epoll_pollset_maybe_work_and_unlock,
multipoll_with_epoll_pollset_finish_shutdown,
multipoll_with_epoll_pollset_destroy};
static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
- size_t nfds) {
+ size_t nfds, grpc_call_list *call_list) {
size_t i;
pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
@@ -252,7 +249,7 @@ static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
abort();
}
for (i = 0; i < nfds; i++) {
- multipoll_with_epoll_pollset_add_fd(pollset, fds[i], 0);
+ multipoll_with_epoll_pollset_add_fd(pollset, fds[i], 0, call_list);
}
}
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index cae260cab0..03a28894f0 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -61,7 +61,8 @@ typedef struct {
static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset,
grpc_fd *fd,
- int and_unlock_pollset) {
+ int and_unlock_pollset,
+ grpc_call_list *call_list) {
size_t i;
pollset_hdr *h = pollset->data.ptr;
/* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
@@ -82,7 +83,8 @@ exit:
static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset,
grpc_fd *fd,
- int and_unlock_pollset) {
+ int and_unlock_pollset,
+ grpc_call_list *call_list) {
/* will get removed next poll cycle */
pollset_hdr *h = pollset->data.ptr;
if (h->del_count == h->del_capacity) {
@@ -96,9 +98,9 @@ static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset,
}
}
-static void multipoll_with_poll_pollset_maybe_work(
+static void multipoll_with_poll_pollset_maybe_work_and_unlock(
grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline,
- gpr_timespec now, int allow_synchronous_callback) {
+ gpr_timespec now, grpc_call_list *call_list) {
int timeout;
int r;
size_t i, j, fd_count;
@@ -149,7 +151,7 @@ static void multipoll_with_poll_pollset_maybe_work(
for (i = 1; i < pfd_count; i++) {
grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN,
- pfds[i].revents & POLLOUT);
+ pfds[i].revents & POLLOUT, call_list);
}
if (r < 0) {
@@ -167,18 +169,16 @@ static void multipoll_with_poll_pollset_maybe_work(
continue;
}
if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
- grpc_fd_become_readable(watchers[i].fd, allow_synchronous_callback);
+ grpc_fd_become_readable(watchers[i].fd, call_list);
}
if (pfds[i].revents & (POLLOUT | POLLHUP | POLLERR)) {
- grpc_fd_become_writable(watchers[i].fd, allow_synchronous_callback);
+ grpc_fd_become_writable(watchers[i].fd, call_list);
}
}
}
gpr_free(pfds);
gpr_free(watchers);
-
- gpr_mu_lock(&pollset->mu);
}
static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) {
@@ -204,12 +204,12 @@ static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) {
static const grpc_pollset_vtable multipoll_with_poll_pollset = {
multipoll_with_poll_pollset_add_fd, multipoll_with_poll_pollset_del_fd,
- multipoll_with_poll_pollset_maybe_work,
+ multipoll_with_poll_pollset_maybe_work_and_unlock,
multipoll_with_poll_pollset_finish_shutdown,
multipoll_with_poll_pollset_destroy};
void grpc_poll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
- size_t nfds) {
+ size_t nfds, grpc_call_list *call_list) {
size_t i;
pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
pollset->vtable = &multipoll_with_poll_pollset;
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 0fe3f80d44..885cb29234 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -136,17 +136,14 @@ void grpc_pollset_init(grpc_pollset *pollset) {
pollset->in_flight_cbs = 0;
pollset->shutting_down = 0;
pollset->called_shutdown = 0;
- pollset->idle_jobs = NULL;
- pollset->unlock_jobs = NULL;
+ pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL;
become_basic_pollset(pollset, NULL);
}
-void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
- if (fd->workqueue->wakeup_read_fd != fd) {
- grpc_pollset_add_fd(pollset, fd->workqueue->wakeup_read_fd);
- }
+void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
+ grpc_call_list *call_list) {
gpr_mu_lock(&pollset->mu);
- pollset->vtable->add_fd(pollset, fd, 1);
+ pollset->vtable->add_fd(pollset, fd, 1, call_list);
/* the following (enabled only in debug) will reacquire and then release
our lock - meaning that if the unlocking flag passed to del_fd above is
not respected, the code will deadlock (in a way that we have a chance of
@@ -157,9 +154,10 @@ void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
#endif
}
-void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
+void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd,
+ grpc_call_list *call_list) {
gpr_mu_lock(&pollset->mu);
- pollset->vtable->del_fd(pollset, fd, 1);
+ pollset->vtable->del_fd(pollset, fd, 1, call_list);
/* the following (enabled only in debug) will reacquire and then release
our lock - meaning that if the unlocking flag passed to del_fd above is
not respected, the code will deadlock (in a way that we have a chance of
@@ -170,53 +168,27 @@ void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
#endif
}
-static void finish_shutdown(grpc_pollset *pollset) {
+static void finish_shutdown(grpc_pollset *pollset, grpc_call_list *call_list) {
pollset->vtable->finish_shutdown(pollset);
- pollset->shutdown_done_cb(pollset->shutdown_done_arg);
-}
-
-static void run_jobs(grpc_pollset *pollset, grpc_closure **root) {
- grpc_closure *exec = *root;
- *root = NULL;
- gpr_mu_unlock(&pollset->mu);
- while (exec != NULL) {
- grpc_closure *next = exec->next;
- exec->cb(exec->cb_arg, 1);
- exec = next;
- }
- gpr_mu_lock(&pollset->mu);
-}
-
-static void add_job(grpc_closure **root, grpc_closure *closure) {
- closure->next = *root;
- *root = closure;
-}
-
-void grpc_pollset_add_idle_job(grpc_pollset *pollset, grpc_closure *closure) {
- add_job(&pollset->idle_jobs, closure);
-}
-
-void grpc_pollset_add_unlock_job(grpc_pollset *pollset, grpc_closure *closure) {
- add_job(&pollset->unlock_jobs, closure);
+ grpc_call_list_add(call_list, pollset->shutdown_done, 1);
}
void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec now, gpr_timespec deadline) {
/* pollset->mu already held */
int added_worker = 0;
+ int locked = 1;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
/* this must happen before we (potentially) drop pollset->mu */
worker->next = worker->prev = NULL;
/* TODO(ctiller): pool these */
grpc_wakeup_fd_init(&worker->wakeup_fd);
- if (!grpc_pollset_has_workers(pollset) && pollset->idle_jobs != NULL) {
- run_jobs(pollset, &pollset->idle_jobs);
+ if (!grpc_pollset_has_workers(pollset) &&
+ !grpc_call_list_empty(pollset->idle_jobs)) {
+ grpc_call_list_move(&pollset->idle_jobs, &call_list);
goto done;
}
- if (pollset->unlock_jobs != NULL) {
- run_jobs(pollset, &pollset->unlock_jobs);
- goto done;
- }
- if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
+ if (grpc_alarm_check(now, &deadline, &call_list)) {
goto done;
}
if (pollset->shutting_down) {
@@ -225,19 +197,32 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
if (pollset->in_flight_cbs) {
/* Give do_promote priority so we don't starve it out */
gpr_mu_unlock(&pollset->mu);
- gpr_mu_lock(&pollset->mu);
+ locked = 0;
goto done;
}
if (!pollset->kicked_without_pollers) {
push_front_worker(pollset, worker);
added_worker = 1;
gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
- pollset->vtable->maybe_work(pollset, worker, deadline, now, 1);
+ pollset->vtable->maybe_work_and_unlock(pollset, worker, deadline, now,
+ NULL);
+ locked = 0;
gpr_tls_set(&g_current_thread_poller, 0);
} else {
pollset->kicked_without_pollers = 0;
}
done:
+ if (!grpc_call_list_empty(call_list)) {
+ if (locked) {
+ gpr_mu_unlock(&pollset->mu);
+ locked = 0;
+ }
+ grpc_call_list_run(&call_list);
+ }
+ if (!locked) {
+ gpr_mu_lock(&pollset->mu);
+ locked = 1;
+ }
grpc_wakeup_fd_destroy(&worker->wakeup_fd);
if (added_worker) {
remove_worker(pollset, worker);
@@ -248,7 +233,8 @@ done:
} else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
pollset->called_shutdown = 1;
gpr_mu_unlock(&pollset->mu);
- finish_shutdown(pollset);
+ finish_shutdown(pollset, &call_list);
+ grpc_call_list_run(&call_list);
/* Continuing to access pollset here is safe -- it is the caller's
* responsibility to not destroy when it has outstanding calls to
* grpc_pollset_work.
@@ -258,9 +244,8 @@ done:
}
}
-void grpc_pollset_shutdown(grpc_pollset *pollset,
- void (*shutdown_done)(void *arg),
- void *shutdown_done_arg) {
+void grpc_pollset_shutdown(grpc_pollset *pollset, grpc_closure *closure,
+ grpc_call_list *call_list) {
int call_shutdown = 0;
gpr_mu_lock(&pollset->mu);
GPR_ASSERT(!pollset->shutting_down);
@@ -270,13 +255,12 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
pollset->called_shutdown = 1;
call_shutdown = 1;
}
- pollset->shutdown_done_cb = shutdown_done;
- pollset->shutdown_done_arg = shutdown_done_arg;
+ pollset->shutdown_done = closure;
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
gpr_mu_unlock(&pollset->mu);
if (call_shutdown) {
- finish_shutdown(pollset);
+ finish_shutdown(pollset, call_list);
}
}
@@ -317,12 +301,12 @@ typedef struct grpc_unary_promote_args {
grpc_closure promotion_closure;
} grpc_unary_promote_args;
-static void basic_do_promote(void *args, int success) {
+static void basic_do_promote(void *args, int success,
+ grpc_call_list *call_list) {
grpc_unary_promote_args *up_args = args;
const grpc_pollset_vtable *original_vtable = up_args->original_vtable;
grpc_pollset *pollset = up_args->pollset;
grpc_fd *fd = up_args->fd;
- int do_shutdown_cb = 0;
/*
* This is quite tricky. There are a number of cases to keep in mind here:
@@ -349,19 +333,20 @@ static void basic_do_promote(void *args, int success) {
if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) {
GPR_ASSERT(!grpc_pollset_has_workers(pollset));
pollset->called_shutdown = 1;
- do_shutdown_cb = 1;
+ grpc_call_list_add(call_list, pollset->shutdown_done, 1);
}
} else if (grpc_fd_is_orphaned(fd)) {
/* Don't try to add it to anything, we'll drop our ref on it below */
} else if (pollset->vtable != original_vtable) {
- pollset->vtable->add_fd(pollset, fd, 0);
+ pollset->vtable->add_fd(pollset, fd, 0, call_list);
} else if (fd != pollset->data.ptr) {
grpc_fd *fds[2];
fds[0] = pollset->data.ptr;
fds[1] = fd;
if (fds[0] && !grpc_fd_is_orphaned(fds[0])) {
- grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds));
+ grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds),
+ call_list);
GRPC_FD_UNREF(fds[0], "basicpoll");
} else {
/* old fd is orphaned and we haven't cleaned it up until now, so remain a
@@ -376,16 +361,15 @@ static void basic_do_promote(void *args, int success) {
gpr_mu_unlock(&pollset->mu);
- if (do_shutdown_cb) {
- pollset->shutdown_done_cb(pollset->shutdown_done_arg);
- }
-
/* Matching ref in basic_pollset_add_fd */
GRPC_FD_UNREF(fd, "basicpoll_add");
+
+ grpc_call_list_run(call_list);
}
static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
- int and_unlock_pollset) {
+ int and_unlock_pollset,
+ grpc_call_list *call_list) {
grpc_unary_promote_args *up_args;
GPR_ASSERT(fd);
if (fd == pollset->data.ptr) goto exit;
@@ -402,7 +386,8 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
pollset->data.ptr = fd;
GRPC_FD_REF(fd, "basicpoll");
} else if (!grpc_fd_is_orphaned(fds[0])) {
- grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds));
+ grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds),
+ call_list);
GRPC_FD_UNREF(fds[0], "basicpoll");
} else {
/* old fd is orphaned and we haven't cleaned it up until now, so remain a
@@ -424,7 +409,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
up_args->promotion_closure.cb = basic_do_promote;
up_args->promotion_closure.cb_arg = up_args;
- grpc_pollset_add_idle_job(pollset, &up_args->promotion_closure);
+ grpc_call_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1);
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
exit:
@@ -434,7 +419,8 @@ exit:
}
static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd,
- int and_unlock_pollset) {
+ int and_unlock_pollset,
+ grpc_call_list *call_list) {
GPR_ASSERT(fd);
if (fd == pollset->data.ptr) {
GRPC_FD_UNREF(pollset->data.ptr, "basicpoll");
@@ -446,10 +432,11 @@ static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd,
}
}
-static void basic_pollset_maybe_work(grpc_pollset *pollset,
- grpc_pollset_worker *worker,
- gpr_timespec deadline, gpr_timespec now,
- int allow_synchronous_callback) {
+static void basic_pollset_maybe_work_and_unlock(grpc_pollset *pollset,
+ grpc_pollset_worker *worker,
+ gpr_timespec deadline,
+ gpr_timespec now,
+ grpc_call_list *call_list) {
struct pollfd pfd[2];
grpc_fd *fd;
grpc_fd_watcher fd_watcher;
@@ -487,7 +474,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
if (fd) {
grpc_fd_end_poll(&fd_watcher, pfd[1].revents & POLLIN,
- pfd[1].revents & POLLOUT);
+ pfd[1].revents & POLLOUT, call_list);
}
if (r < 0) {
@@ -502,15 +489,13 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
}
if (nfds > 1) {
if (pfd[1].revents & (POLLIN | POLLHUP | POLLERR)) {
- grpc_fd_become_readable(fd, allow_synchronous_callback);
+ grpc_fd_become_readable(fd, call_list);
}
if (pfd[1].revents & (POLLOUT | POLLHUP | POLLERR)) {
- grpc_fd_become_writable(fd, allow_synchronous_callback);
+ grpc_fd_become_writable(fd, call_list);
}
}
}
-
- gpr_mu_lock(&pollset->mu);
}
static void basic_pollset_destroy(grpc_pollset *pollset) {
@@ -521,8 +506,9 @@ static void basic_pollset_destroy(grpc_pollset *pollset) {
}
static const grpc_pollset_vtable basic_pollset = {
- basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work,
- basic_pollset_destroy, basic_pollset_destroy};
+ basic_pollset_add_fd, basic_pollset_del_fd,
+ basic_pollset_maybe_work_and_unlock, basic_pollset_destroy,
+ basic_pollset_destroy};
static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) {
pollset->vtable = &basic_pollset;
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index 4f09f870f7..c65f25fd18 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -65,10 +65,8 @@ typedef struct grpc_pollset {
int shutting_down;
int called_shutdown;
int kicked_without_pollers;
- void (*shutdown_done_cb)(void *arg);
- void *shutdown_done_arg;
- grpc_closure *unlock_jobs;
- grpc_closure *idle_jobs;
+ grpc_closure *shutdown_done;
+ grpc_call_list idle_jobs;
union {
int fd;
void *ptr;
@@ -77,12 +75,13 @@ typedef struct grpc_pollset {
struct grpc_pollset_vtable {
void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd,
- int and_unlock_pollset);
+ int and_unlock_pollset, grpc_call_list *call_list);
void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd,
- int and_unlock_pollset);
- void (*maybe_work)(grpc_pollset *pollset, grpc_pollset_worker *worker,
- gpr_timespec deadline, gpr_timespec now,
- int allow_synchronous_callback);
+ int and_unlock_pollset, grpc_call_list *call_list);
+ void (*maybe_work_and_unlock)(grpc_pollset *pollset,
+ grpc_pollset_worker *worker,
+ gpr_timespec deadline, gpr_timespec now,
+ grpc_call_list *call_list);
void (*finish_shutdown)(grpc_pollset *pollset);
void (*destroy)(grpc_pollset *pollset);
};
@@ -90,10 +89,12 @@ struct grpc_pollset_vtable {
#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu)
/* Add an fd to a pollset */
-void grpc_pollset_add_fd(grpc_pollset *pollset, struct grpc_fd *fd);
+void grpc_pollset_add_fd(grpc_pollset *pollset, struct grpc_fd *fd,
+ grpc_call_list *call_list);
/* Force remove an fd from a pollset (normally they are removed on the next
poll after an fd is orphaned) */
-void grpc_pollset_del_fd(grpc_pollset *pollset, struct grpc_fd *fd);
+void grpc_pollset_del_fd(grpc_pollset *pollset, struct grpc_fd *fd,
+ grpc_call_list *call_list);
/* Returns the fd to listen on for kicks */
int grpc_kick_read_fd(grpc_pollset *p);
@@ -111,13 +112,13 @@ int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
gpr_timespec now);
/* turn a pollset into a multipoller: platform specific */
-typedef void (*grpc_platform_become_multipoller_type)(grpc_pollset *pollset,
- struct grpc_fd **fds,
- size_t fd_count);
+typedef void (*grpc_platform_become_multipoller_type)(
+ grpc_pollset *pollset, struct grpc_fd **fds, size_t fd_count,
+ grpc_call_list *call_list);
extern grpc_platform_become_multipoller_type grpc_platform_become_multipoller;
void grpc_poll_become_multipoller(grpc_pollset *pollset, struct grpc_fd **fds,
- size_t fd_count);
+ size_t fd_count, grpc_call_list *call_list);
/* Return 1 if the pollset has active threads in grpc_pollset_work (pollset must
* be locked) */
@@ -127,9 +128,4 @@ int grpc_pollset_has_workers(grpc_pollset *pollset);
typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
-/** schedule a closure to be run next time there are no active workers */
-void grpc_pollset_add_idle_job(grpc_pollset *pollset, grpc_closure *closure);
-/** schedule a closure to be run next time the pollset is unlocked */
-void grpc_pollset_add_unlock_job(grpc_pollset *pollset, grpc_closure *closure);
-
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */
diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h
index 6d73951c70..ca667bf34e 100644
--- a/src/core/iomgr/pollset_set.h
+++ b/src/core/iomgr/pollset_set.h
@@ -52,8 +52,10 @@
void grpc_pollset_set_init(grpc_pollset_set *pollset_set);
void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set);
void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set,
- grpc_pollset *pollset);
+ grpc_pollset *pollset,
+ grpc_call_list *call_list);
void grpc_pollset_set_del_pollset(grpc_pollset_set *pollset_set,
- grpc_pollset *pollset);
+ grpc_pollset *pollset,
+ grpc_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */
diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c
index 2076ac70ef..a2c1f47629 100644
--- a/src/core/iomgr/pollset_set_posix.c
+++ b/src/core/iomgr/pollset_set_posix.c
@@ -59,7 +59,8 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) {
}
void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set,
- grpc_pollset *pollset) {
+ grpc_pollset *pollset,
+ grpc_call_list *call_list) {
size_t i, j;
gpr_mu_lock(&pollset_set->mu);
if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
@@ -74,7 +75,7 @@ void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set,
if (grpc_fd_is_orphaned(pollset_set->fds[i])) {
GRPC_FD_UNREF(pollset_set->fds[i], "pollset");
} else {
- grpc_pollset_add_fd(pollset, pollset_set->fds[i]);
+ grpc_pollset_add_fd(pollset, pollset_set->fds[i], call_list);
pollset_set->fds[j++] = pollset_set->fds[i];
}
}
@@ -83,7 +84,8 @@ void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set,
}
void grpc_pollset_set_del_pollset(grpc_pollset_set *pollset_set,
- grpc_pollset *pollset) {
+ grpc_pollset *pollset,
+ grpc_call_list *call_list) {
size_t i;
gpr_mu_lock(&pollset_set->mu);
for (i = 0; i < pollset_set->pollset_count; i++) {
@@ -97,7 +99,8 @@ void grpc_pollset_set_del_pollset(grpc_pollset_set *pollset_set,
gpr_mu_unlock(&pollset_set->mu);
}
-void grpc_pollset_set_add_fd(grpc_pollset_set *pollset_set, grpc_fd *fd) {
+void grpc_pollset_set_add_fd(grpc_pollset_set *pollset_set, grpc_fd *fd,
+ grpc_call_list *call_list) {
size_t i;
gpr_mu_lock(&pollset_set->mu);
if (pollset_set->fd_count == pollset_set->fd_capacity) {
@@ -108,12 +111,13 @@ void grpc_pollset_set_add_fd(grpc_pollset_set *pollset_set, grpc_fd *fd) {
GRPC_FD_REF(fd, "pollset_set");
pollset_set->fds[pollset_set->fd_count++] = fd;
for (i = 0; i < pollset_set->pollset_count; i++) {
- grpc_pollset_add_fd(pollset_set->pollsets[i], fd);
+ grpc_pollset_add_fd(pollset_set->pollsets[i], fd, call_list);
}
gpr_mu_unlock(&pollset_set->mu);
}
-void grpc_pollset_set_del_fd(grpc_pollset_set *pollset_set, grpc_fd *fd) {
+void grpc_pollset_set_del_fd(grpc_pollset_set *pollset_set, grpc_fd *fd,
+ grpc_call_list *call_list) {
size_t i;
gpr_mu_lock(&pollset_set->mu);
for (i = 0; i < pollset_set->fd_count; i++) {
diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h
index e88740bde1..e9969d2c0f 100644
--- a/src/core/iomgr/pollset_set_posix.h
+++ b/src/core/iomgr/pollset_set_posix.h
@@ -49,7 +49,9 @@ typedef struct grpc_pollset_set {
grpc_fd **fds;
} grpc_pollset_set;
-void grpc_pollset_set_add_fd(grpc_pollset_set *pollset_set, grpc_fd *fd);
-void grpc_pollset_set_del_fd(grpc_pollset_set *pollset_set, grpc_fd *fd);
+void grpc_pollset_set_add_fd(grpc_pollset_set *pollset_set, grpc_fd *fd,
+ grpc_call_list *call_list);
+void grpc_pollset_set_del_fd(grpc_pollset_set *pollset_set, grpc_fd *fd,
+ grpc_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */
diff --git a/src/core/iomgr/resolve_address.h b/src/core/iomgr/resolve_address.h
index 9f361cb892..ec8d83fffa 100644
--- a/src/core/iomgr/resolve_address.h
+++ b/src/core/iomgr/resolve_address.h
@@ -34,7 +34,7 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_RESOLVE_ADDRESS_H
#define GRPC_INTERNAL_CORE_IOMGR_RESOLVE_ADDRESS_H
-#include <stddef.h>
+#include "src/core/iomgr/iomgr.h"
#define GRPC_MAX_SOCKADDR_SIZE 128
@@ -52,7 +52,8 @@ typedef struct {
On success: addresses is the result, and the callee must call
grpc_resolved_addresses_destroy when it's done with them
On failure: addresses is NULL */
-typedef void (*grpc_resolve_cb)(void *arg, grpc_resolved_addresses *addresses);
+typedef void (*grpc_resolve_cb)(void *arg, grpc_resolved_addresses *addresses,
+ grpc_call_list *call_list);
/* Asynchronously resolve addr. Use default_port if a port isn't designated
in addr, otherwise use the port in addr. */
/* TODO(ctiller): add a timeout here */
diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c
index ce6972b797..ea64f8176f 100644
--- a/src/core/iomgr/resolve_address_posix.c
+++ b/src/core/iomgr/resolve_address_posix.c
@@ -144,17 +144,19 @@ done:
}
/* Thread function to asynch-ify grpc_blocking_resolve_address */
-static void do_request(void *rp) {
+static void do_request_thread(void *rp) {
request *r = rp;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_resolved_addresses *resolved =
grpc_blocking_resolve_address(r->name, r->default_port);
void *arg = r->arg;
grpc_resolve_cb cb = r->cb;
gpr_free(r->name);
gpr_free(r->default_port);
- cb(arg, resolved);
+ cb(arg, resolved, &call_list);
grpc_iomgr_unregister_object(&r->iomgr_object);
gpr_free(r);
+ grpc_call_list_run(&call_list);
}
void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
@@ -175,7 +177,7 @@ void grpc_resolve_address(const char *name, const char *default_port,
r->default_port = gpr_strdup(default_port);
r->cb = cb;
r->arg = arg;
- gpr_thd_new(&id, do_request, r, NULL);
+ gpr_thd_new(&id, do_request_thread, r, NULL);
}
#endif
diff --git a/src/core/iomgr/tcp_client.h b/src/core/iomgr/tcp_client.h
index 57f80016c2..8fb5bfda1d 100644
--- a/src/core/iomgr/tcp_client.h
+++ b/src/core/iomgr/tcp_client.h
@@ -44,10 +44,9 @@
NULL on failure).
interested_parties points to a set of pollsets that would be interested
in this connection being established (in order to continue their work) */
-void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
- void *arg, grpc_pollset_set *interested_parties,
- grpc_workqueue *workqueue,
+void grpc_tcp_client_connect(grpc_closure *on_connect, grpc_endpoint **endpoint,
+ grpc_pollset_set *interested_parties,
const struct sockaddr *addr, size_t addr_len,
- gpr_timespec deadline);
+ gpr_timespec deadline, grpc_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_CLIENT_H */
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index 1ea2155060..a4828201c7 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -57,8 +57,6 @@
extern int grpc_tcp_trace;
typedef struct {
- void (*cb)(void *arg, grpc_endpoint *tcp);
- void *cb_arg;
gpr_mu mu;
grpc_fd *fd;
gpr_timespec deadline;
@@ -67,6 +65,8 @@ typedef struct {
grpc_closure write_closure;
grpc_pollset_set *interested_parties;
char *addr_str;
+ grpc_endpoint **ep;
+ grpc_closure *closure;
} async_connect;
static int prepare_socket(const struct sockaddr *addr, int fd) {
@@ -91,7 +91,7 @@ error:
return 0;
}
-static void tc_on_alarm(void *acp, int success) {
+static void tc_on_alarm(void *acp, int success, grpc_call_list *call_list) {
int done;
async_connect *ac = acp;
if (grpc_tcp_trace) {
@@ -100,7 +100,7 @@ static void tc_on_alarm(void *acp, int success) {
}
gpr_mu_lock(&ac->mu);
if (ac->fd != NULL) {
- grpc_fd_shutdown(ac->fd);
+ grpc_fd_shutdown(ac->fd, call_list);
}
done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
@@ -111,15 +111,14 @@ static void tc_on_alarm(void *acp, int success) {
}
}
-static void on_writable(void *acp, int success) {
+static void on_writable(void *acp, int success, grpc_call_list *call_list) {
async_connect *ac = acp;
int so_error = 0;
socklen_t so_error_size;
int err;
int done;
- grpc_endpoint *ep = NULL;
- void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb;
- void *cb_arg = ac->cb_arg;
+ grpc_endpoint **ep = ac->ep;
+ grpc_closure *closure = ac->closure;
grpc_fd *fd;
if (grpc_tcp_trace) {
@@ -133,7 +132,7 @@ static void on_writable(void *acp, int success) {
ac->fd = NULL;
gpr_mu_unlock(&ac->mu);
- grpc_alarm_cancel(&ac->alarm);
+ grpc_alarm_cancel(&ac->alarm, call_list);
gpr_mu_lock(&ac->mu);
if (success) {
@@ -162,7 +161,7 @@ static void on_writable(void *acp, int success) {
don't do that! */
gpr_log(GPR_ERROR, "kernel out of buffers");
gpr_mu_unlock(&ac->mu);
- grpc_fd_notify_on_write(fd, &ac->write_closure);
+ grpc_fd_notify_on_write(fd, &ac->write_closure, call_list);
return;
} else {
switch (so_error) {
@@ -176,8 +175,8 @@ static void on_writable(void *acp, int success) {
goto finish;
}
} else {
- grpc_pollset_set_del_fd(ac->interested_parties, fd);
- ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str);
+ grpc_pollset_set_del_fd(ac->interested_parties, fd, call_list);
+ *ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str);
fd = NULL;
goto finish;
}
@@ -190,8 +189,8 @@ static void on_writable(void *acp, int success) {
finish:
if (fd != NULL) {
- grpc_pollset_set_del_fd(ac->interested_parties, fd);
- grpc_fd_orphan(fd, NULL, "tcp_client_orphan");
+ grpc_pollset_set_del_fd(ac->interested_parties, fd, call_list);
+ grpc_fd_orphan(fd, NULL, "tcp_client_orphan", call_list);
fd = NULL;
}
done = (--ac->refs == 0);
@@ -201,14 +200,14 @@ finish:
gpr_free(ac->addr_str);
gpr_free(ac);
}
- cb(cb_arg, ep);
+ grpc_call_list_add(call_list, closure, *ep != NULL);
}
-void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
- void *arg, grpc_pollset_set *interested_parties,
+void grpc_tcp_client_connect(grpc_closure *closure, grpc_endpoint **ep,
+ grpc_pollset_set *interested_parties,
grpc_workqueue *workqueue,
const struct sockaddr *addr, size_t addr_len,
- gpr_timespec deadline) {
+ gpr_timespec deadline, grpc_call_list *call_list) {
int fd;
grpc_dualstack_mode dsmode;
int err;
@@ -219,6 +218,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
char *name;
char *addr_str;
+ *ep = NULL;
+
/* Use dualstack sockets where available. */
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
addr = (const struct sockaddr *)&addr6_v4mapped;
@@ -236,7 +237,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
addr_len = sizeof(addr4_copy);
}
if (!prepare_socket(addr, fd)) {
- cb(arg, NULL);
+ grpc_call_list_add(call_list, closure, 0);
return;
}
@@ -248,25 +249,26 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
addr_str = grpc_sockaddr_to_uri(addr);
gpr_asprintf(&name, "tcp-client:%s", addr_str);
- fdobj = grpc_fd_create(fd, workqueue, name);
+ fdobj = grpc_fd_create(fd, name);
if (err >= 0) {
- cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str));
+ *ep = grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str);
+ grpc_call_list_add(call_list, closure, 1);
goto done;
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
gpr_log(GPR_ERROR, "connect error to '%s': %s", addr_str, strerror(errno));
- grpc_fd_orphan(fdobj, NULL, "tcp_client_connect_error");
- cb(arg, NULL);
+ grpc_fd_orphan(fdobj, NULL, "tcp_client_connect_error", call_list);
+ grpc_call_list_add(call_list, closure, 0);
goto done;
}
- grpc_pollset_set_add_fd(interested_parties, fdobj);
+ grpc_pollset_set_add_fd(interested_parties, fdobj, call_list);
ac = gpr_malloc(sizeof(async_connect));
- ac->cb = cb;
- ac->cb_arg = arg;
+ ac->closure = closure;
+ ac->ep = ep;
ac->fd = fdobj;
ac->interested_parties = interested_parties;
ac->addr_str = addr_str;
@@ -284,8 +286,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
gpr_mu_lock(&ac->mu);
grpc_alarm_init(&ac->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
- tc_on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC));
- grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
+ tc_on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC), call_list);
+ grpc_fd_notify_on_write(ac->fd, &ac->write_closure, call_list);
gpr_mu_unlock(&ac->mu);
done:
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 374d2f3a40..89a85f26dc 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -94,30 +94,33 @@ typedef struct {
char *peer_string;
} grpc_tcp;
-static void tcp_handle_read(void *arg /* grpc_tcp */, int success);
-static void tcp_handle_write(void *arg /* grpc_tcp */, int success);
+static void tcp_handle_read(void *arg /* grpc_tcp */, int success,
+ grpc_call_list *call_list);
+static void tcp_handle_write(void *arg /* grpc_tcp */, int success,
+ grpc_call_list *call_list);
-static void tcp_shutdown(grpc_endpoint *ep) {
+static void tcp_shutdown(grpc_endpoint *ep, grpc_call_list *call_list) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- grpc_fd_shutdown(tcp->em_fd);
+ grpc_fd_shutdown(tcp->em_fd, call_list);
}
-static void tcp_free(grpc_tcp *tcp) {
- grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan");
+static void tcp_free(grpc_tcp *tcp, grpc_call_list *call_list) {
+ grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan", call_list);
gpr_free(tcp->peer_string);
gpr_free(tcp);
}
/*#define GRPC_TCP_REFCOUNT_DEBUG*/
#ifdef GRPC_TCP_REFCOUNT_DEBUG
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
+#define TCP_UNREF(tcp, reason, cl) \
+ tcp_unref((tcp), (cl), (reason), __FILE__, __LINE__)
#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
-static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
- int line) {
+static void tcp_unref(grpc_tcp *tcp, grpc_call_list *call_list,
+ const char *reason, const char *file, int line) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
reason, tcp->refcount.count, tcp->refcount.count - 1);
if (gpr_unref(&tcp->refcount)) {
- tcp_free(tcp);
+ tcp_free(tcp, call_list);
}
}
@@ -128,23 +131,24 @@ static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
gpr_ref(&tcp->refcount);
}
#else
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
+#define TCP_UNREF(tcp, reason, cl) tcp_unref((tcp), (cl))
#define TCP_REF(tcp, reason) tcp_ref((tcp))
-static void tcp_unref(grpc_tcp *tcp) {
+static void tcp_unref(grpc_tcp *tcp, grpc_call_list *call_list) {
if (gpr_unref(&tcp->refcount)) {
- tcp_free(tcp);
+ tcp_free(tcp, call_list);
}
}
static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
#endif
-static void tcp_destroy(grpc_endpoint *ep) {
+static void tcp_destroy(grpc_endpoint *ep, grpc_call_list *call_list) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- TCP_UNREF(tcp, "destroy");
+ TCP_UNREF(tcp, "destroy", call_list);
}
-static void call_read_cb(grpc_tcp *tcp, int success) {
+static void call_read_cb(grpc_tcp *tcp, int success,
+ grpc_call_list *call_list) {
grpc_closure *cb = tcp->read_cb;
if (grpc_tcp_trace) {
@@ -160,11 +164,11 @@ static void call_read_cb(grpc_tcp *tcp, int success) {
tcp->read_cb = NULL;
tcp->incoming_buffer = NULL;
- cb->cb(cb->cb_arg, success);
+ cb->cb(cb->cb_arg, success, call_list);
}
#define MAX_READ_IOVEC 4
-static void tcp_continue_read(grpc_tcp *tcp) {
+static void tcp_continue_read(grpc_tcp *tcp, grpc_call_list *call_list) {
struct msghdr msg;
struct iovec iov[MAX_READ_IOVEC];
ssize_t read_bytes;
@@ -206,18 +210,18 @@ static void tcp_continue_read(grpc_tcp *tcp) {
tcp->iov_size /= 2;
}
/* We've consumed the edge, request a new one */
- grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
+ grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure, call_list);
} else {
/* TODO(klempner): Log interesting errors */
gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(tcp, 0);
- TCP_UNREF(tcp, "read");
+ call_read_cb(tcp, 0, call_list);
+ TCP_UNREF(tcp, "read", call_list);
}
} else if (read_bytes == 0) {
/* 0 read size ==> end of stream */
gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(tcp, 0);
- TCP_UNREF(tcp, "read");
+ call_read_cb(tcp, 0, call_list);
+ TCP_UNREF(tcp, "read", call_list);
} else {
GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
if ((size_t)read_bytes < tcp->incoming_buffer->length) {
@@ -228,29 +232,29 @@ static void tcp_continue_read(grpc_tcp *tcp) {
++tcp->iov_size;
}
GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
- call_read_cb(tcp, 1);
- TCP_UNREF(tcp, "read");
+ call_read_cb(tcp, 1, call_list);
+ TCP_UNREF(tcp, "read", call_list);
}
GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0);
}
-static void tcp_handle_read(void *arg /* grpc_tcp */, int success) {
+static void tcp_handle_read(void *arg /* grpc_tcp */, int success,
+ grpc_call_list *call_list) {
grpc_tcp *tcp = (grpc_tcp *)arg;
GPR_ASSERT(!tcp->finished_edge);
if (!success) {
gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(tcp, 0);
- TCP_UNREF(tcp, "read");
+ call_read_cb(tcp, 0, call_list);
+ TCP_UNREF(tcp, "read", call_list);
} else {
- tcp_continue_read(tcp);
+ tcp_continue_read(tcp, call_list);
}
}
-static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep,
- gpr_slice_buffer *incoming_buffer,
- grpc_closure *cb) {
+static void tcp_read(grpc_endpoint *ep, gpr_slice_buffer *incoming_buffer,
+ grpc_closure *cb, grpc_call_list *call_list) {
grpc_tcp *tcp = (grpc_tcp *)ep;
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
@@ -259,16 +263,16 @@ static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep,
TCP_REF(tcp, "read");
if (tcp->finished_edge) {
tcp->finished_edge = 0;
- grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
+ grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure, call_list);
} else {
- grpc_workqueue_push(tcp->em_fd->workqueue, &tcp->read_closure, 1);
+ grpc_call_list_add(call_list, &tcp->read_closure, 1);
}
- /* TODO(ctiller): immediate return */
- return GRPC_ENDPOINT_PENDING;
}
+typedef enum { FLUSH_DONE, FLUSH_PENDING, FLUSH_ERROR } flush_result;
+
#define MAX_WRITE_IOVEC 16
-static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) {
+static flush_result tcp_flush(grpc_tcp *tcp) {
struct msghdr msg;
struct iovec iov[MAX_WRITE_IOVEC];
msg_iovlen_type iov_size;
@@ -318,10 +322,10 @@ static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) {
if (errno == EAGAIN) {
tcp->outgoing_slice_idx = unwind_slice_idx;
tcp->outgoing_byte_idx = unwind_byte_idx;
- return GRPC_ENDPOINT_PENDING;
+ return FLUSH_PENDING;
} else {
/* TODO(klempner): Log some of these */
- return GRPC_ENDPOINT_ERROR;
+ return FLUSH_ERROR;
}
}
@@ -342,42 +346,42 @@ static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) {
}
if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
- return GRPC_ENDPOINT_DONE;
+ return FLUSH_DONE;
}
};
}
-static void tcp_handle_write(void *arg /* grpc_tcp */, int success) {
+static void tcp_handle_write(void *arg /* grpc_tcp */, int success,
+ grpc_call_list *call_list) {
grpc_tcp *tcp = (grpc_tcp *)arg;
- grpc_endpoint_op_status status;
+ flush_result status;
grpc_closure *cb;
if (!success) {
cb = tcp->write_cb;
tcp->write_cb = NULL;
- cb->cb(cb->cb_arg, 0);
- TCP_UNREF(tcp, "write");
+ cb->cb(cb->cb_arg, 0, call_list);
+ TCP_UNREF(tcp, "write", call_list);
return;
}
GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0);
status = tcp_flush(tcp);
- if (status == GRPC_ENDPOINT_PENDING) {
- grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
+ if (status == FLUSH_PENDING) {
+ grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure, call_list);
} else {
cb = tcp->write_cb;
tcp->write_cb = NULL;
- cb->cb(cb->cb_arg, status == GRPC_ENDPOINT_DONE);
- TCP_UNREF(tcp, "write");
+ cb->cb(cb->cb_arg, status == FLUSH_DONE, call_list);
+ TCP_UNREF(tcp, "write", call_list);
}
GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0);
}
-static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep,
- gpr_slice_buffer *buf,
- grpc_closure *cb) {
+static void tcp_write(grpc_endpoint *ep, gpr_slice_buffer *buf,
+ grpc_closure *cb, grpc_call_list *call_list) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- grpc_endpoint_op_status status;
+ flush_result status;
if (grpc_tcp_trace) {
size_t i;
@@ -395,32 +399,36 @@ static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep,
if (buf->length == 0) {
GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0);
- return GRPC_ENDPOINT_DONE;
+ grpc_call_list_add(call_list, cb, 1);
+ return;
}
tcp->outgoing_buffer = buf;
tcp->outgoing_slice_idx = 0;
tcp->outgoing_byte_idx = 0;
status = tcp_flush(tcp);
- if (status == GRPC_ENDPOINT_PENDING) {
+ if (status == FLUSH_PENDING) {
TCP_REF(tcp, "write");
tcp->write_cb = cb;
- grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
+ grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure, call_list);
+ } else {
+ grpc_call_list_add(call_list, cb, status == FLUSH_DONE);
}
GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0);
- return status;
}
-static void tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
+static void tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset,
+ grpc_call_list *call_list) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- grpc_pollset_add_fd(pollset, tcp->em_fd);
+ grpc_pollset_add_fd(pollset, tcp->em_fd, call_list);
}
static void tcp_add_to_pollset_set(grpc_endpoint *ep,
- grpc_pollset_set *pollset_set) {
+ grpc_pollset_set *pollset_set,
+ grpc_call_list *call_list) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- grpc_pollset_set_add_fd(pollset_set, tcp->em_fd);
+ grpc_pollset_set_add_fd(pollset_set, tcp->em_fd, call_list);
}
static char *tcp_get_peer(grpc_endpoint *ep) {
diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h
index 5165f5c5ca..19e58bc294 100644
--- a/src/core/iomgr/tcp_server.h
+++ b/src/core/iomgr/tcp_server.h
@@ -40,7 +40,8 @@
typedef struct grpc_tcp_server grpc_tcp_server;
/* New server callback: tcp is the newly connected tcp connection */
-typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep);
+typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep,
+ grpc_call_list *call_list);
/* Create a server, initially not bound to any ports */
grpc_tcp_server *grpc_tcp_server_create(void);
@@ -48,7 +49,7 @@ grpc_tcp_server *grpc_tcp_server_create(void);
/* Start listening to bound ports */
void grpc_tcp_server_start(grpc_tcp_server *server, grpc_pollset **pollsets,
size_t pollset_count, grpc_tcp_server_cb cb,
- void *cb_arg);
+ void *cb_arg, grpc_call_list *call_list);
/* Add a port to the server, returning port number on success, or negative
on failure.
@@ -71,8 +72,7 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
up when grpc_tcp_server_destroy is called. */
int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index);
-void grpc_tcp_server_destroy(grpc_tcp_server *server,
- void (*shutdown_done)(void *shutdown_done_arg),
- void *shutdown_done_arg);
+void grpc_tcp_server_destroy(grpc_tcp_server *server, grpc_closure *closure,
+ grpc_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H */
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index 213b2e1113..0c5e0053dd 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -117,16 +117,12 @@ struct grpc_tcp_server {
size_t port_capacity;
/* shutdown callback */
- void (*shutdown_complete)(void *);
- void *shutdown_complete_arg;
+ grpc_closure *shutdown_complete;
/* all pollsets interested in new connections */
grpc_pollset **pollsets;
/* number of pollsets in the pollsets array */
size_t pollset_count;
-
- /** workqueue for interally created async work */
- grpc_workqueue *workqueue;
};
grpc_tcp_server *grpc_tcp_server_create(void) {
@@ -140,40 +136,37 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
s->nports = 0;
s->port_capacity = INIT_PORT_CAP;
- s->workqueue = grpc_workqueue_create();
return s;
}
-static void finish_shutdown(grpc_tcp_server *s) {
- s->shutdown_complete(s->shutdown_complete_arg);
- s->shutdown_complete = NULL;
+static void finish_shutdown(grpc_tcp_server *s, grpc_call_list *call_list) {
+ grpc_call_list_add(call_list, s->shutdown_complete, 1);
gpr_mu_destroy(&s->mu);
gpr_free(s->ports);
- GRPC_WORKQUEUE_UNREF(s->workqueue, "destroy");
gpr_free(s);
}
-static void destroyed_port(void *server, int success) {
+static void destroyed_port(void *server, int success,
+ grpc_call_list *call_list) {
grpc_tcp_server *s = server;
gpr_mu_lock(&s->mu);
s->destroyed_ports++;
if (s->destroyed_ports == s->nports) {
gpr_mu_unlock(&s->mu);
- finish_shutdown(s);
+ finish_shutdown(s, call_list);
} else {
GPR_ASSERT(s->destroyed_ports < s->nports);
gpr_mu_unlock(&s->mu);
}
}
-static void dont_care_about_shutdown_completion(void *ignored) {}
-
/* called when all listening endpoints have been shutdown, so no further
events will be received on them - at this point it's safe to destroy
things */
-static void deactivated_all_ports(grpc_tcp_server *s) {
+static void deactivated_all_ports(grpc_tcp_server *s,
+ grpc_call_list *call_list) {
size_t i;
/* delete ALL the things */
@@ -192,38 +185,35 @@ static void deactivated_all_ports(grpc_tcp_server *s) {
}
sp->destroyed_closure.cb = destroyed_port;
sp->destroyed_closure.cb_arg = s;
- grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "tcp_listener_shutdown");
+ grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "tcp_listener_shutdown",
+ call_list);
}
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
- finish_shutdown(s);
+ finish_shutdown(s, call_list);
}
}
-void grpc_tcp_server_destroy(
- grpc_tcp_server *s, void (*shutdown_complete)(void *shutdown_complete_arg),
- void *shutdown_complete_arg) {
+void grpc_tcp_server_destroy(grpc_tcp_server *s, grpc_closure *closure,
+ grpc_call_list *call_list) {
size_t i;
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->shutdown);
s->shutdown = 1;
- s->shutdown_complete = shutdown_complete
- ? shutdown_complete
- : dont_care_about_shutdown_completion;
- s->shutdown_complete_arg = shutdown_complete_arg;
+ s->shutdown_complete = closure;
/* shutdown all fd's */
if (s->active_ports) {
for (i = 0; i < s->nports; i++) {
- grpc_fd_shutdown(s->ports[i].emfd);
+ grpc_fd_shutdown(s->ports[i].emfd, call_list);
}
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
- deactivated_all_ports(s);
+ deactivated_all_ports(s, call_list);
}
}
@@ -308,7 +298,7 @@ error:
}
/* event manager callback when reads are ready */
-static void on_read(void *arg, int success) {
+static void on_read(void *arg, int success, grpc_call_list *call_list) {
server_port *sp = arg;
grpc_fd *fdobj;
size_t i;
@@ -331,7 +321,7 @@ static void on_read(void *arg, int success) {
case EINTR:
continue;
case EAGAIN:
- grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
+ grpc_fd_notify_on_read(sp->emfd, &sp->read_closure, call_list);
return;
default:
gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
@@ -348,16 +338,17 @@ static void on_read(void *arg, int success) {
gpr_log(GPR_DEBUG, "SERVER_CONNECT: incoming connection: %s", addr_str);
}
- fdobj = grpc_fd_create(fd, sp->server->workqueue, name);
+ fdobj = grpc_fd_create(fd, name);
/* TODO(ctiller): revise this when we have server-side sharding
of channels -- we certainly should not be automatically adding every
incoming channel to every pollset owned by the server */
for (i = 0; i < sp->server->pollset_count; i++) {
- grpc_pollset_add_fd(sp->server->pollsets[i], fdobj);
+ grpc_pollset_add_fd(sp->server->pollsets[i], fdobj, call_list);
}
sp->server->cb(
sp->server->cb_arg,
- grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str));
+ grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
+ call_list);
gpr_free(name);
gpr_free(addr_str);
@@ -369,7 +360,7 @@ error:
gpr_mu_lock(&sp->server->mu);
if (0 == --sp->server->active_ports) {
gpr_mu_unlock(&sp->server->mu);
- deactivated_all_ports(sp->server);
+ deactivated_all_ports(sp->server, call_list);
} else {
gpr_mu_unlock(&sp->server->mu);
}
@@ -396,7 +387,7 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd,
sp = &s->ports[s->nports++];
sp->server = s;
sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, s->workqueue, name);
+ sp->emfd = grpc_fd_create(fd, name);
memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len;
GPR_ASSERT(sp->emfd);
@@ -495,7 +486,7 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index) {
void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets,
size_t pollset_count, grpc_tcp_server_cb cb,
- void *cb_arg) {
+ void *cb_arg, grpc_call_list *call_list) {
size_t i, j;
GPR_ASSERT(cb);
gpr_mu_lock(&s->mu);
@@ -507,11 +498,12 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets,
s->pollset_count = pollset_count;
for (i = 0; i < s->nports; i++) {
for (j = 0; j < pollset_count; j++) {
- grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd);
+ grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd, call_list);
}
s->ports[i].read_closure.cb = on_read;
s->ports[i].read_closure.cb_arg = &s->ports[i];
- grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure);
+ grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure,
+ call_list);
s->active_ports++;
}
gpr_mu_unlock(&s->mu);
diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c
index 1a1d812050..e1c2ae95fd 100644
--- a/src/core/iomgr/udp_server.c
+++ b/src/core/iomgr/udp_server.c
@@ -111,15 +111,12 @@ struct grpc_udp_server {
size_t port_capacity;
/* shutdown callback */
- void (*shutdown_complete)(void *);
- void *shutdown_complete_arg;
+ grpc_closure *shutdown_complete;
/* all pollsets interested in new connections */
grpc_pollset **pollsets;
/* number of pollsets in the pollsets array */
size_t pollset_count;
-
- grpc_workqueue *workqueue;
};
grpc_udp_server *grpc_udp_server_create(void) {
@@ -132,40 +129,38 @@ grpc_udp_server *grpc_udp_server_create(void) {
s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
s->nports = 0;
s->port_capacity = INIT_PORT_CAP;
- s->workqueue = grpc_workqueue_create();
return s;
}
-static void finish_shutdown(grpc_udp_server *s) {
- s->shutdown_complete(s->shutdown_complete_arg);
+static void finish_shutdown(grpc_udp_server *s, grpc_call_list *call_list) {
+ grpc_call_list_add(call_list, s->shutdown_complete, 1);
gpr_mu_destroy(&s->mu);
gpr_cv_destroy(&s->cv);
gpr_free(s->ports);
- GRPC_WORKQUEUE_UNREF(s->workqueue, "workqueue");
gpr_free(s);
}
-static void destroyed_port(void *server, int success) {
+static void destroyed_port(void *server, int success,
+ grpc_call_list *call_list) {
grpc_udp_server *s = server;
gpr_mu_lock(&s->mu);
s->destroyed_ports++;
if (s->destroyed_ports == s->nports) {
gpr_mu_unlock(&s->mu);
- finish_shutdown(s);
+ finish_shutdown(s, call_list);
} else {
gpr_mu_unlock(&s->mu);
}
}
-static void dont_care_about_shutdown_completion(void *ignored) {}
-
/* called when all listening endpoints have been shutdown, so no further
events will be received on them - at this point it's safe to destroy
things */
-static void deactivated_all_ports(grpc_udp_server *s) {
+static void deactivated_all_ports(grpc_udp_server *s,
+ grpc_call_list *call_list) {
size_t i;
/* delete ALL the things */
@@ -184,38 +179,35 @@ static void deactivated_all_ports(grpc_udp_server *s) {
}
sp->destroyed_closure.cb = destroyed_port;
sp->destroyed_closure.cb_arg = s;
- grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "udp_listener_shutdown");
+ grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "udp_listener_shutdown",
+ call_list);
}
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
- finish_shutdown(s);
+ finish_shutdown(s, call_list);
}
}
-void grpc_udp_server_destroy(
- grpc_udp_server *s, void (*shutdown_complete)(void *shutdown_complete_arg),
- void *shutdown_complete_arg) {
+void grpc_udp_server_destroy(grpc_udp_server *s, grpc_closure *on_done,
+ grpc_call_list *call_list) {
size_t i;
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->shutdown);
s->shutdown = 1;
- s->shutdown_complete = shutdown_complete
- ? shutdown_complete
- : dont_care_about_shutdown_completion;
- s->shutdown_complete_arg = shutdown_complete_arg;
+ s->shutdown_complete = on_done;
/* shutdown all fd's */
if (s->active_ports) {
for (i = 0; i < s->nports; i++) {
- grpc_fd_shutdown(s->ports[i].emfd);
+ grpc_fd_shutdown(s->ports[i].emfd, call_list);
}
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
- deactivated_all_ports(s);
+ deactivated_all_ports(s, call_list);
}
}
@@ -270,14 +262,14 @@ error:
}
/* event manager callback when reads are ready */
-static void on_read(void *arg, int success) {
+static void on_read(void *arg, int success, grpc_call_list *call_list) {
server_port *sp = arg;
if (success == 0) {
gpr_mu_lock(&sp->server->mu);
if (0 == --sp->server->active_ports) {
gpr_mu_unlock(&sp->server->mu);
- deactivated_all_ports(sp->server);
+ deactivated_all_ports(sp->server, call_list);
} else {
gpr_mu_unlock(&sp->server->mu);
}
@@ -289,7 +281,7 @@ static void on_read(void *arg, int success) {
sp->read_cb(sp->fd);
/* Re-arm the notification event so we get another chance to read. */
- grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
+ grpc_fd_notify_on_read(sp->emfd, &sp->read_closure, call_list);
}
static int add_socket_to_server(grpc_udp_server *s, int fd,
@@ -313,7 +305,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
sp = &s->ports[s->nports++];
sp->server = s;
sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, s->workqueue, name);
+ sp->emfd = grpc_fd_create(fd, name);
memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len;
sp->read_cb = read_cb;
@@ -410,18 +402,19 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index) {
}
void grpc_udp_server_start(grpc_udp_server *s, grpc_pollset **pollsets,
- size_t pollset_count) {
+ size_t pollset_count, grpc_call_list *call_list) {
size_t i, j;
gpr_mu_lock(&s->mu);
GPR_ASSERT(s->active_ports == 0);
s->pollsets = pollsets;
for (i = 0; i < s->nports; i++) {
for (j = 0; j < pollset_count; j++) {
- grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd);
+ grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd, call_list);
}
s->ports[i].read_closure.cb = on_read;
s->ports[i].read_closure.cb_arg = &s->ports[i];
- grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure);
+ grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure,
+ call_list);
s->active_ports++;
}
gpr_mu_unlock(&s->mu);
diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h
index c930e81cbc..fa4d2147b4 100644
--- a/src/core/iomgr/udp_server.h
+++ b/src/core/iomgr/udp_server.h
@@ -47,7 +47,7 @@ grpc_udp_server *grpc_udp_server_create(void);
/* Start listening to bound ports */
void grpc_udp_server_start(grpc_udp_server *udp_server, grpc_pollset **pollsets,
- size_t pollset_count);
+ size_t pollset_count, grpc_call_list *call_list);
int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index);
@@ -64,9 +64,8 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index);
int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
size_t addr_len, grpc_udp_server_read_cb read_cb);
-void grpc_udp_server_destroy(grpc_udp_server *server,
- void (*shutdown_done)(void *shutdown_done_arg),
- void *shutdown_done_arg);
+void grpc_udp_server_destroy(grpc_udp_server *server, grpc_closure *on_done,
+ grpc_call_list *call_list);
/* Write the contents of buffer to the underlying UDP socket. */
/*
diff --git a/src/core/iomgr/workqueue.h b/src/core/iomgr/workqueue.h
index 6f09399b55..b9d2a87dca 100644
--- a/src/core/iomgr/workqueue.h
+++ b/src/core/iomgr/workqueue.h
@@ -50,25 +50,25 @@ struct grpc_workqueue;
typedef struct grpc_workqueue grpc_workqueue;
/** Create a work queue */
-grpc_workqueue *grpc_workqueue_create(void);
+grpc_workqueue *grpc_workqueue_create(grpc_call_list *call_list);
-void grpc_workqueue_flush(grpc_workqueue *workqueue);
+void grpc_workqueue_flush(grpc_workqueue *workqueue, grpc_call_list *call_list);
#define GRPC_WORKQUEUE_REFCOUNT_DEBUG
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
#define GRPC_WORKQUEUE_REF(p, r) \
grpc_workqueue_ref((p), __FILE__, __LINE__, (r))
-#define GRPC_WORKQUEUE_UNREF(p, r) \
- grpc_workqueue_unref((p), __FILE__, __LINE__, (r))
+#define GRPC_WORKQUEUE_UNREF(p, r, cl) \
+ grpc_workqueue_unref((p), (cl), __FILE__, __LINE__, (r))
void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
const char *reason);
-void grpc_workqueue_unref(grpc_workqueue *workqueue, const char *file, int line,
- const char *reason);
+void grpc_workqueue_unref(grpc_workqueue *workqueue, grpc_call_list *call_list,
+ const char *file, int line, const char *reason);
#else
#define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p))
-#define GRPC_WORKQUEUE_UNREF(p, r) grpc_workqueue_unref((p))
+#define GRPC_WORKQUEUE_UNREF(p, r, cl) grpc_workqueue_unref((p), (cl))
void grpc_workqueue_ref(grpc_workqueue *workqueue);
-void grpc_workqueue_unref(grpc_workqueue *workqueue);
+void grpc_workqueue_unref(grpc_workqueue *workqueue, grpc_call_list *call_list);
#endif
/** Bind this workqueue to a pollset */
diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c
index 85b541f4d2..83249c583c 100644
--- a/src/core/iomgr/workqueue_posix.c
+++ b/src/core/iomgr/workqueue_posix.c
@@ -45,9 +45,9 @@
#include "src/core/iomgr/fd_posix.h"
-static void on_readable(void *arg, int success);
+static void on_readable(void *arg, int success, grpc_call_list *call_list);
-grpc_workqueue *grpc_workqueue_create(void) {
+grpc_workqueue *grpc_workqueue_create(grpc_call_list *call_list) {
char name[32];
grpc_workqueue *workqueue = gpr_malloc(sizeof(grpc_workqueue));
gpr_ref_init(&workqueue->refs, 1);
@@ -55,17 +55,18 @@ grpc_workqueue *grpc_workqueue_create(void) {
workqueue->call_list.head = workqueue->call_list.tail = NULL;
grpc_wakeup_fd_init(&workqueue->wakeup_fd);
sprintf(name, "workqueue:%p", (void *)workqueue);
- workqueue->wakeup_read_fd = NULL; /* inspected during grpc_fd_create below */
- workqueue->wakeup_read_fd = grpc_fd_create(
- GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), workqueue, name);
+ workqueue->wakeup_read_fd =
+ grpc_fd_create(GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), name);
grpc_closure_init(&workqueue->read_closure, on_readable, workqueue);
- grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure);
+ grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure,
+ call_list);
return workqueue;
}
-static void workqueue_destroy(grpc_workqueue *workqueue) {
+static void workqueue_destroy(grpc_workqueue *workqueue,
+ grpc_call_list *call_list) {
GPR_ASSERT(grpc_call_list_empty(workqueue->call_list));
- grpc_fd_shutdown(workqueue->wakeup_read_fd);
+ grpc_fd_shutdown(workqueue->wakeup_read_fd, call_list);
}
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
@@ -81,33 +82,34 @@ void grpc_workqueue_ref(grpc_workqueue *workqueue) {
}
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-void grpc_workqueue_unref(grpc_workqueue *workqueue, const char *file, int line,
- const char *reason) {
+void grpc_workqueue_unref(grpc_workqueue *workqueue, grpc_call_list *call_list,
+ const char *file, int line, const char *reason) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p unref %d -> %d %s",
workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count - 1,
reason);
#else
-void grpc_workqueue_unref(grpc_workqueue *workqueue) {
+void grpc_workqueue_unref(grpc_workqueue *workqueue,
+ grpc_call_list *call_list) {
#endif
if (gpr_unref(&workqueue->refs)) {
- workqueue_destroy(workqueue);
+ workqueue_destroy(workqueue, call_list);
}
}
void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue,
- grpc_pollset *pollset) {
- grpc_pollset_add_fd(pollset, workqueue->wakeup_read_fd);
+ grpc_pollset *pollset,
+ grpc_call_list *call_list) {
+ grpc_pollset_add_fd(pollset, workqueue->wakeup_read_fd, call_list);
}
-void grpc_workqueue_flush(grpc_workqueue *workqueue) {
- grpc_call_list todo = GRPC_CALL_LIST_INIT;
+void grpc_workqueue_flush(grpc_workqueue *workqueue,
+ grpc_call_list *call_list) {
gpr_mu_lock(&workqueue->mu);
- GPR_SWAP(grpc_call_list, todo, workqueue->call_list);
+ grpc_call_list_move(&workqueue->call_list, call_list);
gpr_mu_unlock(&workqueue->mu);
- grpc_call_list_run(todo);
}
-static void on_readable(void *arg, int success) {
+static void on_readable(void *arg, int success, grpc_call_list *call_list) {
grpc_workqueue *workqueue = arg;
if (!success) {
@@ -115,16 +117,15 @@ static void on_readable(void *arg, int success) {
/* HACK: let wakeup_fd code know that we stole the fd */
workqueue->wakeup_fd.read_fd = 0;
grpc_wakeup_fd_destroy(&workqueue->wakeup_fd);
- grpc_fd_orphan(workqueue->wakeup_read_fd, NULL, "destroy");
+ grpc_fd_orphan(workqueue->wakeup_read_fd, NULL, "destroy", call_list);
gpr_free(workqueue);
} else {
- grpc_call_list todo = GRPC_CALL_LIST_INIT;
gpr_mu_lock(&workqueue->mu);
- GPR_SWAP(grpc_call_list, todo, workqueue->call_list);
+ grpc_call_list_move(&workqueue->call_list, call_list);
grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
gpr_mu_unlock(&workqueue->mu);
- grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure);
- grpc_call_list_run(todo);
+ grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure,
+ call_list);
}
}