aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-08-13 10:53:06 -0700
committerGravatar Yash Tibrewal <yashkt@google.com>2018-08-13 10:53:06 -0700
commit556775d7c78125e051c6571d4c528de214024c23 (patch)
tree77d3e0effdb58b01e9e257a8dcd03a024acedf31 /src/core/lib/iomgr
parent0ec6973b743e8e1f463bdcc8691e1869097c92f0 (diff)
parentba7ca9742eb7484009b75c268dba7e56b8feaebb (diff)
Merge master
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/call_combiner.h5
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc22
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc31
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.cc38
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc67
-rw-r--r--src/core/lib/iomgr/ev_posix.cc6
-rw-r--r--src/core/lib/iomgr/ev_posix.h20
-rw-r--r--src/core/lib/iomgr/executor.cc220
-rw-r--r--src/core/lib/iomgr/executor.h45
-rw-r--r--src/core/lib/iomgr/iocp_windows.cc13
-rw-r--r--src/core/lib/iomgr/iomgr_posix_cfstream.cc75
-rw-r--r--src/core/lib/iomgr/lockfree_event.cc6
-rw-r--r--src/core/lib/iomgr/port.h5
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.cc5
-rw-r--r--src/core/lib/iomgr/resolve_address_windows.cc5
-rw-r--r--src/core/lib/iomgr/socket_windows.cc33
-rw-r--r--src/core/lib/iomgr/socket_windows.h6
-rw-r--r--src/core/lib/iomgr/tcp_client_cfstream.cc2
-rw-r--r--src/core/lib/iomgr/tcp_posix.cc13
-rw-r--r--src/core/lib/iomgr/tcp_windows.cc4
-rw-r--r--src/core/lib/iomgr/tcp_windows.h2
21 files changed, 418 insertions, 205 deletions
diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h
index 641fa18082..6f7ddd4043 100644
--- a/src/core/lib/iomgr/call_combiner.h
+++ b/src/core/lib/iomgr/call_combiner.h
@@ -102,7 +102,10 @@ void grpc_call_combiner_stop(grpc_call_combiner* call_combiner,
/// If \a closure is NULL, then no closure will be invoked on
/// cancellation; this effectively unregisters the previously set closure.
/// However, most filters will not need to explicitly unregister their
-/// callbacks, as this is done automatically when the call is destroyed.
+/// callbacks, as this is done automatically when the call is destroyed. Filters
+/// that schedule the cancellation closure on ExecCtx do not need to take a ref
+/// on the call stack to guarantee closure liveness. This is done by explicitly
+/// flushing ExecCtx after the unregistration during call destruction.
void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
grpc_closure* closure);
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index 86a0243d2e..66e0f1fd6d 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -140,10 +140,6 @@ struct grpc_fd {
struct grpc_fd* freelist_next;
- /* The pollset that last noticed that the fd is readable. The actual type
- * stored in this is (grpc_pollset *) */
- gpr_atm read_notifier_pollset;
-
grpc_iomgr_object iomgr_object;
};
@@ -293,7 +289,6 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
new_fd->read_closure->InitEvent();
new_fd->write_closure->InitEvent();
new_fd->error_closure->InitEvent();
- gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
@@ -376,11 +371,6 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
gpr_mu_unlock(&fd_freelist_mu);
}
-static grpc_pollset* fd_get_read_notifier_pollset(grpc_fd* fd) {
- gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
- return (grpc_pollset*)notifier;
-}
-
static bool fd_is_shutdown(grpc_fd* fd) {
return fd->read_closure->IsShutdown();
}
@@ -397,11 +387,7 @@ static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
fd->error_closure->NotifyOn(closure);
}
-static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
- fd->read_closure->SetReady();
- /* Use release store to match with acquire load in fd_get_read_notifier */
- gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
-}
+static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
@@ -642,7 +628,7 @@ static grpc_error* process_epoll_events(grpc_pollset* pollset) {
}
if (read_ev || cancel || err_fallback) {
- fd_become_readable(fd, pollset);
+ fd_become_readable(fd);
}
if (write_ev || cancel || err_fallback) {
@@ -1217,8 +1203,10 @@ static const grpc_event_engine_vtable vtable = {
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
+ fd_become_readable,
+ fd_become_writable,
+ fd_has_errors,
fd_is_shutdown,
- fd_get_read_notifier_pollset,
pollset_init,
pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 7b368410cf..96eae30345 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -135,7 +135,7 @@ struct pollable {
// underlying epoll set (i.e whenever fd_orphan() is called).
//
// Implementing (2) above (i.e removing fds from cache on fd_orphan) adds a
- // lot of complexity since an fd can be present in multiple pollalbles. So our
+ // lot of complexity since an fd can be present in multiple pollables. So our
// implementation ONLY DOES (1) and NOT (2).
//
// The cache_fd.salt variable helps here to maintain correctness (it serves as
@@ -220,10 +220,6 @@ struct grpc_fd {
struct grpc_fd* freelist_next;
grpc_closure* on_done_closure;
- // The pollset that last noticed that the fd is readable. The actual type
- // stored in this is (grpc_pollset *)
- gpr_atm read_notifier_pollset;
-
grpc_iomgr_object iomgr_object;
// Do we need to track EPOLLERR events separately?
@@ -353,7 +349,6 @@ static void invalidate_fd(grpc_fd* fd) {
memset(&fd->pollable_mu, -1, sizeof(fd->pollable_mu));
fd->pollable_obj = nullptr;
fd->on_done_closure = nullptr;
- gpr_atm_no_barrier_store(&fd->read_notifier_pollset, 0);
memset(&fd->iomgr_object, -1, sizeof(fd->iomgr_object));
fd->track_err = false;
}
@@ -445,7 +440,6 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
new_fd->error_closure->InitEvent();
new_fd->freelist_next = nullptr;
new_fd->on_done_closure = nullptr;
- gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
char* fd_name;
gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
@@ -514,11 +508,6 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
UNREF_BY(fd, 2, reason); /* Drop the reference */
}
-static grpc_pollset* fd_get_read_notifier_pollset(grpc_fd* fd) {
- gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
- return (grpc_pollset*)notifier;
-}
-
static bool fd_is_shutdown(grpc_fd* fd) {
return fd->read_closure->IsShutdown();
}
@@ -875,17 +864,7 @@ static int poll_deadline_to_millis_timeout(grpc_millis millis) {
return static_cast<int>(delta);
}
-static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
- fd->read_closure->SetReady();
-
- /* Note, it is possible that fd_become_readable might be called twice with
- different 'notifier's when an fd becomes readable and it is in two epoll
- sets (This can happen briefly during polling island merges). In such cases
- it does not really matter which notifer is set as the read_notifier_pollset
- (They would both point to the same polling island anyway) */
- /* Use release store to match with acquire load in fd_get_read_notifier */
- gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
-}
+static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
@@ -983,7 +962,7 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset,
fd_has_errors(fd);
}
if (read_ev || cancel || err_fallback) {
- fd_become_readable(fd, pollset);
+ fd_become_readable(fd);
}
if (write_ev || cancel || err_fallback) {
fd_become_writable(fd);
@@ -1636,8 +1615,10 @@ static const grpc_event_engine_vtable vtable = {
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
+ fd_become_readable,
+ fd_become_writable,
+ fd_has_errors,
fd_is_shutdown,
- fd_get_read_notifier_pollset,
pollset_init,
pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc
index 2189801c18..5695ac795d 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.cc
+++ b/src/core/lib/iomgr/ev_epollsig_linux.cc
@@ -137,10 +137,6 @@ struct grpc_fd {
struct grpc_fd* freelist_next;
grpc_closure* on_done_closure;
- /* The pollset that last noticed that the fd is readable. The actual type
- * stored in this is (grpc_pollset *) */
- gpr_atm read_notifier_pollset;
-
grpc_iomgr_object iomgr_object;
/* Do we need to track EPOLLERR events separately? */
@@ -845,7 +841,6 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
new_fd->write_closure->InitEvent();
new_fd->error_closure->InitEvent();
new_fd->track_err = track_err;
- gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
new_fd->on_done_closure = nullptr;
@@ -927,11 +922,6 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
GRPC_ERROR_UNREF(error);
}
-static grpc_pollset* fd_get_read_notifier_pollset(grpc_fd* fd) {
- gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
- return (grpc_pollset*)notifier;
-}
-
static bool fd_is_shutdown(grpc_fd* fd) {
return fd->read_closure->IsShutdown();
}
@@ -958,6 +948,12 @@ static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
fd->error_closure->NotifyOn(closure);
}
+static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
+
+static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
+
+static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
+
/*******************************************************************************
* Pollset Definitions
*/
@@ -1115,22 +1111,6 @@ static int poll_deadline_to_millis_timeout(grpc_millis millis) {
return static_cast<int>(delta);
}
-static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
- fd->read_closure->SetReady();
-
- /* Note, it is possible that fd_become_readable might be called twice with
- different 'notifier's when an fd becomes readable and it is in two epoll
- sets (This can happen briefly during polling island merges). In such cases
- it does not really matter which notifer is set as the read_notifier_pollset
- (They would both point to the same polling island anyway) */
- /* Use release store to match with acquire load in fd_get_read_notifier */
- gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
-}
-
-static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
-
-static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
-
static void pollset_release_polling_island(grpc_pollset* ps,
const char* reason) {
if (ps->po.pi != nullptr) {
@@ -1283,7 +1263,7 @@ static void pollset_work_and_unlock(grpc_pollset* pollset,
fd_has_errors(fd);
}
if (read_ev || cancel || err_fallback) {
- fd_become_readable(fd, pollset);
+ fd_become_readable(fd);
}
if (write_ev || cancel || err_fallback) {
fd_become_writable(fd);
@@ -1667,8 +1647,10 @@ static const grpc_event_engine_vtable vtable = {
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
+ fd_become_readable,
+ fd_become_writable,
+ fd_has_errors,
fd_is_shutdown,
- fd_get_read_notifier_pollset,
pollset_init,
pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index c9c09881a2..fb4c71ef71 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -108,9 +108,6 @@ struct grpc_fd {
grpc_closure* on_done_closure;
grpc_iomgr_object iomgr_object;
-
- /* The pollset that last noticed and notified that the fd is readable */
- grpc_pollset* read_notifier_pollset;
};
/* Begin polling on an fd.
@@ -131,8 +128,7 @@ static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
MUST NOT be called with a pollset lock taken
if got_read or got_write are 1, also does the become_{readable,writable} as
appropriate. */
-static void fd_end_poll(grpc_fd_watcher* rec, int got_read, int got_write,
- grpc_pollset* read_notifier_pollset);
+static void fd_end_poll(grpc_fd_watcher* rec, int got_read, int got_write);
/* Return 1 if this fd is orphaned, 0 otherwise */
static bool fd_is_orphaned(grpc_fd* fd);
@@ -346,7 +342,6 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
r->closed = 0;
r->released = 0;
gpr_atm_no_barrier_store(&r->pollhup, 0);
- r->read_notifier_pollset = nullptr;
char* name2;
gpr_asprintf(&name2, "%s fd=%d", name, fd);
@@ -359,17 +354,6 @@ static bool fd_is_orphaned(grpc_fd* fd) {
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
-/* Return the read-notifier pollset */
-static grpc_pollset* fd_get_read_notifier_pollset(grpc_fd* fd) {
- grpc_pollset* notifier = nullptr;
-
- gpr_mu_lock(&fd->mu);
- notifier = fd->read_notifier_pollset;
- gpr_mu_unlock(&fd->mu);
-
- return notifier;
-}
-
static grpc_error* pollset_kick_locked(grpc_fd_watcher* watcher) {
gpr_mu_lock(&watcher->pollset->mu);
GPR_ASSERT(watcher->worker);
@@ -512,11 +496,6 @@ static int set_ready_locked(grpc_fd* fd, grpc_closure** st) {
}
}
-static void set_read_notifier_pollset_locked(
- grpc_fd* fd, grpc_pollset* read_notifier_pollset) {
- fd->read_notifier_pollset = read_notifier_pollset;
-}
-
static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
gpr_mu_lock(&fd->mu);
/* only shutdown once */
@@ -553,8 +532,28 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
}
static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
- gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
- abort();
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
+ }
+ GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED);
+}
+
+static void fd_set_readable(grpc_fd* fd) {
+ gpr_mu_lock(&fd->mu);
+ set_ready_locked(fd, &fd->read_closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+static void fd_set_writable(grpc_fd* fd) {
+ gpr_mu_lock(&fd->mu);
+ set_ready_locked(fd, &fd->write_closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+static void fd_set_error(grpc_fd* fd) {
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
+ }
}
static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
@@ -608,8 +607,7 @@ static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
return mask;
}
-static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write,
- grpc_pollset* read_notifier_pollset) {
+static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write) {
int was_polling = 0;
int kick = 0;
grpc_fd* fd = watcher->fd;
@@ -645,9 +643,6 @@ static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write,
if (set_ready_locked(fd, &fd->read_closure)) {
kick = 1;
}
- if (read_notifier_pollset != nullptr) {
- set_read_notifier_pollset_locked(fd, read_notifier_pollset);
- }
}
if (got_write) {
if (set_ready_locked(fd, &fd->write_closure)) {
@@ -997,16 +992,16 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
for (i = 1; i < pfd_count; i++) {
if (watchers[i].fd == nullptr) {
- fd_end_poll(&watchers[i], 0, 0, nullptr);
+ fd_end_poll(&watchers[i], 0, 0);
} else {
// Wake up all the file descriptors, if we have an invalid one
// we can identify it on the next pollset_work()
- fd_end_poll(&watchers[i], 1, 1, pollset);
+ fd_end_poll(&watchers[i], 1, 1);
}
}
} else if (r == 0) {
for (i = 1; i < pfd_count; i++) {
- fd_end_poll(&watchers[i], 0, 0, nullptr);
+ fd_end_poll(&watchers[i], 0, 0);
}
} else {
if (pfds[0].revents & POLLIN_CHECK) {
@@ -1018,7 +1013,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
}
for (i = 1; i < pfd_count; i++) {
if (watchers[i].fd == nullptr) {
- fd_end_poll(&watchers[i], 0, 0, nullptr);
+ fd_end_poll(&watchers[i], 0, 0);
} else {
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_INFO, "%p got_event: %d r:%d w:%d [%d]", pollset,
@@ -1032,7 +1027,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1);
}
fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK,
- pfds[i].revents & POLLOUT_CHECK, pollset);
+ pfds[i].revents & POLLOUT_CHECK);
}
}
}
@@ -1723,8 +1718,10 @@ static const grpc_event_engine_vtable vtable = {
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
+ fd_set_readable,
+ fd_set_writable,
+ fd_set_error,
fd_is_shutdown,
- fd_get_read_notifier_pollset,
pollset_init,
pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index 6ca985cb22..2ce9b0f97d 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -239,6 +239,12 @@ void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
g_event_engine->fd_notify_on_error(fd, closure);
}
+void grpc_fd_set_readable(grpc_fd* fd) { g_event_engine->fd_set_readable(fd); }
+
+void grpc_fd_set_writable(grpc_fd* fd) { g_event_engine->fd_set_writable(fd); }
+
+void grpc_fd_set_error(grpc_fd* fd) { g_event_engine->fd_set_error(fd); }
+
static size_t pollset_size(void) { return g_event_engine->pollset_size; }
static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index b4c17fc80d..8d0bcc0710 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -51,8 +51,10 @@ typedef struct grpc_event_engine_vtable {
void (*fd_notify_on_read)(grpc_fd* fd, grpc_closure* closure);
void (*fd_notify_on_write)(grpc_fd* fd, grpc_closure* closure);
void (*fd_notify_on_error)(grpc_fd* fd, grpc_closure* closure);
+ void (*fd_set_readable)(grpc_fd* fd);
+ void (*fd_set_writable)(grpc_fd* fd);
+ void (*fd_set_error)(grpc_fd* fd);
bool (*fd_is_shutdown)(grpc_fd* fd);
- grpc_pollset* (*fd_get_read_notifier_pollset)(grpc_fd* fd);
void (*pollset_init)(grpc_pollset* pollset, gpr_mu** mu);
void (*pollset_shutdown)(grpc_pollset* pollset, grpc_closure* closure);
@@ -142,8 +144,20 @@ void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure);
* needs to have been set on grpc_fd_create */
void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure);
-/* Return the read notifier pollset from the fd */
-grpc_pollset* grpc_fd_get_read_notifier_pollset(grpc_fd* fd);
+/* Forcibly set the fd to be readable, resulting in the closure registered with
+ * grpc_fd_notify_on_read being invoked.
+ */
+void grpc_fd_set_readable(grpc_fd* fd);
+
+/* Forcibly set the fd to be writable, resulting in the closure registered with
+ * grpc_fd_notify_on_write being invoked.
+ */
+void grpc_fd_set_writable(grpc_fd* fd);
+
+/* Forcibly set the fd to have errored, resulting in the closure registered with
+ * grpc_fd_notify_on_error being invoked.
+ */
+void grpc_fd_set_error(grpc_fd* fd);
/* pollset_posix functions */
diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc
index 1ad13b831d..45d96b80eb 100644
--- a/src/core/lib/iomgr/executor.cc
+++ b/src/core/lib/iomgr/executor.cc
@@ -40,19 +40,25 @@
gpr_log(GPR_INFO, "EXECUTOR " format, __VA_ARGS__); \
}
+#define EXECUTOR_TRACE0(str) \
+ if (executor_trace.enabled()) { \
+ gpr_log(GPR_INFO, "EXECUTOR " str); \
+ }
+
grpc_core::TraceFlag executor_trace(false, "executor");
GPR_TLS_DECL(g_this_thread_state);
-GrpcExecutor::GrpcExecutor(const char* executor_name) : name_(executor_name) {
+GrpcExecutor::GrpcExecutor(const char* name) : name_(name) {
adding_thread_lock_ = GPR_SPINLOCK_STATIC_INITIALIZER;
- gpr_atm_no_barrier_store(&num_threads_, 0);
+ gpr_atm_rel_store(&num_threads_, 0);
max_threads_ = GPR_MAX(1, 2 * gpr_cpu_num_cores());
}
void GrpcExecutor::Init() { SetThreading(true); }
-size_t GrpcExecutor::RunClosures(grpc_closure_list list) {
+size_t GrpcExecutor::RunClosures(const char* executor_name,
+ grpc_closure_list list) {
size_t n = 0;
grpc_closure* c = list.head;
@@ -60,11 +66,11 @@ size_t GrpcExecutor::RunClosures(grpc_closure_list list) {
grpc_closure* next = c->next_data.next;
grpc_error* error = c->error_data.error;
#ifndef NDEBUG
- EXECUTOR_TRACE("run %p [created by %s:%d]", c, c->file_created,
- c->line_created);
+ EXECUTOR_TRACE("(%s) run %p [created by %s:%d]", executor_name, c,
+ c->file_created, c->line_created);
c->scheduled = false;
#else
- EXECUTOR_TRACE("run %p", c);
+ EXECUTOR_TRACE("(%s) run %p", executor_name, c);
#endif
c->cb(c->cb_arg, error);
GRPC_ERROR_UNREF(error);
@@ -77,17 +83,21 @@ size_t GrpcExecutor::RunClosures(grpc_closure_list list) {
}
bool GrpcExecutor::IsThreaded() const {
- return gpr_atm_no_barrier_load(&num_threads_) > 0;
+ return gpr_atm_acq_load(&num_threads_) > 0;
}
void GrpcExecutor::SetThreading(bool threading) {
- gpr_atm curr_num_threads = gpr_atm_no_barrier_load(&num_threads_);
+ gpr_atm curr_num_threads = gpr_atm_acq_load(&num_threads_);
+ EXECUTOR_TRACE("(%s) SetThreading(%d) begin", name_, threading);
if (threading) {
- if (curr_num_threads > 0) return;
+ if (curr_num_threads > 0) {
+ EXECUTOR_TRACE("(%s) SetThreading(true). curr_num_threads == 0", name_);
+ return;
+ }
GPR_ASSERT(num_threads_ == 0);
- gpr_atm_no_barrier_store(&num_threads_, 1);
+ gpr_atm_rel_store(&num_threads_, 1);
gpr_tls_init(&g_this_thread_state);
thd_state_ = static_cast<ThreadState*>(
gpr_zalloc(sizeof(ThreadState) * max_threads_));
@@ -96,6 +106,7 @@ void GrpcExecutor::SetThreading(bool threading) {
gpr_mu_init(&thd_state_[i].mu);
gpr_cv_init(&thd_state_[i].cv);
thd_state_[i].id = i;
+ thd_state_[i].name = name_;
thd_state_[i].thd = grpc_core::Thread();
thd_state_[i].elems = GRPC_CLOSURE_LIST_INIT;
}
@@ -104,7 +115,10 @@ void GrpcExecutor::SetThreading(bool threading) {
grpc_core::Thread(name_, &GrpcExecutor::ThreadMain, &thd_state_[0]);
thd_state_[0].thd.Start();
} else { // !threading
- if (curr_num_threads == 0) return;
+ if (curr_num_threads == 0) {
+ EXECUTOR_TRACE("(%s) SetThreading(false). curr_num_threads == 0", name_);
+ return;
+ }
for (size_t i = 0; i < max_threads_; i++) {
gpr_mu_lock(&thd_state_[i].mu);
@@ -121,20 +135,22 @@ void GrpcExecutor::SetThreading(bool threading) {
curr_num_threads = gpr_atm_no_barrier_load(&num_threads_);
for (gpr_atm i = 0; i < curr_num_threads; i++) {
thd_state_[i].thd.Join();
- EXECUTOR_TRACE(" Thread %" PRIdPTR " of %" PRIdPTR " joined", i,
- curr_num_threads);
+ EXECUTOR_TRACE("(%s) Thread %" PRIdPTR " of %" PRIdPTR " joined", name_,
+ i + 1, curr_num_threads);
}
- gpr_atm_no_barrier_store(&num_threads_, 0);
+ gpr_atm_rel_store(&num_threads_, 0);
for (size_t i = 0; i < max_threads_; i++) {
gpr_mu_destroy(&thd_state_[i].mu);
gpr_cv_destroy(&thd_state_[i].cv);
- RunClosures(thd_state_[i].elems);
+ RunClosures(thd_state_[i].name, thd_state_[i].elems);
}
gpr_free(thd_state_);
gpr_tls_destroy(&g_this_thread_state);
}
+
+ EXECUTOR_TRACE("(%s) SetThreading(%d) done", name_, threading);
}
void GrpcExecutor::Shutdown() { SetThreading(false); }
@@ -147,8 +163,8 @@ void GrpcExecutor::ThreadMain(void* arg) {
size_t subtract_depth = 0;
for (;;) {
- EXECUTOR_TRACE("[%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")", ts->id,
- subtract_depth);
+ EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")",
+ ts->name, ts->id, subtract_depth);
gpr_mu_lock(&ts->mu);
ts->depth -= subtract_depth;
@@ -159,7 +175,7 @@ void GrpcExecutor::ThreadMain(void* arg) {
}
if (ts->shutdown) {
- EXECUTOR_TRACE("[%" PRIdPTR "]: shutdown", ts->id);
+ EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: shutdown", ts->name, ts->id);
gpr_mu_unlock(&ts->mu);
break;
}
@@ -169,10 +185,10 @@ void GrpcExecutor::ThreadMain(void* arg) {
ts->elems = GRPC_CLOSURE_LIST_INIT;
gpr_mu_unlock(&ts->mu);
- EXECUTOR_TRACE("[%" PRIdPTR "]: execute", ts->id);
+ EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: execute", ts->name, ts->id);
grpc_core::ExecCtx::Get()->InvalidateNow();
- subtract_depth = RunClosures(closures);
+ subtract_depth = RunClosures(ts->name, closures);
}
}
@@ -188,16 +204,16 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
do {
retry_push = false;
size_t cur_thread_count =
- static_cast<size_t>(gpr_atm_no_barrier_load(&num_threads_));
+ static_cast<size_t>(gpr_atm_acq_load(&num_threads_));
// If the number of threads is zero(i.e either the executor is not threaded
// or already shutdown), then queue the closure on the exec context itself
if (cur_thread_count == 0) {
#ifndef NDEBUG
- EXECUTOR_TRACE("schedule %p (created %s:%d) inline", closure,
+ EXECUTOR_TRACE("(%s) schedule %p (created %s:%d) inline", name_, closure,
closure->file_created, closure->line_created);
#else
- EXECUTOR_TRACE("schedule %p inline", closure);
+ EXECUTOR_TRACE("(%s) schedule %p inline", name_, closure);
#endif
grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(),
closure, error);
@@ -213,18 +229,18 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
}
ThreadState* orig_ts = ts;
-
bool try_new_thread = false;
+
for (;;) {
#ifndef NDEBUG
EXECUTOR_TRACE(
- "try to schedule %p (%s) (created %s:%d) to thread "
+ "(%s) try to schedule %p (%s) (created %s:%d) to thread "
"%" PRIdPTR,
- closure, is_short ? "short" : "long", closure->file_created,
+ name_, closure, is_short ? "short" : "long", closure->file_created,
closure->line_created, ts->id);
#else
- EXECUTOR_TRACE("try to schedule %p (%s) to thread %" PRIdPTR, closure,
- is_short ? "short" : "long", ts->id);
+ EXECUTOR_TRACE("(%s) try to schedule %p (%s) to thread %" PRIdPTR, name_,
+ closure, is_short ? "short" : "long", ts->id);
#endif
gpr_mu_lock(&ts->mu);
@@ -236,18 +252,22 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
size_t idx = ts->id;
ts = &thd_state_[(idx + 1) % cur_thread_count];
if (ts == orig_ts) {
- // We cycled through all the threads. Retry enqueue again (by creating
- // a new thread)
+ // We cycled through all the threads. Retry enqueue again by creating
+ // a new thread
+ //
+ // TODO (sreek): There is a potential issue here. We are
+ // unconditionally setting try_new_thread to true here. What if the
+ // executor is shutdown OR if cur_thread_count is already equal to
+ // max_threads ?
+ // (Fortunately, this is not an issue yet (as of july 2018) because
+ // there is only one instance of long job in gRPC and hence we will
+ // not hit this code path)
retry_push = true;
- // TODO (sreek): What if the executor is shutdown OR if
- // cur_thread_count is already equal to max_threads ? (currently - as
- // of July 2018, we do not run in to this issue because there is only
- // one instance of long job in gRPC. This has to be fixed soon)
try_new_thread = true;
break;
}
- continue;
+ continue; // Try the next thread-state
}
// == Found the thread state (i.e thread) to enqueue this closure! ==
@@ -277,13 +297,11 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
}
if (try_new_thread && gpr_spinlock_trylock(&adding_thread_lock_)) {
- cur_thread_count =
- static_cast<size_t>(gpr_atm_no_barrier_load(&num_threads_));
+ cur_thread_count = static_cast<size_t>(gpr_atm_acq_load(&num_threads_));
if (cur_thread_count < max_threads_) {
- // Increment num_threads (Safe to do a no_barrier_store instead of a
- // cas because we always increment num_threads under the
- // 'adding_thread_lock')
- gpr_atm_no_barrier_store(&num_threads_, cur_thread_count + 1);
+ // Increment num_threads (safe to do a store instead of a cas because we
+ // always increment num_threads under the 'adding_thread_lock')
+ gpr_atm_rel_store(&num_threads_, cur_thread_count + 1);
thd_state_[cur_thread_count].thd = grpc_core::Thread(
name_, &GrpcExecutor::ThreadMain, &thd_state_[cur_thread_count]);
@@ -298,60 +316,118 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
} while (retry_push);
}
-static GrpcExecutor* global_executor;
+static GrpcExecutor* executors[GRPC_NUM_EXECUTORS];
-void enqueue_long(grpc_closure* closure, grpc_error* error) {
- global_executor->Enqueue(closure, error, false /* is_short */);
+void default_enqueue_short(grpc_closure* closure, grpc_error* error) {
+ executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error,
+ true /* is_short */);
}
-void enqueue_short(grpc_closure* closure, grpc_error* error) {
- global_executor->Enqueue(closure, error, true /* is_short */);
+void default_enqueue_long(grpc_closure* closure, grpc_error* error) {
+ executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error,
+ false /* is_short */);
}
-// Short-Job executor scheduler
-static const grpc_closure_scheduler_vtable global_executor_vtable_short = {
- enqueue_short, enqueue_short, "executor-short"};
-static grpc_closure_scheduler global_scheduler_short = {
- &global_executor_vtable_short};
+void resolver_enqueue_short(grpc_closure* closure, grpc_error* error) {
+ executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error,
+ true /* is_short */);
+}
-// Long-job executor scheduler
-static const grpc_closure_scheduler_vtable global_executor_vtable_long = {
- enqueue_long, enqueue_long, "executor-long"};
-static grpc_closure_scheduler global_scheduler_long = {
- &global_executor_vtable_long};
+void resolver_enqueue_long(grpc_closure* closure, grpc_error* error) {
+ executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error,
+ false /* is_short */);
+}
+
+static const grpc_closure_scheduler_vtable
+ vtables_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = {
+ {{&default_enqueue_short, &default_enqueue_short, "def-ex-short"},
+ {&default_enqueue_long, &default_enqueue_long, "def-ex-long"}},
+ {{&resolver_enqueue_short, &resolver_enqueue_short, "res-ex-short"},
+ {&resolver_enqueue_long, &resolver_enqueue_long, "res-ex-long"}}};
+
+static grpc_closure_scheduler
+ schedulers_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = {
+ {{&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_SHORT]},
+ {&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_LONG]}},
+ {{&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_SHORT]},
+ {&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_LONG]}}};
// grpc_executor_init() and grpc_executor_shutdown() functions are called in the
// the grpc_init() and grpc_shutdown() code paths which are protected by a
// global mutex. So it is okay to assume that these functions are thread-safe
void grpc_executor_init() {
- if (global_executor != nullptr) {
- // grpc_executor_init() already called once (and grpc_executor_shutdown()
- // wasn't called)
+ EXECUTOR_TRACE0("grpc_executor_init() enter");
+
+ // Return if grpc_executor_init() is already called earlier
+ if (executors[GRPC_DEFAULT_EXECUTOR] != nullptr) {
+ GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] != nullptr);
return;
}
- global_executor = grpc_core::New<GrpcExecutor>("global-executor");
- global_executor->Init();
+ executors[GRPC_DEFAULT_EXECUTOR] =
+ grpc_core::New<GrpcExecutor>("default-executor");
+ executors[GRPC_RESOLVER_EXECUTOR] =
+ grpc_core::New<GrpcExecutor>("resolver-executor");
+
+ executors[GRPC_DEFAULT_EXECUTOR]->Init();
+ executors[GRPC_RESOLVER_EXECUTOR]->Init();
+
+ EXECUTOR_TRACE0("grpc_executor_init() done");
+}
+
+grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type,
+ GrpcExecutorJobType job_type) {
+ return &schedulers_[executor_type][job_type];
+}
+
+grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) {
+ return grpc_executor_scheduler(GRPC_DEFAULT_EXECUTOR, job_type);
}
void grpc_executor_shutdown() {
- // Shutdown already called
- if (global_executor == nullptr) {
+ EXECUTOR_TRACE0("grpc_executor_shutdown() enter");
+
+ // Return if grpc_executor_shutdown() is already called earlier
+ if (executors[GRPC_DEFAULT_EXECUTOR] == nullptr) {
+ GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] == nullptr);
return;
}
- global_executor->Shutdown();
- grpc_core::Delete<GrpcExecutor>(global_executor);
- global_executor = nullptr;
+ executors[GRPC_DEFAULT_EXECUTOR]->Shutdown();
+ executors[GRPC_RESOLVER_EXECUTOR]->Shutdown();
+
+ // Delete the executor objects.
+ //
+ // NOTE: It is important to call Shutdown() on all executors first before
+ // calling Delete() because it is possible for one executor (that is not
+ // shutdown yet) to call Enqueue() on a different executor which is already
+ // shutdown. This is legal and in such cases, the Enqueue() operation
+ // effectively "fails" and enqueues that closure on the calling thread's
+ // exec_ctx.
+ //
+ // By ensuring that all executors are shutdown first, we are also ensuring
+ // that no thread is active across all executors.
+
+ grpc_core::Delete<GrpcExecutor>(executors[GRPC_DEFAULT_EXECUTOR]);
+ grpc_core::Delete<GrpcExecutor>(executors[GRPC_RESOLVER_EXECUTOR]);
+ executors[GRPC_DEFAULT_EXECUTOR] = nullptr;
+ executors[GRPC_RESOLVER_EXECUTOR] = nullptr;
+
+ EXECUTOR_TRACE0("grpc_executor_shutdown() done");
}
-bool grpc_executor_is_threaded() { return global_executor->IsThreaded(); }
+bool grpc_executor_is_threaded(GrpcExecutorType executor_type) {
+ GPR_ASSERT(executor_type < GRPC_NUM_EXECUTORS);
+ return executors[executor_type]->IsThreaded();
+}
-void grpc_executor_set_threading(bool enable) {
- global_executor->SetThreading(enable);
+bool grpc_executor_is_threaded() {
+ return grpc_executor_is_threaded(GRPC_DEFAULT_EXECUTOR);
}
-grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) {
- return job_type == GRPC_EXECUTOR_SHORT ? &global_scheduler_short
- : &global_scheduler_long;
+void grpc_executor_set_threading(bool enable) {
+ EXECUTOR_TRACE("grpc_executor_set_threading(%d) called", enable);
+ for (int i = 0; i < GRPC_NUM_EXECUTORS; i++) {
+ executors[i]->SetThreading(enable);
+ }
}
diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h
index 395fc52863..8829138c5f 100644
--- a/src/core/lib/iomgr/executor.h
+++ b/src/core/lib/iomgr/executor.h
@@ -27,7 +27,8 @@
typedef struct {
gpr_mu mu;
- size_t id; // For debugging purposes
+ size_t id; // For debugging purposes
+ const char* name; // Thread state name
gpr_cv cv;
grpc_closure_list elems;
size_t depth; // Number of closures in the closure list
@@ -36,7 +37,11 @@ typedef struct {
grpc_core::Thread thd;
} ThreadState;
-typedef enum { GRPC_EXECUTOR_SHORT, GRPC_EXECUTOR_LONG } GrpcExecutorJobType;
+typedef enum {
+ GRPC_EXECUTOR_SHORT = 0,
+ GRPC_EXECUTOR_LONG,
+ GRPC_NUM_EXECUTOR_JOB_TYPES // Add new values above this
+} GrpcExecutorJobType;
class GrpcExecutor {
public:
@@ -58,7 +63,7 @@ class GrpcExecutor {
void Enqueue(grpc_closure* closure, grpc_error* error, bool is_short);
private:
- static size_t RunClosures(grpc_closure_list list);
+ static size_t RunClosures(const char* executor_name, grpc_closure_list list);
static void ThreadMain(void* arg);
const char* name_;
@@ -70,14 +75,42 @@ class GrpcExecutor {
// == Global executor functions ==
+typedef enum {
+ GRPC_DEFAULT_EXECUTOR = 0,
+ GRPC_RESOLVER_EXECUTOR,
+
+ GRPC_NUM_EXECUTORS // Add new values above this
+} GrpcExecutorType;
+
+// TODO(sreek): Currently we have two executors (available globally): The
+// default executor and the resolver executor.
+//
+// Some of the functions below operate on the DEFAULT executor only while some
+// operate of ALL the executors. This is a bit confusing and should be cleaned
+// up in future (where we make all the following functions take executor_type
+// and/or job_type)
+
+// Initialize ALL the executors
void grpc_executor_init();
+// Shutdown ALL the executors
+void grpc_executor_shutdown();
+
+// Set the threading mode for ALL the executors
+void grpc_executor_set_threading(bool enable);
+
+// Get the DEFAULT executor scheduler for the given job_type
grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type);
-void grpc_executor_shutdown();
+// Get the executor scheduler for a given executor_type and a job_type
+grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type,
+ GrpcExecutorJobType job_type);
-bool grpc_executor_is_threaded();
+// Return if a given executor is running in threaded mode (i.e if
+// grpc_executor_set_threading(true) was called previously on that executor)
+bool grpc_executor_is_threaded(GrpcExecutorType executor_type);
-void grpc_executor_set_threading(bool enable);
+// Return if the DEFAULT executor is threaded
+bool grpc_executor_is_threaded();
#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */
diff --git a/src/core/lib/iomgr/iocp_windows.cc b/src/core/lib/iomgr/iocp_windows.cc
index ce77231036..ad325fe215 100644
--- a/src/core/lib/iomgr/iocp_windows.cc
+++ b/src/core/lib/iomgr/iocp_windows.cc
@@ -89,10 +89,15 @@ grpc_iocp_work_status grpc_iocp_work(grpc_millis deadline) {
} else {
abort();
}
- success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
- FALSE, &flags);
- info->bytes_transfered = bytes;
- info->wsa_error = success ? 0 : WSAGetLastError();
+ if (socket->shutdown_called) {
+ info->bytes_transfered = 0;
+ info->wsa_error = WSA_OPERATION_ABORTED;
+ } else {
+ success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
+ FALSE, &flags);
+ info->bytes_transfered = bytes;
+ info->wsa_error = success ? 0 : WSAGetLastError();
+ }
GPR_ASSERT(overlapped == &info->overlapped);
grpc_socket_become_ready(socket, info);
return GRPC_IOCP_WORK_WORK;
diff --git a/src/core/lib/iomgr/iomgr_posix_cfstream.cc b/src/core/lib/iomgr/iomgr_posix_cfstream.cc
new file mode 100644
index 0000000000..235a9e0712
--- /dev/null
+++ b/src/core/lib/iomgr/iomgr_posix_cfstream.cc
@@ -0,0 +1,75 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_CFSTREAM_IOMGR
+
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/iomgr_posix.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/tcp_client.h"
+#include "src/core/lib/iomgr/tcp_posix.h"
+#include "src/core/lib/iomgr/tcp_server.h"
+#include "src/core/lib/iomgr/timer.h"
+
+static const char* grpc_cfstream_env_var = "grpc_cfstream";
+
+extern grpc_tcp_server_vtable grpc_posix_tcp_server_vtable;
+extern grpc_tcp_client_vtable grpc_posix_tcp_client_vtable;
+extern grpc_tcp_client_vtable grpc_cfstream_client_vtable;
+extern grpc_timer_vtable grpc_generic_timer_vtable;
+extern grpc_pollset_vtable grpc_posix_pollset_vtable;
+extern grpc_pollset_set_vtable grpc_posix_pollset_set_vtable;
+extern grpc_address_resolver_vtable grpc_posix_resolver_vtable;
+
+static void iomgr_platform_init(void) {
+ grpc_wakeup_fd_global_init();
+ grpc_event_engine_init();
+}
+
+static void iomgr_platform_flush(void) {}
+
+static void iomgr_platform_shutdown(void) {
+ grpc_event_engine_shutdown();
+ grpc_wakeup_fd_global_destroy();
+}
+
+static grpc_iomgr_platform_vtable vtable = {
+ iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown};
+
+void grpc_set_default_iomgr_platform() {
+ char* enable_cfstream = getenv(grpc_cfstream_env_var);
+ grpc_tcp_client_vtable* client_vtable = &grpc_posix_tcp_client_vtable;
+ if (enable_cfstream != nullptr && enable_cfstream[0] == '1') {
+ client_vtable = &grpc_cfstream_client_vtable;
+ }
+ grpc_set_tcp_client_impl(client_vtable);
+ grpc_set_tcp_server_impl(&grpc_posix_tcp_server_vtable);
+ grpc_set_timer_impl(&grpc_generic_timer_vtable);
+ grpc_set_pollset_vtable(&grpc_posix_pollset_vtable);
+ grpc_set_pollset_set_vtable(&grpc_posix_pollset_set_vtable);
+ grpc_set_resolver_impl(&grpc_posix_resolver_vtable);
+ grpc_set_iomgr_platform_vtable(&vtable);
+}
+
+#endif /* GRPC_CFSTREAM_IOMGR */
diff --git a/src/core/lib/iomgr/lockfree_event.cc b/src/core/lib/iomgr/lockfree_event.cc
index 5b6b79fa91..085fea40a4 100644
--- a/src/core/lib/iomgr/lockfree_event.cc
+++ b/src/core/lib/iomgr/lockfree_event.cc
@@ -89,7 +89,11 @@ void LockfreeEvent::DestroyEvent() {
void LockfreeEvent::NotifyOn(grpc_closure* closure) {
while (true) {
- gpr_atm curr = gpr_atm_no_barrier_load(&state_);
+ /* This load needs to be an acquire load because this can be a shutdown
+ * error that we might need to reference. Adding acquire semantics makes
+ * sure that the shutdown error has been initialized properly before us
+ * referencing it. */
+ gpr_atm curr = gpr_atm_acq_load(&state_);
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_ERROR, "LockfreeEvent::NotifyOn: %p curr=%p closure=%p", this,
(void*)curr, closure);
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index a06e9f827b..b68305ce0e 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -99,9 +99,9 @@
#define GRPC_POSIX_FORK 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
#ifdef GRPC_CFSTREAM
-#define GRPC_POSIX_SOCKET_IOMGR 1
-#define GRPC_CFSTREAM_ENDPOINT 1
+#define GRPC_CFSTREAM_IOMGR 1
#define GRPC_CFSTREAM_CLIENT 1
+#define GRPC_CFSTREAM_ENDPOINT 1
#define GRPC_POSIX_SOCKET_ARES_EV_DRIVER 1
#define GRPC_POSIX_SOCKET_EV 1
#define GRPC_POSIX_SOCKET_EV_EPOLL1 1
@@ -112,6 +112,7 @@
#define GRPC_POSIX_SOCKET_SOCKADDR 1
#define GRPC_POSIX_SOCKET_SOCKET_FACTORY 1
#define GRPC_POSIX_SOCKET_TCP 1
+#define GRPC_POSIX_SOCKET_TCP_CLIENT 1
#define GRPC_POSIX_SOCKET_TCP_SERVER 1
#define GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON 1
#define GRPC_POSIX_SOCKET_UTILS_COMMON 1
diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc
index 7a825643e1..c285d7eca6 100644
--- a/src/core/lib/iomgr/resolve_address_posix.cc
+++ b/src/core/lib/iomgr/resolve_address_posix.cc
@@ -166,8 +166,9 @@ static void posix_resolve_address(const char* name, const char* default_port,
grpc_closure* on_done,
grpc_resolved_addresses** addrs) {
request* r = static_cast<request*>(gpr_malloc(sizeof(request)));
- GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r,
- grpc_executor_scheduler(GRPC_EXECUTOR_SHORT));
+ GRPC_CLOSURE_INIT(
+ &r->request_closure, do_request_thread, r,
+ grpc_executor_scheduler(GRPC_RESOLVER_EXECUTOR, GRPC_EXECUTOR_SHORT));
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;
diff --git a/src/core/lib/iomgr/resolve_address_windows.cc b/src/core/lib/iomgr/resolve_address_windows.cc
index 71c92615ad..3e977dca2d 100644
--- a/src/core/lib/iomgr/resolve_address_windows.cc
+++ b/src/core/lib/iomgr/resolve_address_windows.cc
@@ -151,8 +151,9 @@ static void windows_resolve_address(const char* name, const char* default_port,
grpc_closure* on_done,
grpc_resolved_addresses** addresses) {
request* r = (request*)gpr_malloc(sizeof(request));
- GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r,
- grpc_executor_scheduler(GRPC_EXECUTOR_SHORT));
+ GRPC_CLOSURE_INIT(
+ &r->request_closure, do_request_thread, r,
+ grpc_executor_scheduler(GRPC_RESOLVER_EXECUTOR, GRPC_EXECUTOR_SHORT));
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;
diff --git a/src/core/lib/iomgr/socket_windows.cc b/src/core/lib/iomgr/socket_windows.cc
index 2e23409582..999c6646ad 100644
--- a/src/core/lib/iomgr/socket_windows.cc
+++ b/src/core/lib/iomgr/socket_windows.cc
@@ -36,6 +36,7 @@
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_windows.h"
+#include "src/core/lib/iomgr/sockaddr_windows.h"
#include "src/core/lib/iomgr/socket_windows.h"
grpc_winsocket* grpc_winsocket_create(SOCKET socket, const char* name) {
@@ -51,6 +52,10 @@ grpc_winsocket* grpc_winsocket_create(SOCKET socket, const char* name) {
return r;
}
+SOCKET grpc_winsocket_wrapped_socket(grpc_winsocket* socket) {
+ return socket->socket;
+}
+
/* Schedule a shutdown of the socket operations. Will call the pending
operations to abort them. We need to do that this way because of the
various callsites of that function, which happens to be in various
@@ -148,4 +153,32 @@ void grpc_socket_become_ready(grpc_winsocket* socket,
if (should_destroy) destroy(socket);
}
+static gpr_once g_probe_ipv6_once = GPR_ONCE_INIT;
+static bool g_ipv6_loopback_available = false;
+
+static void probe_ipv6_once(void) {
+ SOCKET s = socket(AF_INET6, SOCK_STREAM, 0);
+ g_ipv6_loopback_available = 0;
+ if (s == INVALID_SOCKET) {
+ gpr_log(GPR_INFO, "Disabling AF_INET6 sockets because socket() failed.");
+ } else {
+ grpc_sockaddr_in6 addr;
+ memset(&addr, 0, sizeof(addr));
+ addr.sin6_family = AF_INET6;
+ addr.sin6_addr.s6_addr[15] = 1; /* [::1]:0 */
+ if (bind(s, reinterpret_cast<grpc_sockaddr*>(&addr), sizeof(addr)) == 0) {
+ g_ipv6_loopback_available = 1;
+ } else {
+ gpr_log(GPR_INFO,
+ "Disabling AF_INET6 sockets because ::1 is not available.");
+ }
+ closesocket(s);
+ }
+}
+
+int grpc_ipv6_loopback_available(void) {
+ gpr_once_init(&g_probe_ipv6_once, probe_ipv6_once);
+ return g_ipv6_loopback_available;
+}
+
#endif /* GRPC_WINSOCK_SOCKET */
diff --git a/src/core/lib/iomgr/socket_windows.h b/src/core/lib/iomgr/socket_windows.h
index 7bd01eded5..46d7d58356 100644
--- a/src/core/lib/iomgr/socket_windows.h
+++ b/src/core/lib/iomgr/socket_windows.h
@@ -92,6 +92,8 @@ typedef struct grpc_winsocket {
it will be responsible for closing it. */
grpc_winsocket* grpc_winsocket_create(SOCKET socket, const char* name);
+SOCKET grpc_winsocket_wrapped_socket(grpc_winsocket* socket);
+
/* Initiate an asynchronous shutdown of the socket. Will call off any pending
operation to cancel them. */
void grpc_winsocket_shutdown(grpc_winsocket* socket);
@@ -108,6 +110,10 @@ void grpc_socket_notify_on_read(grpc_winsocket* winsocket,
void grpc_socket_become_ready(grpc_winsocket* winsocket,
grpc_winsocket_callback_info* ci);
+/* Returns true if this system can create AF_INET6 sockets bound to ::1.
+ The value is probed once, and cached for the life of the process. */
+int grpc_ipv6_loopback_available(void);
+
#endif
#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_WINDOWS_H */
diff --git a/src/core/lib/iomgr/tcp_client_cfstream.cc b/src/core/lib/iomgr/tcp_client_cfstream.cc
index 5acea91792..4b21322d74 100644
--- a/src/core/lib/iomgr/tcp_client_cfstream.cc
+++ b/src/core/lib/iomgr/tcp_client_cfstream.cc
@@ -211,6 +211,6 @@ static void CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep,
gpr_mu_unlock(&connect->mu);
}
-grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {CFStreamClientConnect};
+grpc_tcp_client_vtable grpc_cfstream_client_vtable = {CFStreamClientConnect};
#endif /* GRPC_CFSTREAM_CLIENT */
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index 4300a9f882..6a16b8d628 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -26,6 +26,7 @@
#include "src/core/lib/iomgr/tcp_posix.h"
#include <errno.h>
+#include <limits.h>
#include <netinet/in.h>
#include <stdbool.h>
#include <stdio.h>
@@ -365,7 +366,7 @@ static void tcp_destroy(grpc_endpoint* ep) {
grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
if (grpc_event_engine_can_track_errors()) {
gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
- grpc_fd_notify_on_error(tcp->em_fd, nullptr);
+ grpc_fd_set_error(tcp->em_fd);
}
TCP_UNREF(tcp, "destroy");
}
@@ -721,8 +722,8 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
/* This was not a timestamps error. This was an actual error. Set the
* read and write closures to be ready.
*/
- grpc_fd_notify_on_read(tcp->em_fd, nullptr);
- grpc_fd_notify_on_write(tcp->em_fd, nullptr);
+ grpc_fd_set_readable(tcp->em_fd);
+ grpc_fd_set_writable(tcp->em_fd);
}
GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp,
grpc_schedule_on_exec_ctx);
@@ -747,7 +748,11 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
#endif /* GRPC_LINUX_ERRQUEUE */
/* returns true if done, false if pending; if returning true, *error is set */
+#if defined(IOV_MAX) && IOV_MAX < 1000
+#define MAX_WRITE_IOVEC IOV_MAX
+#else
#define MAX_WRITE_IOVEC 1000
+#endif
static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
struct msghdr msg;
struct iovec iov[MAX_WRITE_IOVEC];
@@ -1074,7 +1079,7 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
if (grpc_event_engine_can_track_errors()) {
/* Stop errors notification. */
gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
- grpc_fd_notify_on_error(tcp->em_fd, nullptr);
+ grpc_fd_set_error(tcp->em_fd);
}
TCP_UNREF(tcp, "destroy");
}
diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc
index fd146c94b4..64c4a56ae9 100644
--- a/src/core/lib/iomgr/tcp_windows.cc
+++ b/src/core/lib/iomgr/tcp_windows.cc
@@ -53,7 +53,7 @@
extern grpc_core::TraceFlag grpc_tcp_trace;
-static grpc_error* set_non_block(SOCKET sock) {
+grpc_error* grpc_tcp_set_non_block(SOCKET sock) {
int status;
uint32_t param = 1;
DWORD ret;
@@ -90,7 +90,7 @@ static grpc_error* enable_loopback_fast_path(SOCKET sock) {
grpc_error* grpc_tcp_prepare_socket(SOCKET sock) {
grpc_error* err;
- err = set_non_block(sock);
+ err = grpc_tcp_set_non_block(sock);
if (err != GRPC_ERROR_NONE) return err;
err = set_dualstack(sock);
if (err != GRPC_ERROR_NONE) return err;
diff --git a/src/core/lib/iomgr/tcp_windows.h b/src/core/lib/iomgr/tcp_windows.h
index 161a545a2a..04ef8102b6 100644
--- a/src/core/lib/iomgr/tcp_windows.h
+++ b/src/core/lib/iomgr/tcp_windows.h
@@ -46,6 +46,8 @@ grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket,
grpc_error* grpc_tcp_prepare_socket(SOCKET sock);
+grpc_error* grpc_tcp_set_non_block(SOCKET sock);
+
#endif
#endif /* GRPC_CORE_LIB_IOMGR_TCP_WINDOWS_H */