aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c21
-rw-r--r--src/core/lib/iomgr/tcp_posix.c2
-rw-r--r--src/core/lib/iomgr/workqueue.h9
-rw-r--r--src/core/lib/iomgr/workqueue_posix.c6
-rw-r--r--src/core/lib/iomgr/workqueue_posix.h4
-rw-r--r--src/core/lib/iomgr/workqueue_windows.c10
6 files changed, 17 insertions, 35 deletions
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index af23e35ca3..e5aab293bd 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -253,13 +253,14 @@ struct grpc_pollset_set {
* Common helpers
*/
-static void append_error(grpc_error **composite, grpc_error *error,
+static bool append_error(grpc_error **composite, grpc_error *error,
const char *desc) {
- if (error == GRPC_ERROR_NONE) return;
+ if (error == GRPC_ERROR_NONE) return true;
if (*composite == GRPC_ERROR_NONE) {
*composite = GRPC_ERROR_CREATE(desc);
}
*composite = grpc_error_add_child(*composite, error);
+ return false;
}
/*******************************************************************************
@@ -490,16 +491,18 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
if (initial_fd != NULL) {
- /* Lock the polling island here just in case we got this structure from
- the freelist and the polling island lock was not released yet (by the
- code that adds the polling island to the freelist) */
- gpr_mu_lock(&pi->mu);
polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
- gpr_mu_unlock(&pi->mu);
}
- append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue),
- err_desc);
+ if (append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue),
+ err_desc) &&
+ *error == GRPC_ERROR_NONE) {
+ polling_island_add_fds_locked(pi, &pi->workqueue->wakeup_read_fd, 1, true,
+ error);
+ GPR_ASSERT(pi->workqueue->wakeup_read_fd->polling_island == NULL);
+ pi->workqueue->wakeup_read_fd->polling_island = pi;
+ PI_ADD_REF(pi, 1);
+ }
done:
if (*error != GRPC_ERROR_NONE) {
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index b5bac152fb..ec21e03944 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -284,7 +284,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
}
/* returns true if done, false if pending; if returning true, *error is set */
-#define MAX_WRITE_IOVEC 16
+#define MAX_WRITE_IOVEC 1024
static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
struct msghdr msg;
struct iovec iov[MAX_WRITE_IOVEC];
diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h
index 498b7a300a..064a551ed0 100644
--- a/src/core/lib/iomgr/workqueue.h
+++ b/src/core/lib/iomgr/workqueue.h
@@ -49,10 +49,6 @@
/* grpc_workqueue is forward declared in exec_ctx.h */
-/** Create a work queue */
-grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
- grpc_workqueue **workqueue);
-
void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
#define GRPC_WORKQUEUE_REFCOUNT_DEBUG
@@ -72,11 +68,6 @@ void grpc_workqueue_ref(grpc_workqueue *workqueue);
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
#endif
-/** Bind this workqueue to a pollset */
-void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue,
- grpc_pollset *pollset);
-
/** Add a work item to a workqueue */
void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
grpc_closure *closure, grpc_error *error);
diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c
index 45e0f6063b..fef1709954 100644
--- a/src/core/lib/iomgr/workqueue_posix.c
+++ b/src/core/lib/iomgr/workqueue_posix.c
@@ -100,12 +100,6 @@ void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
}
}
-void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue,
- grpc_pollset *pollset) {
- grpc_pollset_add_fd(exec_ctx, pollset, workqueue->wakeup_read_fd);
-}
-
void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
gpr_mu_lock(&workqueue->mu);
grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
diff --git a/src/core/lib/iomgr/workqueue_posix.h b/src/core/lib/iomgr/workqueue_posix.h
index dcb47e7b59..2e8aca1816 100644
--- a/src/core/lib/iomgr/workqueue_posix.h
+++ b/src/core/lib/iomgr/workqueue_posix.h
@@ -50,4 +50,8 @@ struct grpc_workqueue {
grpc_closure read_closure;
};
+/** Create a work queue */
+grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
+ grpc_workqueue **workqueue);
+
#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H */
diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c
index 52a3e47f37..23e2dea185 100644
--- a/src/core/lib/iomgr/workqueue_windows.c
+++ b/src/core/lib/iomgr/workqueue_windows.c
@@ -42,12 +42,6 @@
// context, which is at least correct, if not performant or in the spirit of
// workqueues.
-grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
- grpc_workqueue **workqueue) {
- *workqueue = (grpc_workqueue *)1;
- return GRPC_ERROR_NONE;
-}
-
void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
@@ -60,10 +54,6 @@ void grpc_workqueue_ref(grpc_workqueue *workqueue) {}
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
#endif
-void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue,
- grpc_pollset *pollset) {}
-
void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
grpc_closure *closure, grpc_error *error) {
grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);