aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2016-02-25 18:10:24 -0800
committerGravatar Vijay Pai <vpai@google.com>2016-02-25 18:10:24 -0800
commite9ef53645150f7a0400a5f9be770eb2b4ba335b5 (patch)
tree34a8b7094b851589eec500b2ca419b64c9051d89 /src/core
parente3cd25684051de21e4b1ba93173c06e72fa5ffad (diff)
Revert "Add an implementation firewall against pollset_set"
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/client_channel.c45
-rw-r--r--src/core/client_config/lb_policies/pick_first.c31
-rw-r--r--src/core/client_config/lb_policies/round_robin.c25
-rw-r--r--src/core/client_config/lb_policy.c4
-rw-r--r--src/core/client_config/lb_policy.h3
-rw-r--r--src/core/client_config/subchannel.c30
-rw-r--r--src/core/httpcli/httpcli.c24
-rw-r--r--src/core/httpcli/httpcli.h3
-rw-r--r--src/core/iomgr/fd_posix.c6
-rw-r--r--src/core/iomgr/pollset.h15
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c1
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c8
-rw-r--r--src/core/iomgr/pollset_posix.c16
-rw-r--r--src/core/iomgr/pollset_posix.h12
-rw-r--r--src/core/iomgr/pollset_set.h10
-rw-r--r--src/core/iomgr/pollset_set_posix.c26
-rw-r--r--src/core/iomgr/pollset_set_posix.h20
-rw-r--r--src/core/iomgr/pollset_set_windows.c4
-rw-r--r--src/core/iomgr/pollset_set_windows.h2
-rw-r--r--src/core/iomgr/pollset_windows.c7
-rw-r--r--src/core/iomgr/pollset_windows.h6
-rw-r--r--src/core/iomgr/tcp_client_posix.c12
-rw-r--r--src/core/iomgr/tcp_posix.c10
-rw-r--r--src/core/iomgr/udp_server.h1
-rw-r--r--src/core/iomgr/workqueue_posix.c1
-rw-r--r--src/core/iomgr/workqueue_posix.h4
-rw-r--r--src/core/security/google_default_credentials.c39
-rw-r--r--src/core/surface/completion_queue.c85
28 files changed, 202 insertions, 248 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index a96b49ac12..7176c01b05 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -78,8 +78,8 @@ typedef struct client_channel_channel_data {
int exit_idle_when_lb_policy_arrives;
/** owning stack */
grpc_channel_stack *owning_stack;
- /** interested parties (owned) */
- grpc_pollset_set *interested_parties;
+ /** interested parties */
+ grpc_pollset_set interested_parties;
} channel_data;
/** We create one watcher for each new lb_policy that is returned from a
@@ -183,8 +183,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
chand->incoming_configuration = NULL;
if (lb_policy != NULL) {
- grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties,
- chand->interested_parties);
+ grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties,
+ &chand->interested_parties);
}
gpr_mu_lock(&chand->mu_config);
@@ -231,8 +231,9 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
if (old_lb_policy != NULL) {
- grpc_pollset_set_del_pollset_set(
- exec_ctx, old_lb_policy->interested_parties, chand->interested_parties);
+ grpc_pollset_set_del_pollset_set(exec_ctx,
+ &old_lb_policy->interested_parties,
+ &chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
}
@@ -253,7 +254,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(op->set_accept_stream == NULL);
if (op->bind_pollset != NULL) {
- grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties,
+ grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties,
op->bind_pollset);
}
@@ -283,8 +284,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
chand->resolver = NULL;
if (chand->lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx,
- chand->lb_policy->interested_parties,
- chand->interested_parties);
+ &chand->lb_policy->interested_parties,
+ &chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
chand->lb_policy = NULL;
}
@@ -410,7 +411,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
- chand->interested_parties = grpc_pollset_set_create();
+ grpc_pollset_set_init(&chand->interested_parties);
}
/* Destructor for channel_data */
@@ -424,12 +425,12 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
}
if (chand->lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx,
- chand->lb_policy->interested_parties,
- chand->interested_parties);
+ &chand->lb_policy->interested_parties,
+ &chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
}
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
- grpc_pollset_set_destroy(chand->interested_parties);
+ grpc_pollset_set_destroy(&chand->interested_parties);
gpr_mu_destroy(&chand->mu_config);
}
@@ -440,17 +441,9 @@ static void cc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
const grpc_channel_filter grpc_client_channel_filter = {
- cc_start_transport_stream_op,
- cc_start_transport_op,
- sizeof(call_data),
- init_call_elem,
- cc_set_pollset,
- destroy_call_elem,
- sizeof(channel_data),
- init_channel_elem,
- destroy_channel_elem,
- cc_get_peer,
- "client-channel",
+ cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data),
+ init_call_elem, cc_set_pollset, destroy_call_elem, sizeof(channel_data),
+ init_channel_elem, destroy_channel_elem, cc_get_peer, "client-channel",
};
void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
@@ -508,7 +501,7 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
bool iomgr_success) {
external_connectivity_watcher *w = arg;
grpc_closure *follow_up = w->on_complete;
- grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
+ grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties,
w->pollset);
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
"external_connectivity_watcher");
@@ -524,7 +517,7 @@ void grpc_client_channel_watch_connectivity_state(
w->chand = chand;
w->pollset = pollset;
w->on_complete = on_complete;
- grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
+ grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, pollset);
grpc_closure_init(&w->my_closure, on_external_watch_complete, w);
GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
"external_connectivity_watcher");
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 9f38f398d8..459bbebb68 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -31,8 +31,8 @@
*
*/
-#include "src/core/client_config/lb_policies/pick_first.h"
#include "src/core/client_config/lb_policy_factory.h"
+#include "src/core/client_config/lb_policies/pick_first.h"
#include <string.h>
@@ -119,7 +119,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
@@ -137,7 +137,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
*target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
@@ -158,7 +158,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity");
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
- p->base.interested_parties, &p->checking_connectivity,
+ &p->base.interested_parties, &p->checking_connectivity,
&p->connectivity_changed);
}
@@ -195,7 +195,8 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset);
+ grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties,
+ pollset);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->pollset = pollset;
@@ -252,7 +253,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
p->checking_connectivity, "selected_changed");
if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, selected, p->base.interested_parties,
+ exec_ctx, selected, &p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
} else {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
@@ -277,13 +278,13 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = selected;
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, selected, p->base.interested_parties,
+ exec_ctx, selected, &p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
@@ -297,7 +298,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
- p->base.interested_parties, &p->checking_connectivity,
+ &p->base.interested_parties, &p->checking_connectivity,
&p->connectivity_changed);
} else {
goto loop;
@@ -310,7 +311,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"connecting_changed");
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
- p->base.interested_parties, &p->checking_connectivity,
+ &p->base.interested_parties, &p->checking_connectivity,
&p->connectivity_changed);
break;
case GRPC_CHANNEL_FATAL_FAILURE:
@@ -378,14 +379,8 @@ void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
- pf_destroy,
- pf_shutdown,
- pf_pick,
- pf_cancel_pick,
- pf_ping_one,
- pf_exit_idle,
- pf_check_connectivity,
- pf_notify_on_state_change};
+ pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_ping_one, pf_exit_idle,
+ pf_check_connectivity, pf_notify_on_state_change};
static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c
index 114ece6e4d..b1171c45b0 100644
--- a/src/core/client_config/lb_policies/round_robin.c
+++ b/src/core/client_config/lb_policies/round_robin.c
@@ -260,7 +260,7 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
*target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
@@ -285,7 +285,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
subchannel_data *sd = p->subchannels[i];
sd->connectivity_state = GRPC_CHANNEL_IDLE;
grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
+ exec_ctx, sd->subchannel, &p->base.interested_parties,
&sd->connectivity_state, &sd->connectivity_changed_closure);
GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity");
}
@@ -322,7 +322,8 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset);
+ grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties,
+ pollset);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->pollset = pollset;
@@ -373,13 +374,13 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
selected->subchannel, selected);
}
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
+ exec_ctx, sd->subchannel, &p->base.interested_parties,
&sd->connectivity_state, &sd->connectivity_changed_closure);
break;
case GRPC_CHANNEL_CONNECTING:
@@ -388,13 +389,13 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
sd->connectivity_state,
"connecting_changed");
grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
+ exec_ctx, sd->subchannel, &p->base.interested_parties,
&sd->connectivity_state, &sd->connectivity_changed_closure);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
/* renew state notification */
grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
+ exec_ctx, sd->subchannel, &p->base.interested_parties,
&sd->connectivity_state, &sd->connectivity_changed_closure);
/* remove from ready list if still present */
@@ -483,14 +484,8 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
- rr_destroy,
- rr_shutdown,
- rr_pick,
- rr_cancel_pick,
- rr_ping_one,
- rr_exit_idle,
- rr_check_connectivity,
- rr_notify_on_state_change};
+ rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_ping_one, rr_exit_idle,
+ rr_check_connectivity, rr_notify_on_state_change};
static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c
index 5ff623e006..d4672f6b25 100644
--- a/src/core/client_config/lb_policy.c
+++ b/src/core/client_config/lb_policy.c
@@ -39,7 +39,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable) {
policy->vtable = vtable;
gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS);
- policy->interested_parties = grpc_pollset_set_create();
+ grpc_pollset_set_init(&policy->interested_parties);
}
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
@@ -93,7 +93,7 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
gpr_atm old_val =
ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF"));
if (old_val == 1) {
- grpc_pollset_set_destroy(policy->interested_parties);
+ grpc_pollset_set_destroy(&policy->interested_parties);
policy->vtable->destroy(exec_ctx, policy);
}
}
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index 4fbb12da39..db5238c8ca 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -48,8 +48,7 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel,
struct grpc_lb_policy {
const grpc_lb_policy_vtable *vtable;
gpr_atm ref_pair;
- /* owned pointer to interested parties in load balancing decisions */
- grpc_pollset_set *interested_parties;
+ grpc_pollset_set interested_parties;
};
struct grpc_lb_policy_vtable {
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 291ad3472c..6599c75dba 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -108,7 +108,7 @@ struct grpc_subchannel {
/** pollset_set tracking who's interested in a connection
being setup */
- grpc_pollset_set *pollset_set;
+ grpc_pollset_set pollset_set;
/** active connection, or null; of type grpc_connected_subchannel */
gpr_atm connected_subchannel;
@@ -184,8 +184,8 @@ static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(c);
}
-void grpc_connected_subchannel_ref(
- grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+void grpc_connected_subchannel_ref(grpc_connected_subchannel *c
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON);
}
@@ -209,7 +209,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
gpr_slice_unref(c->initial_connect_string);
grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker);
grpc_connector_unref(exec_ctx, c->connector);
- grpc_pollset_set_destroy(c->pollset_set);
+ grpc_pollset_set_destroy(&c->pollset_set);
grpc_subchannel_key_destroy(exec_ctx, c->key);
gpr_free(c);
}
@@ -226,8 +226,8 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta,
return old_val;
}
-grpc_subchannel *grpc_subchannel_ref(
- grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_atm old_refs;
old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS),
0 REF_MUTATE_PURPOSE("STRONG_REF"));
@@ -235,8 +235,8 @@ grpc_subchannel *grpc_subchannel_ref(
return c;
}
-grpc_subchannel *grpc_subchannel_weak_ref(
- grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+grpc_subchannel *grpc_subchannel_weak_ref(grpc_subchannel *c
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_atm old_refs;
old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF"));
GPR_ASSERT(old_refs != 0);
@@ -326,7 +326,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
}
c->addr = gpr_malloc(args->addr_len);
memcpy(c->addr, args->addr, args->addr_len);
- c->pollset_set = grpc_pollset_set_create();
+ grpc_pollset_set_init(&c->pollset_set);
c->addr_len = args->addr_len;
grpc_set_initial_connect_string(&c->addr, &c->addr_len,
&c->initial_connect_string);
@@ -345,7 +345,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
grpc_connect_in_args args;
- args.interested_parties = c->pollset_set;
+ args.interested_parties = &c->pollset_set;
args.addr = c->addr;
args.addr_len = c->addr_len;
args.deadline = compute_connect_deadline(c);
@@ -379,7 +379,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
external_state_watcher *w = arg;
grpc_closure *follow_up = w->notify;
if (w->pollset_set != NULL) {
- grpc_pollset_set_del_pollset_set(exec_ctx, w->subchannel->pollset_set,
+ grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set,
w->pollset_set);
}
gpr_mu_lock(&w->subchannel->mu);
@@ -415,7 +415,7 @@ void grpc_subchannel_notify_on_state_change(
w->notify = notify;
grpc_closure_init(&w->closure, on_external_state_watcher_done, w);
if (interested_parties != NULL) {
- grpc_pollset_set_add_pollset_set(exec_ctx, c->pollset_set,
+ grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set,
interested_parties);
}
GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher");
@@ -573,7 +573,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, con, c->pollset_set, &sw_subchannel->connectivity_state,
+ exec_ctx, con, &c->pollset_set, &sw_subchannel->connectivity_state,
&sw_subchannel->closure);
/* signal completion */
@@ -690,8 +690,8 @@ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
}
-void grpc_subchannel_call_ref(
- grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+void grpc_subchannel_call_ref(grpc_subchannel_call *c
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
}
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index 1219c444c7..71237bb614 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -31,22 +31,20 @@
*
*/
-#include "src/core/httpcli/httpcli.h"
#include "src/core/iomgr/sockaddr.h"
+#include "src/core/httpcli/httpcli.h"
#include <string.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-
-#include "src/core/httpcli/format_request.h"
-#include "src/core/httpcli/parser.h"
#include "src/core/iomgr/endpoint.h"
-#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/tcp_client.h"
+#include "src/core/httpcli/format_request.h"
+#include "src/core/httpcli/parser.h"
#include "src/core/support/string.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
typedef struct {
gpr_slice request_text;
@@ -86,18 +84,18 @@ const grpc_httpcli_handshaker grpc_httpcli_plaintext = {"http",
plaintext_handshake};
void grpc_httpcli_context_init(grpc_httpcli_context *context) {
- context->pollset_set = grpc_pollset_set_create();
+ grpc_pollset_set_init(&context->pollset_set);
}
void grpc_httpcli_context_destroy(grpc_httpcli_context *context) {
- grpc_pollset_set_destroy(context->pollset_set);
+ grpc_pollset_set_destroy(&context->pollset_set);
}
static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req);
static void finish(grpc_exec_ctx *exec_ctx, internal_request *req,
int success) {
- grpc_pollset_set_del_pollset(exec_ctx, req->context->pollset_set,
+ grpc_pollset_set_del_pollset(exec_ctx, &req->context->pollset_set,
req->pollset);
req->on_response(exec_ctx, req->user_data, success ? &req->parser.r : NULL);
grpc_httpcli_parser_destroy(&req->parser);
@@ -199,7 +197,7 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req) {
addr = &req->addresses->addrs[req->next_address++];
grpc_closure_init(&req->connected, on_connected, req);
grpc_tcp_client_connect(
- exec_ctx, &req->connected, &req->ep, req->context->pollset_set,
+ exec_ctx, &req->connected, &req->ep, &req->context->pollset_set,
(struct sockaddr *)&addr->addr, addr->len, req->deadline);
}
@@ -239,7 +237,7 @@ static void internal_request_begin(
req->host = gpr_strdup(request->host);
req->ssl_host_override = gpr_strdup(request->ssl_host_override);
- grpc_pollset_set_add_pollset(exec_ctx, req->context->pollset_set,
+ grpc_pollset_set_add_pollset(exec_ctx, &req->context->pollset_set,
req->pollset);
grpc_resolve_address(request->host, req->handshaker->default_port,
on_resolved, req);
diff --git a/src/core/httpcli/httpcli.h b/src/core/httpcli/httpcli.h
index 86e17c1d69..30875d71f1 100644
--- a/src/core/httpcli/httpcli.h
+++ b/src/core/httpcli/httpcli.h
@@ -39,7 +39,6 @@
#include <grpc/support/time.h>
#include "src/core/iomgr/endpoint.h"
-#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/pollset_set.h"
/* User agent this library reports */
@@ -57,7 +56,7 @@ typedef struct grpc_httpcli_header {
TODO(ctiller): allow caching and capturing multiple requests for the
same content and combining them */
typedef struct grpc_httpcli_context {
- grpc_pollset_set *pollset_set;
+ grpc_pollset_set pollset_set;
} grpc_httpcli_context;
typedef struct {
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 4ba7c5df94..85eadd754b 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -46,8 +46,6 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
-#include "src/core/iomgr/pollset_posix.h"
-
#define CLOSURE_NOT_READY ((grpc_closure *)0)
#define CLOSURE_READY ((grpc_closure *)1)
@@ -177,11 +175,11 @@ int grpc_fd_is_orphaned(grpc_fd *fd) {
}
static void pollset_kick_locked(grpc_fd_watcher *watcher) {
- gpr_mu_lock(&watcher->pollset->mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset));
GPR_ASSERT(watcher->worker);
grpc_pollset_kick_ext(watcher->pollset, watcher->worker,
GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
- gpr_mu_unlock(&watcher->pollset->mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset));
}
static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
index 92a0374ddd..6585326f81 100644
--- a/src/core/iomgr/pollset.h
+++ b/src/core/iomgr/pollset.h
@@ -35,11 +35,8 @@
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_H
#include <grpc/support/port_platform.h>
-#include <grpc/support/sync.h>
#include <grpc/support/time.h>
-#include "src/core/iomgr/exec_ctx.h"
-
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
/* A grpc_pollset is a set of file descriptors that a higher level item is
@@ -49,11 +46,15 @@
- a completion queue might keep a pollset with an entry for each transport
that is servicing a call that it's tracking */
-typedef struct grpc_pollset grpc_pollset;
-typedef struct grpc_pollset_worker grpc_pollset_worker;
+#ifdef GPR_POSIX_SOCKET
+#include "src/core/iomgr/pollset_posix.h"
+#endif
+
+#ifdef GPR_WIN32
+#include "src/core/iomgr/pollset_windows.h"
+#endif
-size_t grpc_pollset_size(void);
-void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu);
+void grpc_pollset_init(grpc_pollset *pollset);
/* Begin shutting down the pollset, and call closure when done.
* GRPC_POLLSET_MU(pollset) must be held */
void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 2e0f27fab8..4acae2bb71 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -45,7 +45,6 @@
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
#include "src/core/iomgr/fd_posix.h"
-#include "src/core/iomgr/pollset_posix.h"
#include "src/core/profiling/timers.h"
#include "src/core/support/block_annotate.h"
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 4dddfff230..809f8f39da 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -42,14 +42,12 @@
#include <stdlib.h>
#include <string.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/useful.h>
-
#include "src/core/iomgr/fd_posix.h"
#include "src/core/iomgr/iomgr_internal.h"
-#include "src/core/iomgr/pollset_posix.h"
#include "src/core/support/block_annotate.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/useful.h>
typedef struct {
/* all polled fds */
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index e895a77884..ee7e9f48f4 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -42,16 +42,16 @@
#include <string.h>
#include <unistd.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/thd.h>
-#include <grpc/support/tls.h>
-#include <grpc/support/useful.h>
#include "src/core/iomgr/fd_posix.h"
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/profiling/timers.h"
#include "src/core/support/block_annotate.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/tls.h>
+#include <grpc/support/useful.h>
GPR_TLS_DECL(g_current_thread_poller);
GPR_TLS_DECL(g_current_thread_worker);
@@ -97,8 +97,6 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next->prev = worker;
}
-size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); }
-
void grpc_pollset_kick_ext(grpc_pollset *p,
grpc_pollset_worker *specific_worker,
uint32_t flags) {
@@ -188,9 +186,8 @@ void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null);
-void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
+void grpc_pollset_init(grpc_pollset *pollset) {
gpr_mu_init(&pollset->mu);
- *mu = &pollset->mu;
pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
pollset->in_flight_cbs = 0;
pollset->shutting_down = 0;
@@ -207,6 +204,7 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
GPR_ASSERT(!grpc_pollset_has_workers(pollset));
GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
pollset->vtable->destroy(pollset);
+ gpr_mu_destroy(&pollset->mu);
while (pollset->local_wakeup_cache) {
grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next;
grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index bbedb66b00..5868b3fa21 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -37,10 +37,8 @@
#include <poll.h>
#include <grpc/support/sync.h>
-
#include "src/core/iomgr/exec_ctx.h"
#include "src/core/iomgr/iomgr.h"
-#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/wakeup_fd_posix.h"
typedef struct grpc_pollset_vtable grpc_pollset_vtable;
@@ -55,15 +53,15 @@ typedef struct grpc_cached_wakeup_fd {
struct grpc_cached_wakeup_fd *next;
} grpc_cached_wakeup_fd;
-struct grpc_pollset_worker {
+typedef struct grpc_pollset_worker {
grpc_cached_wakeup_fd *wakeup_fd;
int reevaluate_polling_on_wakeup;
int kicked_specifically;
struct grpc_pollset_worker *next;
struct grpc_pollset_worker *prev;
-};
+} grpc_pollset_worker;
-struct grpc_pollset {
+typedef struct grpc_pollset {
/* pollsets under posix can mutate representation as fds are added and
removed.
For example, we may choose a poll() based implementation on linux for
@@ -83,7 +81,7 @@ struct grpc_pollset {
} data;
/* Local cache of eventfds for workers */
grpc_cached_wakeup_fd *local_wakeup_cache;
-};
+} grpc_pollset;
struct grpc_pollset_vtable {
void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@@ -95,6 +93,8 @@ struct grpc_pollset_vtable {
void (*destroy)(grpc_pollset *pollset);
};
+#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu)
+
/* Add an fd to a pollset */
void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
struct grpc_fd *fd);
diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h
index 9591bf0d32..09c04438f7 100644
--- a/src/core/iomgr/pollset_set.h
+++ b/src/core/iomgr/pollset_set.h
@@ -41,9 +41,15 @@
fd's (etc) that have been registered with the set_set to that pollset.
Registering fd's automatically adds them to all current pollsets. */
-typedef struct grpc_pollset_set grpc_pollset_set;
+#ifdef GPR_POSIX_SOCKET
+#include "src/core/iomgr/pollset_set_posix.h"
+#endif
-grpc_pollset_set *grpc_pollset_set_create(void);
+#ifdef GPR_WIN32
+#include "src/core/iomgr/pollset_set_windows.h"
+#endif
+
+void grpc_pollset_set_init(grpc_pollset_set *pollset_set);
void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set);
void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pollset_set,
diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c
index 9dc9aff4a8..4ec92202e3 100644
--- a/src/core/iomgr/pollset_set_posix.c
+++ b/src/core/iomgr/pollset_set_posix.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015-2016, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -41,30 +41,11 @@
#include <grpc/support/alloc.h>
#include <grpc/support/useful.h>
-#include "src/core/iomgr/pollset_posix.h"
-#include "src/core/iomgr/pollset_set_posix.h"
+#include "src/core/iomgr/pollset_set.h"
-struct grpc_pollset_set {
- gpr_mu mu;
-
- size_t pollset_count;
- size_t pollset_capacity;
- grpc_pollset **pollsets;
-
- size_t pollset_set_count;
- size_t pollset_set_capacity;
- struct grpc_pollset_set **pollset_sets;
-
- size_t fd_count;
- size_t fd_capacity;
- grpc_fd **fds;
-};
-
-grpc_pollset_set *grpc_pollset_set_create(void) {
- grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
+void grpc_pollset_set_init(grpc_pollset_set *pollset_set) {
memset(pollset_set, 0, sizeof(*pollset_set));
gpr_mu_init(&pollset_set->mu);
- return pollset_set;
}
void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) {
@@ -76,7 +57,6 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) {
gpr_free(pollset_set->pollsets);
gpr_free(pollset_set->pollset_sets);
gpr_free(pollset_set->fds);
- gpr_free(pollset_set);
}
void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h
index 7d1aaf4181..4820a61e4b 100644
--- a/src/core/iomgr/pollset_set_posix.h
+++ b/src/core/iomgr/pollset_set_posix.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015-2016, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -35,7 +35,23 @@
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_POSIX_H
#include "src/core/iomgr/fd_posix.h"
-#include "src/core/iomgr/pollset_set.h"
+#include "src/core/iomgr/pollset_posix.h"
+
+typedef struct grpc_pollset_set {
+ gpr_mu mu;
+
+ size_t pollset_count;
+ size_t pollset_capacity;
+ grpc_pollset **pollsets;
+
+ size_t pollset_set_count;
+ size_t pollset_set_capacity;
+ struct grpc_pollset_set **pollset_sets;
+
+ size_t fd_count;
+ size_t fd_capacity;
+ grpc_fd **fds;
+} grpc_pollset_set;
void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pollset_set, grpc_fd *fd);
diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c
index 9cf8fd4472..157b46ec32 100644
--- a/src/core/iomgr/pollset_set_windows.c
+++ b/src/core/iomgr/pollset_set_windows.c
@@ -35,9 +35,9 @@
#ifdef GPR_WINSOCK_SOCKET
-#include "src/core/iomgr/pollset_set_windows.h"
+#include "src/core/iomgr/pollset_set.h"
-grpc_pollset_set* grpc_pollset_set_create(pollset_set) { return NULL; }
+void grpc_pollset_set_init(grpc_pollset_set* pollset_set) {}
void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {}
diff --git a/src/core/iomgr/pollset_set_windows.h b/src/core/iomgr/pollset_set_windows.h
index aa5abe9133..cada0d2b61 100644
--- a/src/core/iomgr/pollset_set_windows.h
+++ b/src/core/iomgr/pollset_set_windows.h
@@ -34,6 +34,6 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H
-#include "src/core/iomgr/pollset_set.h"
+typedef struct grpc_pollset_set { void *unused; } grpc_pollset_set;
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index 651f8e7334..bbce23b46a 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -89,17 +89,12 @@ static void push_front_worker(grpc_pollset_worker *root,
worker->links[type].next->links[type].prev = worker;
}
-size_t grpc_pollset_size(void) {
- return sizeof(grpc_pollset);
-}
-
/* There isn't really any such thing as a pollset under Windows, due to the
nature of the IO completion ports. We're still going to provide a minimal
set of features for the sake of the rest of grpc. But grpc_pollset_work
won't actually do any polling, and return as quickly as possible. */
-void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
- *mu = &grpc_polling_mu;
+void grpc_pollset_init(grpc_pollset *pollset) {
memset(pollset, 0, sizeof(*pollset));
pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h
index dc0b7a4104..65ba80619b 100644
--- a/src/core/iomgr/pollset_windows.h
+++ b/src/core/iomgr/pollset_windows.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015-2016, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -72,4 +72,8 @@ struct grpc_pollset {
grpc_closure *on_shutdown;
};
+extern gpr_mu grpc_polling_mu;
+
+#define GRPC_POLLSET_MU(pollset) (&grpc_polling_mu)
+
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index 15727856ab..c76c2e3b0f 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -42,19 +42,17 @@
#include <string.h>
#include <unistd.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-#include <grpc/support/time.h>
-
+#include "src/core/iomgr/timer.h"
#include "src/core/iomgr/iomgr_posix.h"
#include "src/core/iomgr/pollset_posix.h"
-#include "src/core/iomgr/pollset_set_posix.h"
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/iomgr/tcp_posix.h"
-#include "src/core/iomgr/timer.h"
#include "src/core/support/string.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/time.h>
extern int grpc_tcp_trace;
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index e8f73811ce..048e907441 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -40,8 +40,8 @@
#include <errno.h>
#include <stdlib.h>
#include <string.h>
-#include <sys/socket.h>
#include <sys/types.h>
+#include <sys/socket.h>
#include <unistd.h>
#include <grpc/support/alloc.h>
@@ -51,11 +51,9 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
+#include "src/core/support/string.h"
#include "src/core/debug/trace.h"
-#include "src/core/iomgr/pollset_posix.h"
-#include "src/core/iomgr/pollset_set_posix.h"
#include "src/core/profiling/timers.h"
-#include "src/core/support/string.h"
#ifdef GPR_HAVE_MSG_NOSIGNAL
#define SENDMSG_FLAGS MSG_NOSIGNAL
@@ -297,7 +295,7 @@ static flush_result tcp_flush(grpc_tcp *tcp) {
unwind_slice_idx = tcp->outgoing_slice_idx;
unwind_byte_idx = tcp->outgoing_byte_idx;
for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
- iov_size != MAX_WRITE_IOVEC;
+ iov_size != MAX_WRITE_IOVEC;
iov_size++) {
iov[iov_size].iov_base =
GPR_SLICE_START_PTR(
@@ -446,7 +444,7 @@ static char *tcp_get_peer(grpc_endpoint *ep) {
}
static const grpc_endpoint_vtable vtable = {
- tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
+ tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
tcp_shutdown, tcp_destroy, tcp_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h
index a9d0489edf..73a21c80ab 100644
--- a/src/core/iomgr/udp_server.h
+++ b/src/core/iomgr/udp_server.h
@@ -35,7 +35,6 @@
#define GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H
#include "src/core/iomgr/endpoint.h"
-#include "src/core/iomgr/fd_posix.h"
/* Forward decl of grpc_server */
typedef struct grpc_server grpc_server;
diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c
index c096dbfb30..da11df67ef 100644
--- a/src/core/iomgr/workqueue_posix.c
+++ b/src/core/iomgr/workqueue_posix.c
@@ -44,7 +44,6 @@
#include <grpc/support/useful.h>
#include "src/core/iomgr/fd_posix.h"
-#include "src/core/iomgr/pollset_posix.h"
static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success);
diff --git a/src/core/iomgr/workqueue_posix.h b/src/core/iomgr/workqueue_posix.h
index 68f195ee0d..589034fe1b 100644
--- a/src/core/iomgr/workqueue_posix.h
+++ b/src/core/iomgr/workqueue_posix.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015-2016, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -34,8 +34,6 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H
#define GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H
-#include "src/core/iomgr/wakeup_fd_posix.h"
-
struct grpc_fd;
struct grpc_workqueue {
diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c
index 1f4f3e4aa5..458d0d3ac3 100644
--- a/src/core/security/google_default_credentials.c
+++ b/src/core/security/google_default_credentials.c
@@ -52,14 +52,13 @@
static grpc_channel_credentials *default_credentials = NULL;
static int compute_engine_detection_done = 0;
-static gpr_mu g_state_mu;
-static gpr_mu *g_polling_mu;
+static gpr_mu g_mu;
static gpr_once g_once = GPR_ONCE_INIT;
-static void init_default_credentials(void) { gpr_mu_init(&g_state_mu); }
+static void init_default_credentials(void) { gpr_mu_init(&g_mu); }
typedef struct {
- grpc_pollset *pollset;
+ grpc_pollset pollset;
int is_done;
int success;
} compute_engine_detector;
@@ -81,10 +80,10 @@ static void on_compute_engine_detection_http_response(
}
}
}
- gpr_mu_lock(g_polling_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&detector->pollset));
detector->is_done = 1;
- grpc_pollset_kick(detector->pollset, NULL);
- gpr_mu_unlock(g_polling_mu);
+ grpc_pollset_kick(&detector->pollset, NULL);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&detector->pollset));
}
static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool s) {
@@ -102,8 +101,7 @@ static int is_stack_running_on_compute_engine(void) {
on compute engine. */
gpr_timespec max_detection_delay = gpr_time_from_seconds(1, GPR_TIMESPAN);
- detector.pollset = gpr_malloc(grpc_pollset_size());
- grpc_pollset_init(detector.pollset, &g_polling_mu);
+ grpc_pollset_init(&detector.pollset);
detector.is_done = 0;
detector.success = 0;
@@ -114,7 +112,7 @@ static int is_stack_running_on_compute_engine(void) {
grpc_httpcli_context_init(&context);
grpc_httpcli_get(
- &exec_ctx, &context, detector.pollset, &request,
+ &exec_ctx, &context, &detector.pollset, &request,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay),
on_compute_engine_detection_http_response, &detector);
@@ -122,22 +120,19 @@ static int is_stack_running_on_compute_engine(void) {
/* Block until we get the response. This is not ideal but this should only be
called once for the lifetime of the process by the default credentials. */
- gpr_mu_lock(g_polling_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset));
while (!detector.is_done) {
grpc_pollset_worker *worker = NULL;
- grpc_pollset_work(&exec_ctx, detector.pollset, &worker,
+ grpc_pollset_work(&exec_ctx, &detector.pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
- gpr_mu_unlock(g_polling_mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset));
grpc_httpcli_context_destroy(&context);
- grpc_closure_init(&destroy_closure, destroy_pollset, detector.pollset);
- grpc_pollset_shutdown(&exec_ctx, detector.pollset, &destroy_closure);
+ grpc_closure_init(&destroy_closure, destroy_pollset, &detector.pollset);
+ grpc_pollset_shutdown(&exec_ctx, &detector.pollset, &destroy_closure);
grpc_exec_ctx_finish(&exec_ctx);
- g_polling_mu = NULL;
-
- gpr_free(detector.pollset);
return detector.success;
}
@@ -189,7 +184,7 @@ grpc_channel_credentials *grpc_google_default_credentials_create(void) {
gpr_once_init(&g_once, init_default_credentials);
- gpr_mu_lock(&g_state_mu);
+ gpr_mu_lock(&g_mu);
if (default_credentials != NULL) {
result = grpc_channel_credentials_ref(default_credentials);
@@ -235,19 +230,19 @@ end:
gpr_log(GPR_ERROR, "Could not create google default credentials.");
}
}
- gpr_mu_unlock(&g_state_mu);
+ gpr_mu_unlock(&g_mu);
return result;
}
void grpc_flush_cached_google_default_credentials(void) {
gpr_once_init(&g_once, init_default_credentials);
- gpr_mu_lock(&g_state_mu);
+ gpr_mu_lock(&g_mu);
if (default_credentials != NULL) {
grpc_channel_credentials_unref(default_credentials);
default_credentials = NULL;
}
compute_engine_detection_done = 0;
- gpr_mu_unlock(&g_state_mu);
+ gpr_mu_unlock(&g_mu);
}
/* -- Well known credentials path. -- */
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 8a9bbace08..f9cb852722 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -36,19 +36,18 @@
#include <stdio.h>
#include <string.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/atm.h>
-#include <grpc/support/log.h>
-#include <grpc/support/time.h>
-
-#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/timer.h"
-#include "src/core/profiling/timers.h"
+#include "src/core/iomgr/pollset.h"
#include "src/core/support/string.h"
#include "src/core/surface/api_trace.h"
#include "src/core/surface/call.h"
#include "src/core/surface/event_string.h"
#include "src/core/surface/surface_trace.h"
+#include "src/core/profiling/timers.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/atm.h>
+#include <grpc/support/log.h>
+#include <grpc/support/time.h>
typedef struct {
grpc_pollset_worker **worker;
@@ -57,8 +56,6 @@ typedef struct {
/* Completion queue structure */
struct grpc_completion_queue {
- /** owned by pollset */
- gpr_mu *mu;
/** completed events */
grpc_cq_completion completed_head;
grpc_cq_completion *completed_tail;
@@ -66,6 +63,8 @@ struct grpc_completion_queue {
gpr_refcount pending_events;
/** Once owning_refs drops to zero, we will destroy the cq */
gpr_refcount owning_refs;
+ /** the set of low level i/o things that concern this cq */
+ grpc_pollset pollset;
/** 0 initially, 1 once we've begun shutting down */
int shutdown;
int shutdown_called;
@@ -83,8 +82,6 @@ struct grpc_completion_queue {
grpc_completion_queue *next_free;
};
-#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1))
-
static gpr_mu g_freelist_mu;
grpc_completion_queue *g_freelist;
@@ -97,7 +94,7 @@ void grpc_cq_global_shutdown(void) {
gpr_mu_destroy(&g_freelist_mu);
while (g_freelist) {
grpc_completion_queue *next = g_freelist->next_free;
- grpc_pollset_destroy(POLLSET_FROM_CQ(g_freelist));
+ grpc_pollset_destroy(&g_freelist->pollset);
#ifndef NDEBUG
gpr_free(g_freelist->outstanding_tags);
#endif
@@ -127,8 +124,8 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
if (g_freelist == NULL) {
gpr_mu_unlock(&g_freelist_mu);
- cc = gpr_malloc(sizeof(grpc_completion_queue) + grpc_pollset_size());
- grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu);
+ cc = gpr_malloc(sizeof(grpc_completion_queue));
+ grpc_pollset_init(&cc->pollset);
#ifndef NDEBUG
cc->outstanding_tags = NULL;
cc->outstanding_tag_capacity = 0;
@@ -187,7 +184,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) {
#endif
if (gpr_unref(&cc->owning_refs)) {
GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head);
- grpc_pollset_reset(POLLSET_FROM_CQ(cc));
+ grpc_pollset_reset(&cc->pollset);
gpr_mu_lock(&g_freelist_mu);
cc->next_free = g_freelist;
g_freelist = cc;
@@ -197,7 +194,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) {
void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) {
#ifndef NDEBUG
- gpr_mu_lock(cc->mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
GPR_ASSERT(!cc->shutdown_called);
if (cc->outstanding_tag_count == cc->outstanding_tag_capacity) {
cc->outstanding_tag_capacity = GPR_MAX(4, 2 * cc->outstanding_tag_capacity);
@@ -206,7 +203,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) {
cc->outstanding_tag_capacity);
}
cc->outstanding_tags[cc->outstanding_tag_count++] = tag;
- gpr_mu_unlock(cc->mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
#endif
gpr_ref(&cc->pending_events);
}
@@ -234,7 +231,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
storage->next =
((uintptr_t)&cc->completed_head) | ((uintptr_t)(success != 0));
- gpr_mu_lock(cc->mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
#ifndef NDEBUG
for (i = 0; i < (int)cc->outstanding_tag_count; i++) {
if (cc->outstanding_tags[i] == tag) {
@@ -259,8 +256,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
break;
}
}
- grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker);
- gpr_mu_unlock(cc->mu);
+ grpc_pollset_kick(&cc->pollset, pluck_worker);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
} else {
cc->completed_tail->next =
((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next);
@@ -268,9 +265,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
GPR_ASSERT(!cc->shutdown);
GPR_ASSERT(cc->shutdown_called);
cc->shutdown = 1;
- grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
- &cc->pollset_shutdown_done);
- gpr_mu_unlock(cc->mu);
+ grpc_pollset_shutdown(exec_ctx, &cc->pollset, &cc->pollset_shutdown_done);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
GPR_TIMER_END("grpc_cq_end_op", 0);
@@ -298,7 +294,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
GRPC_CQ_INTERNAL_REF(cc, "next");
- gpr_mu_lock(cc->mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
if (cc->completed_tail != &cc->completed_head) {
grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next;
@@ -306,7 +302,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
if (c == cc->completed_tail) {
cc->completed_tail = &cc->completed_head;
}
- gpr_mu_unlock(cc->mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
@@ -314,14 +310,14 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
break;
}
if (cc->shutdown) {
- gpr_mu_unlock(cc->mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
- gpr_mu_unlock(cc->mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
break;
@@ -334,12 +330,11 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec iteration_deadline = deadline;
if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) {
GPR_TIMER_MARK("alarm_triggered", 0);
- gpr_mu_unlock(cc->mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
grpc_exec_ctx_flush(&exec_ctx);
- gpr_mu_lock(cc->mu);
- continue;
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
} else {
- grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now,
+ grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now,
iteration_deadline);
}
}
@@ -400,7 +395,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
GRPC_CQ_INTERNAL_REF(cc, "pluck");
- gpr_mu_lock(cc->mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
prev = &cc->completed_head;
while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
@@ -410,7 +405,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
if (c == cc->completed_tail) {
cc->completed_tail = prev;
}
- gpr_mu_unlock(cc->mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
@@ -420,7 +415,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
prev = c;
}
if (cc->shutdown) {
- gpr_mu_unlock(cc->mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
@@ -430,7 +425,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
"Too many outstanding grpc_completion_queue_pluck calls: maximum "
"is %d",
GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
- gpr_mu_unlock(cc->mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
/* TODO(ctiller): should we use a different result here */
ret.type = GRPC_QUEUE_TIMEOUT;
@@ -439,7 +434,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
del_plucker(cc, tag, &worker);
- gpr_mu_unlock(cc->mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
break;
@@ -452,12 +447,11 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec iteration_deadline = deadline;
if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) {
GPR_TIMER_MARK("alarm_triggered", 0);
- gpr_mu_unlock(cc->mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
grpc_exec_ctx_flush(&exec_ctx);
- gpr_mu_lock(cc->mu);
- continue;
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
} else {
- grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now,
+ grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now,
iteration_deadline);
}
del_plucker(cc, tag, &worker);
@@ -478,9 +472,9 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc));
- gpr_mu_lock(cc->mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
if (cc->shutdown_called) {
- gpr_mu_unlock(cc->mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
return;
}
@@ -488,10 +482,9 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
if (gpr_unref(&cc->pending_events)) {
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
- grpc_pollset_shutdown(&exec_ctx, POLLSET_FROM_CQ(cc),
- &cc->pollset_shutdown_done);
+ grpc_pollset_shutdown(&exec_ctx, &cc->pollset, &cc->pollset_shutdown_done);
}
- gpr_mu_unlock(cc->mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
}
@@ -505,7 +498,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
}
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
- return POLLSET_FROM_CQ(cc);
+ return &cc->pollset;
}
void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }