aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/call_combiner.h5
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc22
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc32
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.cc38
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc67
-rw-r--r--src/core/lib/iomgr/ev_posix.cc67
-rw-r--r--src/core/lib/iomgr/ev_posix.h30
-rw-r--r--src/core/lib/iomgr/iocp_windows.cc13
-rw-r--r--src/core/lib/iomgr/iomgr_posix_cfstream.cc75
-rw-r--r--src/core/lib/iomgr/port.h17
-rw-r--r--src/core/lib/iomgr/resource_quota.cc78
-rw-r--r--src/core/lib/iomgr/resource_quota.h16
-rw-r--r--src/core/lib/iomgr/socket_windows.cc4
-rw-r--r--src/core/lib/iomgr/socket_windows.h2
-rw-r--r--src/core/lib/iomgr/tcp_client_cfstream.cc2
-rw-r--r--src/core/lib/iomgr/tcp_windows.cc4
-rw-r--r--src/core/lib/iomgr/tcp_windows.h2
17 files changed, 342 insertions, 132 deletions
diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h
index 641fa18082..6f7ddd4043 100644
--- a/src/core/lib/iomgr/call_combiner.h
+++ b/src/core/lib/iomgr/call_combiner.h
@@ -102,7 +102,10 @@ void grpc_call_combiner_stop(grpc_call_combiner* call_combiner,
/// If \a closure is NULL, then no closure will be invoked on
/// cancellation; this effectively unregisters the previously set closure.
/// However, most filters will not need to explicitly unregister their
-/// callbacks, as this is done automatically when the call is destroyed.
+/// callbacks, as this is done automatically when the call is destroyed. Filters
+/// that schedule the cancellation closure on ExecCtx do not need to take a ref
+/// on the call stack to guarantee closure liveness. This is done by explicitly
+/// flushing ExecCtx after the unregistration during call destruction.
void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
grpc_closure* closure);
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index 86a0243d2e..66e0f1fd6d 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -140,10 +140,6 @@ struct grpc_fd {
struct grpc_fd* freelist_next;
- /* The pollset that last noticed that the fd is readable. The actual type
- * stored in this is (grpc_pollset *) */
- gpr_atm read_notifier_pollset;
-
grpc_iomgr_object iomgr_object;
};
@@ -293,7 +289,6 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
new_fd->read_closure->InitEvent();
new_fd->write_closure->InitEvent();
new_fd->error_closure->InitEvent();
- gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
@@ -376,11 +371,6 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
gpr_mu_unlock(&fd_freelist_mu);
}
-static grpc_pollset* fd_get_read_notifier_pollset(grpc_fd* fd) {
- gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
- return (grpc_pollset*)notifier;
-}
-
static bool fd_is_shutdown(grpc_fd* fd) {
return fd->read_closure->IsShutdown();
}
@@ -397,11 +387,7 @@ static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
fd->error_closure->NotifyOn(closure);
}
-static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
- fd->read_closure->SetReady();
- /* Use release store to match with acquire load in fd_get_read_notifier */
- gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
-}
+static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
@@ -642,7 +628,7 @@ static grpc_error* process_epoll_events(grpc_pollset* pollset) {
}
if (read_ev || cancel || err_fallback) {
- fd_become_readable(fd, pollset);
+ fd_become_readable(fd);
}
if (write_ev || cancel || err_fallback) {
@@ -1217,8 +1203,10 @@ static const grpc_event_engine_vtable vtable = {
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
+ fd_become_readable,
+ fd_become_writable,
+ fd_has_errors,
fd_is_shutdown,
- fd_get_read_notifier_pollset,
pollset_init,
pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index e1f3e43af7..b082634af1 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -46,6 +46,7 @@
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/manual_constructor.h"
+#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/is_epollexclusive_available.h"
@@ -220,10 +221,6 @@ struct grpc_fd {
struct grpc_fd* freelist_next;
grpc_closure* on_done_closure;
- // The pollset that last noticed that the fd is readable. The actual type
- // stored in this is (grpc_pollset *)
- gpr_atm read_notifier_pollset;
-
grpc_iomgr_object iomgr_object;
// Do we need to track EPOLLERR events separately?
@@ -353,7 +350,6 @@ static void invalidate_fd(grpc_fd* fd) {
memset(&fd->pollable_mu, -1, sizeof(fd->pollable_mu));
fd->pollable_obj = nullptr;
fd->on_done_closure = nullptr;
- gpr_atm_no_barrier_store(&fd->read_notifier_pollset, 0);
memset(&fd->iomgr_object, -1, sizeof(fd->iomgr_object));
fd->track_err = false;
}
@@ -445,7 +441,6 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
new_fd->error_closure->InitEvent();
new_fd->freelist_next = nullptr;
new_fd->on_done_closure = nullptr;
- gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
char* fd_name;
gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
@@ -514,11 +509,6 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
UNREF_BY(fd, 2, reason); /* Drop the reference */
}
-static grpc_pollset* fd_get_read_notifier_pollset(grpc_fd* fd) {
- gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
- return (grpc_pollset*)notifier;
-}
-
static bool fd_is_shutdown(grpc_fd* fd) {
return fd->read_closure->IsShutdown();
}
@@ -746,7 +736,7 @@ static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
static grpc_error* kick_one_worker(grpc_pollset_worker* specific_worker) {
GPR_TIMER_SCOPE("kick_one_worker", 0);
pollable* p = specific_worker->pollable_obj;
- grpc_core::mu_guard lock(&p->mu);
+ grpc_core::MutexLock lock(&p->mu);
GPR_ASSERT(specific_worker != nullptr);
if (specific_worker->kicked) {
if (grpc_polling_trace.enabled()) {
@@ -875,17 +865,7 @@ static int poll_deadline_to_millis_timeout(grpc_millis millis) {
return static_cast<int>(delta);
}
-static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
- fd->read_closure->SetReady();
-
- /* Note, it is possible that fd_become_readable might be called twice with
- different 'notifier's when an fd becomes readable and it is in two epoll
- sets (This can happen briefly during polling island merges). In such cases
- it does not really matter which notifer is set as the read_notifier_pollset
- (They would both point to the same polling island anyway) */
- /* Use release store to match with acquire load in fd_get_read_notifier */
- gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
-}
+static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
@@ -983,7 +963,7 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset,
fd_has_errors(fd);
}
if (read_ev || cancel || err_fallback) {
- fd_become_readable(fd, pollset);
+ fd_become_readable(fd);
}
if (write_ev || cancel || err_fallback) {
fd_become_writable(fd);
@@ -1636,8 +1616,10 @@ static const grpc_event_engine_vtable vtable = {
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
+ fd_become_readable,
+ fd_become_writable,
+ fd_has_errors,
fd_is_shutdown,
- fd_get_read_notifier_pollset,
pollset_init,
pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc
index 2189801c18..5695ac795d 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.cc
+++ b/src/core/lib/iomgr/ev_epollsig_linux.cc
@@ -137,10 +137,6 @@ struct grpc_fd {
struct grpc_fd* freelist_next;
grpc_closure* on_done_closure;
- /* The pollset that last noticed that the fd is readable. The actual type
- * stored in this is (grpc_pollset *) */
- gpr_atm read_notifier_pollset;
-
grpc_iomgr_object iomgr_object;
/* Do we need to track EPOLLERR events separately? */
@@ -845,7 +841,6 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
new_fd->write_closure->InitEvent();
new_fd->error_closure->InitEvent();
new_fd->track_err = track_err;
- gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
new_fd->on_done_closure = nullptr;
@@ -927,11 +922,6 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
GRPC_ERROR_UNREF(error);
}
-static grpc_pollset* fd_get_read_notifier_pollset(grpc_fd* fd) {
- gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
- return (grpc_pollset*)notifier;
-}
-
static bool fd_is_shutdown(grpc_fd* fd) {
return fd->read_closure->IsShutdown();
}
@@ -958,6 +948,12 @@ static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
fd->error_closure->NotifyOn(closure);
}
+static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
+
+static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
+
+static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
+
/*******************************************************************************
* Pollset Definitions
*/
@@ -1115,22 +1111,6 @@ static int poll_deadline_to_millis_timeout(grpc_millis millis) {
return static_cast<int>(delta);
}
-static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
- fd->read_closure->SetReady();
-
- /* Note, it is possible that fd_become_readable might be called twice with
- different 'notifier's when an fd becomes readable and it is in two epoll
- sets (This can happen briefly during polling island merges). In such cases
- it does not really matter which notifer is set as the read_notifier_pollset
- (They would both point to the same polling island anyway) */
- /* Use release store to match with acquire load in fd_get_read_notifier */
- gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
-}
-
-static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
-
-static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
-
static void pollset_release_polling_island(grpc_pollset* ps,
const char* reason) {
if (ps->po.pi != nullptr) {
@@ -1283,7 +1263,7 @@ static void pollset_work_and_unlock(grpc_pollset* pollset,
fd_has_errors(fd);
}
if (read_ev || cancel || err_fallback) {
- fd_become_readable(fd, pollset);
+ fd_become_readable(fd);
}
if (write_ev || cancel || err_fallback) {
fd_become_writable(fd);
@@ -1667,8 +1647,10 @@ static const grpc_event_engine_vtable vtable = {
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
+ fd_become_readable,
+ fd_become_writable,
+ fd_has_errors,
fd_is_shutdown,
- fd_get_read_notifier_pollset,
pollset_init,
pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index c9c09881a2..fb4c71ef71 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -108,9 +108,6 @@ struct grpc_fd {
grpc_closure* on_done_closure;
grpc_iomgr_object iomgr_object;
-
- /* The pollset that last noticed and notified that the fd is readable */
- grpc_pollset* read_notifier_pollset;
};
/* Begin polling on an fd.
@@ -131,8 +128,7 @@ static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
MUST NOT be called with a pollset lock taken
if got_read or got_write are 1, also does the become_{readable,writable} as
appropriate. */
-static void fd_end_poll(grpc_fd_watcher* rec, int got_read, int got_write,
- grpc_pollset* read_notifier_pollset);
+static void fd_end_poll(grpc_fd_watcher* rec, int got_read, int got_write);
/* Return 1 if this fd is orphaned, 0 otherwise */
static bool fd_is_orphaned(grpc_fd* fd);
@@ -346,7 +342,6 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
r->closed = 0;
r->released = 0;
gpr_atm_no_barrier_store(&r->pollhup, 0);
- r->read_notifier_pollset = nullptr;
char* name2;
gpr_asprintf(&name2, "%s fd=%d", name, fd);
@@ -359,17 +354,6 @@ static bool fd_is_orphaned(grpc_fd* fd) {
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
-/* Return the read-notifier pollset */
-static grpc_pollset* fd_get_read_notifier_pollset(grpc_fd* fd) {
- grpc_pollset* notifier = nullptr;
-
- gpr_mu_lock(&fd->mu);
- notifier = fd->read_notifier_pollset;
- gpr_mu_unlock(&fd->mu);
-
- return notifier;
-}
-
static grpc_error* pollset_kick_locked(grpc_fd_watcher* watcher) {
gpr_mu_lock(&watcher->pollset->mu);
GPR_ASSERT(watcher->worker);
@@ -512,11 +496,6 @@ static int set_ready_locked(grpc_fd* fd, grpc_closure** st) {
}
}
-static void set_read_notifier_pollset_locked(
- grpc_fd* fd, grpc_pollset* read_notifier_pollset) {
- fd->read_notifier_pollset = read_notifier_pollset;
-}
-
static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
gpr_mu_lock(&fd->mu);
/* only shutdown once */
@@ -553,8 +532,28 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
}
static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
- gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
- abort();
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
+ }
+ GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED);
+}
+
+static void fd_set_readable(grpc_fd* fd) {
+ gpr_mu_lock(&fd->mu);
+ set_ready_locked(fd, &fd->read_closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+static void fd_set_writable(grpc_fd* fd) {
+ gpr_mu_lock(&fd->mu);
+ set_ready_locked(fd, &fd->write_closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+static void fd_set_error(grpc_fd* fd) {
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
+ }
}
static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
@@ -608,8 +607,7 @@ static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
return mask;
}
-static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write,
- grpc_pollset* read_notifier_pollset) {
+static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write) {
int was_polling = 0;
int kick = 0;
grpc_fd* fd = watcher->fd;
@@ -645,9 +643,6 @@ static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write,
if (set_ready_locked(fd, &fd->read_closure)) {
kick = 1;
}
- if (read_notifier_pollset != nullptr) {
- set_read_notifier_pollset_locked(fd, read_notifier_pollset);
- }
}
if (got_write) {
if (set_ready_locked(fd, &fd->write_closure)) {
@@ -997,16 +992,16 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
for (i = 1; i < pfd_count; i++) {
if (watchers[i].fd == nullptr) {
- fd_end_poll(&watchers[i], 0, 0, nullptr);
+ fd_end_poll(&watchers[i], 0, 0);
} else {
// Wake up all the file descriptors, if we have an invalid one
// we can identify it on the next pollset_work()
- fd_end_poll(&watchers[i], 1, 1, pollset);
+ fd_end_poll(&watchers[i], 1, 1);
}
}
} else if (r == 0) {
for (i = 1; i < pfd_count; i++) {
- fd_end_poll(&watchers[i], 0, 0, nullptr);
+ fd_end_poll(&watchers[i], 0, 0);
}
} else {
if (pfds[0].revents & POLLIN_CHECK) {
@@ -1018,7 +1013,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
}
for (i = 1; i < pfd_count; i++) {
if (watchers[i].fd == nullptr) {
- fd_end_poll(&watchers[i], 0, 0, nullptr);
+ fd_end_poll(&watchers[i], 0, 0);
} else {
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_INFO, "%p got_event: %d r:%d w:%d [%d]", pollset,
@@ -1032,7 +1027,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1);
}
fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK,
- pfds[i].revents & POLLOUT_CHECK, pollset);
+ pfds[i].revents & POLLOUT_CHECK);
}
}
}
@@ -1723,8 +1718,10 @@ static const grpc_event_engine_vtable vtable = {
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
+ fd_set_readable,
+ fd_set_writable,
+ fd_set_error,
fd_is_shutdown,
- fd_get_read_notifier_pollset,
pollset_init,
pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index 1139b3273a..0205363d5c 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -59,7 +59,14 @@ grpc_core::DebugOnlyTraceFlag grpc_polling_api_trace(false, "polling_api");
/** Default poll() function - a pointer so that it can be overridden by some
* tests */
+#ifndef GPR_AIX
grpc_poll_function_type grpc_poll_function = poll;
+#else
+int aix_poll(struct pollfd fds[], nfds_t nfds, int timeout) {
+ return poll(fds, nfds, timeout);
+}
+grpc_poll_function_type grpc_poll_function = aix_poll;
+#endif
grpc_wakeup_fd grpc_global_wakeup_fd;
@@ -101,10 +108,28 @@ const grpc_event_engine_vtable* init_non_polling(bool explicit_request) {
}
} // namespace
-static const event_engine_factory g_factories[] = {
+#define ENGINE_HEAD_CUSTOM "head_custom"
+#define ENGINE_TAIL_CUSTOM "tail_custom"
+
+// The global array of event-engine factories. Each entry is a pair with a name
+// and an event-engine generator function (nullptr if there is no generator
+// registered for this name). The middle entries are the engines predefined by
+// open-source gRPC. The head entries represent an opportunity for specific
+// high-priority custom pollers to be added by the initializer plugins of
+// custom-built gRPC libraries. The tail entries represent the same, but for
+// low-priority custom pollers. The actual poller selected is either the first
+// available one in the list if no specific poller is requested, or the first
+// specific poller that is requested by name in the GRPC_POLL_STRATEGY
+// environment variable if that variable is set (which should be a
+// comma-separated list of one or more event engine names)
+static event_engine_factory g_factories[] = {
+ {ENGINE_HEAD_CUSTOM, nullptr}, {ENGINE_HEAD_CUSTOM, nullptr},
+ {ENGINE_HEAD_CUSTOM, nullptr}, {ENGINE_HEAD_CUSTOM, nullptr},
{"epollex", grpc_init_epollex_linux}, {"epoll1", grpc_init_epoll1_linux},
{"epollsig", grpc_init_epollsig_linux}, {"poll", grpc_init_poll_posix},
{"poll-cv", grpc_init_poll_cv_posix}, {"none", init_non_polling},
+ {ENGINE_TAIL_CUSTOM, nullptr}, {ENGINE_TAIL_CUSTOM, nullptr},
+ {ENGINE_TAIL_CUSTOM, nullptr}, {ENGINE_TAIL_CUSTOM, nullptr},
};
static void add(const char* beg, const char* end, char*** ss, size_t* ns) {
@@ -138,7 +163,7 @@ static bool is(const char* want, const char* have) {
static void try_engine(const char* engine) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
- if (is(engine, g_factories[i].name)) {
+ if (g_factories[i].factory != nullptr && is(engine, g_factories[i].name)) {
if ((g_event_engine = g_factories[i].factory(
0 == strcmp(engine, g_factories[i].name)))) {
g_poll_strategy_name = g_factories[i].name;
@@ -149,14 +174,32 @@ static void try_engine(const char* engine) {
}
}
-/* This should be used for testing purposes ONLY */
-void grpc_set_event_engine_test_only(
- const grpc_event_engine_vtable* ev_engine) {
- g_event_engine = ev_engine;
-}
+/* Call this before calling grpc_event_engine_init() */
+void grpc_register_event_engine_factory(const char* name,
+ event_engine_factory_fn factory,
+ bool add_at_head) {
+ const char* custom_match =
+ add_at_head ? ENGINE_HEAD_CUSTOM : ENGINE_TAIL_CUSTOM;
+
+ // Overwrite an existing registration if already registered
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
+ if (0 == strcmp(name, g_factories[i].name)) {
+ g_factories[i].factory = factory;
+ return;
+ }
+ }
-const grpc_event_engine_vtable* grpc_get_event_engine_test_only() {
- return g_event_engine;
+ // Otherwise fill in an available custom slot
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
+ if (0 == strcmp(g_factories[i].name, custom_match)) {
+ g_factories[i].name = name;
+ g_factories[i].factory = factory;
+ return;
+ }
+ }
+
+ // Otherwise fail
+ GPR_ASSERT(false);
}
/* Call this only after calling grpc_event_engine_init() */
@@ -239,6 +282,12 @@ void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
g_event_engine->fd_notify_on_error(fd, closure);
}
+void grpc_fd_set_readable(grpc_fd* fd) { g_event_engine->fd_set_readable(fd); }
+
+void grpc_fd_set_writable(grpc_fd* fd) { g_event_engine->fd_set_writable(fd); }
+
+void grpc_fd_set_error(grpc_fd* fd) { g_event_engine->fd_set_error(fd); }
+
static size_t pollset_size(void) { return g_event_engine->pollset_size; }
static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index b4c17fc80d..b8fb8f534b 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -51,8 +51,10 @@ typedef struct grpc_event_engine_vtable {
void (*fd_notify_on_read)(grpc_fd* fd, grpc_closure* closure);
void (*fd_notify_on_write)(grpc_fd* fd, grpc_closure* closure);
void (*fd_notify_on_error)(grpc_fd* fd, grpc_closure* closure);
+ void (*fd_set_readable)(grpc_fd* fd);
+ void (*fd_set_writable)(grpc_fd* fd);
+ void (*fd_set_error)(grpc_fd* fd);
bool (*fd_is_shutdown)(grpc_fd* fd);
- grpc_pollset* (*fd_get_read_notifier_pollset)(grpc_fd* fd);
void (*pollset_init)(grpc_pollset* pollset, gpr_mu** mu);
void (*pollset_shutdown)(grpc_pollset* pollset, grpc_closure* closure);
@@ -80,6 +82,11 @@ typedef struct grpc_event_engine_vtable {
void (*shutdown_engine)(void);
} grpc_event_engine_vtable;
+/* register a new event engine factory */
+void grpc_register_event_engine_factory(
+ const char* name, const grpc_event_engine_vtable* (*factory)(bool),
+ bool add_at_head);
+
void grpc_event_engine_init(void);
void grpc_event_engine_shutdown(void);
@@ -142,8 +149,20 @@ void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure);
* needs to have been set on grpc_fd_create */
void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure);
-/* Return the read notifier pollset from the fd */
-grpc_pollset* grpc_fd_get_read_notifier_pollset(grpc_fd* fd);
+/* Forcibly set the fd to be readable, resulting in the closure registered with
+ * grpc_fd_notify_on_read being invoked.
+ */
+void grpc_fd_set_readable(grpc_fd* fd);
+
+/* Forcibly set the fd to be writable, resulting in the closure registered with
+ * grpc_fd_notify_on_write being invoked.
+ */
+void grpc_fd_set_writable(grpc_fd* fd);
+
+/* Forcibly set the fd to have errored, resulting in the closure registered with
+ * grpc_fd_notify_on_error being invoked.
+ */
+void grpc_fd_set_error(grpc_fd* fd);
/* pollset_posix functions */
@@ -159,9 +178,4 @@ void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
typedef int (*grpc_poll_function_type)(struct pollfd*, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
-/* WARNING: The following two functions should be used for testing purposes
- * ONLY */
-void grpc_set_event_engine_test_only(const grpc_event_engine_vtable*);
-const grpc_event_engine_vtable* grpc_get_event_engine_test_only();
-
#endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */
diff --git a/src/core/lib/iomgr/iocp_windows.cc b/src/core/lib/iomgr/iocp_windows.cc
index ce77231036..ad325fe215 100644
--- a/src/core/lib/iomgr/iocp_windows.cc
+++ b/src/core/lib/iomgr/iocp_windows.cc
@@ -89,10 +89,15 @@ grpc_iocp_work_status grpc_iocp_work(grpc_millis deadline) {
} else {
abort();
}
- success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
- FALSE, &flags);
- info->bytes_transfered = bytes;
- info->wsa_error = success ? 0 : WSAGetLastError();
+ if (socket->shutdown_called) {
+ info->bytes_transfered = 0;
+ info->wsa_error = WSA_OPERATION_ABORTED;
+ } else {
+ success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
+ FALSE, &flags);
+ info->bytes_transfered = bytes;
+ info->wsa_error = success ? 0 : WSAGetLastError();
+ }
GPR_ASSERT(overlapped == &info->overlapped);
grpc_socket_become_ready(socket, info);
return GRPC_IOCP_WORK_WORK;
diff --git a/src/core/lib/iomgr/iomgr_posix_cfstream.cc b/src/core/lib/iomgr/iomgr_posix_cfstream.cc
new file mode 100644
index 0000000000..235a9e0712
--- /dev/null
+++ b/src/core/lib/iomgr/iomgr_posix_cfstream.cc
@@ -0,0 +1,75 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_CFSTREAM_IOMGR
+
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/iomgr_posix.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/tcp_client.h"
+#include "src/core/lib/iomgr/tcp_posix.h"
+#include "src/core/lib/iomgr/tcp_server.h"
+#include "src/core/lib/iomgr/timer.h"
+
+static const char* grpc_cfstream_env_var = "grpc_cfstream";
+
+extern grpc_tcp_server_vtable grpc_posix_tcp_server_vtable;
+extern grpc_tcp_client_vtable grpc_posix_tcp_client_vtable;
+extern grpc_tcp_client_vtable grpc_cfstream_client_vtable;
+extern grpc_timer_vtable grpc_generic_timer_vtable;
+extern grpc_pollset_vtable grpc_posix_pollset_vtable;
+extern grpc_pollset_set_vtable grpc_posix_pollset_set_vtable;
+extern grpc_address_resolver_vtable grpc_posix_resolver_vtable;
+
+static void iomgr_platform_init(void) {
+ grpc_wakeup_fd_global_init();
+ grpc_event_engine_init();
+}
+
+static void iomgr_platform_flush(void) {}
+
+static void iomgr_platform_shutdown(void) {
+ grpc_event_engine_shutdown();
+ grpc_wakeup_fd_global_destroy();
+}
+
+static grpc_iomgr_platform_vtable vtable = {
+ iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown};
+
+void grpc_set_default_iomgr_platform() {
+ char* enable_cfstream = getenv(grpc_cfstream_env_var);
+ grpc_tcp_client_vtable* client_vtable = &grpc_posix_tcp_client_vtable;
+ if (enable_cfstream != nullptr && enable_cfstream[0] == '1') {
+ client_vtable = &grpc_cfstream_client_vtable;
+ }
+ grpc_set_tcp_client_impl(client_vtable);
+ grpc_set_tcp_server_impl(&grpc_posix_tcp_server_vtable);
+ grpc_set_timer_impl(&grpc_generic_timer_vtable);
+ grpc_set_pollset_vtable(&grpc_posix_pollset_vtable);
+ grpc_set_pollset_set_vtable(&grpc_posix_pollset_set_vtable);
+ grpc_set_resolver_impl(&grpc_posix_resolver_vtable);
+ grpc_set_iomgr_platform_vtable(&vtable);
+}
+
+#endif /* GRPC_CFSTREAM_IOMGR */
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index 80d8e63cdd..066417b93c 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -98,9 +98,9 @@
#define GRPC_POSIX_FORK 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
#ifdef GRPC_CFSTREAM
-#define GRPC_POSIX_SOCKET_IOMGR 1
-#define GRPC_CFSTREAM_ENDPOINT 1
+#define GRPC_CFSTREAM_IOMGR 1
#define GRPC_CFSTREAM_CLIENT 1
+#define GRPC_CFSTREAM_ENDPOINT 1
#define GRPC_POSIX_SOCKET_ARES_EV_DRIVER 1
#define GRPC_POSIX_SOCKET_EV 1
#define GRPC_POSIX_SOCKET_EV_EPOLL1 1
@@ -111,6 +111,7 @@
#define GRPC_POSIX_SOCKET_SOCKADDR 1
#define GRPC_POSIX_SOCKET_SOCKET_FACTORY 1
#define GRPC_POSIX_SOCKET_TCP 1
+#define GRPC_POSIX_SOCKET_TCP_CLIENT 1
#define GRPC_POSIX_SOCKET_TCP_SERVER 1
#define GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON 1
#define GRPC_POSIX_SOCKET_UTILS_COMMON 1
@@ -139,6 +140,18 @@
#define GRPC_POSIX_SOCKET 1
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_WAKEUP_FD 1
+#elif defined(GPR_SOLARIS)
+#define GRPC_HAVE_UNIX_SOCKET 1
+#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
+#define GRPC_POSIX_SOCKET 1
+#define GRPC_POSIX_SOCKETUTILS 1
+#define GRPC_POSIX_WAKEUP_FD 1
+#elif defined(GPR_AIX)
+#define GRPC_HAVE_UNIX_SOCKET 1
+#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
+#define GRPC_POSIX_SOCKET 1
+#define GRPC_POSIX_SOCKETUTILS 1
+#define GRPC_POSIX_WAKEUP_FD 1
#elif defined(GPR_NACL)
#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc
index 539bc120ce..b6fc7579f7 100644
--- a/src/core/lib/iomgr/resource_quota.cc
+++ b/src/core/lib/iomgr/resource_quota.cc
@@ -96,6 +96,9 @@ struct grpc_resource_user {
list, false otherwise */
bool added_to_free_pool;
+ /* The number of threads currently allocated to this resource user */
+ gpr_atm num_threads_allocated;
+
/* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
*/
grpc_closure* reclaimers[2];
@@ -135,12 +138,33 @@ struct grpc_resource_quota {
gpr_atm last_size;
+ /* Mutex to protect max_threads and num_threads_allocated */
+ /* Note: We could have used gpr_atm for max_threads and num_threads_allocated
+ * and avoid having this mutex; but in that case, each invocation of the
+ * function grpc_resource_user_allocate_threads() would have had to do at
+ * least two atomic loads (for max_threads and num_threads_allocated) followed
+ * by a CAS (on num_threads_allocated).
+ * Moreover, we expect grpc_resource_user_allocate_threads() to be often
+ * called concurrently thereby increasing the chances of failing the CAS
+ * operation. This additional complexity is not worth the tiny perf gain we
+ * may (or may not) have by using atomics */
+ gpr_mu thread_count_mu;
+
+ /* Max number of threads allowed */
+ int max_threads;
+
+ /* Number of threads currently allocated via this resource_quota object */
+ int num_threads_allocated;
+
/* Has rq_step been scheduled to occur? */
bool step_scheduled;
+
/* Are we currently reclaiming memory */
bool reclaiming;
+
/* Closure around rq_step */
grpc_closure rq_step_closure;
+
/* Closure around rq_reclamation_done */
grpc_closure rq_reclamation_done_closure;
@@ -524,6 +548,11 @@ static void ru_shutdown(void* ru, grpc_error* error) {
static void ru_destroy(void* ru, grpc_error* error) {
grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->refs) == 0);
+ // Free all the remaining thread quota
+ grpc_resource_user_free_threads(resource_user,
+ static_cast<int>(gpr_atm_no_barrier_load(
+ &resource_user->num_threads_allocated)));
+
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
rulist_remove(resource_user, static_cast<grpc_rulist>(i));
}
@@ -594,6 +623,9 @@ grpc_resource_quota* grpc_resource_quota_create(const char* name) {
resource_quota->free_pool = INT64_MAX;
resource_quota->size = INT64_MAX;
gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
+ gpr_mu_init(&resource_quota->thread_count_mu);
+ resource_quota->max_threads = INT_MAX;
+ resource_quota->num_threads_allocated = 0;
resource_quota->step_scheduled = false;
resource_quota->reclaiming = false;
gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0);
@@ -616,6 +648,8 @@ grpc_resource_quota* grpc_resource_quota_create(const char* name) {
void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota) {
if (gpr_unref(&resource_quota->refs)) {
+ // No outstanding thread quota
+ GPR_ASSERT(resource_quota->num_threads_allocated == 0);
GRPC_COMBINER_UNREF(resource_quota->combiner, "resource_quota");
gpr_free(resource_quota->name);
gpr_free(resource_quota);
@@ -647,6 +681,15 @@ double grpc_resource_quota_get_memory_pressure(
}
/* Public API */
+void grpc_resource_quota_set_max_threads(grpc_resource_quota* resource_quota,
+ int new_max_threads) {
+ GPR_ASSERT(new_max_threads >= 0);
+ gpr_mu_lock(&resource_quota->thread_count_mu);
+ resource_quota->max_threads = new_max_threads;
+ gpr_mu_unlock(&resource_quota->thread_count_mu);
+}
+
+/* Public API */
void grpc_resource_quota_resize(grpc_resource_quota* resource_quota,
size_t size) {
grpc_core::ExecCtx exec_ctx;
@@ -731,6 +774,7 @@ grpc_resource_user* grpc_resource_user_create(
grpc_closure_list_init(&resource_user->on_allocated);
resource_user->allocating = false;
resource_user->added_to_free_pool = false;
+ gpr_atm_no_barrier_store(&resource_user->num_threads_allocated, 0);
resource_user->reclaimers[0] = nullptr;
resource_user->reclaimers[1] = nullptr;
resource_user->new_reclaimers[0] = nullptr;
@@ -785,6 +829,40 @@ void grpc_resource_user_shutdown(grpc_resource_user* resource_user) {
}
}
+bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
+ int thread_count) {
+ GPR_ASSERT(thread_count >= 0);
+ bool is_success = false;
+ gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
+ grpc_resource_quota* rq = resource_user->resource_quota;
+ if (rq->num_threads_allocated + thread_count <= rq->max_threads) {
+ rq->num_threads_allocated += thread_count;
+ gpr_atm_no_barrier_fetch_add(&resource_user->num_threads_allocated,
+ thread_count);
+ is_success = true;
+ }
+ gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
+ return is_success;
+}
+
+void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
+ int thread_count) {
+ GPR_ASSERT(thread_count >= 0);
+ gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
+ grpc_resource_quota* rq = resource_user->resource_quota;
+ rq->num_threads_allocated -= thread_count;
+ int old_count = static_cast<int>(gpr_atm_no_barrier_fetch_add(
+ &resource_user->num_threads_allocated, -thread_count));
+ if (old_count < thread_count || rq->num_threads_allocated < 0) {
+ gpr_log(GPR_ERROR,
+ "Releasing more threads (%d) than currently allocated (rq threads: "
+ "%d, ru threads: %d)",
+ thread_count, rq->num_threads_allocated + thread_count, old_count);
+ abort();
+ }
+ gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
+}
+
void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size,
grpc_closure* optional_on_done) {
gpr_mu_lock(&resource_user->mu);
diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h
index 937daf8728..7b0ed7417a 100644
--- a/src/core/lib/iomgr/resource_quota.h
+++ b/src/core/lib/iomgr/resource_quota.h
@@ -93,6 +93,22 @@ void grpc_resource_user_ref(grpc_resource_user* resource_user);
void grpc_resource_user_unref(grpc_resource_user* resource_user);
void grpc_resource_user_shutdown(grpc_resource_user* resource_user);
+/* Attempts to get quota from the resource_user to create 'thread_count' number
+ * of threads. Returns true if successful (i.e the caller is now free to create
+ * 'thread_count' number of threads) or false if quota is not available */
+bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
+ int thread_count);
+/* Releases 'thread_count' worth of quota back to the resource user. The quota
+ * should have been previously obtained successfully by calling
+ * grpc_resource_user_allocate_threads().
+ *
+ * Note: There need not be an exact one-to-one correspondence between
+ * grpc_resource_user_allocate_threads() and grpc_resource_user_free_threads()
+ * calls. The only requirement is that the number of threads allocated should
+ * all be eventually released */
+void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
+ int thread_count);
+
/* Allocate from the resource user (and its quota).
If optional_on_done is NULL, then allocate immediately. This may push the
quota over-limit, at which point reclamation will kick in.
diff --git a/src/core/lib/iomgr/socket_windows.cc b/src/core/lib/iomgr/socket_windows.cc
index 4ad31cb35d..999c6646ad 100644
--- a/src/core/lib/iomgr/socket_windows.cc
+++ b/src/core/lib/iomgr/socket_windows.cc
@@ -52,6 +52,10 @@ grpc_winsocket* grpc_winsocket_create(SOCKET socket, const char* name) {
return r;
}
+SOCKET grpc_winsocket_wrapped_socket(grpc_winsocket* socket) {
+ return socket->socket;
+}
+
/* Schedule a shutdown of the socket operations. Will call the pending
operations to abort them. We need to do that this way because of the
various callsites of that function, which happens to be in various
diff --git a/src/core/lib/iomgr/socket_windows.h b/src/core/lib/iomgr/socket_windows.h
index b09b9da562..46d7d58356 100644
--- a/src/core/lib/iomgr/socket_windows.h
+++ b/src/core/lib/iomgr/socket_windows.h
@@ -92,6 +92,8 @@ typedef struct grpc_winsocket {
it will be responsible for closing it. */
grpc_winsocket* grpc_winsocket_create(SOCKET socket, const char* name);
+SOCKET grpc_winsocket_wrapped_socket(grpc_winsocket* socket);
+
/* Initiate an asynchronous shutdown of the socket. Will call off any pending
operation to cancel them. */
void grpc_winsocket_shutdown(grpc_winsocket* socket);
diff --git a/src/core/lib/iomgr/tcp_client_cfstream.cc b/src/core/lib/iomgr/tcp_client_cfstream.cc
index 5acea91792..4b21322d74 100644
--- a/src/core/lib/iomgr/tcp_client_cfstream.cc
+++ b/src/core/lib/iomgr/tcp_client_cfstream.cc
@@ -211,6 +211,6 @@ static void CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep,
gpr_mu_unlock(&connect->mu);
}
-grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {CFStreamClientConnect};
+grpc_tcp_client_vtable grpc_cfstream_client_vtable = {CFStreamClientConnect};
#endif /* GRPC_CFSTREAM_CLIENT */
diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc
index 5d316d477b..b3cb442f18 100644
--- a/src/core/lib/iomgr/tcp_windows.cc
+++ b/src/core/lib/iomgr/tcp_windows.cc
@@ -53,7 +53,7 @@
extern grpc_core::TraceFlag grpc_tcp_trace;
-static grpc_error* set_non_block(SOCKET sock) {
+grpc_error* grpc_tcp_set_non_block(SOCKET sock) {
int status;
uint32_t param = 1;
DWORD ret;
@@ -90,7 +90,7 @@ static grpc_error* enable_loopback_fast_path(SOCKET sock) {
grpc_error* grpc_tcp_prepare_socket(SOCKET sock) {
grpc_error* err;
- err = set_non_block(sock);
+ err = grpc_tcp_set_non_block(sock);
if (err != GRPC_ERROR_NONE) return err;
err = set_dualstack(sock);
if (err != GRPC_ERROR_NONE) return err;
diff --git a/src/core/lib/iomgr/tcp_windows.h b/src/core/lib/iomgr/tcp_windows.h
index 161a545a2a..04ef8102b6 100644
--- a/src/core/lib/iomgr/tcp_windows.h
+++ b/src/core/lib/iomgr/tcp_windows.h
@@ -46,6 +46,8 @@ grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket,
grpc_error* grpc_tcp_prepare_socket(SOCKET sock);
+grpc_error* grpc_tcp_set_non_block(SOCKET sock);
+
#endif
#endif /* GRPC_CORE_LIB_IOMGR_TCP_WINDOWS_H */