diff options
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r-- | src/core/lib/iomgr/ev_epoll_linux.c | 21 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/workqueue.h | 9 | ||||
-rw-r--r-- | src/core/lib/iomgr/workqueue_posix.c | 6 | ||||
-rw-r--r-- | src/core/lib/iomgr/workqueue_posix.h | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/workqueue_windows.c | 10 |
6 files changed, 17 insertions, 35 deletions
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index af23e35ca3..e5aab293bd 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -253,13 +253,14 @@ struct grpc_pollset_set { * Common helpers */ -static void append_error(grpc_error **composite, grpc_error *error, +static bool append_error(grpc_error **composite, grpc_error *error, const char *desc) { - if (error == GRPC_ERROR_NONE) return; + if (error == GRPC_ERROR_NONE) return true; if (*composite == GRPC_ERROR_NONE) { *composite = GRPC_ERROR_CREATE(desc); } *composite = grpc_error_add_child(*composite, error); + return false; } /******************************************************************************* @@ -490,16 +491,18 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error); if (initial_fd != NULL) { - /* Lock the polling island here just in case we got this structure from - the freelist and the polling island lock was not released yet (by the - code that adds the polling island to the freelist) */ - gpr_mu_lock(&pi->mu); polling_island_add_fds_locked(pi, &initial_fd, 1, true, error); - gpr_mu_unlock(&pi->mu); } - append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue), - err_desc); + if (append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue), + err_desc) && + *error == GRPC_ERROR_NONE) { + polling_island_add_fds_locked(pi, &pi->workqueue->wakeup_read_fd, 1, true, + error); + GPR_ASSERT(pi->workqueue->wakeup_read_fd->polling_island == NULL); + pi->workqueue->wakeup_read_fd->polling_island = pi; + PI_ADD_REF(pi, 1); + } done: if (*error != GRPC_ERROR_NONE) { diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index b5bac152fb..ec21e03944 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -284,7 +284,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } /* returns true if done, false if pending; if returning true, *error is set */ -#define MAX_WRITE_IOVEC 16 +#define MAX_WRITE_IOVEC 1024 static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) { struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h index 498b7a300a..064a551ed0 100644 --- a/src/core/lib/iomgr/workqueue.h +++ b/src/core/lib/iomgr/workqueue.h @@ -49,10 +49,6 @@ /* grpc_workqueue is forward declared in exec_ctx.h */ -/** Create a work queue */ -grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, - grpc_workqueue **workqueue); - void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); #define GRPC_WORKQUEUE_REFCOUNT_DEBUG @@ -72,11 +68,6 @@ void grpc_workqueue_ref(grpc_workqueue *workqueue); void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); #endif -/** Bind this workqueue to a pollset */ -void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue, - grpc_pollset *pollset); - /** Add a work item to a workqueue */ void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, grpc_closure *closure, grpc_error *error); diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index 45e0f6063b..fef1709954 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -100,12 +100,6 @@ void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { } } -void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue, - grpc_pollset *pollset) { - grpc_pollset_add_fd(exec_ctx, pollset, workqueue->wakeup_read_fd); -} - void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { gpr_mu_lock(&workqueue->mu); grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); diff --git a/src/core/lib/iomgr/workqueue_posix.h b/src/core/lib/iomgr/workqueue_posix.h index dcb47e7b59..2e8aca1816 100644 --- a/src/core/lib/iomgr/workqueue_posix.h +++ b/src/core/lib/iomgr/workqueue_posix.h @@ -50,4 +50,8 @@ struct grpc_workqueue { grpc_closure read_closure; }; +/** Create a work queue */ +grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, + grpc_workqueue **workqueue); + #endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H */ diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c index 52a3e47f37..23e2dea185 100644 --- a/src/core/lib/iomgr/workqueue_windows.c +++ b/src/core/lib/iomgr/workqueue_windows.c @@ -42,12 +42,6 @@ // context, which is at least correct, if not performant or in the spirit of // workqueues. -grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, - grpc_workqueue **workqueue) { - *workqueue = (grpc_workqueue *)1; - return GRPC_ERROR_NONE; -} - void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG @@ -60,10 +54,6 @@ void grpc_workqueue_ref(grpc_workqueue *workqueue) {} void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} #endif -void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue, - grpc_pollset *pollset) {} - void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, grpc_closure *closure, grpc_error *error) { grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); |