diff options
author | 2016-02-25 18:10:24 -0800 | |
---|---|---|
committer | 2016-02-25 18:10:24 -0800 | |
commit | e9ef53645150f7a0400a5f9be770eb2b4ba335b5 (patch) | |
tree | 34a8b7094b851589eec500b2ca419b64c9051d89 /src/core | |
parent | e3cd25684051de21e4b1ba93173c06e72fa5ffad (diff) |
Revert "Add an implementation firewall against pollset_set"
Diffstat (limited to 'src/core')
28 files changed, 202 insertions, 248 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index a96b49ac12..7176c01b05 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -78,8 +78,8 @@ typedef struct client_channel_channel_data { int exit_idle_when_lb_policy_arrives; /** owning stack */ grpc_channel_stack *owning_stack; - /** interested parties (owned) */ - grpc_pollset_set *interested_parties; + /** interested parties */ + grpc_pollset_set interested_parties; } channel_data; /** We create one watcher for each new lb_policy that is returned from a @@ -183,8 +183,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, chand->incoming_configuration = NULL; if (lb_policy != NULL) { - grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties, - chand->interested_parties); + grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties, + &chand->interested_parties); } gpr_mu_lock(&chand->mu_config); @@ -231,8 +231,9 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, } if (old_lb_policy != NULL) { - grpc_pollset_set_del_pollset_set( - exec_ctx, old_lb_policy->interested_parties, chand->interested_parties); + grpc_pollset_set_del_pollset_set(exec_ctx, + &old_lb_policy->interested_parties, + &chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); } @@ -253,7 +254,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, GPR_ASSERT(op->set_accept_stream == NULL); if (op->bind_pollset != NULL) { - grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, + grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, op->bind_pollset); } @@ -283,8 +284,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, chand->resolver = NULL; if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, - chand->lb_policy->interested_parties, - chand->interested_parties); + &chand->lb_policy->interested_parties, + &chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); chand->lb_policy = NULL; } @@ -410,7 +411,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); - chand->interested_parties = grpc_pollset_set_create(); + grpc_pollset_set_init(&chand->interested_parties); } /* Destructor for channel_data */ @@ -424,12 +425,12 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, } if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, - chand->lb_policy->interested_parties, - chand->interested_parties); + &chand->lb_policy->interested_parties, + &chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); - grpc_pollset_set_destroy(chand->interested_parties); + grpc_pollset_set_destroy(&chand->interested_parties); gpr_mu_destroy(&chand->mu_config); } @@ -440,17 +441,9 @@ static void cc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } const grpc_channel_filter grpc_client_channel_filter = { - cc_start_transport_stream_op, - cc_start_transport_op, - sizeof(call_data), - init_call_elem, - cc_set_pollset, - destroy_call_elem, - sizeof(channel_data), - init_channel_elem, - destroy_channel_elem, - cc_get_peer, - "client-channel", + cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data), + init_call_elem, cc_set_pollset, destroy_call_elem, sizeof(channel_data), + init_channel_elem, destroy_channel_elem, cc_get_peer, "client-channel", }; void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, @@ -508,7 +501,7 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { external_connectivity_watcher *w = arg; grpc_closure *follow_up = w->on_complete; - grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties, w->pollset); GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); @@ -524,7 +517,7 @@ void grpc_client_channel_watch_connectivity_state( w->chand = chand; w->pollset = pollset; w->on_complete = on_complete; - grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); + grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, pollset); grpc_closure_init(&w->my_closure, on_external_watch_complete, w); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 9f38f398d8..459bbebb68 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -31,8 +31,8 @@ * */ -#include "src/core/client_config/lb_policies/pick_first.h" #include "src/core/client_config/lb_policy_factory.h" +#include "src/core/client_config/lb_policies/pick_first.h" #include <string.h> @@ -119,7 +119,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); @@ -137,7 +137,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); @@ -158,7 +158,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, + &p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } @@ -195,7 +195,8 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset); + grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, + pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -252,7 +253,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, p->checking_connectivity, "selected_changed"); if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { grpc_connected_subchannel_notify_on_state_change( - exec_ctx, selected, p->base.interested_parties, + exec_ctx, selected, &p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } else { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); @@ -277,13 +278,13 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = selected; - grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); } grpc_connected_subchannel_notify_on_state_change( - exec_ctx, selected, p->base.interested_parties, + exec_ctx, selected, &p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: @@ -297,7 +298,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, + &p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } else { goto loop; @@ -310,7 +311,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "connecting_changed"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, + &p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_FATAL_FAILURE: @@ -378,14 +379,8 @@ void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { - pf_destroy, - pf_shutdown, - pf_pick, - pf_cancel_pick, - pf_ping_one, - pf_exit_idle, - pf_check_connectivity, - pf_notify_on_state_change}; + pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_ping_one, pf_exit_idle, + pf_check_connectivity, pf_notify_on_state_change}; static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index 114ece6e4d..b1171c45b0 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -260,7 +260,7 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); @@ -285,7 +285,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { subchannel_data *sd = p->subchannels[i]; sd->connectivity_state = GRPC_CHANNEL_IDLE; grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, + exec_ctx, sd->subchannel, &p->base.interested_parties, &sd->connectivity_state, &sd->connectivity_changed_closure); GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity"); } @@ -322,7 +322,8 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset); + grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, + pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -373,13 +374,13 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } - grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); } grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, + exec_ctx, sd->subchannel, &p->base.interested_parties, &sd->connectivity_state, &sd->connectivity_changed_closure); break; case GRPC_CHANNEL_CONNECTING: @@ -388,13 +389,13 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, sd->connectivity_state, "connecting_changed"); grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, + exec_ctx, sd->subchannel, &p->base.interested_parties, &sd->connectivity_state, &sd->connectivity_changed_closure); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: /* renew state notification */ grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, + exec_ctx, sd->subchannel, &p->base.interested_parties, &sd->connectivity_state, &sd->connectivity_changed_closure); /* remove from ready list if still present */ @@ -483,14 +484,8 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { - rr_destroy, - rr_shutdown, - rr_pick, - rr_cancel_pick, - rr_ping_one, - rr_exit_idle, - rr_check_connectivity, - rr_notify_on_state_change}; + rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_ping_one, rr_exit_idle, + rr_check_connectivity, rr_notify_on_state_change}; static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 5ff623e006..d4672f6b25 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -39,7 +39,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable) { policy->vtable = vtable; gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS); - policy->interested_parties = grpc_pollset_set_create(); + grpc_pollset_set_init(&policy->interested_parties); } #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG @@ -93,7 +93,7 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, gpr_atm old_val = ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); if (old_val == 1) { - grpc_pollset_set_destroy(policy->interested_parties); + grpc_pollset_set_destroy(&policy->interested_parties); policy->vtable->destroy(exec_ctx, policy); } } diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 4fbb12da39..db5238c8ca 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -48,8 +48,7 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel, struct grpc_lb_policy { const grpc_lb_policy_vtable *vtable; gpr_atm ref_pair; - /* owned pointer to interested parties in load balancing decisions */ - grpc_pollset_set *interested_parties; + grpc_pollset_set interested_parties; }; struct grpc_lb_policy_vtable { diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 291ad3472c..6599c75dba 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -108,7 +108,7 @@ struct grpc_subchannel { /** pollset_set tracking who's interested in a connection being setup */ - grpc_pollset_set *pollset_set; + grpc_pollset_set pollset_set; /** active connection, or null; of type grpc_connected_subchannel */ gpr_atm connected_subchannel; @@ -184,8 +184,8 @@ static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(c); } -void grpc_connected_subchannel_ref( - grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_connected_subchannel_ref(grpc_connected_subchannel *c + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); } @@ -209,7 +209,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_slice_unref(c->initial_connect_string); grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connector_unref(exec_ctx, c->connector); - grpc_pollset_set_destroy(c->pollset_set); + grpc_pollset_set_destroy(&c->pollset_set); grpc_subchannel_key_destroy(exec_ctx, c->key); gpr_free(c); } @@ -226,8 +226,8 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, return old_val; } -grpc_subchannel *grpc_subchannel_ref( - grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), 0 REF_MUTATE_PURPOSE("STRONG_REF")); @@ -235,8 +235,8 @@ grpc_subchannel *grpc_subchannel_ref( return c; } -grpc_subchannel *grpc_subchannel_weak_ref( - grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +grpc_subchannel *grpc_subchannel_weak_ref(grpc_subchannel *c + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF")); GPR_ASSERT(old_refs != 0); @@ -326,7 +326,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, } c->addr = gpr_malloc(args->addr_len); memcpy(c->addr, args->addr, args->addr_len); - c->pollset_set = grpc_pollset_set_create(); + grpc_pollset_set_init(&c->pollset_set); c->addr_len = args->addr_len; grpc_set_initial_connect_string(&c->addr, &c->addr_len, &c->initial_connect_string); @@ -345,7 +345,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connect_in_args args; - args.interested_parties = c->pollset_set; + args.interested_parties = &c->pollset_set; args.addr = c->addr; args.addr_len = c->addr_len; args.deadline = compute_connect_deadline(c); @@ -379,7 +379,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, external_state_watcher *w = arg; grpc_closure *follow_up = w->notify; if (w->pollset_set != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, w->subchannel->pollset_set, + grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set); } gpr_mu_lock(&w->subchannel->mu); @@ -415,7 +415,7 @@ void grpc_subchannel_notify_on_state_change( w->notify = notify; grpc_closure_init(&w->closure, on_external_state_watcher_done, w); if (interested_parties != NULL) { - grpc_pollset_set_add_pollset_set(exec_ctx, c->pollset_set, + grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties); } GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); @@ -573,7 +573,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); grpc_connected_subchannel_notify_on_state_change( - exec_ctx, con, c->pollset_set, &sw_subchannel->connectivity_state, + exec_ctx, con, &c->pollset_set, &sw_subchannel->connectivity_state, &sw_subchannel->closure); /* signal completion */ @@ -690,8 +690,8 @@ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } -void grpc_subchannel_call_ref( - grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_subchannel_call_ref(grpc_subchannel_call *c + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); } diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index 1219c444c7..71237bb614 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -31,22 +31,20 @@ * */ -#include "src/core/httpcli/httpcli.h" #include "src/core/iomgr/sockaddr.h" +#include "src/core/httpcli/httpcli.h" #include <string.h> -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> - -#include "src/core/httpcli/format_request.h" -#include "src/core/httpcli/parser.h" #include "src/core/iomgr/endpoint.h" -#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/iomgr/tcp_client.h" +#include "src/core/httpcli/format_request.h" +#include "src/core/httpcli/parser.h" #include "src/core/support/string.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> typedef struct { gpr_slice request_text; @@ -86,18 +84,18 @@ const grpc_httpcli_handshaker grpc_httpcli_plaintext = {"http", plaintext_handshake}; void grpc_httpcli_context_init(grpc_httpcli_context *context) { - context->pollset_set = grpc_pollset_set_create(); + grpc_pollset_set_init(&context->pollset_set); } void grpc_httpcli_context_destroy(grpc_httpcli_context *context) { - grpc_pollset_set_destroy(context->pollset_set); + grpc_pollset_set_destroy(&context->pollset_set); } static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req); static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, int success) { - grpc_pollset_set_del_pollset(exec_ctx, req->context->pollset_set, + grpc_pollset_set_del_pollset(exec_ctx, &req->context->pollset_set, req->pollset); req->on_response(exec_ctx, req->user_data, success ? &req->parser.r : NULL); grpc_httpcli_parser_destroy(&req->parser); @@ -199,7 +197,7 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req) { addr = &req->addresses->addrs[req->next_address++]; grpc_closure_init(&req->connected, on_connected, req); grpc_tcp_client_connect( - exec_ctx, &req->connected, &req->ep, req->context->pollset_set, + exec_ctx, &req->connected, &req->ep, &req->context->pollset_set, (struct sockaddr *)&addr->addr, addr->len, req->deadline); } @@ -239,7 +237,7 @@ static void internal_request_begin( req->host = gpr_strdup(request->host); req->ssl_host_override = gpr_strdup(request->ssl_host_override); - grpc_pollset_set_add_pollset(exec_ctx, req->context->pollset_set, + grpc_pollset_set_add_pollset(exec_ctx, &req->context->pollset_set, req->pollset); grpc_resolve_address(request->host, req->handshaker->default_port, on_resolved, req); diff --git a/src/core/httpcli/httpcli.h b/src/core/httpcli/httpcli.h index 86e17c1d69..30875d71f1 100644 --- a/src/core/httpcli/httpcli.h +++ b/src/core/httpcli/httpcli.h @@ -39,7 +39,6 @@ #include <grpc/support/time.h> #include "src/core/iomgr/endpoint.h" -#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/pollset_set.h" /* User agent this library reports */ @@ -57,7 +56,7 @@ typedef struct grpc_httpcli_header { TODO(ctiller): allow caching and capturing multiple requests for the same content and combining them */ typedef struct grpc_httpcli_context { - grpc_pollset_set *pollset_set; + grpc_pollset_set pollset_set; } grpc_httpcli_context; typedef struct { diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 4ba7c5df94..85eadd754b 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -46,8 +46,6 @@ #include <grpc/support/string_util.h> #include <grpc/support/useful.h> -#include "src/core/iomgr/pollset_posix.h" - #define CLOSURE_NOT_READY ((grpc_closure *)0) #define CLOSURE_READY ((grpc_closure *)1) @@ -177,11 +175,11 @@ int grpc_fd_is_orphaned(grpc_fd *fd) { } static void pollset_kick_locked(grpc_fd_watcher *watcher) { - gpr_mu_lock(&watcher->pollset->mu); + gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset)); GPR_ASSERT(watcher->worker); grpc_pollset_kick_ext(watcher->pollset, watcher->worker, GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); - gpr_mu_unlock(&watcher->pollset->mu); + gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset)); } static void maybe_wake_one_watcher_locked(grpc_fd *fd) { diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index 92a0374ddd..6585326f81 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -35,11 +35,8 @@ #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_H #include <grpc/support/port_platform.h> -#include <grpc/support/sync.h> #include <grpc/support/time.h> -#include "src/core/iomgr/exec_ctx.h" - #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) /* A grpc_pollset is a set of file descriptors that a higher level item is @@ -49,11 +46,15 @@ - a completion queue might keep a pollset with an entry for each transport that is servicing a call that it's tracking */ -typedef struct grpc_pollset grpc_pollset; -typedef struct grpc_pollset_worker grpc_pollset_worker; +#ifdef GPR_POSIX_SOCKET +#include "src/core/iomgr/pollset_posix.h" +#endif + +#ifdef GPR_WIN32 +#include "src/core/iomgr/pollset_windows.h" +#endif -size_t grpc_pollset_size(void); -void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu); +void grpc_pollset_init(grpc_pollset *pollset); /* Begin shutting down the pollset, and call closure when done. * GRPC_POLLSET_MU(pollset) must be held */ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 2e0f27fab8..4acae2bb71 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -45,7 +45,6 @@ #include <grpc/support/log.h> #include <grpc/support/useful.h> #include "src/core/iomgr/fd_posix.h" -#include "src/core/iomgr/pollset_posix.h" #include "src/core/profiling/timers.h" #include "src/core/support/block_annotate.h" diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 4dddfff230..809f8f39da 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -42,14 +42,12 @@ #include <stdlib.h> #include <string.h> -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/useful.h> - #include "src/core/iomgr/fd_posix.h" #include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/pollset_posix.h" #include "src/core/support/block_annotate.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/useful.h> typedef struct { /* all polled fds */ diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index e895a77884..ee7e9f48f4 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -42,16 +42,16 @@ #include <string.h> #include <unistd.h> -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/thd.h> -#include <grpc/support/tls.h> -#include <grpc/support/useful.h> #include "src/core/iomgr/fd_posix.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/profiling/timers.h" #include "src/core/support/block_annotate.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/thd.h> +#include <grpc/support/tls.h> +#include <grpc/support/useful.h> GPR_TLS_DECL(g_current_thread_poller); GPR_TLS_DECL(g_current_thread_worker); @@ -97,8 +97,6 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next->prev = worker; } -size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); } - void grpc_pollset_kick_ext(grpc_pollset *p, grpc_pollset_worker *specific_worker, uint32_t flags) { @@ -188,9 +186,8 @@ void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null); -void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { +void grpc_pollset_init(grpc_pollset *pollset) { gpr_mu_init(&pollset->mu); - *mu = &pollset->mu; pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; pollset->in_flight_cbs = 0; pollset->shutting_down = 0; @@ -207,6 +204,7 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { GPR_ASSERT(!grpc_pollset_has_workers(pollset)); GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); pollset->vtable->destroy(pollset); + gpr_mu_destroy(&pollset->mu); while (pollset->local_wakeup_cache) { grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next; grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd); diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index bbedb66b00..5868b3fa21 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -37,10 +37,8 @@ #include <poll.h> #include <grpc/support/sync.h> - #include "src/core/iomgr/exec_ctx.h" #include "src/core/iomgr/iomgr.h" -#include "src/core/iomgr/pollset.h" #include "src/core/iomgr/wakeup_fd_posix.h" typedef struct grpc_pollset_vtable grpc_pollset_vtable; @@ -55,15 +53,15 @@ typedef struct grpc_cached_wakeup_fd { struct grpc_cached_wakeup_fd *next; } grpc_cached_wakeup_fd; -struct grpc_pollset_worker { +typedef struct grpc_pollset_worker { grpc_cached_wakeup_fd *wakeup_fd; int reevaluate_polling_on_wakeup; int kicked_specifically; struct grpc_pollset_worker *next; struct grpc_pollset_worker *prev; -}; +} grpc_pollset_worker; -struct grpc_pollset { +typedef struct grpc_pollset { /* pollsets under posix can mutate representation as fds are added and removed. For example, we may choose a poll() based implementation on linux for @@ -83,7 +81,7 @@ struct grpc_pollset { } data; /* Local cache of eventfds for workers */ grpc_cached_wakeup_fd *local_wakeup_cache; -}; +} grpc_pollset; struct grpc_pollset_vtable { void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -95,6 +93,8 @@ struct grpc_pollset_vtable { void (*destroy)(grpc_pollset *pollset); }; +#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) + /* Add an fd to a pollset */ void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, struct grpc_fd *fd); diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h index 9591bf0d32..09c04438f7 100644 --- a/src/core/iomgr/pollset_set.h +++ b/src/core/iomgr/pollset_set.h @@ -41,9 +41,15 @@ fd's (etc) that have been registered with the set_set to that pollset. Registering fd's automatically adds them to all current pollsets. */ -typedef struct grpc_pollset_set grpc_pollset_set; +#ifdef GPR_POSIX_SOCKET +#include "src/core/iomgr/pollset_set_posix.h" +#endif -grpc_pollset_set *grpc_pollset_set_create(void); +#ifdef GPR_WIN32 +#include "src/core/iomgr/pollset_set_windows.h" +#endif + +void grpc_pollset_set_init(grpc_pollset_set *pollset_set); void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set); void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index 9dc9aff4a8..4ec92202e3 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015-2016, Google Inc. + * Copyright 2015, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -41,30 +41,11 @@ #include <grpc/support/alloc.h> #include <grpc/support/useful.h> -#include "src/core/iomgr/pollset_posix.h" -#include "src/core/iomgr/pollset_set_posix.h" +#include "src/core/iomgr/pollset_set.h" -struct grpc_pollset_set { - gpr_mu mu; - - size_t pollset_count; - size_t pollset_capacity; - grpc_pollset **pollsets; - - size_t pollset_set_count; - size_t pollset_set_capacity; - struct grpc_pollset_set **pollset_sets; - - size_t fd_count; - size_t fd_capacity; - grpc_fd **fds; -}; - -grpc_pollset_set *grpc_pollset_set_create(void) { - grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set)); +void grpc_pollset_set_init(grpc_pollset_set *pollset_set) { memset(pollset_set, 0, sizeof(*pollset_set)); gpr_mu_init(&pollset_set->mu); - return pollset_set; } void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { @@ -76,7 +57,6 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { gpr_free(pollset_set->pollsets); gpr_free(pollset_set->pollset_sets); gpr_free(pollset_set->fds); - gpr_free(pollset_set); } void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h index 7d1aaf4181..4820a61e4b 100644 --- a/src/core/iomgr/pollset_set_posix.h +++ b/src/core/iomgr/pollset_set_posix.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015-2016, Google Inc. + * Copyright 2015, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -35,7 +35,23 @@ #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_POSIX_H #include "src/core/iomgr/fd_posix.h" -#include "src/core/iomgr/pollset_set.h" +#include "src/core/iomgr/pollset_posix.h" + +typedef struct grpc_pollset_set { + gpr_mu mu; + + size_t pollset_count; + size_t pollset_capacity; + grpc_pollset **pollsets; + + size_t pollset_set_count; + size_t pollset_set_capacity; + struct grpc_pollset_set **pollset_sets; + + size_t fd_count; + size_t fd_capacity; + grpc_fd **fds; +} grpc_pollset_set; void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_fd *fd); diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c index 9cf8fd4472..157b46ec32 100644 --- a/src/core/iomgr/pollset_set_windows.c +++ b/src/core/iomgr/pollset_set_windows.c @@ -35,9 +35,9 @@ #ifdef GPR_WINSOCK_SOCKET -#include "src/core/iomgr/pollset_set_windows.h" +#include "src/core/iomgr/pollset_set.h" -grpc_pollset_set* grpc_pollset_set_create(pollset_set) { return NULL; } +void grpc_pollset_set_init(grpc_pollset_set* pollset_set) {} void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {} diff --git a/src/core/iomgr/pollset_set_windows.h b/src/core/iomgr/pollset_set_windows.h index aa5abe9133..cada0d2b61 100644 --- a/src/core/iomgr/pollset_set_windows.h +++ b/src/core/iomgr/pollset_set_windows.h @@ -34,6 +34,6 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H -#include "src/core/iomgr/pollset_set.h" +typedef struct grpc_pollset_set { void *unused; } grpc_pollset_set; #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 651f8e7334..bbce23b46a 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -89,17 +89,12 @@ static void push_front_worker(grpc_pollset_worker *root, worker->links[type].next->links[type].prev = worker; } -size_t grpc_pollset_size(void) { - return sizeof(grpc_pollset); -} - /* There isn't really any such thing as a pollset under Windows, due to the nature of the IO completion ports. We're still going to provide a minimal set of features for the sake of the rest of grpc. But grpc_pollset_work won't actually do any polling, and return as quickly as possible. */ -void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { - *mu = &grpc_polling_mu; +void grpc_pollset_init(grpc_pollset *pollset) { memset(pollset, 0, sizeof(*pollset)); pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h index dc0b7a4104..65ba80619b 100644 --- a/src/core/iomgr/pollset_windows.h +++ b/src/core/iomgr/pollset_windows.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015-2016, Google Inc. + * Copyright 2015, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -72,4 +72,8 @@ struct grpc_pollset { grpc_closure *on_shutdown; }; +extern gpr_mu grpc_polling_mu; + +#define GRPC_POLLSET_MU(pollset) (&grpc_polling_mu) + #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 15727856ab..c76c2e3b0f 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -42,19 +42,17 @@ #include <string.h> #include <unistd.h> -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <grpc/support/time.h> - +#include "src/core/iomgr/timer.h" #include "src/core/iomgr/iomgr_posix.h" #include "src/core/iomgr/pollset_posix.h" -#include "src/core/iomgr/pollset_set_posix.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/iomgr/tcp_posix.h" -#include "src/core/iomgr/timer.h" #include "src/core/support/string.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/time.h> extern int grpc_tcp_trace; diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index e8f73811ce..048e907441 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -40,8 +40,8 @@ #include <errno.h> #include <stdlib.h> #include <string.h> -#include <sys/socket.h> #include <sys/types.h> +#include <sys/socket.h> #include <unistd.h> #include <grpc/support/alloc.h> @@ -51,11 +51,9 @@ #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/support/string.h" #include "src/core/debug/trace.h" -#include "src/core/iomgr/pollset_posix.h" -#include "src/core/iomgr/pollset_set_posix.h" #include "src/core/profiling/timers.h" -#include "src/core/support/string.h" #ifdef GPR_HAVE_MSG_NOSIGNAL #define SENDMSG_FLAGS MSG_NOSIGNAL @@ -297,7 +295,7 @@ static flush_result tcp_flush(grpc_tcp *tcp) { unwind_slice_idx = tcp->outgoing_slice_idx; unwind_byte_idx = tcp->outgoing_byte_idx; for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count && - iov_size != MAX_WRITE_IOVEC; + iov_size != MAX_WRITE_IOVEC; iov_size++) { iov[iov_size].iov_base = GPR_SLICE_START_PTR( @@ -446,7 +444,7 @@ static char *tcp_get_peer(grpc_endpoint *ep) { } static const grpc_endpoint_vtable vtable = { - tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, + tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, tcp_shutdown, tcp_destroy, tcp_get_peer}; grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h index a9d0489edf..73a21c80ab 100644 --- a/src/core/iomgr/udp_server.h +++ b/src/core/iomgr/udp_server.h @@ -35,7 +35,6 @@ #define GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H #include "src/core/iomgr/endpoint.h" -#include "src/core/iomgr/fd_posix.h" /* Forward decl of grpc_server */ typedef struct grpc_server grpc_server; diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index c096dbfb30..da11df67ef 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -44,7 +44,6 @@ #include <grpc/support/useful.h> #include "src/core/iomgr/fd_posix.h" -#include "src/core/iomgr/pollset_posix.h" static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success); diff --git a/src/core/iomgr/workqueue_posix.h b/src/core/iomgr/workqueue_posix.h index 68f195ee0d..589034fe1b 100644 --- a/src/core/iomgr/workqueue_posix.h +++ b/src/core/iomgr/workqueue_posix.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015-2016, Google Inc. + * Copyright 2015, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,8 +34,6 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H #define GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H -#include "src/core/iomgr/wakeup_fd_posix.h" - struct grpc_fd; struct grpc_workqueue { diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index 1f4f3e4aa5..458d0d3ac3 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -52,14 +52,13 @@ static grpc_channel_credentials *default_credentials = NULL; static int compute_engine_detection_done = 0; -static gpr_mu g_state_mu; -static gpr_mu *g_polling_mu; +static gpr_mu g_mu; static gpr_once g_once = GPR_ONCE_INIT; -static void init_default_credentials(void) { gpr_mu_init(&g_state_mu); } +static void init_default_credentials(void) { gpr_mu_init(&g_mu); } typedef struct { - grpc_pollset *pollset; + grpc_pollset pollset; int is_done; int success; } compute_engine_detector; @@ -81,10 +80,10 @@ static void on_compute_engine_detection_http_response( } } } - gpr_mu_lock(g_polling_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&detector->pollset)); detector->is_done = 1; - grpc_pollset_kick(detector->pollset, NULL); - gpr_mu_unlock(g_polling_mu); + grpc_pollset_kick(&detector->pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(&detector->pollset)); } static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool s) { @@ -102,8 +101,7 @@ static int is_stack_running_on_compute_engine(void) { on compute engine. */ gpr_timespec max_detection_delay = gpr_time_from_seconds(1, GPR_TIMESPAN); - detector.pollset = gpr_malloc(grpc_pollset_size()); - grpc_pollset_init(detector.pollset, &g_polling_mu); + grpc_pollset_init(&detector.pollset); detector.is_done = 0; detector.success = 0; @@ -114,7 +112,7 @@ static int is_stack_running_on_compute_engine(void) { grpc_httpcli_context_init(&context); grpc_httpcli_get( - &exec_ctx, &context, detector.pollset, &request, + &exec_ctx, &context, &detector.pollset, &request, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay), on_compute_engine_detection_http_response, &detector); @@ -122,22 +120,19 @@ static int is_stack_running_on_compute_engine(void) { /* Block until we get the response. This is not ideal but this should only be called once for the lifetime of the process by the default credentials. */ - gpr_mu_lock(g_polling_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset)); while (!detector.is_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, detector.pollset, &worker, + grpc_pollset_work(&exec_ctx, &detector.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); } - gpr_mu_unlock(g_polling_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset)); grpc_httpcli_context_destroy(&context); - grpc_closure_init(&destroy_closure, destroy_pollset, detector.pollset); - grpc_pollset_shutdown(&exec_ctx, detector.pollset, &destroy_closure); + grpc_closure_init(&destroy_closure, destroy_pollset, &detector.pollset); + grpc_pollset_shutdown(&exec_ctx, &detector.pollset, &destroy_closure); grpc_exec_ctx_finish(&exec_ctx); - g_polling_mu = NULL; - - gpr_free(detector.pollset); return detector.success; } @@ -189,7 +184,7 @@ grpc_channel_credentials *grpc_google_default_credentials_create(void) { gpr_once_init(&g_once, init_default_credentials); - gpr_mu_lock(&g_state_mu); + gpr_mu_lock(&g_mu); if (default_credentials != NULL) { result = grpc_channel_credentials_ref(default_credentials); @@ -235,19 +230,19 @@ end: gpr_log(GPR_ERROR, "Could not create google default credentials."); } } - gpr_mu_unlock(&g_state_mu); + gpr_mu_unlock(&g_mu); return result; } void grpc_flush_cached_google_default_credentials(void) { gpr_once_init(&g_once, init_default_credentials); - gpr_mu_lock(&g_state_mu); + gpr_mu_lock(&g_mu); if (default_credentials != NULL) { grpc_channel_credentials_unref(default_credentials); default_credentials = NULL; } compute_engine_detection_done = 0; - gpr_mu_unlock(&g_state_mu); + gpr_mu_unlock(&g_mu); } /* -- Well known credentials path. -- */ diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 8a9bbace08..f9cb852722 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -36,19 +36,18 @@ #include <stdio.h> #include <string.h> -#include <grpc/support/alloc.h> -#include <grpc/support/atm.h> -#include <grpc/support/log.h> -#include <grpc/support/time.h> - -#include "src/core/iomgr/pollset.h" #include "src/core/iomgr/timer.h" -#include "src/core/profiling/timers.h" +#include "src/core/iomgr/pollset.h" #include "src/core/support/string.h" #include "src/core/surface/api_trace.h" #include "src/core/surface/call.h" #include "src/core/surface/event_string.h" #include "src/core/surface/surface_trace.h" +#include "src/core/profiling/timers.h" +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> typedef struct { grpc_pollset_worker **worker; @@ -57,8 +56,6 @@ typedef struct { /* Completion queue structure */ struct grpc_completion_queue { - /** owned by pollset */ - gpr_mu *mu; /** completed events */ grpc_cq_completion completed_head; grpc_cq_completion *completed_tail; @@ -66,6 +63,8 @@ struct grpc_completion_queue { gpr_refcount pending_events; /** Once owning_refs drops to zero, we will destroy the cq */ gpr_refcount owning_refs; + /** the set of low level i/o things that concern this cq */ + grpc_pollset pollset; /** 0 initially, 1 once we've begun shutting down */ int shutdown; int shutdown_called; @@ -83,8 +82,6 @@ struct grpc_completion_queue { grpc_completion_queue *next_free; }; -#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) - static gpr_mu g_freelist_mu; grpc_completion_queue *g_freelist; @@ -97,7 +94,7 @@ void grpc_cq_global_shutdown(void) { gpr_mu_destroy(&g_freelist_mu); while (g_freelist) { grpc_completion_queue *next = g_freelist->next_free; - grpc_pollset_destroy(POLLSET_FROM_CQ(g_freelist)); + grpc_pollset_destroy(&g_freelist->pollset); #ifndef NDEBUG gpr_free(g_freelist->outstanding_tags); #endif @@ -127,8 +124,8 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { if (g_freelist == NULL) { gpr_mu_unlock(&g_freelist_mu); - cc = gpr_malloc(sizeof(grpc_completion_queue) + grpc_pollset_size()); - grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu); + cc = gpr_malloc(sizeof(grpc_completion_queue)); + grpc_pollset_init(&cc->pollset); #ifndef NDEBUG cc->outstanding_tags = NULL; cc->outstanding_tag_capacity = 0; @@ -187,7 +184,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { #endif if (gpr_unref(&cc->owning_refs)) { GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head); - grpc_pollset_reset(POLLSET_FROM_CQ(cc)); + grpc_pollset_reset(&cc->pollset); gpr_mu_lock(&g_freelist_mu); cc->next_free = g_freelist; g_freelist = cc; @@ -197,7 +194,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { #ifndef NDEBUG - gpr_mu_lock(cc->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); GPR_ASSERT(!cc->shutdown_called); if (cc->outstanding_tag_count == cc->outstanding_tag_capacity) { cc->outstanding_tag_capacity = GPR_MAX(4, 2 * cc->outstanding_tag_capacity); @@ -206,7 +203,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { cc->outstanding_tag_capacity); } cc->outstanding_tags[cc->outstanding_tag_count++] = tag; - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); #endif gpr_ref(&cc->pending_events); } @@ -234,7 +231,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, storage->next = ((uintptr_t)&cc->completed_head) | ((uintptr_t)(success != 0)); - gpr_mu_lock(cc->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); #ifndef NDEBUG for (i = 0; i < (int)cc->outstanding_tag_count; i++) { if (cc->outstanding_tags[i] == tag) { @@ -259,8 +256,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, break; } } - grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker); - gpr_mu_unlock(cc->mu); + grpc_pollset_kick(&cc->pollset, pluck_worker); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } else { cc->completed_tail->next = ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); @@ -268,9 +265,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, GPR_ASSERT(!cc->shutdown); GPR_ASSERT(cc->shutdown_called); cc->shutdown = 1; - grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc), - &cc->pollset_shutdown_done); - gpr_mu_unlock(cc->mu); + grpc_pollset_shutdown(exec_ctx, &cc->pollset, &cc->pollset_shutdown_done); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } GPR_TIMER_END("grpc_cq_end_op", 0); @@ -298,7 +294,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); GRPC_CQ_INTERNAL_REF(cc, "next"); - gpr_mu_lock(cc->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { if (cc->completed_tail != &cc->completed_head) { grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next; @@ -306,7 +302,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, if (c == cc->completed_tail) { cc->completed_tail = &cc->completed_head; } - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; @@ -314,14 +310,14 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, break; } if (cc->shutdown) { - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; } now = gpr_now(GPR_CLOCK_MONOTONIC); if (!first_loop && gpr_time_cmp(now, deadline) >= 0) { - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; break; @@ -334,12 +330,11 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec iteration_deadline = deadline; if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) { GPR_TIMER_MARK("alarm_triggered", 0); - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(cc->mu); - continue; + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); } else { - grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, + grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, iteration_deadline); } } @@ -400,7 +395,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); GRPC_CQ_INTERNAL_REF(cc, "pluck"); - gpr_mu_lock(cc->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { prev = &cc->completed_head; while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) != @@ -410,7 +405,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, if (c == cc->completed_tail) { cc->completed_tail = prev; } - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; @@ -420,7 +415,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, prev = c; } if (cc->shutdown) { - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; @@ -430,7 +425,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, "Too many outstanding grpc_completion_queue_pluck calls: maximum " "is %d", GRPC_MAX_COMPLETION_QUEUE_PLUCKERS); - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); /* TODO(ctiller): should we use a different result here */ ret.type = GRPC_QUEUE_TIMEOUT; @@ -439,7 +434,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, now = gpr_now(GPR_CLOCK_MONOTONIC); if (!first_loop && gpr_time_cmp(now, deadline) >= 0) { del_plucker(cc, tag, &worker); - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; break; @@ -452,12 +447,11 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_timespec iteration_deadline = deadline; if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) { GPR_TIMER_MARK("alarm_triggered", 0); - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(cc->mu); - continue; + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); } else { - grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, + grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, iteration_deadline); } del_plucker(cc, tag, &worker); @@ -478,9 +472,9 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc)); - gpr_mu_lock(cc->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); if (cc->shutdown_called) { - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); return; } @@ -488,10 +482,9 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { if (gpr_unref(&cc->pending_events)) { GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - grpc_pollset_shutdown(&exec_ctx, POLLSET_FROM_CQ(cc), - &cc->pollset_shutdown_done); + grpc_pollset_shutdown(&exec_ctx, &cc->pollset, &cc->pollset_shutdown_done); } - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); } @@ -505,7 +498,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) { } grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { - return POLLSET_FROM_CQ(cc); + return &cc->pollset; } void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } |