aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-09-14 12:33:34 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-09-14 12:33:34 -0700
commit79eba775cae030efa030e4c887d2a1ba7544417f (patch)
tree2586b9aabc49007fdb78e2d239cc325dcc4a1c37 /src/core/lib/iomgr
parentf66ed288538a7e28cdbdc694bef80bb86b696b90 (diff)
parent213d2744b543cf8593838a5eb995bf9c7924c7d5 (diff)
Merge branch 'exec' into execp
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/combiner.c3
-rw-r--r--src/core/lib/iomgr/error.c34
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.c149
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c45
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c6
-rw-r--r--src/core/lib/iomgr/executor.c62
-rw-r--r--src/core/lib/iomgr/polling_entity.c18
-rw-r--r--src/core/lib/iomgr/polling_entity.h8
-rw-r--r--src/core/lib/iomgr/resource_quota.c7
-rw-r--r--src/core/lib/iomgr/socket_factory_posix.c4
-rw-r--r--src/core/lib/iomgr/socket_mutator.c4
-rw-r--r--src/core/lib/iomgr/timer_manager.c2
-rw-r--r--src/core/lib/iomgr/udp_server.c2
-rw-r--r--src/core/lib/iomgr/wakeup_fd_cv.c14
-rw-r--r--src/core/lib/iomgr/wakeup_fd_cv.h4
15 files changed, 185 insertions, 177 deletions
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index 360967f3ba..f899b25f10 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -356,7 +356,8 @@ static void combiner_finally_exec(grpc_exec_ctx *exec_ctx,
static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
grpc_error *error) {
- combiner_finally_exec(exec_ctx, closure, GRPC_ERROR_REF(error));
+ combiner_finally_exec(exec_ctx, (grpc_closure *)closure,
+ GRPC_ERROR_REF(error));
}
grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *combiner) {
diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c
index dcd175a2e1..d5e00a8f64 100644
--- a/src/core/lib/iomgr/error.c
+++ b/src/core/lib/iomgr/error.c
@@ -278,13 +278,13 @@ static void internal_set_time(grpc_error **err, grpc_error_times which,
memcpy((*err)->arena + slot, &value, sizeof(value));
}
-static void internal_add_error(grpc_error **err, grpc_error *new) {
- grpc_linked_error new_last = {new, UINT8_MAX};
+static void internal_add_error(grpc_error **err, grpc_error *new_err) {
+ grpc_linked_error new_last = {new_err, UINT8_MAX};
uint8_t slot = get_placement(err, sizeof(grpc_linked_error));
if (slot == UINT8_MAX) {
- gpr_log(GPR_ERROR, "Error %p is full, dropping error %p = %s", *err, new,
- grpc_error_string(new));
- GRPC_ERROR_UNREF(new);
+ gpr_log(GPR_ERROR, "Error %p is full, dropping error %p = %s", *err,
+ new_err, grpc_error_string(new_err));
+ GRPC_ERROR_UNREF(new_err);
return;
}
if ((*err)->first_err == UINT8_MAX) {
@@ -321,8 +321,8 @@ grpc_error *grpc_error_create(const char *file, int line, grpc_slice desc,
uint8_t initial_arena_capacity = (uint8_t)(
DEFAULT_ERROR_CAPACITY +
(uint8_t)(num_referencing * SLOTS_PER_LINKED_ERROR) + SURPLUS_CAPACITY);
- grpc_error *err =
- gpr_malloc(sizeof(*err) + initial_arena_capacity * sizeof(intptr_t));
+ grpc_error *err = (grpc_error *)gpr_malloc(
+ sizeof(*err) + initial_arena_capacity * sizeof(intptr_t));
if (err == NULL) { // TODO(ctiller): make gpr_malloc return NULL
return GRPC_ERROR_OOM;
}
@@ -432,10 +432,10 @@ static grpc_error *copy_error_and_unref(grpc_error *in) {
grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which,
intptr_t value) {
GPR_TIMER_BEGIN("grpc_error_set_int", 0);
- grpc_error *new = copy_error_and_unref(src);
- internal_set_int(&new, which, value);
+ grpc_error *new_err = copy_error_and_unref(src);
+ internal_set_int(&new_err, which, value);
GPR_TIMER_END("grpc_error_set_int", 0);
- return new;
+ return new_err;
}
typedef struct {
@@ -477,10 +477,10 @@ bool grpc_error_get_int(grpc_error *err, grpc_error_ints which, intptr_t *p) {
grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which,
grpc_slice str) {
GPR_TIMER_BEGIN("grpc_error_set_str", 0);
- grpc_error *new = copy_error_and_unref(src);
- internal_set_str(&new, which, str);
+ grpc_error *new_err = copy_error_and_unref(src);
+ internal_set_str(&new_err, which, str);
GPR_TIMER_END("grpc_error_set_str", 0);
- return new;
+ return new_err;
}
bool grpc_error_get_str(grpc_error *err, grpc_error_strs which,
@@ -507,10 +507,10 @@ bool grpc_error_get_str(grpc_error *err, grpc_error_strs which,
grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) {
GPR_TIMER_BEGIN("grpc_error_add_child", 0);
- grpc_error *new = copy_error_and_unref(src);
- internal_add_error(&new, child);
+ grpc_error *new_err = copy_error_and_unref(src);
+ internal_add_error(&new_err, child);
GPR_TIMER_END("grpc_error_add_child", 0);
- return new;
+ return new_err;
}
static const char *no_error_string = "\"No Error\"";
@@ -733,7 +733,7 @@ const char *grpc_error_string(grpc_error *err) {
void *p = (void *)gpr_atm_acq_load(&err->atomics.error_string);
if (p != NULL) {
GPR_TIMER_END("grpc_error_string", 0);
- return p;
+ return (const char *)p;
}
kv_pairs kvs;
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index 5bc7e878de..6946a2cbf5 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -130,9 +130,9 @@ static void fd_global_shutdown(void);
* Pollset Declarations
*/
-typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
+typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state_t;
-static const char *kick_state_string(kick_state st) {
+static const char *kick_state_string(kick_state_t st) {
switch (st) {
case UNKICKED:
return "UNKICKED";
@@ -145,7 +145,7 @@ static const char *kick_state_string(kick_state st) {
}
struct grpc_pollset_worker {
- kick_state kick_state;
+ kick_state_t kick_state;
int kick_state_mutator; // which line of code last changed kick state
bool initialized_cv;
grpc_pollset_worker *next;
@@ -160,18 +160,18 @@ struct grpc_pollset_worker {
(worker)->kick_state_mutator = __LINE__; \
} while (false)
-#define MAX_NEIGHBOURHOODS 1024
+#define MAX_NEIGHBORHOODS 1024
-typedef struct pollset_neighbourhood {
+typedef struct pollset_neighborhood {
gpr_mu mu;
grpc_pollset *active_root;
char pad[GPR_CACHELINE_SIZE];
-} pollset_neighbourhood;
+} pollset_neighborhood;
struct grpc_pollset {
gpr_mu mu;
- pollset_neighbourhood *neighbourhood;
- bool reassigning_neighbourhood;
+ pollset_neighborhood *neighborhood;
+ bool reassigning_neighborhood;
grpc_pollset_worker *root_worker;
bool kicked_without_poller;
@@ -384,8 +384,8 @@ GPR_TLS_DECL(g_current_thread_worker);
/* The designated poller */
static gpr_atm g_active_poller;
-static pollset_neighbourhood *g_neighbourhoods;
-static size_t g_num_neighbourhoods;
+static pollset_neighborhood *g_neighborhoods;
+static size_t g_num_neighborhoods;
/* Return true if first in list */
static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) {
@@ -424,8 +424,8 @@ static worker_remove_result worker_remove(grpc_pollset *pollset,
}
}
-static size_t choose_neighbourhood(void) {
- return (size_t)gpr_cpu_current_cpu() % g_num_neighbourhoods;
+static size_t choose_neighborhood(void) {
+ return (size_t)gpr_cpu_current_cpu() % g_num_neighborhoods;
}
static grpc_error *pollset_global_init(void) {
@@ -441,11 +441,11 @@ static grpc_error *pollset_global_init(void) {
&ev) != 0) {
return GRPC_OS_ERROR(errno, "epoll_ctl");
}
- g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS);
- g_neighbourhoods = (pollset_neighbourhood *)gpr_zalloc(
- sizeof(*g_neighbourhoods) * g_num_neighbourhoods);
- for (size_t i = 0; i < g_num_neighbourhoods; i++) {
- gpr_mu_init(&g_neighbourhoods[i].mu);
+ g_num_neighborhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBORHOODS);
+ g_neighborhoods = (pollset_neighborhood *)gpr_zalloc(
+ sizeof(*g_neighborhoods) * g_num_neighborhoods);
+ for (size_t i = 0; i < g_num_neighborhoods; i++) {
+ gpr_mu_init(&g_neighborhoods[i].mu);
}
return GRPC_ERROR_NONE;
}
@@ -454,17 +454,17 @@ static void pollset_global_shutdown(void) {
gpr_tls_destroy(&g_current_thread_pollset);
gpr_tls_destroy(&g_current_thread_worker);
if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
- for (size_t i = 0; i < g_num_neighbourhoods; i++) {
- gpr_mu_destroy(&g_neighbourhoods[i].mu);
+ for (size_t i = 0; i < g_num_neighborhoods; i++) {
+ gpr_mu_destroy(&g_neighborhoods[i].mu);
}
- gpr_free(g_neighbourhoods);
+ gpr_free(g_neighborhoods);
}
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
gpr_mu_init(&pollset->mu);
*mu = &pollset->mu;
- pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()];
- pollset->reassigning_neighbourhood = false;
+ pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
+ pollset->reassigning_neighborhood = false;
pollset->root_worker = NULL;
pollset->kicked_without_poller = false;
pollset->seen_inactive = true;
@@ -477,26 +477,26 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
gpr_mu_lock(&pollset->mu);
if (!pollset->seen_inactive) {
- pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
+ pollset_neighborhood *neighborhood = pollset->neighborhood;
gpr_mu_unlock(&pollset->mu);
- retry_lock_neighbourhood:
- gpr_mu_lock(&neighbourhood->mu);
+ retry_lock_neighborhood:
+ gpr_mu_lock(&neighborhood->mu);
gpr_mu_lock(&pollset->mu);
if (!pollset->seen_inactive) {
- if (pollset->neighbourhood != neighbourhood) {
- gpr_mu_unlock(&neighbourhood->mu);
- neighbourhood = pollset->neighbourhood;
+ if (pollset->neighborhood != neighborhood) {
+ gpr_mu_unlock(&neighborhood->mu);
+ neighborhood = pollset->neighborhood;
gpr_mu_unlock(&pollset->mu);
- goto retry_lock_neighbourhood;
+ goto retry_lock_neighborhood;
}
pollset->prev->next = pollset->next;
pollset->next->prev = pollset->prev;
- if (pollset == pollset->neighbourhood->active_root) {
- pollset->neighbourhood->active_root =
+ if (pollset == pollset->neighborhood->active_root) {
+ pollset->neighborhood->active_root =
pollset->next == pollset ? NULL : pollset->next;
}
}
- gpr_mu_unlock(&pollset->neighbourhood->mu);
+ gpr_mu_unlock(&pollset->neighborhood->mu);
}
gpr_mu_unlock(&pollset->mu);
gpr_mu_destroy(&pollset->mu);
@@ -675,16 +675,16 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
// pollset has been observed to be inactive, we need to move back to the
// active list
bool is_reassigning = false;
- if (!pollset->reassigning_neighbourhood) {
+ if (!pollset->reassigning_neighborhood) {
is_reassigning = true;
- pollset->reassigning_neighbourhood = true;
- pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()];
+ pollset->reassigning_neighborhood = true;
+ pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
}
- pollset_neighbourhood *neighbourhood = pollset->neighbourhood;
+ pollset_neighborhood *neighborhood = pollset->neighborhood;
gpr_mu_unlock(&pollset->mu);
// pollset unlocked: state may change (even worker->kick_state)
- retry_lock_neighbourhood:
- gpr_mu_lock(&neighbourhood->mu);
+ retry_lock_neighborhood:
+ gpr_mu_lock(&neighborhood->mu);
gpr_mu_lock(&pollset->mu);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
@@ -692,17 +692,17 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
is_reassigning);
}
if (pollset->seen_inactive) {
- if (neighbourhood != pollset->neighbourhood) {
- gpr_mu_unlock(&neighbourhood->mu);
- neighbourhood = pollset->neighbourhood;
+ if (neighborhood != pollset->neighborhood) {
+ gpr_mu_unlock(&neighborhood->mu);
+ neighborhood = pollset->neighborhood;
gpr_mu_unlock(&pollset->mu);
- goto retry_lock_neighbourhood;
+ goto retry_lock_neighborhood;
}
/* In the brief time we released the pollset locks above, the worker MAY
have been kicked. In this case, the worker should get out of this
pollset ASAP and hence this should neither add the pollset to
- neighbourhood nor mark the pollset as active.
+ neighborhood nor mark the pollset as active.
On a side note, the only way a worker's kick state could have changed
at this point is if it were "kicked specifically". Since the worker has
@@ -710,25 +710,25 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
not visible in the "kick any" path yet */
if (worker->kick_state == UNKICKED) {
pollset->seen_inactive = false;
- if (neighbourhood->active_root == NULL) {
- neighbourhood->active_root = pollset->next = pollset->prev = pollset;
+ if (neighborhood->active_root == NULL) {
+ neighborhood->active_root = pollset->next = pollset->prev = pollset;
/* Make this the designated poller if there isn't one already */
if (worker->kick_state == UNKICKED &&
gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
SET_KICK_STATE(worker, DESIGNATED_POLLER);
}
} else {
- pollset->next = neighbourhood->active_root;
+ pollset->next = neighborhood->active_root;
pollset->prev = pollset->next->prev;
pollset->next->prev = pollset->prev->next = pollset;
}
}
}
if (is_reassigning) {
- GPR_ASSERT(pollset->reassigning_neighbourhood);
- pollset->reassigning_neighbourhood = false;
+ GPR_ASSERT(pollset->reassigning_neighborhood);
+ pollset->reassigning_neighborhood = false;
}
- gpr_mu_unlock(&neighbourhood->mu);
+ gpr_mu_unlock(&neighborhood->mu);
}
worker_insert(pollset, worker);
@@ -763,7 +763,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
}
/* We release pollset lock in this function at a couple of places:
- * 1. Briefly when assigning pollset to a neighbourhood
+ * 1. Briefly when assigning pollset to a neighborhood
* 2. When doing gpr_cv_wait()
* It is possible that 'kicked_without_poller' was set to true during (1) and
* 'shutting_down' is set to true during (1) or (2). If either of them is
@@ -781,12 +781,12 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down;
}
-static bool check_neighbourhood_for_available_poller(
- pollset_neighbourhood *neighbourhood) {
- GPR_TIMER_BEGIN("check_neighbourhood_for_available_poller", 0);
+static bool check_neighborhood_for_available_poller(
+ pollset_neighborhood *neighborhood) {
+ GPR_TIMER_BEGIN("check_neighborhood_for_available_poller", 0);
bool found_worker = false;
do {
- grpc_pollset *inspect = neighbourhood->active_root;
+ grpc_pollset *inspect = neighborhood->active_root;
if (inspect == NULL) {
break;
}
@@ -831,8 +831,8 @@ static bool check_neighbourhood_for_available_poller(
gpr_log(GPR_DEBUG, " .. mark pollset %p inactive", inspect);
}
inspect->seen_inactive = true;
- if (inspect == neighbourhood->active_root) {
- neighbourhood->active_root =
+ if (inspect == neighborhood->active_root) {
+ neighborhood->active_root =
inspect->next == inspect ? NULL : inspect->next;
}
inspect->next->prev = inspect->prev;
@@ -841,7 +841,7 @@ static bool check_neighbourhood_for_available_poller(
}
gpr_mu_unlock(&inspect->mu);
} while (!found_worker);
- GPR_TIMER_END("check_neighbourhood_for_available_poller", 0);
+ GPR_TIMER_END("check_neighborhood_for_available_poller", 0);
return found_worker;
}
@@ -873,32 +873,31 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
} else {
gpr_atm_no_barrier_store(&g_active_poller, 0);
- size_t poller_neighbourhood_idx =
- (size_t)(pollset->neighbourhood - g_neighbourhoods);
+ size_t poller_neighborhood_idx =
+ (size_t)(pollset->neighborhood - g_neighborhoods);
gpr_mu_unlock(&pollset->mu);
bool found_worker = false;
- bool scan_state[MAX_NEIGHBOURHOODS];
- for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
- pollset_neighbourhood *neighbourhood =
- &g_neighbourhoods[(poller_neighbourhood_idx + i) %
- g_num_neighbourhoods];
- if (gpr_mu_trylock(&neighbourhood->mu)) {
- found_worker =
- check_neighbourhood_for_available_poller(neighbourhood);
- gpr_mu_unlock(&neighbourhood->mu);
+ bool scan_state[MAX_NEIGHBORHOODS];
+ for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
+ pollset_neighborhood *neighborhood =
+ &g_neighborhoods[(poller_neighborhood_idx + i) %
+ g_num_neighborhoods];
+ if (gpr_mu_trylock(&neighborhood->mu)) {
+ found_worker = check_neighborhood_for_available_poller(neighborhood);
+ gpr_mu_unlock(&neighborhood->mu);
scan_state[i] = true;
} else {
scan_state[i] = false;
}
}
- for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
+ for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
if (scan_state[i]) continue;
- pollset_neighbourhood *neighbourhood =
- &g_neighbourhoods[(poller_neighbourhood_idx + i) %
- g_num_neighbourhoods];
- gpr_mu_lock(&neighbourhood->mu);
- found_worker = check_neighbourhood_for_available_poller(neighbourhood);
- gpr_mu_unlock(&neighbourhood->mu);
+ pollset_neighborhood *neighborhood =
+ &g_neighborhoods[(poller_neighborhood_idx + i) %
+ g_num_neighborhoods];
+ gpr_mu_lock(&neighborhood->mu);
+ found_worker = check_neighborhood_for_available_poller(neighborhood);
+ gpr_mu_unlock(&neighborhood->mu);
}
grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->mu);
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index 277347ac70..df69025f1a 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -97,12 +97,12 @@ static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg,
* pollable Declarations
*/
-typedef struct pollable {
+typedef struct pollable_t {
polling_obj po;
int epfd;
grpc_wakeup_fd wakeup;
grpc_pollset_worker *root_worker;
-} pollable;
+} pollable_t;
static const char *polling_obj_type_string(polling_obj_type t) {
switch (t) {
@@ -122,7 +122,7 @@ static const char *polling_obj_type_string(polling_obj_type t) {
return "<invalid>";
}
-static char *pollable_desc(pollable *p) {
+static char *pollable_desc(pollable_t *p) {
char *out;
gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d",
polling_obj_type_string(p->po.type), p->po.group, p->epfd,
@@ -130,19 +130,19 @@ static char *pollable_desc(pollable *p) {
return out;
}
-static pollable g_empty_pollable;
+static pollable_t g_empty_pollable;
-static void pollable_init(pollable *p, polling_obj_type type);
-static void pollable_destroy(pollable *p);
+static void pollable_init(pollable_t *p, polling_obj_type type);
+static void pollable_destroy(pollable_t *p);
/* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */
-static grpc_error *pollable_materialize(pollable *p);
+static grpc_error *pollable_materialize(pollable_t *p);
/*******************************************************************************
* Fd Declarations
*/
struct grpc_fd {
- pollable pollable;
+ pollable_t pollable;
int fd;
/* refst format:
bit 0 : 1=Active / 0=Orphaned
@@ -193,15 +193,15 @@ struct grpc_pollset_worker {
pollset_worker_link links[POLLSET_WORKER_LINK_COUNT];
gpr_cv cv;
grpc_pollset *pollset;
- pollable *pollable;
+ pollable_t *pollable;
};
#define MAX_EPOLL_EVENTS 100
#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5
struct grpc_pollset {
- pollable pollable;
- pollable *current_pollable;
+ pollable_t pollable;
+ pollable_t *current_pollable;
int kick_alls_pending;
bool kicked_without_poller;
grpc_closure *shutdown_closure;
@@ -451,13 +451,13 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
* Pollable Definitions
*/
-static void pollable_init(pollable *p, polling_obj_type type) {
+static void pollable_init(pollable_t *p, polling_obj_type type) {
po_init(&p->po, type);
p->root_worker = NULL;
p->epfd = -1;
}
-static void pollable_destroy(pollable *p) {
+static void pollable_destroy(pollable_t *p) {
po_destroy(&p->po);
if (p->epfd != -1) {
close(p->epfd);
@@ -466,7 +466,7 @@ static void pollable_destroy(pollable *p) {
}
/* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */
-static grpc_error *pollable_materialize(pollable *p) {
+static grpc_error *pollable_materialize(pollable_t *p) {
if (p->epfd == -1) {
int new_epfd = epoll_create1(EPOLL_CLOEXEC);
if (new_epfd < 0) {
@@ -492,7 +492,7 @@ static grpc_error *pollable_materialize(pollable *p) {
}
/* pollable must be materialized */
-static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) {
+static grpc_error *pollable_add_fd(pollable_t *p, grpc_fd *fd) {
grpc_error *error = GRPC_ERROR_NONE;
static const char *err_desc = "pollable_add_fd";
const int epfd = p->epfd;
@@ -599,7 +599,7 @@ static void pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
GRPC_ERROR_NONE);
}
-static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p,
+static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable_t *p,
grpc_pollset_worker *specific_worker) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG,
@@ -664,7 +664,7 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p,
/* p->po.mu must be held before calling this function */
static grpc_error *pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
- pollable *p = pollset->current_pollable;
+ pollable_t *p = pollset->current_pollable;
if (p != &pollset->pollable) {
gpr_mu_lock(&p->po.mu);
}
@@ -744,7 +744,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
-static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable *p) {
+static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable_t *p) {
return p != &g_empty_pollable && p != &pollset->pollable;
}
@@ -762,8 +762,9 @@ static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx,
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "PS:%p got pollset_wakeup %p", pollset, data_ptr);
}
- append_error(&error, grpc_wakeup_fd_consume_wakeup(
- (void *)((~(intptr_t)1) & (intptr_t)data_ptr)),
+ append_error(&error,
+ grpc_wakeup_fd_consume_wakeup(
+ (grpc_wakeup_fd *)((~(intptr_t)1) & (intptr_t)data_ptr)),
err_desc);
} else {
grpc_fd *fd = (grpc_fd *)data_ptr;
@@ -800,7 +801,7 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
}
static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- pollable *p, gpr_timespec now,
+ pollable_t *p, gpr_timespec now,
gpr_timespec deadline) {
int timeout = poll_deadline_to_millis_timeout(deadline, now);
@@ -1028,7 +1029,7 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
"PS:%p add fd %p; transition pollable from empty to fd", pollset,
fd);
}
- /* empty pollable --> single fd pollable */
+ /* empty pollable --> single fd pollable_t */
pollset_kick_all(exec_ctx, pollset);
pollset->current_pollable = &fd->pollable;
if (!fd_locked) gpr_mu_lock(&fd->pollable.po.mu);
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index bcf1d9001b..7f44eda138 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -1539,7 +1539,7 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
for (i = 0; i < nfds; i++) {
fds[i].revents = 0;
if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
- idx = FD_TO_IDX(fds[i].fd);
+ idx = GRPC_FD_TO_IDX(fds[i].fd);
fd_cvs[i].cv = &pollcv_cv;
fd_cvs[i].prev = NULL;
fd_cvs[i].next = g_cvfds.cvfds[idx].cvs;
@@ -1602,8 +1602,8 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
idx = 0;
for (i = 0; i < nfds; i++) {
if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
- remove_cvn(&g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs, &(fd_cvs[i]));
- if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) {
+ remove_cvn(&g_cvfds.cvfds[GRPC_FD_TO_IDX(fds[i].fd)].cvs, &(fd_cvs[i]));
+ if (g_cvfds.cvfds[GRPC_FD_TO_IDX(fds[i].fd)].is_set) {
fds[i].revents = POLLIN;
if (res >= 0) res++;
}
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c
index 41c7ff959e..cf2c051c92 100644
--- a/src/core/lib/iomgr/executor.c
+++ b/src/core/lib/iomgr/executor.c
@@ -32,16 +32,14 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/support/spinlock.h"
-#define MAX_DEPTH 2
-
typedef struct {
gpr_mu mu;
gpr_cv cv;
grpc_closure_list elems;
- size_t depth;
bool shutdown;
bool queued_long_job;
gpr_thd_id id;
+ grpc_closure_list local_elems;
} thread_state;
static thread_state *g_thread_state;
@@ -56,34 +54,35 @@ static grpc_tracer_flag executor_trace =
static void executor_thread(void *arg);
-static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
- size_t n = 0;
+static void run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list *list) {
+ int n = 0; // number of closures executed
- grpc_closure *c = list.head;
- while (c != NULL) {
- grpc_closure *next = c->next_data.next;
- grpc_error *error = c->error_data.error;
- if (GRPC_TRACER_ON(executor_trace)) {
+ while (!grpc_closure_list_empty(*list)) {
+ grpc_closure *c = list->head;
+ grpc_closure_list_init(list);
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
+ if (GRPC_TRACER_ON(executor_trace)) {
#ifndef NDEBUG
- gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c,
- c->file_created, c->line_created);
+ gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c,
+ c->file_created, c->line_created);
#else
- gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c);
+ gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c);
#endif
- }
+ }
#ifndef NDEBUG
- c->scheduled = false;
+ c->scheduled = false;
#endif
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- c = next;
- n++;
- grpc_exec_ctx_flush(exec_ctx);
+ n++;
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ c = next;
+ grpc_exec_ctx_flush(exec_ctx);
+ }
}
GRPC_STATS_INC_EXECUTOR_CLOSURES_PER_WAKEUP(exec_ctx, n);
-
- return n;
}
bool grpc_executor_is_threaded() {
@@ -128,7 +127,7 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) {
for (size_t i = 0; i < g_max_threads; i++) {
gpr_mu_destroy(&g_thread_state[i].mu);
gpr_cv_destroy(&g_thread_state[i].cv);
- run_closures(exec_ctx, g_thread_state[i].elems);
+ run_closures(exec_ctx, &g_thread_state[i].elems);
}
gpr_free(g_thread_state);
gpr_tls_destroy(&g_this_thread_state);
@@ -154,15 +153,12 @@ static void executor_thread(void *arg) {
GRPC_STATS_INC_EXECUTOR_THREADS_CREATED(&exec_ctx);
- size_t subtract_depth = 0;
bool used = false;
for (;;) {
if (GRPC_TRACER_ON(executor_trace)) {
- gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")",
- (int)(ts - g_thread_state), subtract_depth);
+ gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step", (int)(ts - g_thread_state));
}
gpr_mu_lock(&ts->mu);
- ts->depth -= subtract_depth;
while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
ts->queued_long_job = false;
gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
@@ -180,14 +176,15 @@ static void executor_thread(void *arg) {
used = true;
}
GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(&exec_ctx);
- grpc_closure_list exec = ts->elems;
+ GPR_ASSERT(grpc_closure_list_empty(ts->local_elems));
+ ts->local_elems = ts->elems;
ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
gpr_mu_unlock(&ts->mu);
if (GRPC_TRACER_ON(executor_trace)) {
gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state));
}
- subtract_depth = run_closures(&exec_ctx, exec);
+ run_closures(&exec_ctx, &ts->local_elems);
}
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -220,6 +217,10 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)];
} else {
GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx);
+ if (!is_short) {
+ grpc_closure_list_append(&ts->local_elems, closure, error);
+ return;
+ }
}
thread_state *orig_ts = ts;
@@ -259,8 +260,7 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
gpr_cv_signal(&ts->cv);
}
grpc_closure_list_append(&ts->elems, closure, error);
- ts->depth++;
- try_new_thread = ts->depth > MAX_DEPTH &&
+ try_new_thread = ts->elems.head != closure &&
cur_thread_count < g_max_threads && !ts->shutdown;
if (!is_short) ts->queued_long_job = true;
gpr_mu_unlock(&ts->mu);
diff --git a/src/core/lib/iomgr/polling_entity.c b/src/core/lib/iomgr/polling_entity.c
index 74d8794af5..8591a5518e 100644
--- a/src/core/lib/iomgr/polling_entity.c
+++ b/src/core/lib/iomgr/polling_entity.c
@@ -25,7 +25,7 @@ grpc_polling_entity grpc_polling_entity_create_from_pollset_set(
grpc_pollset_set *pollset_set) {
grpc_polling_entity pollent;
pollent.pollent.pollset_set = pollset_set;
- pollent.tag = POPS_POLLSET_SET;
+ pollent.tag = GRPC_POLLS_POLLSET_SET;
return pollent;
}
@@ -33,12 +33,12 @@ grpc_polling_entity grpc_polling_entity_create_from_pollset(
grpc_pollset *pollset) {
grpc_polling_entity pollent;
pollent.pollent.pollset = pollset;
- pollent.tag = POPS_POLLSET;
+ pollent.tag = GRPC_POLLS_POLLSET;
return pollent;
}
grpc_pollset *grpc_polling_entity_pollset(grpc_polling_entity *pollent) {
- if (pollent->tag == POPS_POLLSET) {
+ if (pollent->tag == GRPC_POLLS_POLLSET) {
return pollent->pollent.pollset;
}
return NULL;
@@ -46,23 +46,23 @@ grpc_pollset *grpc_polling_entity_pollset(grpc_polling_entity *pollent) {
grpc_pollset_set *grpc_polling_entity_pollset_set(
grpc_polling_entity *pollent) {
- if (pollent->tag == POPS_POLLSET_SET) {
+ if (pollent->tag == GRPC_POLLS_POLLSET_SET) {
return pollent->pollent.pollset_set;
}
return NULL;
}
bool grpc_polling_entity_is_empty(const grpc_polling_entity *pollent) {
- return pollent->tag == POPS_NONE;
+ return pollent->tag == GRPC_POLLS_NONE;
}
void grpc_polling_entity_add_to_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_polling_entity *pollent,
grpc_pollset_set *pss_dst) {
- if (pollent->tag == POPS_POLLSET) {
+ if (pollent->tag == GRPC_POLLS_POLLSET) {
GPR_ASSERT(pollent->pollent.pollset != NULL);
grpc_pollset_set_add_pollset(exec_ctx, pss_dst, pollent->pollent.pollset);
- } else if (pollent->tag == POPS_POLLSET_SET) {
+ } else if (pollent->tag == GRPC_POLLS_POLLSET_SET) {
GPR_ASSERT(pollent->pollent.pollset_set != NULL);
grpc_pollset_set_add_pollset_set(exec_ctx, pss_dst,
pollent->pollent.pollset_set);
@@ -75,10 +75,10 @@ void grpc_polling_entity_add_to_pollset_set(grpc_exec_ctx *exec_ctx,
void grpc_polling_entity_del_from_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_polling_entity *pollent,
grpc_pollset_set *pss_dst) {
- if (pollent->tag == POPS_POLLSET) {
+ if (pollent->tag == GRPC_POLLS_POLLSET) {
GPR_ASSERT(pollent->pollent.pollset != NULL);
grpc_pollset_set_del_pollset(exec_ctx, pss_dst, pollent->pollent.pollset);
- } else if (pollent->tag == POPS_POLLSET_SET) {
+ } else if (pollent->tag == GRPC_POLLS_POLLSET_SET) {
GPR_ASSERT(pollent->pollent.pollset_set != NULL);
grpc_pollset_set_del_pollset_set(exec_ctx, pss_dst,
pollent->pollent.pollset_set);
diff --git a/src/core/lib/iomgr/polling_entity.h b/src/core/lib/iomgr/polling_entity.h
index 971fd88b42..a161e1fea6 100644
--- a/src/core/lib/iomgr/polling_entity.h
+++ b/src/core/lib/iomgr/polling_entity.h
@@ -22,6 +22,12 @@
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
+typedef enum grpc_pollset_tag {
+ GRPC_POLLS_NONE,
+ GRPC_POLLS_POLLSET,
+ GRPC_POLLS_POLLSET_SET
+} grpc_pollset_tag;
+
/* A grpc_polling_entity is a pollset-or-pollset_set container. It allows
* functions that accept a pollset XOR a pollset_set to do so through an
* abstract interface. No ownership is taken. */
@@ -31,7 +37,7 @@ typedef struct grpc_polling_entity {
grpc_pollset *pollset;
grpc_pollset_set *pollset_set;
} pollent;
- enum pops_tag { POPS_NONE, POPS_POLLSET, POPS_POLLSET_SET } tag;
+ grpc_pollset_tag tag;
} grpc_polling_entity;
grpc_polling_entity grpc_polling_entity_create_from_pollset_set(
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index 6c58986b53..4d69986fbc 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -22,6 +22,7 @@
#include <stdint.h>
#include <string.h>
+#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
@@ -656,7 +657,7 @@ grpc_resource_quota *grpc_resource_quota_from_channel_args(
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
if (channel_args->args[i].type == GRPC_ARG_POINTER) {
return grpc_resource_quota_ref_internal(
- channel_args->args[i].value.pointer.p);
+ (grpc_resource_quota *)channel_args->args[i].value.pointer.p);
} else {
gpr_log(GPR_DEBUG, GRPC_ARG_RESOURCE_QUOTA " should be a pointer");
}
@@ -666,12 +667,12 @@ grpc_resource_quota *grpc_resource_quota_from_channel_args(
}
static void *rq_copy(void *rq) {
- grpc_resource_quota_ref(rq);
+ grpc_resource_quota_ref((grpc_resource_quota *)rq);
return rq;
}
static void rq_destroy(grpc_exec_ctx *exec_ctx, void *rq) {
- grpc_resource_quota_unref_internal(exec_ctx, rq);
+ grpc_resource_quota_unref_internal(exec_ctx, (grpc_resource_quota *)rq);
}
static int rq_cmp(void *a, void *b) { return GPR_ICMP(a, b); }
diff --git a/src/core/lib/iomgr/socket_factory_posix.c b/src/core/lib/iomgr/socket_factory_posix.c
index 0f82dea570..c81566575e 100644
--- a/src/core/lib/iomgr/socket_factory_posix.c
+++ b/src/core/lib/iomgr/socket_factory_posix.c
@@ -69,11 +69,11 @@ void grpc_socket_factory_unref(grpc_socket_factory *factory) {
}
static void *socket_factory_arg_copy(void *p) {
- return grpc_socket_factory_ref(p);
+ return grpc_socket_factory_ref((grpc_socket_factory *)p);
}
static void socket_factory_arg_destroy(grpc_exec_ctx *exec_ctx, void *p) {
- grpc_socket_factory_unref(p);
+ grpc_socket_factory_unref((grpc_socket_factory *)p);
}
static int socket_factory_cmp(void *a, void *b) {
diff --git a/src/core/lib/iomgr/socket_mutator.c b/src/core/lib/iomgr/socket_mutator.c
index 5d6c2c400e..300ac75b38 100644
--- a/src/core/lib/iomgr/socket_mutator.c
+++ b/src/core/lib/iomgr/socket_mutator.c
@@ -60,11 +60,11 @@ void grpc_socket_mutator_unref(grpc_socket_mutator *mutator) {
}
static void *socket_mutator_arg_copy(void *p) {
- return grpc_socket_mutator_ref(p);
+ return grpc_socket_mutator_ref((grpc_socket_mutator *)p);
}
static void socket_mutator_arg_destroy(grpc_exec_ctx *exec_ctx, void *p) {
- grpc_socket_mutator_unref(p);
+ grpc_socket_mutator_unref((grpc_socket_mutator *)p);
}
static int socket_mutator_cmp(void *a, void *b) {
diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c
index ae2c0bf0ae..04ca44563d 100644
--- a/src/core/lib/iomgr/timer_manager.c
+++ b/src/core/lib/iomgr/timer_manager.c
@@ -276,7 +276,7 @@ static void timer_thread(void *completed_thread_ptr) {
GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
timer_main_loop(&exec_ctx);
grpc_exec_ctx_finish(&exec_ctx);
- timer_thread_cleanup(completed_thread_ptr);
+ timer_thread_cleanup((completed_thread *)completed_thread_ptr);
}
static void start_threads(void) {
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 9a02c1d1bb..00b2e68bb5 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -118,7 +118,7 @@ static grpc_socket_factory *get_socket_factory(const grpc_channel_args *args) {
const grpc_arg *arg = grpc_channel_args_find(args, GRPC_ARG_SOCKET_FACTORY);
if (arg) {
GPR_ASSERT(arg->type == GRPC_ARG_POINTER);
- return arg->value.pointer.p;
+ return (grpc_socket_factory *)arg->value.pointer.p;
}
}
return NULL;
diff --git a/src/core/lib/iomgr/wakeup_fd_cv.c b/src/core/lib/iomgr/wakeup_fd_cv.c
index 5e0b1d1704..268e0175dd 100644
--- a/src/core/lib/iomgr/wakeup_fd_cv.c
+++ b/src/core/lib/iomgr/wakeup_fd_cv.c
@@ -57,7 +57,7 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) {
g_cvfds.free_fds = g_cvfds.free_fds->next_free;
g_cvfds.cvfds[idx].cvs = NULL;
g_cvfds.cvfds[idx].is_set = 0;
- fd_info->read_fd = IDX_TO_FD(idx);
+ fd_info->read_fd = GRPC_IDX_TO_FD(idx);
fd_info->write_fd = -1;
gpr_mu_unlock(&g_cvfds.mu);
return GRPC_ERROR_NONE;
@@ -66,8 +66,8 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) {
static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) {
cv_node* cvn;
gpr_mu_lock(&g_cvfds.mu);
- g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].is_set = 1;
- cvn = g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].cvs;
+ g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].is_set = 1;
+ cvn = g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].cvs;
while (cvn) {
gpr_cv_signal(cvn->cv);
cvn = cvn->next;
@@ -78,7 +78,7 @@ static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) {
static grpc_error* cv_fd_consume(grpc_wakeup_fd* fd_info) {
gpr_mu_lock(&g_cvfds.mu);
- g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].is_set = 0;
+ g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].is_set = 0;
gpr_mu_unlock(&g_cvfds.mu);
return GRPC_ERROR_NONE;
}
@@ -89,9 +89,9 @@ static void cv_fd_destroy(grpc_wakeup_fd* fd_info) {
}
gpr_mu_lock(&g_cvfds.mu);
// Assert that there are no active pollers
- GPR_ASSERT(!g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].cvs);
- g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].next_free = g_cvfds.free_fds;
- g_cvfds.free_fds = &g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)];
+ GPR_ASSERT(!g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].cvs);
+ g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].next_free = g_cvfds.free_fds;
+ g_cvfds.free_fds = &g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)];
gpr_mu_unlock(&g_cvfds.mu);
}
diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h
index 46e84f5843..dc170ad5b4 100644
--- a/src/core/lib/iomgr/wakeup_fd_cv.h
+++ b/src/core/lib/iomgr/wakeup_fd_cv.h
@@ -37,8 +37,8 @@
#include "src/core/lib/iomgr/ev_posix.h"
-#define FD_TO_IDX(fd) (-(fd)-1)
-#define IDX_TO_FD(idx) (-(idx)-1)
+#define GRPC_FD_TO_IDX(fd) (-(fd)-1)
+#define GRPC_IDX_TO_FD(idx) (-(idx)-1)
typedef struct cv_node {
gpr_cv* cv;