aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-07-01 14:57:26 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-07-01 14:57:26 -0700
commiteea32bb24535c79ec4ee1bbad452858c131c8df0 (patch)
tree07589426e916a8d9b7fa7328b1aa2f5582e8f76b /src/core/lib/iomgr
parent0ab1ec894e80af1db485a8ee3706a9b709af1208 (diff)
parentf975f74c016d5850d10644c4865e76e05e4b7815 (diff)
Merge branch 'iq2-pi' into delayed-write
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c21
-rw-r--r--src/core/lib/iomgr/tcp_posix.c2
-rw-r--r--src/core/lib/iomgr/workqueue.h12
-rw-r--r--src/core/lib/iomgr/workqueue_posix.c12
-rw-r--r--src/core/lib/iomgr/workqueue_posix.h4
-rw-r--r--src/core/lib/iomgr/workqueue_windows.c10
6 files changed, 17 insertions, 44 deletions
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index af23e35ca3..e5aab293bd 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -253,13 +253,14 @@ struct grpc_pollset_set {
* Common helpers
*/
-static void append_error(grpc_error **composite, grpc_error *error,
+static bool append_error(grpc_error **composite, grpc_error *error,
const char *desc) {
- if (error == GRPC_ERROR_NONE) return;
+ if (error == GRPC_ERROR_NONE) return true;
if (*composite == GRPC_ERROR_NONE) {
*composite = GRPC_ERROR_CREATE(desc);
}
*composite = grpc_error_add_child(*composite, error);
+ return false;
}
/*******************************************************************************
@@ -490,16 +491,18 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
if (initial_fd != NULL) {
- /* Lock the polling island here just in case we got this structure from
- the freelist and the polling island lock was not released yet (by the
- code that adds the polling island to the freelist) */
- gpr_mu_lock(&pi->mu);
polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
- gpr_mu_unlock(&pi->mu);
}
- append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue),
- err_desc);
+ if (append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue),
+ err_desc) &&
+ *error == GRPC_ERROR_NONE) {
+ polling_island_add_fds_locked(pi, &pi->workqueue->wakeup_read_fd, 1, true,
+ error);
+ GPR_ASSERT(pi->workqueue->wakeup_read_fd->polling_island == NULL);
+ pi->workqueue->wakeup_read_fd->polling_island = pi;
+ PI_ADD_REF(pi, 1);
+ }
done:
if (*error != GRPC_ERROR_NONE) {
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index b5bac152fb..ec21e03944 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -284,7 +284,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
}
/* returns true if done, false if pending; if returning true, *error is set */
-#define MAX_WRITE_IOVEC 16
+#define MAX_WRITE_IOVEC 1024
static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
struct msghdr msg;
struct iovec iov[MAX_WRITE_IOVEC];
diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h
index 88c2b513f4..b4bd5742dd 100644
--- a/src/core/lib/iomgr/workqueue.h
+++ b/src/core/lib/iomgr/workqueue.h
@@ -50,10 +50,6 @@
/* grpc_workqueue is forward declared in exec_ctx.h */
-/** Create a work queue */
-grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
- grpc_workqueue **workqueue);
-
void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
/*#define GRPC_WORKQUEUE_REFCOUNT_DEBUG*/
@@ -73,14 +69,6 @@ void grpc_workqueue_ref(grpc_workqueue *workqueue);
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
#endif
-/** Bind this workqueue to a pollset */
-void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue,
- grpc_pollset *pollset);
-void grpc_workqueue_add_to_pollset_set(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue,
- grpc_pollset_set *pollset_set);
-
/** Add a work item to a workqueue */
void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
grpc_closure *closure, grpc_error *error);
diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c
index bcbc2699be..e0d6dac230 100644
--- a/src/core/lib/iomgr/workqueue_posix.c
+++ b/src/core/lib/iomgr/workqueue_posix.c
@@ -100,18 +100,6 @@ void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
}
}
-void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue,
- grpc_pollset *pollset) {
- grpc_pollset_add_fd(exec_ctx, pollset, workqueue->wakeup_read_fd);
-}
-
-void grpc_workqueue_add_to_pollset_set(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue,
- grpc_pollset_set *pollset_set) {
- grpc_pollset_set_add_fd(exec_ctx, pollset_set, workqueue->wakeup_read_fd);
-}
-
void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
gpr_mu_lock(&workqueue->mu);
grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
diff --git a/src/core/lib/iomgr/workqueue_posix.h b/src/core/lib/iomgr/workqueue_posix.h
index dcb47e7b59..2e8aca1816 100644
--- a/src/core/lib/iomgr/workqueue_posix.h
+++ b/src/core/lib/iomgr/workqueue_posix.h
@@ -50,4 +50,8 @@ struct grpc_workqueue {
grpc_closure read_closure;
};
+/** Create a work queue */
+grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
+ grpc_workqueue **workqueue);
+
#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H */
diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c
index 52a3e47f37..23e2dea185 100644
--- a/src/core/lib/iomgr/workqueue_windows.c
+++ b/src/core/lib/iomgr/workqueue_windows.c
@@ -42,12 +42,6 @@
// context, which is at least correct, if not performant or in the spirit of
// workqueues.
-grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
- grpc_workqueue **workqueue) {
- *workqueue = (grpc_workqueue *)1;
- return GRPC_ERROR_NONE;
-}
-
void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
@@ -60,10 +54,6 @@ void grpc_workqueue_ref(grpc_workqueue *workqueue) {}
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
#endif
-void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue,
- grpc_pollset *pollset) {}
-
void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
grpc_closure *closure, grpc_error *error) {
grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);