diff options
Diffstat (limited to 'src/core/iomgr')
-rw-r--r-- | src/core/iomgr/endpoint.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/endpoint.h | 8 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.c | 21 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.h | 11 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 19 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.h | 31 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 16 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.h | 10 | ||||
-rw-r--r-- | src/core/iomgr/socket_windows.h | 2 | ||||
-rw-r--r-- | src/core/iomgr/tcp_client_posix.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.c | 16 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_posix.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/tcp_windows.c | 12 | ||||
-rw-r--r-- | src/core/iomgr/udp_server.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/workqueue.h | 4 | ||||
-rw-r--r-- | src/core/iomgr/workqueue_posix.c | 97 | ||||
-rw-r--r-- | src/core/iomgr/workqueue_posix.h | 5 |
18 files changed, 100 insertions, 170 deletions
diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c index a7878e31dd..1955f74b9a 100644 --- a/src/core/iomgr/endpoint.c +++ b/src/core/iomgr/endpoint.c @@ -35,13 +35,13 @@ grpc_endpoint_op_status grpc_endpoint_read(grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_iomgr_closure *cb) { + grpc_closure *cb) { return ep->vtable->read(ep, slices, cb); } grpc_endpoint_op_status grpc_endpoint_write(grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_iomgr_closure *cb) { + grpc_closure *cb) { return ep->vtable->write(ep, slices, cb); } diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h index d14d52d561..79b7d6a78e 100644 --- a/src/core/iomgr/endpoint.h +++ b/src/core/iomgr/endpoint.h @@ -54,9 +54,9 @@ typedef enum grpc_endpoint_op_status { struct grpc_endpoint_vtable { grpc_endpoint_op_status (*read)(grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_iomgr_closure *cb); + grpc_closure *cb); grpc_endpoint_op_status (*write)(grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_iomgr_closure *cb); + grpc_closure *cb); void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset); void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset); void (*shutdown)(grpc_endpoint *ep); @@ -70,7 +70,7 @@ struct grpc_endpoint_vtable { Valid slices may be placed into \a slices even on callback success == 0. */ grpc_endpoint_op_status grpc_endpoint_read( grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT; + grpc_closure *cb) GRPC_MUST_USE_RESULT; char *grpc_endpoint_get_peer(grpc_endpoint *ep); @@ -86,7 +86,7 @@ char *grpc_endpoint_get_peer(grpc_endpoint *ep); */ grpc_endpoint_op_status grpc_endpoint_write( grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT; + grpc_closure *cb) GRPC_MUST_USE_RESULT; /* Causes any pending read/write callbacks to run immediately with success==0 */ diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 5bdce0bfd8..dfebbcc2e1 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -218,8 +218,7 @@ static int has_watchers(grpc_fd *fd) { fd->inactive_watcher_root.next != &fd->inactive_watcher_root; } -void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done, - const char *reason) { +void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason) { fd->on_done_closure = on_done; shutdown(fd->fd, SHUT_RDWR); gpr_mu_lock(&fd->watcher_mu); @@ -253,7 +252,7 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); } void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } #endif -static void process_callback(grpc_iomgr_closure *closure, int success, +static void process_callback(grpc_closure *closure, int success, grpc_workqueue *optional_workqueue) { if (optional_workqueue == NULL) { closure->cb(closure->cb_arg, success); @@ -262,15 +261,15 @@ static void process_callback(grpc_iomgr_closure *closure, int success, } } -static void process_callbacks(grpc_iomgr_closure *callbacks, size_t n, - int success, grpc_workqueue *optional_workqueue) { +static void process_callbacks(grpc_closure *callbacks, size_t n, int success, + grpc_workqueue *optional_workqueue) { size_t i; for (i = 0; i < n; i++) { process_callback(callbacks + i, success, optional_workqueue); } } -static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, +static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_closure *closure, int allow_synchronous_callback) { switch (gpr_atm_acq_load(st)) { case NOT_READY: @@ -307,7 +306,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, abort(); } -static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure **callbacks, +static void set_ready_locked(gpr_atm *st, grpc_closure **callbacks, size_t *ncallbacks) { gpr_intptr state = gpr_atm_acq_load(st); @@ -327,7 +326,7 @@ static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure **callbacks, default: /* waiting */ GPR_ASSERT(gpr_atm_no_barrier_load(st) != READY && gpr_atm_no_barrier_load(st) != NOT_READY); - callbacks[(*ncallbacks)++] = (grpc_iomgr_closure *)state; + callbacks[(*ncallbacks)++] = (grpc_closure *)state; gpr_atm_rel_store(st, NOT_READY); return; } @@ -338,7 +337,7 @@ static void set_ready(grpc_fd *fd, gpr_atm *st, /* only one set_ready can be active at once (but there may be a racing notify_on) */ int success; - grpc_iomgr_closure *closure; + grpc_closure *closure; size_t ncb = 0; gpr_mu_lock(&fd->set_state_mu); @@ -365,11 +364,11 @@ void grpc_fd_shutdown(grpc_fd *fd) { 0 /* GPR_FALSE */); } -void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) { +void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure) { notify_on(fd, &fd->readst, closure, 0); } -void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure) { +void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure) { notify_on(fd, &fd->writest, closure, 0); } diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index e5157ad342..bb85b6c16e 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -96,8 +96,8 @@ struct grpc_fd { struct grpc_fd *freelist_next; - grpc_iomgr_closure *on_done_closure; - grpc_iomgr_closure *shutdown_closures[2]; + grpc_closure *on_done_closure; + grpc_closure *shutdown_closures[2]; grpc_iomgr_object iomgr_object; }; @@ -113,8 +113,7 @@ grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name); Requires: *fd initialized; no outstanding notify_on_read or notify_on_write. MUST NOT be called with a pollset lock taken */ -void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done, - const char *reason); +void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason); /* Begin polling on an fd. Registers that the given pollset is interested in this fd - so that if read @@ -153,10 +152,10 @@ void grpc_fd_shutdown(grpc_fd *fd); underlying platform. This means that users must drain fd in read_cb before calling notify_on_read again. Users are also expected to handle spurious events, i.e read_cb is called while nothing can be readable from fd */ -void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure); +void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure); /* Exactly the same semantics as above, except based on writable events. */ -void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure); +void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure); /* Notification from the poller to an fd that it has become readable or writable. diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index ba8f73fe08..dd76044913 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -151,15 +151,15 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) { gpr_free(obj->name); } -void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb, - void *cb_arg) { +void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, + void *cb_arg) { closure->cb = cb; closure->cb_arg = cb_arg; closure->next = NULL; } -void grpc_iomgr_call_list_add(grpc_iomgr_call_list *call_list, - grpc_iomgr_closure *closure, int success) { +void grpc_call_list_add(grpc_call_list *call_list, grpc_closure *closure, + int success) { if (!closure) return; closure->next = NULL; closure->success = success; @@ -171,21 +171,20 @@ void grpc_iomgr_call_list_add(grpc_iomgr_call_list *call_list, call_list->tail = closure; } -void grpc_iomgr_call_list_run(grpc_iomgr_call_list call_list) { - grpc_iomgr_closure *c = call_list.head; +void grpc_call_list_run(grpc_call_list call_list) { + grpc_closure *c = call_list.head; while (c) { - grpc_iomgr_closure *next = c->next; + grpc_closure *next = c->next; c->cb(c->cb_arg, c->success); c = next; } } -int grpc_iomgr_call_list_empty(grpc_iomgr_call_list call_list) { +int grpc_call_list_empty(grpc_call_list call_list) { return call_list.head == NULL; } -void grpc_iomgr_call_list_move(grpc_iomgr_call_list *src, - grpc_iomgr_call_list *dst) { +void grpc_call_list_move(grpc_call_list *src, grpc_call_list *dst) { if (dst->head == NULL) { *dst = *src; return; diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h index 58c31763d2..56f0195be9 100644 --- a/src/core/iomgr/iomgr.h +++ b/src/core/iomgr/iomgr.h @@ -42,7 +42,7 @@ typedef void (*grpc_iomgr_cb_func)(void *arg, int success); /** A closure over a grpc_iomgr_cb_func. */ -typedef struct grpc_iomgr_closure { +typedef struct grpc_closure { /** Bound callback. */ grpc_iomgr_cb_func cb; @@ -55,27 +55,26 @@ typedef struct grpc_iomgr_closure { int success; /**< Internal. Do not touch */ - struct grpc_iomgr_closure *next; -} grpc_iomgr_closure; + struct grpc_closure *next; +} grpc_closure; -typedef struct grpc_iomgr_call_list { - grpc_iomgr_closure *head; - grpc_iomgr_closure *tail; -} grpc_iomgr_call_list; +typedef struct grpc_call_list { + grpc_closure *head; + grpc_closure *tail; +} grpc_call_list; /** Initializes \a closure with \a cb and \a cb_arg. */ -void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb, - void *cb_arg); +void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, + void *cb_arg); -#define GRPC_IOMGR_CALL_LIST_INIT \ +#define GRPC_CALL_LIST_INIT \ { NULL, NULL } -void grpc_iomgr_call_list_add(grpc_iomgr_call_list *list, - grpc_iomgr_closure *closure, int success); -void grpc_iomgr_call_list_run(grpc_iomgr_call_list list); -void grpc_iomgr_call_list_move(grpc_iomgr_call_list *src, - grpc_iomgr_call_list *dst); -int grpc_iomgr_call_list_empty(grpc_iomgr_call_list list); +void grpc_call_list_add(grpc_call_list *list, grpc_closure *closure, + int success); +void grpc_call_list_run(grpc_call_list list); +void grpc_call_list_move(grpc_call_list *src, grpc_call_list *dst); +int grpc_call_list_empty(grpc_call_list list); /** Initializes the iomgr. */ void grpc_iomgr_init(void); diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 253f3190f1..5ca957b8e1 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -53,7 +53,7 @@ typedef struct wakeup_fd_hdl { typedef struct { grpc_pollset *pollset; grpc_fd *fd; - grpc_iomgr_closure closure; + grpc_closure closure; } delayed_add; typedef struct { @@ -125,7 +125,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, da->pollset = pollset; da->fd = fd; GRPC_FD_REF(fd, "delayed_add"); - grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da); + grpc_closure_init(&da->closure, perform_delayed_add, da); pollset->in_flight_cbs++; grpc_pollset_add_unlock_job(pollset, &da->closure); } diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 107ffb0b5e..0fe3f80d44 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -175,30 +175,28 @@ static void finish_shutdown(grpc_pollset *pollset) { pollset->shutdown_done_cb(pollset->shutdown_done_arg); } -static void run_jobs(grpc_pollset *pollset, grpc_iomgr_closure **root) { - grpc_iomgr_closure *exec = *root; +static void run_jobs(grpc_pollset *pollset, grpc_closure **root) { + grpc_closure *exec = *root; *root = NULL; gpr_mu_unlock(&pollset->mu); while (exec != NULL) { - grpc_iomgr_closure *next = exec->next; + grpc_closure *next = exec->next; exec->cb(exec->cb_arg, 1); exec = next; } gpr_mu_lock(&pollset->mu); } -static void add_job(grpc_iomgr_closure **root, grpc_iomgr_closure *closure) { +static void add_job(grpc_closure **root, grpc_closure *closure) { closure->next = *root; *root = closure; } -void grpc_pollset_add_idle_job(grpc_pollset *pollset, - grpc_iomgr_closure *closure) { +void grpc_pollset_add_idle_job(grpc_pollset *pollset, grpc_closure *closure) { add_job(&pollset->idle_jobs, closure); } -void grpc_pollset_add_unlock_job(grpc_pollset *pollset, - grpc_iomgr_closure *closure) { +void grpc_pollset_add_unlock_job(grpc_pollset *pollset, grpc_closure *closure) { add_job(&pollset->unlock_jobs, closure); } @@ -316,7 +314,7 @@ typedef struct grpc_unary_promote_args { const grpc_pollset_vtable *original_vtable; grpc_pollset *pollset; grpc_fd *fd; - grpc_iomgr_closure promotion_closure; + grpc_closure promotion_closure; } grpc_unary_promote_args; static void basic_do_promote(void *args, int success) { diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 3f89095d40..4f09f870f7 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -67,8 +67,8 @@ typedef struct grpc_pollset { int kicked_without_pollers; void (*shutdown_done_cb)(void *arg); void *shutdown_done_arg; - grpc_iomgr_closure *unlock_jobs; - grpc_iomgr_closure *idle_jobs; + grpc_closure *unlock_jobs; + grpc_closure *idle_jobs; union { int fd; void *ptr; @@ -128,10 +128,8 @@ typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); extern grpc_poll_function_type grpc_poll_function; /** schedule a closure to be run next time there are no active workers */ -void grpc_pollset_add_idle_job(grpc_pollset *pollset, - grpc_iomgr_closure *closure); +void grpc_pollset_add_idle_job(grpc_pollset *pollset, grpc_closure *closure); /** schedule a closure to be run next time the pollset is unlocked */ -void grpc_pollset_add_unlock_job(grpc_pollset *pollset, - grpc_iomgr_closure *closure); +void grpc_pollset_add_unlock_job(grpc_pollset *pollset, grpc_closure *closure); #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */ diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h index 498921e0fd..45bc657225 100644 --- a/src/core/iomgr/socket_windows.h +++ b/src/core/iomgr/socket_windows.h @@ -91,7 +91,7 @@ typedef struct grpc_winsocket { This prevents that. */ int added_to_iocp; - grpc_iomgr_closure shutdown_closure; + grpc_closure shutdown_closure; /* A label for iomgr to track outstanding objects */ grpc_iomgr_object iomgr_object; diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 0df984c3e6..1ea2155060 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -64,7 +64,7 @@ typedef struct { gpr_timespec deadline; grpc_alarm alarm; int refs; - grpc_iomgr_closure write_closure; + grpc_closure write_closure; grpc_pollset_set *interested_parties; char *addr_str; } async_connect; diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index c539cf2d34..374d2f3a40 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -85,11 +85,11 @@ typedef struct { /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */ size_t outgoing_byte_idx; - grpc_iomgr_closure *read_cb; - grpc_iomgr_closure *write_cb; + grpc_closure *read_cb; + grpc_closure *write_cb; - grpc_iomgr_closure read_closure; - grpc_iomgr_closure write_closure; + grpc_closure read_closure; + grpc_closure write_closure; char *peer_string; } grpc_tcp; @@ -145,7 +145,7 @@ static void tcp_destroy(grpc_endpoint *ep) { } static void call_read_cb(grpc_tcp *tcp, int success) { - grpc_iomgr_closure *cb = tcp->read_cb; + grpc_closure *cb = tcp->read_cb; if (grpc_tcp_trace) { size_t i; @@ -250,7 +250,7 @@ static void tcp_handle_read(void *arg /* grpc_tcp */, int success) { static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep, gpr_slice_buffer *incoming_buffer, - grpc_iomgr_closure *cb) { + grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; @@ -350,7 +350,7 @@ static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) { static void tcp_handle_write(void *arg /* grpc_tcp */, int success) { grpc_tcp *tcp = (grpc_tcp *)arg; grpc_endpoint_op_status status; - grpc_iomgr_closure *cb; + grpc_closure *cb; if (!success) { cb = tcp->write_cb; @@ -375,7 +375,7 @@ static void tcp_handle_write(void *arg /* grpc_tcp */, int success) { static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep, gpr_slice_buffer *buf, - grpc_iomgr_closure *cb) { + grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_endpoint_op_status status; diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 0b79678e9d..213b2e1113 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -84,8 +84,8 @@ typedef struct { struct sockaddr_un un; } addr; size_t addr_len; - grpc_iomgr_closure read_closure; - grpc_iomgr_closure destroyed_closure; + grpc_closure read_closure; + grpc_closure destroyed_closure; } server_port; static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) { diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index fe3673c607..e361d22dc3 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -82,8 +82,8 @@ typedef struct grpc_tcp { /* Refcounting how many operations are in progress. */ gpr_refcount refcount; - grpc_iomgr_closure *read_cb; - grpc_iomgr_closure *write_cb; + grpc_closure *read_cb; + grpc_closure *write_cb; gpr_slice read_slice; gpr_slice_buffer *write_slices; gpr_slice_buffer *read_slices; @@ -169,7 +169,7 @@ static int on_read(grpc_tcp *tcp, int success) { static void on_read_cb(void *tcpp, int from_iocp) { grpc_tcp *tcp = tcpp; - grpc_iomgr_closure *cb = tcp->read_cb; + grpc_closure *cb = tcp->read_cb; int success = on_read(tcp, from_iocp); tcp->read_cb = NULL; TCP_UNREF(tcp, "read"); @@ -180,7 +180,7 @@ static void on_read_cb(void *tcpp, int from_iocp) { static grpc_endpoint_op_status win_read(grpc_endpoint *ep, gpr_slice_buffer *read_slices, - grpc_iomgr_closure *cb) { + grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_winsocket *handle = tcp->socket; grpc_winsocket_callback_info *info = &handle->read_info; @@ -241,7 +241,7 @@ static void on_write(void *tcpp, int success) { grpc_tcp *tcp = (grpc_tcp *)tcpp; grpc_winsocket *handle = tcp->socket; grpc_winsocket_callback_info *info = &handle->write_info; - grpc_iomgr_closure *cb; + grpc_closure *cb; int do_abort = 0; gpr_mu_lock(&tcp->mu); @@ -269,7 +269,7 @@ static void on_write(void *tcpp, int success) { /* Initiates a write. */ static grpc_endpoint_op_status win_write(grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_iomgr_closure *cb) { + grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_winsocket *socket = tcp->socket; grpc_winsocket_callback_info *info = &socket->write_info; diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index 30957f8dee..1a1d812050 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -79,8 +79,8 @@ typedef struct { struct sockaddr_un un; } addr; size_t addr_len; - grpc_iomgr_closure read_closure; - grpc_iomgr_closure destroyed_closure; + grpc_closure read_closure; + grpc_closure destroyed_closure; grpc_udp_server_read_cb read_cb; } server_port; diff --git a/src/core/iomgr/workqueue.h b/src/core/iomgr/workqueue.h index 124f294a23..6f09399b55 100644 --- a/src/core/iomgr/workqueue.h +++ b/src/core/iomgr/workqueue.h @@ -52,7 +52,7 @@ typedef struct grpc_workqueue grpc_workqueue; /** Create a work queue */ grpc_workqueue *grpc_workqueue_create(void); -void grpc_workqueue_flush(grpc_workqueue *workqueue, int asynchronously); +void grpc_workqueue_flush(grpc_workqueue *workqueue); #define GRPC_WORKQUEUE_REFCOUNT_DEBUG #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG @@ -76,7 +76,7 @@ void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue, grpc_pollset *pollset); /** Add a work item to a workqueue */ -void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_iomgr_closure *closure, +void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure, int success); #endif diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index ead8c50d8e..85b541f4d2 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -41,7 +41,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/thd.h> +#include <grpc/support/useful.h> #include "src/core/iomgr/fd_posix.h" @@ -52,71 +52,19 @@ grpc_workqueue *grpc_workqueue_create(void) { grpc_workqueue *workqueue = gpr_malloc(sizeof(grpc_workqueue)); gpr_ref_init(&workqueue->refs, 1); gpr_mu_init(&workqueue->mu); - workqueue->head.next = NULL; - workqueue->tail = &workqueue->head; + workqueue->call_list.head = workqueue->call_list.tail = NULL; grpc_wakeup_fd_init(&workqueue->wakeup_fd); sprintf(name, "workqueue:%p", (void *)workqueue); workqueue->wakeup_read_fd = NULL; /* inspected during grpc_fd_create below */ workqueue->wakeup_read_fd = grpc_fd_create( GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), workqueue, name); - grpc_iomgr_closure_init(&workqueue->read_closure, on_readable, workqueue); + grpc_closure_init(&workqueue->read_closure, on_readable, workqueue); grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure); return workqueue; } -static void shutdown_thread(void *arg) { - grpc_iomgr_closure *todo = arg; - - while (todo) { - grpc_iomgr_closure *next = todo->next; - todo->cb(todo->cb_arg, todo->success); - todo = next; - } -} - -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -static size_t count_waiting(grpc_workqueue *workqueue) { - size_t i = 0; - grpc_iomgr_closure *c; - for (c = workqueue->head.next; c; c = c->next) { - i++; - } - return i; -} -#endif - -void grpc_workqueue_flush(grpc_workqueue *workqueue, int asynchronously) { - grpc_iomgr_closure *todo; - gpr_thd_id thd; - - gpr_mu_lock(&workqueue->mu); -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG - if (workqueue->head.next) { - gpr_log(GPR_DEBUG, "WORKQUEUE:%p flush %d objects %s", workqueue, - count_waiting(workqueue), - asynchronously ? "asynchronously" : "synchronously"); - } -#endif - todo = workqueue->head.next; - workqueue->head.next = NULL; - workqueue->tail = &workqueue->head; - gpr_mu_unlock(&workqueue->mu); - - if (todo != NULL) { - if (asynchronously) { - gpr_thd_new(&thd, shutdown_thread, todo, NULL); - } else { - while (todo) { - grpc_iomgr_closure *next = todo->next; - todo->cb(todo->cb_arg, todo->success); - todo = next; - } - } - } -} - static void workqueue_destroy(grpc_workqueue *workqueue) { - GPR_ASSERT(workqueue->tail == &workqueue->head); + GPR_ASSERT(grpc_call_list_empty(workqueue->call_list)); grpc_fd_shutdown(workqueue->wakeup_read_fd); } @@ -151,9 +99,16 @@ void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue, grpc_pollset_add_fd(pollset, workqueue->wakeup_read_fd); } +void grpc_workqueue_flush(grpc_workqueue *workqueue) { + grpc_call_list todo = GRPC_CALL_LIST_INIT; + gpr_mu_lock(&workqueue->mu); + GPR_SWAP(grpc_call_list, todo, workqueue->call_list); + gpr_mu_unlock(&workqueue->mu); + grpc_call_list_run(todo); +} + static void on_readable(void *arg, int success) { grpc_workqueue *workqueue = arg; - grpc_iomgr_closure *todo; if (!success) { gpr_mu_destroy(&workqueue->mu); @@ -162,42 +117,26 @@ static void on_readable(void *arg, int success) { grpc_wakeup_fd_destroy(&workqueue->wakeup_fd); grpc_fd_orphan(workqueue->wakeup_read_fd, NULL, "destroy"); gpr_free(workqueue); - return; } else { + grpc_call_list todo = GRPC_CALL_LIST_INIT; gpr_mu_lock(&workqueue->mu); -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG - gpr_log(GPR_DEBUG, "WORKQUEUE:%p %d objects", workqueue, - count_waiting(workqueue)); -#endif - todo = workqueue->head.next; - workqueue->head.next = NULL; - workqueue->tail = &workqueue->head; + GPR_SWAP(grpc_call_list, todo, workqueue->call_list); grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); gpr_mu_unlock(&workqueue->mu); grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure); - - while (todo) { - grpc_iomgr_closure *next = todo->next; - todo->cb(todo->cb_arg, todo->success); - todo = next; - } + grpc_call_list_run(todo); } } -void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_iomgr_closure *closure, +void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure, int success) { closure->success = success; closure->next = NULL; gpr_mu_lock(&workqueue->mu); - if (workqueue->tail == &workqueue->head) { + if (grpc_call_list_empty(workqueue->call_list)) { grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); } - workqueue->tail->next = closure; - workqueue->tail = closure; -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG - gpr_log(GPR_DEBUG, "WORKQUEUE:%p %d objects", workqueue, - count_waiting(workqueue)); -#endif + grpc_call_list_add(&workqueue->call_list, closure, success); gpr_mu_unlock(&workqueue->mu); } diff --git a/src/core/iomgr/workqueue_posix.h b/src/core/iomgr/workqueue_posix.h index 1b3a0e281b..22c48c4926 100644 --- a/src/core/iomgr/workqueue_posix.h +++ b/src/core/iomgr/workqueue_posix.h @@ -40,13 +40,12 @@ struct grpc_workqueue { gpr_refcount refs; gpr_mu mu; - grpc_iomgr_closure head; - grpc_iomgr_closure *tail; + grpc_call_list call_list; grpc_wakeup_fd wakeup_fd; struct grpc_fd *wakeup_read_fd; - grpc_iomgr_closure read_closure; + grpc_closure read_closure; }; #endif /* GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H */ |