aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/iomgr')
-rw-r--r--src/core/iomgr/endpoint.c4
-rw-r--r--src/core/iomgr/endpoint.h8
-rw-r--r--src/core/iomgr/fd_posix.c21
-rw-r--r--src/core/iomgr/fd_posix.h11
-rw-r--r--src/core/iomgr/iomgr.c19
-rw-r--r--src/core/iomgr/iomgr.h31
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c4
-rw-r--r--src/core/iomgr/pollset_posix.c16
-rw-r--r--src/core/iomgr/pollset_posix.h10
-rw-r--r--src/core/iomgr/socket_windows.h2
-rw-r--r--src/core/iomgr/tcp_client_posix.c2
-rw-r--r--src/core/iomgr/tcp_posix.c16
-rw-r--r--src/core/iomgr/tcp_server_posix.c4
-rw-r--r--src/core/iomgr/tcp_windows.c12
-rw-r--r--src/core/iomgr/udp_server.c4
-rw-r--r--src/core/iomgr/workqueue.h4
-rw-r--r--src/core/iomgr/workqueue_posix.c97
-rw-r--r--src/core/iomgr/workqueue_posix.h5
18 files changed, 100 insertions, 170 deletions
diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c
index a7878e31dd..1955f74b9a 100644
--- a/src/core/iomgr/endpoint.c
+++ b/src/core/iomgr/endpoint.c
@@ -35,13 +35,13 @@
grpc_endpoint_op_status grpc_endpoint_read(grpc_endpoint *ep,
gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb) {
+ grpc_closure *cb) {
return ep->vtable->read(ep, slices, cb);
}
grpc_endpoint_op_status grpc_endpoint_write(grpc_endpoint *ep,
gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb) {
+ grpc_closure *cb) {
return ep->vtable->write(ep, slices, cb);
}
diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h
index d14d52d561..79b7d6a78e 100644
--- a/src/core/iomgr/endpoint.h
+++ b/src/core/iomgr/endpoint.h
@@ -54,9 +54,9 @@ typedef enum grpc_endpoint_op_status {
struct grpc_endpoint_vtable {
grpc_endpoint_op_status (*read)(grpc_endpoint *ep, gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb);
+ grpc_closure *cb);
grpc_endpoint_op_status (*write)(grpc_endpoint *ep, gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb);
+ grpc_closure *cb);
void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset);
void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset);
void (*shutdown)(grpc_endpoint *ep);
@@ -70,7 +70,7 @@ struct grpc_endpoint_vtable {
Valid slices may be placed into \a slices even on callback success == 0. */
grpc_endpoint_op_status grpc_endpoint_read(
grpc_endpoint *ep, gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT;
+ grpc_closure *cb) GRPC_MUST_USE_RESULT;
char *grpc_endpoint_get_peer(grpc_endpoint *ep);
@@ -86,7 +86,7 @@ char *grpc_endpoint_get_peer(grpc_endpoint *ep);
*/
grpc_endpoint_op_status grpc_endpoint_write(
grpc_endpoint *ep, gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT;
+ grpc_closure *cb) GRPC_MUST_USE_RESULT;
/* Causes any pending read/write callbacks to run immediately with
success==0 */
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 5bdce0bfd8..dfebbcc2e1 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -218,8 +218,7 @@ static int has_watchers(grpc_fd *fd) {
fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
}
-void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
- const char *reason) {
+void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason) {
fd->on_done_closure = on_done;
shutdown(fd->fd, SHUT_RDWR);
gpr_mu_lock(&fd->watcher_mu);
@@ -253,7 +252,7 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
#endif
-static void process_callback(grpc_iomgr_closure *closure, int success,
+static void process_callback(grpc_closure *closure, int success,
grpc_workqueue *optional_workqueue) {
if (optional_workqueue == NULL) {
closure->cb(closure->cb_arg, success);
@@ -262,15 +261,15 @@ static void process_callback(grpc_iomgr_closure *closure, int success,
}
}
-static void process_callbacks(grpc_iomgr_closure *callbacks, size_t n,
- int success, grpc_workqueue *optional_workqueue) {
+static void process_callbacks(grpc_closure *callbacks, size_t n, int success,
+ grpc_workqueue *optional_workqueue) {
size_t i;
for (i = 0; i < n; i++) {
process_callback(callbacks + i, success, optional_workqueue);
}
}
-static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
+static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_closure *closure,
int allow_synchronous_callback) {
switch (gpr_atm_acq_load(st)) {
case NOT_READY:
@@ -307,7 +306,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
abort();
}
-static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure **callbacks,
+static void set_ready_locked(gpr_atm *st, grpc_closure **callbacks,
size_t *ncallbacks) {
gpr_intptr state = gpr_atm_acq_load(st);
@@ -327,7 +326,7 @@ static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure **callbacks,
default: /* waiting */
GPR_ASSERT(gpr_atm_no_barrier_load(st) != READY &&
gpr_atm_no_barrier_load(st) != NOT_READY);
- callbacks[(*ncallbacks)++] = (grpc_iomgr_closure *)state;
+ callbacks[(*ncallbacks)++] = (grpc_closure *)state;
gpr_atm_rel_store(st, NOT_READY);
return;
}
@@ -338,7 +337,7 @@ static void set_ready(grpc_fd *fd, gpr_atm *st,
/* only one set_ready can be active at once (but there may be a racing
notify_on) */
int success;
- grpc_iomgr_closure *closure;
+ grpc_closure *closure;
size_t ncb = 0;
gpr_mu_lock(&fd->set_state_mu);
@@ -365,11 +364,11 @@ void grpc_fd_shutdown(grpc_fd *fd) {
0 /* GPR_FALSE */);
}
-void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) {
+void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure) {
notify_on(fd, &fd->readst, closure, 0);
}
-void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure) {
+void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure) {
notify_on(fd, &fd->writest, closure, 0);
}
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index e5157ad342..bb85b6c16e 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -96,8 +96,8 @@ struct grpc_fd {
struct grpc_fd *freelist_next;
- grpc_iomgr_closure *on_done_closure;
- grpc_iomgr_closure *shutdown_closures[2];
+ grpc_closure *on_done_closure;
+ grpc_closure *shutdown_closures[2];
grpc_iomgr_object iomgr_object;
};
@@ -113,8 +113,7 @@ grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name);
Requires: *fd initialized; no outstanding notify_on_read or
notify_on_write.
MUST NOT be called with a pollset lock taken */
-void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
- const char *reason);
+void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason);
/* Begin polling on an fd.
Registers that the given pollset is interested in this fd - so that if read
@@ -153,10 +152,10 @@ void grpc_fd_shutdown(grpc_fd *fd);
underlying platform. This means that users must drain fd in read_cb before
calling notify_on_read again. Users are also expected to handle spurious
events, i.e read_cb is called while nothing can be readable from fd */
-void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure);
+void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure);
/* Exactly the same semantics as above, except based on writable events. */
-void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure);
+void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure);
/* Notification from the poller to an fd that it has become readable or
writable.
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index ba8f73fe08..dd76044913 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -151,15 +151,15 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) {
gpr_free(obj->name);
}
-void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
- void *cb_arg) {
+void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
+ void *cb_arg) {
closure->cb = cb;
closure->cb_arg = cb_arg;
closure->next = NULL;
}
-void grpc_iomgr_call_list_add(grpc_iomgr_call_list *call_list,
- grpc_iomgr_closure *closure, int success) {
+void grpc_call_list_add(grpc_call_list *call_list, grpc_closure *closure,
+ int success) {
if (!closure) return;
closure->next = NULL;
closure->success = success;
@@ -171,21 +171,20 @@ void grpc_iomgr_call_list_add(grpc_iomgr_call_list *call_list,
call_list->tail = closure;
}
-void grpc_iomgr_call_list_run(grpc_iomgr_call_list call_list) {
- grpc_iomgr_closure *c = call_list.head;
+void grpc_call_list_run(grpc_call_list call_list) {
+ grpc_closure *c = call_list.head;
while (c) {
- grpc_iomgr_closure *next = c->next;
+ grpc_closure *next = c->next;
c->cb(c->cb_arg, c->success);
c = next;
}
}
-int grpc_iomgr_call_list_empty(grpc_iomgr_call_list call_list) {
+int grpc_call_list_empty(grpc_call_list call_list) {
return call_list.head == NULL;
}
-void grpc_iomgr_call_list_move(grpc_iomgr_call_list *src,
- grpc_iomgr_call_list *dst) {
+void grpc_call_list_move(grpc_call_list *src, grpc_call_list *dst) {
if (dst->head == NULL) {
*dst = *src;
return;
diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h
index 58c31763d2..56f0195be9 100644
--- a/src/core/iomgr/iomgr.h
+++ b/src/core/iomgr/iomgr.h
@@ -42,7 +42,7 @@
typedef void (*grpc_iomgr_cb_func)(void *arg, int success);
/** A closure over a grpc_iomgr_cb_func. */
-typedef struct grpc_iomgr_closure {
+typedef struct grpc_closure {
/** Bound callback. */
grpc_iomgr_cb_func cb;
@@ -55,27 +55,26 @@ typedef struct grpc_iomgr_closure {
int success;
/**< Internal. Do not touch */
- struct grpc_iomgr_closure *next;
-} grpc_iomgr_closure;
+ struct grpc_closure *next;
+} grpc_closure;
-typedef struct grpc_iomgr_call_list {
- grpc_iomgr_closure *head;
- grpc_iomgr_closure *tail;
-} grpc_iomgr_call_list;
+typedef struct grpc_call_list {
+ grpc_closure *head;
+ grpc_closure *tail;
+} grpc_call_list;
/** Initializes \a closure with \a cb and \a cb_arg. */
-void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
- void *cb_arg);
+void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
+ void *cb_arg);
-#define GRPC_IOMGR_CALL_LIST_INIT \
+#define GRPC_CALL_LIST_INIT \
{ NULL, NULL }
-void grpc_iomgr_call_list_add(grpc_iomgr_call_list *list,
- grpc_iomgr_closure *closure, int success);
-void grpc_iomgr_call_list_run(grpc_iomgr_call_list list);
-void grpc_iomgr_call_list_move(grpc_iomgr_call_list *src,
- grpc_iomgr_call_list *dst);
-int grpc_iomgr_call_list_empty(grpc_iomgr_call_list list);
+void grpc_call_list_add(grpc_call_list *list, grpc_closure *closure,
+ int success);
+void grpc_call_list_run(grpc_call_list list);
+void grpc_call_list_move(grpc_call_list *src, grpc_call_list *dst);
+int grpc_call_list_empty(grpc_call_list list);
/** Initializes the iomgr. */
void grpc_iomgr_init(void);
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 253f3190f1..5ca957b8e1 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -53,7 +53,7 @@ typedef struct wakeup_fd_hdl {
typedef struct {
grpc_pollset *pollset;
grpc_fd *fd;
- grpc_iomgr_closure closure;
+ grpc_closure closure;
} delayed_add;
typedef struct {
@@ -125,7 +125,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
da->pollset = pollset;
da->fd = fd;
GRPC_FD_REF(fd, "delayed_add");
- grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da);
+ grpc_closure_init(&da->closure, perform_delayed_add, da);
pollset->in_flight_cbs++;
grpc_pollset_add_unlock_job(pollset, &da->closure);
}
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 107ffb0b5e..0fe3f80d44 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -175,30 +175,28 @@ static void finish_shutdown(grpc_pollset *pollset) {
pollset->shutdown_done_cb(pollset->shutdown_done_arg);
}
-static void run_jobs(grpc_pollset *pollset, grpc_iomgr_closure **root) {
- grpc_iomgr_closure *exec = *root;
+static void run_jobs(grpc_pollset *pollset, grpc_closure **root) {
+ grpc_closure *exec = *root;
*root = NULL;
gpr_mu_unlock(&pollset->mu);
while (exec != NULL) {
- grpc_iomgr_closure *next = exec->next;
+ grpc_closure *next = exec->next;
exec->cb(exec->cb_arg, 1);
exec = next;
}
gpr_mu_lock(&pollset->mu);
}
-static void add_job(grpc_iomgr_closure **root, grpc_iomgr_closure *closure) {
+static void add_job(grpc_closure **root, grpc_closure *closure) {
closure->next = *root;
*root = closure;
}
-void grpc_pollset_add_idle_job(grpc_pollset *pollset,
- grpc_iomgr_closure *closure) {
+void grpc_pollset_add_idle_job(grpc_pollset *pollset, grpc_closure *closure) {
add_job(&pollset->idle_jobs, closure);
}
-void grpc_pollset_add_unlock_job(grpc_pollset *pollset,
- grpc_iomgr_closure *closure) {
+void grpc_pollset_add_unlock_job(grpc_pollset *pollset, grpc_closure *closure) {
add_job(&pollset->unlock_jobs, closure);
}
@@ -316,7 +314,7 @@ typedef struct grpc_unary_promote_args {
const grpc_pollset_vtable *original_vtable;
grpc_pollset *pollset;
grpc_fd *fd;
- grpc_iomgr_closure promotion_closure;
+ grpc_closure promotion_closure;
} grpc_unary_promote_args;
static void basic_do_promote(void *args, int success) {
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index 3f89095d40..4f09f870f7 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -67,8 +67,8 @@ typedef struct grpc_pollset {
int kicked_without_pollers;
void (*shutdown_done_cb)(void *arg);
void *shutdown_done_arg;
- grpc_iomgr_closure *unlock_jobs;
- grpc_iomgr_closure *idle_jobs;
+ grpc_closure *unlock_jobs;
+ grpc_closure *idle_jobs;
union {
int fd;
void *ptr;
@@ -128,10 +128,8 @@ typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
/** schedule a closure to be run next time there are no active workers */
-void grpc_pollset_add_idle_job(grpc_pollset *pollset,
- grpc_iomgr_closure *closure);
+void grpc_pollset_add_idle_job(grpc_pollset *pollset, grpc_closure *closure);
/** schedule a closure to be run next time the pollset is unlocked */
-void grpc_pollset_add_unlock_job(grpc_pollset *pollset,
- grpc_iomgr_closure *closure);
+void grpc_pollset_add_unlock_job(grpc_pollset *pollset, grpc_closure *closure);
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */
diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h
index 498921e0fd..45bc657225 100644
--- a/src/core/iomgr/socket_windows.h
+++ b/src/core/iomgr/socket_windows.h
@@ -91,7 +91,7 @@ typedef struct grpc_winsocket {
This prevents that. */
int added_to_iocp;
- grpc_iomgr_closure shutdown_closure;
+ grpc_closure shutdown_closure;
/* A label for iomgr to track outstanding objects */
grpc_iomgr_object iomgr_object;
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index 0df984c3e6..1ea2155060 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -64,7 +64,7 @@ typedef struct {
gpr_timespec deadline;
grpc_alarm alarm;
int refs;
- grpc_iomgr_closure write_closure;
+ grpc_closure write_closure;
grpc_pollset_set *interested_parties;
char *addr_str;
} async_connect;
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index c539cf2d34..374d2f3a40 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -85,11 +85,11 @@ typedef struct {
/** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
size_t outgoing_byte_idx;
- grpc_iomgr_closure *read_cb;
- grpc_iomgr_closure *write_cb;
+ grpc_closure *read_cb;
+ grpc_closure *write_cb;
- grpc_iomgr_closure read_closure;
- grpc_iomgr_closure write_closure;
+ grpc_closure read_closure;
+ grpc_closure write_closure;
char *peer_string;
} grpc_tcp;
@@ -145,7 +145,7 @@ static void tcp_destroy(grpc_endpoint *ep) {
}
static void call_read_cb(grpc_tcp *tcp, int success) {
- grpc_iomgr_closure *cb = tcp->read_cb;
+ grpc_closure *cb = tcp->read_cb;
if (grpc_tcp_trace) {
size_t i;
@@ -250,7 +250,7 @@ static void tcp_handle_read(void *arg /* grpc_tcp */, int success) {
static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep,
gpr_slice_buffer *incoming_buffer,
- grpc_iomgr_closure *cb) {
+ grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
@@ -350,7 +350,7 @@ static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) {
static void tcp_handle_write(void *arg /* grpc_tcp */, int success) {
grpc_tcp *tcp = (grpc_tcp *)arg;
grpc_endpoint_op_status status;
- grpc_iomgr_closure *cb;
+ grpc_closure *cb;
if (!success) {
cb = tcp->write_cb;
@@ -375,7 +375,7 @@ static void tcp_handle_write(void *arg /* grpc_tcp */, int success) {
static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep,
gpr_slice_buffer *buf,
- grpc_iomgr_closure *cb) {
+ grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_endpoint_op_status status;
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index 0b79678e9d..213b2e1113 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -84,8 +84,8 @@ typedef struct {
struct sockaddr_un un;
} addr;
size_t addr_len;
- grpc_iomgr_closure read_closure;
- grpc_iomgr_closure destroyed_closure;
+ grpc_closure read_closure;
+ grpc_closure destroyed_closure;
} server_port;
static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) {
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index fe3673c607..e361d22dc3 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -82,8 +82,8 @@ typedef struct grpc_tcp {
/* Refcounting how many operations are in progress. */
gpr_refcount refcount;
- grpc_iomgr_closure *read_cb;
- grpc_iomgr_closure *write_cb;
+ grpc_closure *read_cb;
+ grpc_closure *write_cb;
gpr_slice read_slice;
gpr_slice_buffer *write_slices;
gpr_slice_buffer *read_slices;
@@ -169,7 +169,7 @@ static int on_read(grpc_tcp *tcp, int success) {
static void on_read_cb(void *tcpp, int from_iocp) {
grpc_tcp *tcp = tcpp;
- grpc_iomgr_closure *cb = tcp->read_cb;
+ grpc_closure *cb = tcp->read_cb;
int success = on_read(tcp, from_iocp);
tcp->read_cb = NULL;
TCP_UNREF(tcp, "read");
@@ -180,7 +180,7 @@ static void on_read_cb(void *tcpp, int from_iocp) {
static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
gpr_slice_buffer *read_slices,
- grpc_iomgr_closure *cb) {
+ grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->read_info;
@@ -241,7 +241,7 @@ static void on_write(void *tcpp, int success) {
grpc_tcp *tcp = (grpc_tcp *)tcpp;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->write_info;
- grpc_iomgr_closure *cb;
+ grpc_closure *cb;
int do_abort = 0;
gpr_mu_lock(&tcp->mu);
@@ -269,7 +269,7 @@ static void on_write(void *tcpp, int success) {
/* Initiates a write. */
static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb) {
+ grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_winsocket *socket = tcp->socket;
grpc_winsocket_callback_info *info = &socket->write_info;
diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c
index 30957f8dee..1a1d812050 100644
--- a/src/core/iomgr/udp_server.c
+++ b/src/core/iomgr/udp_server.c
@@ -79,8 +79,8 @@ typedef struct {
struct sockaddr_un un;
} addr;
size_t addr_len;
- grpc_iomgr_closure read_closure;
- grpc_iomgr_closure destroyed_closure;
+ grpc_closure read_closure;
+ grpc_closure destroyed_closure;
grpc_udp_server_read_cb read_cb;
} server_port;
diff --git a/src/core/iomgr/workqueue.h b/src/core/iomgr/workqueue.h
index 124f294a23..6f09399b55 100644
--- a/src/core/iomgr/workqueue.h
+++ b/src/core/iomgr/workqueue.h
@@ -52,7 +52,7 @@ typedef struct grpc_workqueue grpc_workqueue;
/** Create a work queue */
grpc_workqueue *grpc_workqueue_create(void);
-void grpc_workqueue_flush(grpc_workqueue *workqueue, int asynchronously);
+void grpc_workqueue_flush(grpc_workqueue *workqueue);
#define GRPC_WORKQUEUE_REFCOUNT_DEBUG
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
@@ -76,7 +76,7 @@ void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue,
grpc_pollset *pollset);
/** Add a work item to a workqueue */
-void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_iomgr_closure *closure,
+void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure,
int success);
#endif
diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c
index ead8c50d8e..85b541f4d2 100644
--- a/src/core/iomgr/workqueue_posix.c
+++ b/src/core/iomgr/workqueue_posix.c
@@ -41,7 +41,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/thd.h>
+#include <grpc/support/useful.h>
#include "src/core/iomgr/fd_posix.h"
@@ -52,71 +52,19 @@ grpc_workqueue *grpc_workqueue_create(void) {
grpc_workqueue *workqueue = gpr_malloc(sizeof(grpc_workqueue));
gpr_ref_init(&workqueue->refs, 1);
gpr_mu_init(&workqueue->mu);
- workqueue->head.next = NULL;
- workqueue->tail = &workqueue->head;
+ workqueue->call_list.head = workqueue->call_list.tail = NULL;
grpc_wakeup_fd_init(&workqueue->wakeup_fd);
sprintf(name, "workqueue:%p", (void *)workqueue);
workqueue->wakeup_read_fd = NULL; /* inspected during grpc_fd_create below */
workqueue->wakeup_read_fd = grpc_fd_create(
GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), workqueue, name);
- grpc_iomgr_closure_init(&workqueue->read_closure, on_readable, workqueue);
+ grpc_closure_init(&workqueue->read_closure, on_readable, workqueue);
grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure);
return workqueue;
}
-static void shutdown_thread(void *arg) {
- grpc_iomgr_closure *todo = arg;
-
- while (todo) {
- grpc_iomgr_closure *next = todo->next;
- todo->cb(todo->cb_arg, todo->success);
- todo = next;
- }
-}
-
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-static size_t count_waiting(grpc_workqueue *workqueue) {
- size_t i = 0;
- grpc_iomgr_closure *c;
- for (c = workqueue->head.next; c; c = c->next) {
- i++;
- }
- return i;
-}
-#endif
-
-void grpc_workqueue_flush(grpc_workqueue *workqueue, int asynchronously) {
- grpc_iomgr_closure *todo;
- gpr_thd_id thd;
-
- gpr_mu_lock(&workqueue->mu);
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
- if (workqueue->head.next) {
- gpr_log(GPR_DEBUG, "WORKQUEUE:%p flush %d objects %s", workqueue,
- count_waiting(workqueue),
- asynchronously ? "asynchronously" : "synchronously");
- }
-#endif
- todo = workqueue->head.next;
- workqueue->head.next = NULL;
- workqueue->tail = &workqueue->head;
- gpr_mu_unlock(&workqueue->mu);
-
- if (todo != NULL) {
- if (asynchronously) {
- gpr_thd_new(&thd, shutdown_thread, todo, NULL);
- } else {
- while (todo) {
- grpc_iomgr_closure *next = todo->next;
- todo->cb(todo->cb_arg, todo->success);
- todo = next;
- }
- }
- }
-}
-
static void workqueue_destroy(grpc_workqueue *workqueue) {
- GPR_ASSERT(workqueue->tail == &workqueue->head);
+ GPR_ASSERT(grpc_call_list_empty(workqueue->call_list));
grpc_fd_shutdown(workqueue->wakeup_read_fd);
}
@@ -151,9 +99,16 @@ void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue,
grpc_pollset_add_fd(pollset, workqueue->wakeup_read_fd);
}
+void grpc_workqueue_flush(grpc_workqueue *workqueue) {
+ grpc_call_list todo = GRPC_CALL_LIST_INIT;
+ gpr_mu_lock(&workqueue->mu);
+ GPR_SWAP(grpc_call_list, todo, workqueue->call_list);
+ gpr_mu_unlock(&workqueue->mu);
+ grpc_call_list_run(todo);
+}
+
static void on_readable(void *arg, int success) {
grpc_workqueue *workqueue = arg;
- grpc_iomgr_closure *todo;
if (!success) {
gpr_mu_destroy(&workqueue->mu);
@@ -162,42 +117,26 @@ static void on_readable(void *arg, int success) {
grpc_wakeup_fd_destroy(&workqueue->wakeup_fd);
grpc_fd_orphan(workqueue->wakeup_read_fd, NULL, "destroy");
gpr_free(workqueue);
- return;
} else {
+ grpc_call_list todo = GRPC_CALL_LIST_INIT;
gpr_mu_lock(&workqueue->mu);
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
- gpr_log(GPR_DEBUG, "WORKQUEUE:%p %d objects", workqueue,
- count_waiting(workqueue));
-#endif
- todo = workqueue->head.next;
- workqueue->head.next = NULL;
- workqueue->tail = &workqueue->head;
+ GPR_SWAP(grpc_call_list, todo, workqueue->call_list);
grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
gpr_mu_unlock(&workqueue->mu);
grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure);
-
- while (todo) {
- grpc_iomgr_closure *next = todo->next;
- todo->cb(todo->cb_arg, todo->success);
- todo = next;
- }
+ grpc_call_list_run(todo);
}
}
-void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_iomgr_closure *closure,
+void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure,
int success) {
closure->success = success;
closure->next = NULL;
gpr_mu_lock(&workqueue->mu);
- if (workqueue->tail == &workqueue->head) {
+ if (grpc_call_list_empty(workqueue->call_list)) {
grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
}
- workqueue->tail->next = closure;
- workqueue->tail = closure;
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
- gpr_log(GPR_DEBUG, "WORKQUEUE:%p %d objects", workqueue,
- count_waiting(workqueue));
-#endif
+ grpc_call_list_add(&workqueue->call_list, closure, success);
gpr_mu_unlock(&workqueue->mu);
}
diff --git a/src/core/iomgr/workqueue_posix.h b/src/core/iomgr/workqueue_posix.h
index 1b3a0e281b..22c48c4926 100644
--- a/src/core/iomgr/workqueue_posix.h
+++ b/src/core/iomgr/workqueue_posix.h
@@ -40,13 +40,12 @@ struct grpc_workqueue {
gpr_refcount refs;
gpr_mu mu;
- grpc_iomgr_closure head;
- grpc_iomgr_closure *tail;
+ grpc_call_list call_list;
grpc_wakeup_fd wakeup_fd;
struct grpc_fd *wakeup_read_fd;
- grpc_iomgr_closure read_closure;
+ grpc_closure read_closure;
};
#endif /* GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H */