diff options
author | Craig Tiller <ctiller@google.com> | 2016-02-25 19:04:07 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-02-25 19:04:07 -0800 |
commit | 69b093b3601bb01bec66391e28cc9f76b7baf303 (patch) | |
tree | 5f62d43fa22138ee42d348248a5f723c1b7d3959 | |
parent | 2596d8f7e786b22be2fba95c8ced4ec0ad017739 (diff) |
Revert "Revert "Add an implementation firewall against pollset_set""
52 files changed, 680 insertions, 563 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 7176c01b05..a96b49ac12 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -78,8 +78,8 @@ typedef struct client_channel_channel_data { int exit_idle_when_lb_policy_arrives; /** owning stack */ grpc_channel_stack *owning_stack; - /** interested parties */ - grpc_pollset_set interested_parties; + /** interested parties (owned) */ + grpc_pollset_set *interested_parties; } channel_data; /** We create one watcher for each new lb_policy that is returned from a @@ -183,8 +183,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, chand->incoming_configuration = NULL; if (lb_policy != NULL) { - grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties, - &chand->interested_parties); + grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties, + chand->interested_parties); } gpr_mu_lock(&chand->mu_config); @@ -231,9 +231,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, } if (old_lb_policy != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, - &old_lb_policy->interested_parties, - &chand->interested_parties); + grpc_pollset_set_del_pollset_set( + exec_ctx, old_lb_policy->interested_parties, chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); } @@ -254,7 +253,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, GPR_ASSERT(op->set_accept_stream == NULL); if (op->bind_pollset != NULL) { - grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, + grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, op->bind_pollset); } @@ -284,8 +283,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, chand->resolver = NULL; if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, - &chand->lb_policy->interested_parties, - &chand->interested_parties); + chand->lb_policy->interested_parties, + chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); chand->lb_policy = NULL; } @@ -411,7 +410,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); - grpc_pollset_set_init(&chand->interested_parties); + chand->interested_parties = grpc_pollset_set_create(); } /* Destructor for channel_data */ @@ -425,12 +424,12 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, } if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, - &chand->lb_policy->interested_parties, - &chand->interested_parties); + chand->lb_policy->interested_parties, + chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); - grpc_pollset_set_destroy(&chand->interested_parties); + grpc_pollset_set_destroy(chand->interested_parties); gpr_mu_destroy(&chand->mu_config); } @@ -441,9 +440,17 @@ static void cc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } const grpc_channel_filter grpc_client_channel_filter = { - cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data), - init_call_elem, cc_set_pollset, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, cc_get_peer, "client-channel", + cc_start_transport_stream_op, + cc_start_transport_op, + sizeof(call_data), + init_call_elem, + cc_set_pollset, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + cc_get_peer, + "client-channel", }; void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, @@ -501,7 +508,7 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { external_connectivity_watcher *w = arg; grpc_closure *follow_up = w->on_complete; - grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties, w->pollset); GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); @@ -517,7 +524,7 @@ void grpc_client_channel_watch_connectivity_state( w->chand = chand; w->pollset = pollset; w->on_complete = on_complete; - grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, pollset); + grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); grpc_closure_init(&w->my_closure, on_external_watch_complete, w); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 459bbebb68..9f38f398d8 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -31,8 +31,8 @@ * */ -#include "src/core/client_config/lb_policy_factory.h" #include "src/core/client_config/lb_policies/pick_first.h" +#include "src/core/client_config/lb_policy_factory.h" #include <string.h> @@ -119,7 +119,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); @@ -137,7 +137,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); @@ -158,7 +158,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - &p->base.interested_parties, &p->checking_connectivity, + p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } @@ -195,8 +195,7 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, - pollset); + grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -253,7 +252,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, p->checking_connectivity, "selected_changed"); if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { grpc_connected_subchannel_notify_on_state_change( - exec_ctx, selected, &p->base.interested_parties, + exec_ctx, selected, p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } else { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); @@ -278,13 +277,13 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = selected; - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); } grpc_connected_subchannel_notify_on_state_change( - exec_ctx, selected, &p->base.interested_parties, + exec_ctx, selected, p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: @@ -298,7 +297,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - &p->base.interested_parties, &p->checking_connectivity, + p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } else { goto loop; @@ -311,7 +310,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "connecting_changed"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - &p->base.interested_parties, &p->checking_connectivity, + p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_FATAL_FAILURE: @@ -379,8 +378,14 @@ void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { - pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_ping_one, pf_exit_idle, - pf_check_connectivity, pf_notify_on_state_change}; + pf_destroy, + pf_shutdown, + pf_pick, + pf_cancel_pick, + pf_ping_one, + pf_exit_idle, + pf_check_connectivity, + pf_notify_on_state_change}; static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index b1171c45b0..114ece6e4d 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -260,7 +260,7 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); @@ -285,7 +285,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { subchannel_data *sd = p->subchannels[i]; sd->connectivity_state = GRPC_CHANNEL_IDLE; grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, &p->base.interested_parties, + exec_ctx, sd->subchannel, p->base.interested_parties, &sd->connectivity_state, &sd->connectivity_changed_closure); GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity"); } @@ -322,8 +322,7 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, - pollset); + grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -374,13 +373,13 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); } grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, &p->base.interested_parties, + exec_ctx, sd->subchannel, p->base.interested_parties, &sd->connectivity_state, &sd->connectivity_changed_closure); break; case GRPC_CHANNEL_CONNECTING: @@ -389,13 +388,13 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, sd->connectivity_state, "connecting_changed"); grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, &p->base.interested_parties, + exec_ctx, sd->subchannel, p->base.interested_parties, &sd->connectivity_state, &sd->connectivity_changed_closure); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: /* renew state notification */ grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, &p->base.interested_parties, + exec_ctx, sd->subchannel, p->base.interested_parties, &sd->connectivity_state, &sd->connectivity_changed_closure); /* remove from ready list if still present */ @@ -484,8 +483,14 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { - rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_ping_one, rr_exit_idle, - rr_check_connectivity, rr_notify_on_state_change}; + rr_destroy, + rr_shutdown, + rr_pick, + rr_cancel_pick, + rr_ping_one, + rr_exit_idle, + rr_check_connectivity, + rr_notify_on_state_change}; static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index d4672f6b25..5ff623e006 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -39,7 +39,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable) { policy->vtable = vtable; gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS); - grpc_pollset_set_init(&policy->interested_parties); + policy->interested_parties = grpc_pollset_set_create(); } #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG @@ -93,7 +93,7 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, gpr_atm old_val = ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); if (old_val == 1) { - grpc_pollset_set_destroy(&policy->interested_parties); + grpc_pollset_set_destroy(policy->interested_parties); policy->vtable->destroy(exec_ctx, policy); } } diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index db5238c8ca..4fbb12da39 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -48,7 +48,8 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel, struct grpc_lb_policy { const grpc_lb_policy_vtable *vtable; gpr_atm ref_pair; - grpc_pollset_set interested_parties; + /* owned pointer to interested parties in load balancing decisions */ + grpc_pollset_set *interested_parties; }; struct grpc_lb_policy_vtable { diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 6599c75dba..291ad3472c 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -108,7 +108,7 @@ struct grpc_subchannel { /** pollset_set tracking who's interested in a connection being setup */ - grpc_pollset_set pollset_set; + grpc_pollset_set *pollset_set; /** active connection, or null; of type grpc_connected_subchannel */ gpr_atm connected_subchannel; @@ -184,8 +184,8 @@ static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(c); } -void grpc_connected_subchannel_ref(grpc_connected_subchannel *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_connected_subchannel_ref( + grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); } @@ -209,7 +209,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_slice_unref(c->initial_connect_string); grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connector_unref(exec_ctx, c->connector); - grpc_pollset_set_destroy(&c->pollset_set); + grpc_pollset_set_destroy(c->pollset_set); grpc_subchannel_key_destroy(exec_ctx, c->key); gpr_free(c); } @@ -226,8 +226,8 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, return old_val; } -grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +grpc_subchannel *grpc_subchannel_ref( + grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), 0 REF_MUTATE_PURPOSE("STRONG_REF")); @@ -235,8 +235,8 @@ grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c return c; } -grpc_subchannel *grpc_subchannel_weak_ref(grpc_subchannel *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +grpc_subchannel *grpc_subchannel_weak_ref( + grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF")); GPR_ASSERT(old_refs != 0); @@ -326,7 +326,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, } c->addr = gpr_malloc(args->addr_len); memcpy(c->addr, args->addr, args->addr_len); - grpc_pollset_set_init(&c->pollset_set); + c->pollset_set = grpc_pollset_set_create(); c->addr_len = args->addr_len; grpc_set_initial_connect_string(&c->addr, &c->addr_len, &c->initial_connect_string); @@ -345,7 +345,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connect_in_args args; - args.interested_parties = &c->pollset_set; + args.interested_parties = c->pollset_set; args.addr = c->addr; args.addr_len = c->addr_len; args.deadline = compute_connect_deadline(c); @@ -379,7 +379,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, external_state_watcher *w = arg; grpc_closure *follow_up = w->notify; if (w->pollset_set != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, + grpc_pollset_set_del_pollset_set(exec_ctx, w->subchannel->pollset_set, w->pollset_set); } gpr_mu_lock(&w->subchannel->mu); @@ -415,7 +415,7 @@ void grpc_subchannel_notify_on_state_change( w->notify = notify; grpc_closure_init(&w->closure, on_external_state_watcher_done, w); if (interested_parties != NULL) { - grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, + grpc_pollset_set_add_pollset_set(exec_ctx, c->pollset_set, interested_parties); } GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); @@ -573,7 +573,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); grpc_connected_subchannel_notify_on_state_change( - exec_ctx, con, &c->pollset_set, &sw_subchannel->connectivity_state, + exec_ctx, con, c->pollset_set, &sw_subchannel->connectivity_state, &sw_subchannel->closure); /* signal completion */ @@ -690,8 +690,8 @@ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } -void grpc_subchannel_call_ref(grpc_subchannel_call *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_subchannel_call_ref( + grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); } diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index 71237bb614..1219c444c7 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -31,20 +31,22 @@ * */ -#include "src/core/iomgr/sockaddr.h" #include "src/core/httpcli/httpcli.h" +#include "src/core/iomgr/sockaddr.h" #include <string.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> + +#include "src/core/httpcli/format_request.h" +#include "src/core/httpcli/parser.h" #include "src/core/iomgr/endpoint.h" +#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/iomgr/tcp_client.h" -#include "src/core/httpcli/format_request.h" -#include "src/core/httpcli/parser.h" #include "src/core/support/string.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> typedef struct { gpr_slice request_text; @@ -84,18 +86,18 @@ const grpc_httpcli_handshaker grpc_httpcli_plaintext = {"http", plaintext_handshake}; void grpc_httpcli_context_init(grpc_httpcli_context *context) { - grpc_pollset_set_init(&context->pollset_set); + context->pollset_set = grpc_pollset_set_create(); } void grpc_httpcli_context_destroy(grpc_httpcli_context *context) { - grpc_pollset_set_destroy(&context->pollset_set); + grpc_pollset_set_destroy(context->pollset_set); } static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req); static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, int success) { - grpc_pollset_set_del_pollset(exec_ctx, &req->context->pollset_set, + grpc_pollset_set_del_pollset(exec_ctx, req->context->pollset_set, req->pollset); req->on_response(exec_ctx, req->user_data, success ? &req->parser.r : NULL); grpc_httpcli_parser_destroy(&req->parser); @@ -197,7 +199,7 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req) { addr = &req->addresses->addrs[req->next_address++]; grpc_closure_init(&req->connected, on_connected, req); grpc_tcp_client_connect( - exec_ctx, &req->connected, &req->ep, &req->context->pollset_set, + exec_ctx, &req->connected, &req->ep, req->context->pollset_set, (struct sockaddr *)&addr->addr, addr->len, req->deadline); } @@ -237,7 +239,7 @@ static void internal_request_begin( req->host = gpr_strdup(request->host); req->ssl_host_override = gpr_strdup(request->ssl_host_override); - grpc_pollset_set_add_pollset(exec_ctx, &req->context->pollset_set, + grpc_pollset_set_add_pollset(exec_ctx, req->context->pollset_set, req->pollset); grpc_resolve_address(request->host, req->handshaker->default_port, on_resolved, req); diff --git a/src/core/httpcli/httpcli.h b/src/core/httpcli/httpcli.h index 30875d71f1..86e17c1d69 100644 --- a/src/core/httpcli/httpcli.h +++ b/src/core/httpcli/httpcli.h @@ -39,6 +39,7 @@ #include <grpc/support/time.h> #include "src/core/iomgr/endpoint.h" +#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/pollset_set.h" /* User agent this library reports */ @@ -56,7 +57,7 @@ typedef struct grpc_httpcli_header { TODO(ctiller): allow caching and capturing multiple requests for the same content and combining them */ typedef struct grpc_httpcli_context { - grpc_pollset_set pollset_set; + grpc_pollset_set *pollset_set; } grpc_httpcli_context; typedef struct { diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 85eadd754b..4ba7c5df94 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -46,6 +46,8 @@ #include <grpc/support/string_util.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/pollset_posix.h" + #define CLOSURE_NOT_READY ((grpc_closure *)0) #define CLOSURE_READY ((grpc_closure *)1) @@ -175,11 +177,11 @@ int grpc_fd_is_orphaned(grpc_fd *fd) { } static void pollset_kick_locked(grpc_fd_watcher *watcher) { - gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset)); + gpr_mu_lock(&watcher->pollset->mu); GPR_ASSERT(watcher->worker); grpc_pollset_kick_ext(watcher->pollset, watcher->worker, GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); - gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset)); + gpr_mu_unlock(&watcher->pollset->mu); } static void maybe_wake_one_watcher_locked(grpc_fd *fd) { diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index 6585326f81..92a0374ddd 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -35,8 +35,11 @@ #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_H #include <grpc/support/port_platform.h> +#include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/iomgr/exec_ctx.h" + #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) /* A grpc_pollset is a set of file descriptors that a higher level item is @@ -46,15 +49,11 @@ - a completion queue might keep a pollset with an entry for each transport that is servicing a call that it's tracking */ -#ifdef GPR_POSIX_SOCKET -#include "src/core/iomgr/pollset_posix.h" -#endif - -#ifdef GPR_WIN32 -#include "src/core/iomgr/pollset_windows.h" -#endif +typedef struct grpc_pollset grpc_pollset; +typedef struct grpc_pollset_worker grpc_pollset_worker; -void grpc_pollset_init(grpc_pollset *pollset); +size_t grpc_pollset_size(void); +void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu); /* Begin shutting down the pollset, and call closure when done. * GRPC_POLLSET_MU(pollset) must be held */ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 4acae2bb71..2e0f27fab8 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -45,6 +45,7 @@ #include <grpc/support/log.h> #include <grpc/support/useful.h> #include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/pollset_posix.h" #include "src/core/profiling/timers.h" #include "src/core/support/block_annotate.h" diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 809f8f39da..4dddfff230 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -42,13 +42,15 @@ #include <stdlib.h> #include <string.h> -#include "src/core/iomgr/fd_posix.h" -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/pollset_posix.h" +#include "src/core/support/block_annotate.h" + typedef struct { /* all polled fds */ size_t fd_count; diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index ee7e9f48f4..e895a77884 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -42,16 +42,16 @@ #include <string.h> #include <unistd.h> -#include "src/core/iomgr/fd_posix.h" -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/socket_utils_posix.h" -#include "src/core/profiling/timers.h" -#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/thd.h> #include <grpc/support/tls.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/socket_utils_posix.h" +#include "src/core/profiling/timers.h" +#include "src/core/support/block_annotate.h" GPR_TLS_DECL(g_current_thread_poller); GPR_TLS_DECL(g_current_thread_worker); @@ -97,6 +97,8 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next->prev = worker; } +size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); } + void grpc_pollset_kick_ext(grpc_pollset *p, grpc_pollset_worker *specific_worker, uint32_t flags) { @@ -186,8 +188,9 @@ void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null); -void grpc_pollset_init(grpc_pollset *pollset) { +void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { gpr_mu_init(&pollset->mu); + *mu = &pollset->mu; pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; pollset->in_flight_cbs = 0; pollset->shutting_down = 0; @@ -204,7 +207,6 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { GPR_ASSERT(!grpc_pollset_has_workers(pollset)); GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); pollset->vtable->destroy(pollset); - gpr_mu_destroy(&pollset->mu); while (pollset->local_wakeup_cache) { grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next; grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd); diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 5868b3fa21..bbedb66b00 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -37,8 +37,10 @@ #include <poll.h> #include <grpc/support/sync.h> + #include "src/core/iomgr/exec_ctx.h" #include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/pollset.h" #include "src/core/iomgr/wakeup_fd_posix.h" typedef struct grpc_pollset_vtable grpc_pollset_vtable; @@ -53,15 +55,15 @@ typedef struct grpc_cached_wakeup_fd { struct grpc_cached_wakeup_fd *next; } grpc_cached_wakeup_fd; -typedef struct grpc_pollset_worker { +struct grpc_pollset_worker { grpc_cached_wakeup_fd *wakeup_fd; int reevaluate_polling_on_wakeup; int kicked_specifically; struct grpc_pollset_worker *next; struct grpc_pollset_worker *prev; -} grpc_pollset_worker; +}; -typedef struct grpc_pollset { +struct grpc_pollset { /* pollsets under posix can mutate representation as fds are added and removed. For example, we may choose a poll() based implementation on linux for @@ -81,7 +83,7 @@ typedef struct grpc_pollset { } data; /* Local cache of eventfds for workers */ grpc_cached_wakeup_fd *local_wakeup_cache; -} grpc_pollset; +}; struct grpc_pollset_vtable { void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -93,8 +95,6 @@ struct grpc_pollset_vtable { void (*destroy)(grpc_pollset *pollset); }; -#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) - /* Add an fd to a pollset */ void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, struct grpc_fd *fd); diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h index 09c04438f7..9591bf0d32 100644 --- a/src/core/iomgr/pollset_set.h +++ b/src/core/iomgr/pollset_set.h @@ -41,15 +41,9 @@ fd's (etc) that have been registered with the set_set to that pollset. Registering fd's automatically adds them to all current pollsets. */ -#ifdef GPR_POSIX_SOCKET -#include "src/core/iomgr/pollset_set_posix.h" -#endif +typedef struct grpc_pollset_set grpc_pollset_set; -#ifdef GPR_WIN32 -#include "src/core/iomgr/pollset_set_windows.h" -#endif - -void grpc_pollset_set_init(grpc_pollset_set *pollset_set); +grpc_pollset_set *grpc_pollset_set_create(void); void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set); void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index 4ec92202e3..9dc9aff4a8 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -41,11 +41,30 @@ #include <grpc/support/alloc.h> #include <grpc/support/useful.h> -#include "src/core/iomgr/pollset_set.h" +#include "src/core/iomgr/pollset_posix.h" +#include "src/core/iomgr/pollset_set_posix.h" -void grpc_pollset_set_init(grpc_pollset_set *pollset_set) { +struct grpc_pollset_set { + gpr_mu mu; + + size_t pollset_count; + size_t pollset_capacity; + grpc_pollset **pollsets; + + size_t pollset_set_count; + size_t pollset_set_capacity; + struct grpc_pollset_set **pollset_sets; + + size_t fd_count; + size_t fd_capacity; + grpc_fd **fds; +}; + +grpc_pollset_set *grpc_pollset_set_create(void) { + grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set)); memset(pollset_set, 0, sizeof(*pollset_set)); gpr_mu_init(&pollset_set->mu); + return pollset_set; } void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { @@ -57,6 +76,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { gpr_free(pollset_set->pollsets); gpr_free(pollset_set->pollset_sets); gpr_free(pollset_set->fds); + gpr_free(pollset_set); } void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h index 4820a61e4b..7d1aaf4181 100644 --- a/src/core/iomgr/pollset_set_posix.h +++ b/src/core/iomgr/pollset_set_posix.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -35,23 +35,7 @@ #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_POSIX_H #include "src/core/iomgr/fd_posix.h" -#include "src/core/iomgr/pollset_posix.h" - -typedef struct grpc_pollset_set { - gpr_mu mu; - - size_t pollset_count; - size_t pollset_capacity; - grpc_pollset **pollsets; - - size_t pollset_set_count; - size_t pollset_set_capacity; - struct grpc_pollset_set **pollset_sets; - - size_t fd_count; - size_t fd_capacity; - grpc_fd **fds; -} grpc_pollset_set; +#include "src/core/iomgr/pollset_set.h" void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_fd *fd); diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c index 157b46ec32..9cf8fd4472 100644 --- a/src/core/iomgr/pollset_set_windows.c +++ b/src/core/iomgr/pollset_set_windows.c @@ -35,9 +35,9 @@ #ifdef GPR_WINSOCK_SOCKET -#include "src/core/iomgr/pollset_set.h" +#include "src/core/iomgr/pollset_set_windows.h" -void grpc_pollset_set_init(grpc_pollset_set* pollset_set) {} +grpc_pollset_set* grpc_pollset_set_create(pollset_set) { return NULL; } void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {} diff --git a/src/core/iomgr/pollset_set_windows.h b/src/core/iomgr/pollset_set_windows.h index cada0d2b61..aa5abe9133 100644 --- a/src/core/iomgr/pollset_set_windows.h +++ b/src/core/iomgr/pollset_set_windows.h @@ -34,6 +34,6 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H -typedef struct grpc_pollset_set { void *unused; } grpc_pollset_set; +#include "src/core/iomgr/pollset_set.h" #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index bbce23b46a..651f8e7334 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -89,12 +89,17 @@ static void push_front_worker(grpc_pollset_worker *root, worker->links[type].next->links[type].prev = worker; } +size_t grpc_pollset_size(void) { + return sizeof(grpc_pollset); +} + /* There isn't really any such thing as a pollset under Windows, due to the nature of the IO completion ports. We're still going to provide a minimal set of features for the sake of the rest of grpc. But grpc_pollset_work won't actually do any polling, and return as quickly as possible. */ -void grpc_pollset_init(grpc_pollset *pollset) { +void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { + *mu = &grpc_polling_mu; memset(pollset, 0, sizeof(*pollset)); pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h index 65ba80619b..dc0b7a4104 100644 --- a/src/core/iomgr/pollset_windows.h +++ b/src/core/iomgr/pollset_windows.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -72,8 +72,4 @@ struct grpc_pollset { grpc_closure *on_shutdown; }; -extern gpr_mu grpc_polling_mu; - -#define GRPC_POLLSET_MU(pollset) (&grpc_polling_mu) - #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index c76c2e3b0f..15727856ab 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -42,17 +42,19 @@ #include <string.h> #include <unistd.h> -#include "src/core/iomgr/timer.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/time.h> + #include "src/core/iomgr/iomgr_posix.h" #include "src/core/iomgr/pollset_posix.h" +#include "src/core/iomgr/pollset_set_posix.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/iomgr/tcp_posix.h" +#include "src/core/iomgr/timer.h" #include "src/core/support/string.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <grpc/support/time.h> extern int grpc_tcp_trace; diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 048e907441..e8f73811ce 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -40,8 +40,8 @@ #include <errno.h> #include <stdlib.h> #include <string.h> -#include <sys/types.h> #include <sys/socket.h> +#include <sys/types.h> #include <unistd.h> #include <grpc/support/alloc.h> @@ -51,9 +51,11 @@ #include <grpc/support/sync.h> #include <grpc/support/time.h> -#include "src/core/support/string.h" #include "src/core/debug/trace.h" +#include "src/core/iomgr/pollset_posix.h" +#include "src/core/iomgr/pollset_set_posix.h" #include "src/core/profiling/timers.h" +#include "src/core/support/string.h" #ifdef GPR_HAVE_MSG_NOSIGNAL #define SENDMSG_FLAGS MSG_NOSIGNAL @@ -295,7 +297,7 @@ static flush_result tcp_flush(grpc_tcp *tcp) { unwind_slice_idx = tcp->outgoing_slice_idx; unwind_byte_idx = tcp->outgoing_byte_idx; for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count && - iov_size != MAX_WRITE_IOVEC; + iov_size != MAX_WRITE_IOVEC; iov_size++) { iov[iov_size].iov_base = GPR_SLICE_START_PTR( @@ -444,7 +446,7 @@ static char *tcp_get_peer(grpc_endpoint *ep) { } static const grpc_endpoint_vtable vtable = { - tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, + tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, tcp_shutdown, tcp_destroy, tcp_get_peer}; grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h index 73a21c80ab..a9d0489edf 100644 --- a/src/core/iomgr/udp_server.h +++ b/src/core/iomgr/udp_server.h @@ -35,6 +35,7 @@ #define GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H #include "src/core/iomgr/endpoint.h" +#include "src/core/iomgr/fd_posix.h" /* Forward decl of grpc_server */ typedef struct grpc_server grpc_server; diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index da11df67ef..c096dbfb30 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -44,6 +44,7 @@ #include <grpc/support/useful.h> #include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/pollset_posix.h" static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success); diff --git a/src/core/iomgr/workqueue_posix.h b/src/core/iomgr/workqueue_posix.h index 589034fe1b..68f195ee0d 100644 --- a/src/core/iomgr/workqueue_posix.h +++ b/src/core/iomgr/workqueue_posix.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,6 +34,8 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H #define GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H +#include "src/core/iomgr/wakeup_fd_posix.h" + struct grpc_fd; struct grpc_workqueue { diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index 458d0d3ac3..1f4f3e4aa5 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -52,13 +52,14 @@ static grpc_channel_credentials *default_credentials = NULL; static int compute_engine_detection_done = 0; -static gpr_mu g_mu; +static gpr_mu g_state_mu; +static gpr_mu *g_polling_mu; static gpr_once g_once = GPR_ONCE_INIT; -static void init_default_credentials(void) { gpr_mu_init(&g_mu); } +static void init_default_credentials(void) { gpr_mu_init(&g_state_mu); } typedef struct { - grpc_pollset pollset; + grpc_pollset *pollset; int is_done; int success; } compute_engine_detector; @@ -80,10 +81,10 @@ static void on_compute_engine_detection_http_response( } } } - gpr_mu_lock(GRPC_POLLSET_MU(&detector->pollset)); + gpr_mu_lock(g_polling_mu); detector->is_done = 1; - grpc_pollset_kick(&detector->pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(&detector->pollset)); + grpc_pollset_kick(detector->pollset, NULL); + gpr_mu_unlock(g_polling_mu); } static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool s) { @@ -101,7 +102,8 @@ static int is_stack_running_on_compute_engine(void) { on compute engine. */ gpr_timespec max_detection_delay = gpr_time_from_seconds(1, GPR_TIMESPAN); - grpc_pollset_init(&detector.pollset); + detector.pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(detector.pollset, &g_polling_mu); detector.is_done = 0; detector.success = 0; @@ -112,7 +114,7 @@ static int is_stack_running_on_compute_engine(void) { grpc_httpcli_context_init(&context); grpc_httpcli_get( - &exec_ctx, &context, &detector.pollset, &request, + &exec_ctx, &context, detector.pollset, &request, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay), on_compute_engine_detection_http_response, &detector); @@ -120,19 +122,22 @@ static int is_stack_running_on_compute_engine(void) { /* Block until we get the response. This is not ideal but this should only be called once for the lifetime of the process by the default credentials. */ - gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset)); + gpr_mu_lock(g_polling_mu); while (!detector.is_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, &detector.pollset, &worker, + grpc_pollset_work(&exec_ctx, detector.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); } - gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset)); + gpr_mu_unlock(g_polling_mu); grpc_httpcli_context_destroy(&context); - grpc_closure_init(&destroy_closure, destroy_pollset, &detector.pollset); - grpc_pollset_shutdown(&exec_ctx, &detector.pollset, &destroy_closure); + grpc_closure_init(&destroy_closure, destroy_pollset, detector.pollset); + grpc_pollset_shutdown(&exec_ctx, detector.pollset, &destroy_closure); grpc_exec_ctx_finish(&exec_ctx); + g_polling_mu = NULL; + + gpr_free(detector.pollset); return detector.success; } @@ -184,7 +189,7 @@ grpc_channel_credentials *grpc_google_default_credentials_create(void) { gpr_once_init(&g_once, init_default_credentials); - gpr_mu_lock(&g_mu); + gpr_mu_lock(&g_state_mu); if (default_credentials != NULL) { result = grpc_channel_credentials_ref(default_credentials); @@ -230,19 +235,19 @@ end: gpr_log(GPR_ERROR, "Could not create google default credentials."); } } - gpr_mu_unlock(&g_mu); + gpr_mu_unlock(&g_state_mu); return result; } void grpc_flush_cached_google_default_credentials(void) { gpr_once_init(&g_once, init_default_credentials); - gpr_mu_lock(&g_mu); + gpr_mu_lock(&g_state_mu); if (default_credentials != NULL) { grpc_channel_credentials_unref(default_credentials); default_credentials = NULL; } compute_engine_detection_done = 0; - gpr_mu_unlock(&g_mu); + gpr_mu_unlock(&g_state_mu); } /* -- Well known credentials path. -- */ diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index f9cb852722..8a9bbace08 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -36,18 +36,19 @@ #include <stdio.h> #include <string.h> -#include "src/core/iomgr/timer.h" +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> + #include "src/core/iomgr/pollset.h" +#include "src/core/iomgr/timer.h" +#include "src/core/profiling/timers.h" #include "src/core/support/string.h" #include "src/core/surface/api_trace.h" #include "src/core/surface/call.h" #include "src/core/surface/event_string.h" #include "src/core/surface/surface_trace.h" -#include "src/core/profiling/timers.h" -#include <grpc/support/alloc.h> -#include <grpc/support/atm.h> -#include <grpc/support/log.h> -#include <grpc/support/time.h> typedef struct { grpc_pollset_worker **worker; @@ -56,6 +57,8 @@ typedef struct { /* Completion queue structure */ struct grpc_completion_queue { + /** owned by pollset */ + gpr_mu *mu; /** completed events */ grpc_cq_completion completed_head; grpc_cq_completion *completed_tail; @@ -63,8 +66,6 @@ struct grpc_completion_queue { gpr_refcount pending_events; /** Once owning_refs drops to zero, we will destroy the cq */ gpr_refcount owning_refs; - /** the set of low level i/o things that concern this cq */ - grpc_pollset pollset; /** 0 initially, 1 once we've begun shutting down */ int shutdown; int shutdown_called; @@ -82,6 +83,8 @@ struct grpc_completion_queue { grpc_completion_queue *next_free; }; +#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) + static gpr_mu g_freelist_mu; grpc_completion_queue *g_freelist; @@ -94,7 +97,7 @@ void grpc_cq_global_shutdown(void) { gpr_mu_destroy(&g_freelist_mu); while (g_freelist) { grpc_completion_queue *next = g_freelist->next_free; - grpc_pollset_destroy(&g_freelist->pollset); + grpc_pollset_destroy(POLLSET_FROM_CQ(g_freelist)); #ifndef NDEBUG gpr_free(g_freelist->outstanding_tags); #endif @@ -124,8 +127,8 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { if (g_freelist == NULL) { gpr_mu_unlock(&g_freelist_mu); - cc = gpr_malloc(sizeof(grpc_completion_queue)); - grpc_pollset_init(&cc->pollset); + cc = gpr_malloc(sizeof(grpc_completion_queue) + grpc_pollset_size()); + grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu); #ifndef NDEBUG cc->outstanding_tags = NULL; cc->outstanding_tag_capacity = 0; @@ -184,7 +187,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { #endif if (gpr_unref(&cc->owning_refs)) { GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head); - grpc_pollset_reset(&cc->pollset); + grpc_pollset_reset(POLLSET_FROM_CQ(cc)); gpr_mu_lock(&g_freelist_mu); cc->next_free = g_freelist; g_freelist = cc; @@ -194,7 +197,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { #ifndef NDEBUG - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(cc->mu); GPR_ASSERT(!cc->shutdown_called); if (cc->outstanding_tag_count == cc->outstanding_tag_capacity) { cc->outstanding_tag_capacity = GPR_MAX(4, 2 * cc->outstanding_tag_capacity); @@ -203,7 +206,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { cc->outstanding_tag_capacity); } cc->outstanding_tags[cc->outstanding_tag_count++] = tag; - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); #endif gpr_ref(&cc->pending_events); } @@ -231,7 +234,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, storage->next = ((uintptr_t)&cc->completed_head) | ((uintptr_t)(success != 0)); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(cc->mu); #ifndef NDEBUG for (i = 0; i < (int)cc->outstanding_tag_count; i++) { if (cc->outstanding_tags[i] == tag) { @@ -256,8 +259,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, break; } } - grpc_pollset_kick(&cc->pollset, pluck_worker); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker); + gpr_mu_unlock(cc->mu); } else { cc->completed_tail->next = ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); @@ -265,8 +268,9 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, GPR_ASSERT(!cc->shutdown); GPR_ASSERT(cc->shutdown_called); cc->shutdown = 1; - grpc_pollset_shutdown(exec_ctx, &cc->pollset, &cc->pollset_shutdown_done); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc), + &cc->pollset_shutdown_done); + gpr_mu_unlock(cc->mu); } GPR_TIMER_END("grpc_cq_end_op", 0); @@ -294,7 +298,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); GRPC_CQ_INTERNAL_REF(cc, "next"); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(cc->mu); for (;;) { if (cc->completed_tail != &cc->completed_head) { grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next; @@ -302,7 +306,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, if (c == cc->completed_tail) { cc->completed_tail = &cc->completed_head; } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; @@ -310,14 +314,14 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, break; } if (cc->shutdown) { - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; } now = gpr_now(GPR_CLOCK_MONOTONIC); if (!first_loop && gpr_time_cmp(now, deadline) >= 0) { - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; break; @@ -330,11 +334,12 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec iteration_deadline = deadline; if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) { GPR_TIMER_MARK("alarm_triggered", 0); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(cc->mu); + continue; } else { - grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, + grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, iteration_deadline); } } @@ -395,7 +400,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); GRPC_CQ_INTERNAL_REF(cc, "pluck"); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(cc->mu); for (;;) { prev = &cc->completed_head; while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) != @@ -405,7 +410,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, if (c == cc->completed_tail) { cc->completed_tail = prev; } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; @@ -415,7 +420,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, prev = c; } if (cc->shutdown) { - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; @@ -425,7 +430,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, "Too many outstanding grpc_completion_queue_pluck calls: maximum " "is %d", GRPC_MAX_COMPLETION_QUEUE_PLUCKERS); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); memset(&ret, 0, sizeof(ret)); /* TODO(ctiller): should we use a different result here */ ret.type = GRPC_QUEUE_TIMEOUT; @@ -434,7 +439,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, now = gpr_now(GPR_CLOCK_MONOTONIC); if (!first_loop && gpr_time_cmp(now, deadline) >= 0) { del_plucker(cc, tag, &worker); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; break; @@ -447,11 +452,12 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_timespec iteration_deadline = deadline; if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) { GPR_TIMER_MARK("alarm_triggered", 0); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(cc->mu); + continue; } else { - grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, + grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, iteration_deadline); } del_plucker(cc, tag, &worker); @@ -472,9 +478,9 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc)); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(cc->mu); if (cc->shutdown_called) { - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); return; } @@ -482,9 +488,10 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { if (gpr_unref(&cc->pending_events)) { GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - grpc_pollset_shutdown(&exec_ctx, &cc->pollset, &cc->pollset_shutdown_done); + grpc_pollset_shutdown(&exec_ctx, POLLSET_FROM_CQ(cc), + &cc->pollset_shutdown_done); } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); } @@ -498,7 +505,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) { } grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { - return &cc->pollset; + return POLLSET_FROM_CQ(cc); } void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } diff --git a/test/core/client_config/set_initial_connect_string_test.c b/test/core/client_config/set_initial_connect_string_test.c index bcd1f26123..3cf267fb3b 100644 --- a/test/core/client_config/set_initial_connect_string_test.c +++ b/test/core/client_config/set_initial_connect_string_test.c @@ -85,7 +85,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, gpr_slice_buffer_init(&state.incoming_buffer); gpr_slice_buffer_init(&state.temp_incoming_buffer); state.tcp = tcp; - grpc_endpoint_add_to_pollset(exec_ctx, tcp, &server->pollset); + grpc_endpoint_add_to_pollset(exec_ctx, tcp, server->pollset); grpc_endpoint_read(exec_ctx, tcp, &state.temp_incoming_buffer, &on_read); } diff --git a/test/core/end2end/fixtures/h2_full+poll+pipe.c b/test/core/end2end/fixtures/h2_full+poll+pipe.c index d475a7bb55..682598fbe2 100644 --- a/test/core/end2end/fixtures/h2_full+poll+pipe.c +++ b/test/core/end2end/fixtures/h2_full+poll+pipe.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -35,21 +35,23 @@ #include <string.h> -#include "src/core/channel/client_channel.h" -#include "src/core/channel/connected_channel.h" -#include "src/core/channel/http_server_filter.h" -#include "src/core/surface/channel.h" -#include "src/core/surface/server.h" -#include "src/core/transport/chttp2_transport.h" #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/thd.h> #include <grpc/support/useful.h> + +#include "src/core/channel/client_channel.h" +#include "src/core/channel/connected_channel.h" +#include "src/core/channel/http_server_filter.h" +#include "src/core/iomgr/pollset_posix.h" +#include "src/core/iomgr/wakeup_fd_posix.h" +#include "src/core/surface/channel.h" +#include "src/core/surface/server.h" +#include "src/core/transport/chttp2_transport.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" -#include "src/core/iomgr/wakeup_fd_posix.h" typedef struct fullstack_fixture_data { char *localaddr; diff --git a/test/core/end2end/fixtures/h2_full+poll.c b/test/core/end2end/fixtures/h2_full+poll.c index 3f5e6096f6..5a0b2ef495 100644 --- a/test/core/end2end/fixtures/h2_full+poll.c +++ b/test/core/end2end/fixtures/h2_full+poll.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -35,18 +35,20 @@ #include <string.h> -#include "src/core/channel/client_channel.h" -#include "src/core/channel/connected_channel.h" -#include "src/core/channel/http_server_filter.h" -#include "src/core/surface/channel.h" -#include "src/core/surface/server.h" -#include "src/core/transport/chttp2_transport.h" #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/thd.h> #include <grpc/support/useful.h> + +#include "src/core/channel/client_channel.h" +#include "src/core/channel/connected_channel.h" +#include "src/core/channel/http_server_filter.h" +#include "src/core/iomgr/pollset_posix.h" +#include "src/core/surface/channel.h" +#include "src/core/surface/server.h" +#include "src/core/transport/chttp2_transport.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" diff --git a/test/core/end2end/fixtures/h2_ssl+poll.c b/test/core/end2end/fixtures/h2_ssl+poll.c index 0b88876d13..66268c77d5 100644 --- a/test/core/end2end/fixtures/h2_ssl+poll.c +++ b/test/core/end2end/fixtures/h2_ssl+poll.c @@ -41,6 +41,7 @@ #include <grpc/support/log.h> #include "src/core/channel/channel_args.h" +#include "src/core/iomgr/pollset_posix.h" #include "src/core/security/credentials.h" #include "src/core/support/env.h" #include "src/core/support/tmpfile.h" diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index 33055561cb..87bbd64d09 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -35,6 +35,13 @@ #include <string.h> +#include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> +#include <grpc/support/useful.h> #include "src/core/channel/channel_args.h" #include "src/core/channel/client_channel.h" #include "src/core/channel/client_uchannel.h" @@ -46,13 +53,6 @@ #include "src/core/surface/channel.h" #include "src/core/surface/server.h" #include "src/core/transport/chttp2_transport.h" -#include <grpc/support/alloc.h> -#include <grpc/support/host_port.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <grpc/support/sync.h> -#include <grpc/support/thd.h> -#include <grpc/support/useful.h> #include "test/core/util/port.h" #include "test/core/util/test_config.h" @@ -238,12 +238,12 @@ static grpc_end2end_test_fixture chttp2_create_fixture_micro_fullstack( } grpc_connectivity_state g_state = GRPC_CHANNEL_IDLE; -grpc_pollset_set g_interested_parties; +grpc_pollset_set *g_interested_parties; static void state_changed(grpc_exec_ctx *exec_ctx, void *arg, bool success) { if (g_state != GRPC_CHANNEL_READY) { grpc_subchannel_notify_on_state_change( - exec_ctx, arg, &g_interested_parties, &g_state, + exec_ctx, arg, g_interested_parties, &g_state, grpc_closure_create(state_changed, arg)); } } @@ -253,30 +253,31 @@ static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *arg, bool success) { } static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) { - grpc_pollset pollset; + gpr_mu *mu; + grpc_pollset *pollset = gpr_malloc(grpc_pollset_size()); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_pollset_init(&pollset); - grpc_pollset_set_init(&g_interested_parties); - grpc_pollset_set_add_pollset(&exec_ctx, &g_interested_parties, &pollset); - grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_interested_parties, + grpc_pollset_init(pollset, &mu); + g_interested_parties = grpc_pollset_set_create(); + grpc_pollset_set_add_pollset(&exec_ctx, g_interested_parties, pollset); + grpc_subchannel_notify_on_state_change(&exec_ctx, c, g_interested_parties, &g_state, grpc_closure_create(state_changed, c)); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&pollset)); + gpr_mu_lock(mu); while (g_state != GRPC_CHANNEL_READY) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, &pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), + grpc_pollset_work(&exec_ctx, pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); - gpr_mu_unlock(GRPC_POLLSET_MU(&pollset)); + gpr_mu_unlock(mu); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&pollset)); + gpr_mu_lock(mu); } - grpc_pollset_shutdown(&exec_ctx, &pollset, - grpc_closure_create(destroy_pollset, &pollset)); - grpc_pollset_set_destroy(&g_interested_parties); - gpr_mu_unlock(GRPC_POLLSET_MU(&pollset)); + grpc_pollset_shutdown(&exec_ctx, pollset, + grpc_closure_create(destroy_pollset, pollset)); + grpc_pollset_set_destroy(g_interested_parties); + gpr_mu_unlock(mu); grpc_exec_ctx_finish(&exec_ctx); + gpr_free(pollset); return grpc_subchannel_get_connected_subchannel(c); } diff --git a/test/core/end2end/fixtures/h2_uds+poll.c b/test/core/end2end/fixtures/h2_uds+poll.c index 155017c887..c3a855ff88 100644 --- a/test/core/end2end/fixtures/h2_uds+poll.c +++ b/test/core/end2end/fixtures/h2_uds+poll.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -37,13 +37,6 @@ #include <string.h> #include <unistd.h> -#include "src/core/channel/client_channel.h" -#include "src/core/channel/connected_channel.h" -#include "src/core/channel/http_server_filter.h" -#include "src/core/support/string.h" -#include "src/core/surface/channel.h" -#include "src/core/surface/server.h" -#include "src/core/transport/chttp2_transport.h" #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> @@ -51,6 +44,15 @@ #include <grpc/support/sync.h> #include <grpc/support/thd.h> #include <grpc/support/useful.h> + +#include "src/core/channel/client_channel.h" +#include "src/core/channel/connected_channel.h" +#include "src/core/channel/http_server_filter.h" +#include "src/core/iomgr/pollset_posix.h" +#include "src/core/support/string.h" +#include "src/core/surface/channel.h" +#include "src/core/surface/server.h" +#include "src/core/transport/chttp2_transport.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" diff --git a/test/core/httpcli/httpcli_test.c b/test/core/httpcli/httpcli_test.c index e954a920b0..da1463329d 100644 --- a/test/core/httpcli/httpcli_test.c +++ b/test/core/httpcli/httpcli_test.c @@ -36,18 +36,19 @@ #include <string.h> #include <grpc/grpc.h> -#include "src/core/iomgr/iomgr.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpc/support/subprocess.h> #include <grpc/support/sync.h> +#include "src/core/iomgr/iomgr.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" static int g_done = 0; static grpc_httpcli_context g_context; -static grpc_pollset g_pollset; +static gpr_mu *g_mu; +static grpc_pollset *g_pollset; static gpr_timespec n_seconds_time(int seconds) { return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(seconds); @@ -63,10 +64,10 @@ static void on_finish(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(response->status == 200); GPR_ASSERT(response->body_length == strlen(expect)); GPR_ASSERT(0 == memcmp(expect, response->body, response->body_length)); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); g_done = 1; - grpc_pollset_kick(&g_pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_pollset_kick(g_pollset, NULL); + gpr_mu_unlock(g_mu); } static void test_get(int port) { @@ -85,18 +86,18 @@ static void test_get(int port) { req.path = "/get"; req.handshaker = &grpc_httpcli_plaintext; - grpc_httpcli_get(&exec_ctx, &g_context, &g_pollset, &req, n_seconds_time(15), + grpc_httpcli_get(&exec_ctx, &g_context, g_pollset, &req, n_seconds_time(15), on_finish, (void *)42); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); while (!g_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20)); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); } - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); gpr_free(host); } @@ -116,18 +117,18 @@ static void test_post(int port) { req.path = "/post"; req.handshaker = &grpc_httpcli_plaintext; - grpc_httpcli_post(&exec_ctx, &g_context, &g_pollset, &req, "hello", 5, + grpc_httpcli_post(&exec_ctx, &g_context, g_pollset, &req, "hello", 5, n_seconds_time(15), on_finish, (void *)42); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); while (!g_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20)); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); } - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); gpr_free(host); } @@ -175,17 +176,20 @@ int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); grpc_httpcli_context_init(&g_context); - grpc_pollset_init(&g_pollset); + g_pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(g_pollset, &g_mu); test_get(port); test_post(port); grpc_httpcli_context_destroy(&g_context); - grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); - grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); + grpc_closure_init(&destroyed, destroy_pollset, g_pollset); + grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); + gpr_free(g_pollset); + gpr_subprocess_destroy(server); return 0; diff --git a/test/core/httpcli/httpscli_test.c b/test/core/httpcli/httpscli_test.c index 9f31eae278..7f765bc614 100644 --- a/test/core/httpcli/httpscli_test.c +++ b/test/core/httpcli/httpscli_test.c @@ -36,18 +36,19 @@ #include <string.h> #include <grpc/grpc.h> -#include "src/core/iomgr/iomgr.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpc/support/subprocess.h> #include <grpc/support/sync.h> +#include "src/core/iomgr/iomgr.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" static int g_done = 0; static grpc_httpcli_context g_context; -static grpc_pollset g_pollset; +static gpr_mu *g_mu; +static grpc_pollset *g_pollset; static gpr_timespec n_seconds_time(int seconds) { return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(seconds); @@ -63,10 +64,10 @@ static void on_finish(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(response->status == 200); GPR_ASSERT(response->body_length == strlen(expect)); GPR_ASSERT(0 == memcmp(expect, response->body, response->body_length)); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); g_done = 1; - grpc_pollset_kick(&g_pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_pollset_kick(g_pollset, NULL); + gpr_mu_unlock(g_mu); } static void test_get(int port) { @@ -86,18 +87,18 @@ static void test_get(int port) { req.path = "/get"; req.handshaker = &grpc_httpcli_ssl; - grpc_httpcli_get(&exec_ctx, &g_context, &g_pollset, &req, n_seconds_time(15), + grpc_httpcli_get(&exec_ctx, &g_context, g_pollset, &req, n_seconds_time(15), on_finish, (void *)42); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); while (!g_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20)); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); } - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); gpr_free(host); } @@ -118,18 +119,18 @@ static void test_post(int port) { req.path = "/post"; req.handshaker = &grpc_httpcli_ssl; - grpc_httpcli_post(&exec_ctx, &g_context, &g_pollset, &req, "hello", 5, + grpc_httpcli_post(&exec_ctx, &g_context, g_pollset, &req, "hello", 5, n_seconds_time(15), on_finish, (void *)42); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); while (!g_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20)); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); } - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); gpr_free(host); } @@ -178,17 +179,20 @@ int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); grpc_httpcli_context_init(&g_context); - grpc_pollset_init(&g_pollset); + g_pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(g_pollset, &g_mu); test_get(port); test_post(port); grpc_httpcli_context_destroy(&g_context); - grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); - grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); + grpc_closure_init(&destroyed, destroy_pollset, g_pollset); + grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); + gpr_free(g_pollset); + gpr_subprocess_destroy(server); return 0; diff --git a/test/core/iomgr/endpoint_pair_test.c b/test/core/iomgr/endpoint_pair_test.c index 7e266ebfb9..c3a91088a5 100644 --- a/test/core/iomgr/endpoint_pair_test.c +++ b/test/core/iomgr/endpoint_pair_test.c @@ -39,10 +39,11 @@ #include <grpc/support/time.h> #include <grpc/support/useful.h> #include "src/core/iomgr/endpoint_pair.h" -#include "test/core/util/test_config.h" #include "test/core/iomgr/endpoint_tests.h" +#include "test/core/util/test_config.h" -static grpc_pollset g_pollset; +static gpr_mu *g_mu; +static grpc_pollset *g_pollset; static void clean_up(void) {} @@ -54,8 +55,8 @@ static grpc_endpoint_test_fixture create_fixture_endpoint_pair( f.client_ep = p.client; f.server_ep = p.server; - grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, &g_pollset); - grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, g_pollset); grpc_exec_ctx_finish(&exec_ctx); return f; @@ -74,12 +75,14 @@ int main(int argc, char **argv) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); - grpc_pollset_init(&g_pollset); - grpc_endpoint_tests(configs[0], &g_pollset); - grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); - grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); + g_pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(g_pollset, &g_mu); + grpc_endpoint_tests(configs[0], g_pollset, g_mu); + grpc_closure_init(&destroyed, destroy_pollset, g_pollset); + grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); + gpr_free(g_pollset); return 0; } diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index 6ba67df3b1..f689e4ba7f 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -36,8 +36,8 @@ #include <sys/types.h> #include <grpc/support/alloc.h> -#include <grpc/support/slice.h> #include <grpc/support/log.h> +#include <grpc/support/slice.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> #include "test/core/util/test_config.h" @@ -58,6 +58,7 @@ */ +static gpr_mu *g_mu; static grpc_pollset *g_pollset; size_t count_slices(gpr_slice *slices, size_t nslices, int *current_data) { @@ -134,10 +135,10 @@ static void read_and_write_test_read_handler(grpc_exec_ctx *exec_ctx, state->incoming.slices, state->incoming.count, &state->current_read_data); if (state->bytes_read == state->target_bytes || !success) { gpr_log(GPR_INFO, "Read handler done"); - gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); + gpr_mu_lock(g_mu); state->read_done = 1 + success; grpc_pollset_kick(g_pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); + gpr_mu_unlock(g_mu); } else if (success) { grpc_endpoint_read(exec_ctx, state->read_ep, &state->incoming, &state->done_read); @@ -169,10 +170,10 @@ static void read_and_write_test_write_handler(grpc_exec_ctx *exec_ctx, } gpr_log(GPR_INFO, "Write handler done"); - gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); + gpr_mu_lock(g_mu); state->write_done = 1 + success; grpc_pollset_kick(g_pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); + gpr_mu_unlock(g_mu); } /* Do both reading and writing using the grpc_endpoint API. @@ -232,14 +233,14 @@ static void read_and_write_test(grpc_endpoint_test_config config, } grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); + gpr_mu_lock(g_mu); while (!state.read_done || !state.write_done) { grpc_pollset_worker *worker = NULL; GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0); grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); } - gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); end_test(config); @@ -251,9 +252,10 @@ static void read_and_write_test(grpc_endpoint_test_config config, } void grpc_endpoint_tests(grpc_endpoint_test_config config, - grpc_pollset *pollset) { + grpc_pollset *pollset, gpr_mu *mu) { size_t i; g_pollset = pollset; + g_mu = mu; read_and_write_test(config, 10000000, 100000, 8192, 0); read_and_write_test(config, 1000000, 100000, 1, 0); read_and_write_test(config, 100000000, 100000, 1, 1); diff --git a/test/core/iomgr/endpoint_tests.h b/test/core/iomgr/endpoint_tests.h index 700f854891..8ea47e345c 100644 --- a/test/core/iomgr/endpoint_tests.h +++ b/test/core/iomgr/endpoint_tests.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -53,6 +53,6 @@ struct grpc_endpoint_test_config { }; void grpc_endpoint_tests(grpc_endpoint_test_config config, - grpc_pollset *pollset); + grpc_pollset *pollset, gpr_mu *mu); #endif /* GRPC_TEST_CORE_IOMGR_ENDPOINT_TESTS_H */ diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index 07761b6576..99689ebcc3 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -49,9 +49,12 @@ #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> + +#include "src/core/iomgr/pollset_posix.h" #include "test/core/util/test_config.h" -static grpc_pollset g_pollset; +static gpr_mu *g_mu; +static grpc_pollset *g_pollset; /* buffer size used to send and receive data. 1024 is the minimal value to set TCP send and receive buffer. */ @@ -179,10 +182,10 @@ static void listen_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg /*server */, grpc_fd_orphan(exec_ctx, sv->em_fd, NULL, NULL, "b"); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); sv->done = 1; - grpc_pollset_kick(&g_pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_pollset_kick(g_pollset, NULL); + gpr_mu_unlock(g_mu); } /* Called when a new TCP connection request arrives in the listening port. */ @@ -209,7 +212,7 @@ static void listen_cb(grpc_exec_ctx *exec_ctx, void *arg, /*=sv_arg*/ se = gpr_malloc(sizeof(*se)); se->sv = sv; se->em_fd = grpc_fd_create(fd, "listener"); - grpc_pollset_add_fd(exec_ctx, &g_pollset, se->em_fd); + grpc_pollset_add_fd(exec_ctx, g_pollset, se->em_fd); se->session_read_closure.cb = session_read_cb; se->session_read_closure.cb_arg = se; grpc_fd_notify_on_read(exec_ctx, se->em_fd, &se->session_read_closure); @@ -238,7 +241,7 @@ static int server_start(grpc_exec_ctx *exec_ctx, server *sv) { GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0); sv->em_fd = grpc_fd_create(fd, "server"); - grpc_pollset_add_fd(exec_ctx, &g_pollset, sv->em_fd); + grpc_pollset_add_fd(exec_ctx, g_pollset, sv->em_fd); /* Register to be interested in reading from listen_fd. */ sv->listen_closure.cb = listen_cb; sv->listen_closure.cb_arg = sv; @@ -249,18 +252,18 @@ static int server_start(grpc_exec_ctx *exec_ctx, server *sv) { /* Wait and shutdown a sever. */ static void server_wait_and_shutdown(server *sv) { - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); while (!sv->done) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); } - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); } /* ===An upload client to test notify_on_write=== */ @@ -296,7 +299,7 @@ static void client_session_shutdown_cb(grpc_exec_ctx *exec_ctx, client *cl = arg; grpc_fd_orphan(exec_ctx, cl->em_fd, NULL, NULL, "c"); cl->done = 1; - grpc_pollset_kick(&g_pollset, NULL); + grpc_pollset_kick(g_pollset, NULL); } /* Write as much as possible, then register notify_on_write. */ @@ -307,9 +310,9 @@ static void client_session_write(grpc_exec_ctx *exec_ctx, void *arg, /*client */ ssize_t write_once = 0; if (!success) { - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); client_session_shutdown_cb(exec_ctx, arg, 1); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); return; } @@ -319,7 +322,7 @@ static void client_session_write(grpc_exec_ctx *exec_ctx, void *arg, /*client */ } while (write_once > 0); if (errno == EAGAIN) { - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) { cl->write_closure.cb = client_session_write; cl->write_closure.cb_arg = cl; @@ -328,7 +331,7 @@ static void client_session_write(grpc_exec_ctx *exec_ctx, void *arg, /*client */ } else { client_session_shutdown_cb(exec_ctx, arg, 1); } - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); } else { gpr_log(GPR_ERROR, "unknown errno %s", strerror(errno)); abort(); @@ -357,25 +360,25 @@ static void client_start(grpc_exec_ctx *exec_ctx, client *cl, int port) { } cl->em_fd = grpc_fd_create(fd, "client"); - grpc_pollset_add_fd(exec_ctx, &g_pollset, cl->em_fd); + grpc_pollset_add_fd(exec_ctx, g_pollset, cl->em_fd); client_session_write(exec_ctx, cl, 1); } /* Wait for the signal to shutdown a client. */ static void client_wait_and_shutdown(client *cl) { - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); while (!cl->done) { grpc_pollset_worker *worker = NULL; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); } - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); } /* Test grpc_fd. Start an upload server and client, upload a stream of @@ -410,20 +413,20 @@ static void first_read_callback(grpc_exec_ctx *exec_ctx, void *arg /* fd_change_data */, bool success) { fd_change_data *fdc = arg; - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); fdc->cb_that_ran = first_read_callback; - grpc_pollset_kick(&g_pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_pollset_kick(g_pollset, NULL); + gpr_mu_unlock(g_mu); } static void second_read_callback(grpc_exec_ctx *exec_ctx, void *arg /* fd_change_data */, bool success) { fd_change_data *fdc = arg; - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); fdc->cb_that_ran = second_read_callback; - grpc_pollset_kick(&g_pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_pollset_kick(g_pollset, NULL); + gpr_mu_unlock(g_mu); } /* Test that changing the callback we use for notify_on_read actually works. @@ -456,7 +459,7 @@ static void test_grpc_fd_change(void) { GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change"); - grpc_pollset_add_fd(&exec_ctx, &g_pollset, em_fd); + grpc_pollset_add_fd(&exec_ctx, g_pollset, em_fd); /* Register the first callback, then make its FD readable */ grpc_fd_notify_on_read(&exec_ctx, em_fd, &first_closure); @@ -465,18 +468,18 @@ static void test_grpc_fd_change(void) { GPR_ASSERT(result == 1); /* And now wait for it to run. */ - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); while (a.cb_that_ran == NULL) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); } GPR_ASSERT(a.cb_that_ran == first_read_callback); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); /* And drain the socket so we can generate a new read edge */ result = read(sv[0], &data, 1); @@ -489,19 +492,19 @@ static void test_grpc_fd_change(void) { result = write(sv[1], &data, 1); GPR_ASSERT(result == 1); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); while (b.cb_that_ran == NULL) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); } /* Except now we verify that second_read_callback ran instead */ GPR_ASSERT(b.cb_that_ran == second_read_callback); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_fd_orphan(&exec_ctx, em_fd, NULL, NULL, "d"); grpc_exec_ctx_finish(&exec_ctx); @@ -519,12 +522,14 @@ int main(int argc, char **argv) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_iomgr_init(); - grpc_pollset_init(&g_pollset); + g_pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(g_pollset, &g_mu); test_grpc_fd(); test_grpc_fd_change(); - grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); - grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); + grpc_closure_init(&destroyed, destroy_pollset, g_pollset); + grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); + gpr_free(g_pollset); grpc_iomgr_shutdown(); return 0; } diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index cdd5b096fb..1e6fa5d45a 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -40,6 +40,7 @@ #include <unistd.h> #include <grpc/grpc.h> +#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/time.h> @@ -48,8 +49,9 @@ #include "src/core/iomgr/timer.h" #include "test/core/util/test_config.h" -static grpc_pollset_set g_pollset_set; -static grpc_pollset g_pollset; +static grpc_pollset_set *g_pollset_set; +static gpr_mu *g_mu; +static grpc_pollset *g_pollset; static int g_connections_complete = 0; static grpc_endpoint *g_connecting = NULL; @@ -58,10 +60,10 @@ static gpr_timespec test_deadline(void) { } static void finish_connection() { - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); g_connections_complete++; - grpc_pollset_kick(&g_pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_pollset_kick(g_pollset, NULL); + gpr_mu_unlock(g_mu); } static void must_succeed(grpc_exec_ctx *exec_ctx, void *arg, bool success) { @@ -99,14 +101,14 @@ void test_succeeds(void) { GPR_ASSERT(0 == bind(svr_fd, (struct sockaddr *)&addr, addr_len)); GPR_ASSERT(0 == listen(svr_fd, 1)); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); connections_complete_before = g_connections_complete; - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); /* connect to it */ GPR_ASSERT(getsockname(svr_fd, (struct sockaddr *)&addr, &addr_len) == 0); grpc_closure_init(&done, must_succeed, NULL); - grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, &g_pollset_set, + grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, g_pollset_set, (struct sockaddr *)&addr, addr_len, gpr_inf_future(GPR_CLOCK_REALTIME)); @@ -118,19 +120,19 @@ void test_succeeds(void) { GPR_ASSERT(r >= 0); close(r); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); while (g_connections_complete == connections_complete_before) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); } - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); } @@ -147,17 +149,17 @@ void test_fails(void) { memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); connections_complete_before = g_connections_complete; - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); /* connect to a broken address */ grpc_closure_init(&done, must_fail, NULL); - grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, &g_pollset_set, + grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, g_pollset_set, (struct sockaddr *)&addr, addr_len, gpr_inf_future(GPR_CLOCK_REALTIME)); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); /* wait for the connection callback to finish */ while (g_connections_complete == connections_complete_before) { @@ -165,14 +167,14 @@ void test_fails(void) { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec polling_deadline = test_deadline(); if (!grpc_timer_check(&exec_ctx, now, &polling_deadline)) { - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, now, polling_deadline); + grpc_pollset_work(&exec_ctx, g_pollset, &worker, now, polling_deadline); } - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); } - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); } @@ -217,16 +219,16 @@ void test_times_out(void) { connect_deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); connections_complete_before = g_connections_complete; - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_closure_init(&done, must_fail, NULL); - grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, &g_pollset_set, + grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, g_pollset_set, (struct sockaddr *)&addr, addr_len, connect_deadline); /* Make sure the event doesn't trigger early */ - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); for (;;) { grpc_pollset_worker *worker = NULL; gpr_timespec now = gpr_now(connect_deadline.clock_type); @@ -252,13 +254,13 @@ void test_times_out(void) { } gpr_timespec polling_deadline = GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10); if (!grpc_timer_check(&exec_ctx, now, &polling_deadline)) { - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, now, polling_deadline); + grpc_pollset_work(&exec_ctx, g_pollset, &worker, now, polling_deadline); } - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); } - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); @@ -277,18 +279,20 @@ int main(int argc, char **argv) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); - grpc_pollset_set_init(&g_pollset_set); - grpc_pollset_init(&g_pollset); - grpc_pollset_set_add_pollset(&exec_ctx, &g_pollset_set, &g_pollset); + g_pollset_set = grpc_pollset_set_create(); + g_pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(g_pollset, &g_mu); + grpc_pollset_set_add_pollset(&exec_ctx, g_pollset_set, g_pollset); grpc_exec_ctx_finish(&exec_ctx); test_succeeds(); gpr_log(GPR_ERROR, "End of first test"); test_fails(); test_times_out(); - grpc_pollset_set_destroy(&g_pollset_set); - grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); - grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); + grpc_pollset_set_destroy(g_pollset_set); + grpc_closure_init(&destroyed, destroy_pollset, g_pollset); + grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); + gpr_free(g_pollset); return 0; } diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 20b0b9e13b..4351642ab6 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -36,8 +36,8 @@ #include <errno.h> #include <fcntl.h> #include <string.h> -#include <sys/types.h> #include <sys/socket.h> +#include <sys/types.h> #include <unistd.h> #include <grpc/grpc.h> @@ -45,10 +45,11 @@ #include <grpc/support/log.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> -#include "test/core/util/test_config.h" #include "test/core/iomgr/endpoint_tests.h" +#include "test/core/util/test_config.h" -static grpc_pollset g_pollset; +static gpr_mu *g_mu; +static grpc_pollset *g_pollset; /* General test notes: @@ -145,7 +146,7 @@ static void read_cb(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { GPR_ASSERT(success); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); current_data = state->read_bytes % 256; read_bytes = count_slices(state->incoming.slices, state->incoming.count, ¤t_data); @@ -153,10 +154,10 @@ static void read_cb(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { gpr_log(GPR_INFO, "Read %d bytes of %d", read_bytes, state->target_read_bytes); if (state->read_bytes >= state->target_read_bytes) { - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); } else { grpc_endpoint_read(exec_ctx, state->ep, &state->incoming, &state->read_cb); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); } } @@ -175,7 +176,7 @@ static void read_test(size_t num_bytes, size_t slice_size) { create_sockets(sv); ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size, "test"); - grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); written_bytes = fill_socket_partial(sv[0], num_bytes); gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); @@ -188,17 +189,17 @@ static void read_test(size_t num_bytes, size_t slice_size) { grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); gpr_slice_buffer_destroy(&state.incoming); grpc_endpoint_destroy(&exec_ctx, ep); @@ -221,7 +222,7 @@ static void large_read_test(size_t slice_size) { ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), slice_size, "test"); - grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); written_bytes = fill_socket(sv[0]); gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); @@ -234,17 +235,17 @@ static void large_read_test(size_t slice_size) { grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); gpr_slice_buffer_destroy(&state.incoming); grpc_endpoint_destroy(&exec_ctx, ep); @@ -283,11 +284,11 @@ static void write_done(grpc_exec_ctx *exec_ctx, void *user_data /* write_socket_state */, bool success) { struct write_socket_state *state = (struct write_socket_state *)user_data; gpr_log(GPR_INFO, "Write done callback called"); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); gpr_log(GPR_INFO, "Signalling write done"); state->write_done = 1; - grpc_pollset_kick(&g_pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_pollset_kick(g_pollset, NULL); + gpr_mu_unlock(g_mu); } void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { @@ -304,11 +305,11 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { for (;;) { grpc_pollset_worker *worker = NULL; - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + gpr_mu_lock(g_mu); + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10)); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); do { bytes_read = @@ -350,7 +351,7 @@ static void write_test(size_t num_bytes, size_t slice_size) { ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"), GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test"); - grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); state.ep = ep; state.write_done = 0; @@ -363,19 +364,19 @@ static void write_test(size_t num_bytes, size_t slice_size) { grpc_endpoint_write(&exec_ctx, ep, &outgoing, &write_done_closure); drain_socket_blocking(sv[0], num_bytes, num_bytes); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); for (;;) { grpc_pollset_worker *worker = NULL; if (state.write_done) { break; } - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); } - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); gpr_slice_buffer_destroy(&outgoing); grpc_endpoint_destroy(&exec_ctx, ep); @@ -386,7 +387,7 @@ static void write_test(size_t num_bytes, size_t slice_size) { void on_fd_released(grpc_exec_ctx *exec_ctx, void *arg, bool success) { int *done = arg; *done = 1; - grpc_pollset_kick(&g_pollset, NULL); + grpc_pollset_kick(g_pollset, NULL); } /* Do a read_test, then release fd and try to read/write again. Verify that @@ -410,7 +411,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size, "test"); GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0); - grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); written_bytes = fill_socket_partial(sv[0], num_bytes); gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); @@ -423,27 +424,27 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); gpr_slice_buffer_destroy(&state.incoming); grpc_tcp_destroy_and_release_fd(&exec_ctx, ep, &fd, &fd_released_cb); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); while (!fd_released_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); } - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); GPR_ASSERT(fd_released_done == 1); GPR_ASSERT(fd == sv[1]); grpc_exec_ctx_finish(&exec_ctx); @@ -491,8 +492,8 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair( slice_size, "test"); f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), slice_size, "test"); - grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, &g_pollset); - grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, g_pollset); grpc_exec_ctx_finish(&exec_ctx); @@ -512,13 +513,15 @@ int main(int argc, char **argv) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); - grpc_pollset_init(&g_pollset); + g_pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(g_pollset, &g_mu); run_tests(); - grpc_endpoint_tests(configs[0], &g_pollset); - grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); - grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); + grpc_endpoint_tests(configs[0], g_pollset, g_mu); + grpc_closure_init(&destroyed, destroy_pollset, g_pollset); + grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); + gpr_free(g_pollset); return 0; } diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index 43180b1698..7933468355 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -32,24 +32,28 @@ */ #include "src/core/iomgr/tcp_server.h" -#include "src/core/iomgr/iomgr.h" -#include "src/core/iomgr/sockaddr_utils.h" + +#include <errno.h> +#include <netinet/in.h> +#include <string.h> +#include <sys/socket.h> +#include <unistd.h> + #include <grpc/grpc.h> +#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> + +#include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/sockaddr_utils.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" -#include <errno.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <string.h> -#include <unistd.h> - #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", #x) -static grpc_pollset g_pollset; +static gpr_mu *g_mu; +static grpc_pollset *g_pollset; static int g_nconnects = 0; typedef struct on_connect_result { @@ -113,11 +117,11 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, grpc_endpoint_shutdown(exec_ctx, tcp); grpc_endpoint_destroy(exec_ctx, tcp); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); on_connect_result_set(&g_result, acceptor); g_nconnects++; - grpc_pollset_kick(&g_pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_pollset_kick(g_pollset, NULL); + gpr_mu_unlock(g_mu); } static void test_no_op(void) { @@ -174,7 +178,7 @@ static void tcp_connect(grpc_exec_ctx *exec_ctx, const struct sockaddr *remote, int clifd = socket(remote->sa_family, SOCK_STREAM, 0); int nconnects_before; - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); nconnects_before = g_nconnects; on_connect_result_init(&g_result); GPR_ASSERT(clifd >= 0); @@ -184,18 +188,18 @@ static void tcp_connect(grpc_exec_ctx *exec_ctx, const struct sockaddr *remote, while (g_nconnects == nconnects_before && gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(exec_ctx, &g_pollset, &worker, + grpc_pollset_work(exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); } gpr_log(GPR_DEBUG, "wait done"); GPR_ASSERT(g_nconnects == nconnects_before + 1); close(clifd); *result = g_result; - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_unlock(g_mu); } /* Tests a tcp server with multiple ports. TODO(daniel-j-born): Multiple fds for @@ -210,7 +214,6 @@ static void test_connect(unsigned n) { unsigned svr1_fd_count; int svr1_port; grpc_tcp_server *s = grpc_tcp_server_create(NULL); - grpc_pollset *pollsets[1]; unsigned i; server_weak_ref weak_ref; server_weak_ref_init(&weak_ref); @@ -259,8 +262,7 @@ static void test_connect(unsigned n) { } } - pollsets[0] = &g_pollset; - grpc_tcp_server_start(&exec_ctx, s, pollsets, 1, on_connect, NULL); + grpc_tcp_server_start(&exec_ctx, s, &g_pollset, 1, on_connect, NULL); for (i = 0; i < n; i++) { on_connect_result result; @@ -312,7 +314,8 @@ int main(int argc, char **argv) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); - grpc_pollset_init(&g_pollset); + g_pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(g_pollset, &g_mu); test_no_op(); test_no_op_with_start(); @@ -321,9 +324,10 @@ int main(int argc, char **argv) { test_connect(1); test_connect(10); - grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); - grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); + grpc_closure_init(&destroyed, destroy_pollset, g_pollset); + grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); + gpr_free(g_pollset); return 0; } diff --git a/test/core/iomgr/workqueue_test.c b/test/core/iomgr/workqueue_test.c index a024915256..8a1faf6303 100644 --- a/test/core/iomgr/workqueue_test.c +++ b/test/core/iomgr/workqueue_test.c @@ -34,18 +34,20 @@ #include "src/core/iomgr/workqueue.h" #include <grpc/grpc.h> +#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include "test/core/util/test_config.h" -static grpc_pollset g_pollset; +static gpr_mu *g_mu; +static grpc_pollset *g_pollset; static void must_succeed(grpc_exec_ctx *exec_ctx, void *p, bool success) { GPR_ASSERT(success == 1); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); *(int *)p = 1; - grpc_pollset_kick(&g_pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_pollset_kick(g_pollset, NULL); + gpr_mu_unlock(g_mu); } static void test_ref_unref(void) { @@ -67,13 +69,13 @@ static void test_add_closure(void) { grpc_closure_init(&c, must_succeed, &done); grpc_workqueue_push(wq, &c, 1); - grpc_workqueue_add_to_pollset(&exec_ctx, wq, &g_pollset); + grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); GPR_ASSERT(!done); - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, - gpr_now(deadline.clock_type), deadline); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(deadline.clock_type), + deadline); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(done); @@ -92,13 +94,13 @@ static void test_flush(void) { grpc_exec_ctx_enqueue(&exec_ctx, &c, true, NULL); grpc_workqueue_flush(&exec_ctx, wq); - grpc_workqueue_add_to_pollset(&exec_ctx, wq, &g_pollset); + grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + gpr_mu_lock(g_mu); GPR_ASSERT(!done); - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, - gpr_now(deadline.clock_type), deadline); - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(deadline.clock_type), + deadline); + gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(done); @@ -115,15 +117,18 @@ int main(int argc, char **argv) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); - grpc_pollset_init(&g_pollset); + g_pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(g_pollset, &g_mu); test_ref_unref(); test_add_closure(); test_flush(); - grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); - grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); + grpc_closure_init(&destroyed, destroy_pollset, g_pollset); + grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); + + gpr_free(g_pollset); return 0; } diff --git a/test/core/security/oauth2_utils.c b/test/core/security/oauth2_utils.c index 4dd595df95..9b70afffe1 100644 --- a/test/core/security/oauth2_utils.c +++ b/test/core/security/oauth2_utils.c @@ -45,7 +45,8 @@ #include "src/core/security/credentials.h" typedef struct { - grpc_pollset pollset; + gpr_mu *mu; + grpc_pollset *pollset; int is_done; char *token; } oauth2_request; @@ -66,11 +67,11 @@ static void on_oauth2_response(grpc_exec_ctx *exec_ctx, void *user_data, GPR_SLICE_LENGTH(token_slice)); token[GPR_SLICE_LENGTH(token_slice)] = '\0'; } - gpr_mu_lock(GRPC_POLLSET_MU(&request->pollset)); + gpr_mu_lock(request->mu); request->is_done = 1; request->token = token; - grpc_pollset_kick(&request->pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(&request->pollset)); + grpc_pollset_kick(request->pollset, NULL); + gpr_mu_unlock(request->mu); } static void do_nothing(grpc_exec_ctx *exec_ctx, void *unused, bool success) {} @@ -82,28 +83,30 @@ char *grpc_test_fetch_oauth2_token_with_credentials( grpc_closure do_nothing_closure; grpc_auth_metadata_context null_ctx = {"", "", NULL, NULL}; - grpc_pollset_init(&request.pollset); + request.pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(request.pollset, &request.mu); request.is_done = 0; grpc_closure_init(&do_nothing_closure, do_nothing, NULL); - grpc_call_credentials_get_request_metadata(&exec_ctx, creds, &request.pollset, + grpc_call_credentials_get_request_metadata(&exec_ctx, creds, request.pollset, null_ctx, on_oauth2_response, &request); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&request.pollset)); + gpr_mu_lock(request.mu); while (!request.is_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, &request.pollset, &worker, + grpc_pollset_work(&exec_ctx, request.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); } - gpr_mu_unlock(GRPC_POLLSET_MU(&request.pollset)); + gpr_mu_unlock(request.mu); - grpc_pollset_shutdown(&exec_ctx, &request.pollset, &do_nothing_closure); + grpc_pollset_shutdown(&exec_ctx, request.pollset, &do_nothing_closure); grpc_exec_ctx_finish(&exec_ctx); - grpc_pollset_destroy(&request.pollset); + grpc_pollset_destroy(request.pollset); + gpr_free(request.pollset); return request.token; } diff --git a/test/core/security/print_google_default_creds_token.c b/test/core/security/print_google_default_creds_token.c index 6043bf5420..09673f362c 100644 --- a/test/core/security/print_google_default_creds_token.c +++ b/test/core/security/print_google_default_creds_token.c @@ -34,8 +34,6 @@ #include <stdio.h> #include <string.h> -#include "src/core/security/credentials.h" -#include "src/core/support/string.h" #include <grpc/grpc.h> #include <grpc/grpc_security.h> #include <grpc/support/alloc.h> @@ -44,8 +42,12 @@ #include <grpc/support/slice.h> #include <grpc/support/sync.h> +#include "src/core/security/credentials.h" +#include "src/core/support/string.h" + typedef struct { - grpc_pollset pollset; + gpr_mu *mu; + grpc_pollset *pollset; int is_done; } synchronizer; @@ -62,10 +64,10 @@ static void on_metadata_response(grpc_exec_ctx *exec_ctx, void *user_data, printf("\nGot token: %s\n\n", token); gpr_free(token); } - gpr_mu_lock(GRPC_POLLSET_MU(&sync->pollset)); + gpr_mu_lock(sync->mu); sync->is_done = 1; - grpc_pollset_kick(&sync->pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(&sync->pollset)); + grpc_pollset_kick(sync->pollset, NULL); + gpr_mu_unlock(sync->mu); } int main(int argc, char **argv) { @@ -91,26 +93,30 @@ int main(int argc, char **argv) { goto end; } - grpc_pollset_init(&sync.pollset); + sync.pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(sync.pollset, &sync.mu); sync.is_done = 0; grpc_call_credentials_get_request_metadata( &exec_ctx, ((grpc_composite_channel_credentials *)creds)->call_creds, - &sync.pollset, context, on_metadata_response, &sync); + sync.pollset, context, on_metadata_response, &sync); - gpr_mu_lock(GRPC_POLLSET_MU(&sync.pollset)); + gpr_mu_lock(sync.mu); while (!sync.is_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, &sync.pollset, &worker, + grpc_pollset_work(&exec_ctx, sync.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); - gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset)); - grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&sync.pollset)); + gpr_mu_unlock(sync.mu); + grpc_exec_ctx_flush(&exec_ctx); + gpr_mu_lock(sync.mu); } - gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset)); + gpr_mu_unlock(sync.mu); + + grpc_exec_ctx_finish(&exec_ctx); grpc_channel_credentials_release(creds); + gpr_free(sync.pollset); end: gpr_cmdline_destroy(cl); diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c index fb4bd30e2d..0e8c38a53e 100644 --- a/test/core/security/secure_endpoint_test.c +++ b/test/core/security/secure_endpoint_test.c @@ -36,16 +36,17 @@ #include <fcntl.h> #include <sys/types.h> -#include "src/core/security/secure_endpoint.h" -#include "src/core/iomgr/endpoint_pair.h" -#include "src/core/iomgr/iomgr.h" #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include "test/core/util/test_config.h" +#include "src/core/iomgr/endpoint_pair.h" +#include "src/core/iomgr/iomgr.h" +#include "src/core/security/secure_endpoint.h" #include "src/core/tsi/fake_transport_security.h" +#include "test/core/util/test_config.h" -static grpc_pollset g_pollset; +static gpr_mu *g_mu; +static grpc_pollset *g_pollset; static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair( size_t slice_size, gpr_slice *leftover_slices, size_t leftover_nslices) { @@ -56,8 +57,8 @@ static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair( grpc_endpoint_pair tcp; tcp = grpc_iomgr_create_endpoint_pair("fixture", slice_size); - grpc_endpoint_add_to_pollset(&exec_ctx, tcp.client, &g_pollset); - grpc_endpoint_add_to_pollset(&exec_ctx, tcp.server, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, tcp.client, g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, tcp.server, g_pollset); if (leftover_nslices == 0) { f.client_ep = @@ -181,13 +182,16 @@ int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); - grpc_pollset_init(&g_pollset); - grpc_endpoint_tests(configs[0], &g_pollset); + g_pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(g_pollset, &g_mu); + grpc_endpoint_tests(configs[0], g_pollset, g_mu); test_leftover(configs[1], 1); - grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); - grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); + grpc_closure_init(&destroyed, destroy_pollset, g_pollset); + grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); + gpr_free(g_pollset); + return 0; } diff --git a/test/core/security/verify_jwt.c b/test/core/security/verify_jwt.c index 5070cf0492..eb86589681 100644 --- a/test/core/security/verify_jwt.c +++ b/test/core/security/verify_jwt.c @@ -34,7 +34,6 @@ #include <stdio.h> #include <string.h> -#include "src/core/security/jwt_verifier.h" #include <grpc/grpc.h> #include <grpc/grpc_security.h> #include <grpc/support/alloc.h> @@ -43,8 +42,11 @@ #include <grpc/support/slice.h> #include <grpc/support/sync.h> +#include "src/core/security/jwt_verifier.h" + typedef struct { - grpc_pollset pollset; + grpc_pollset *pollset; + gpr_mu *mu; int is_done; int success; } synchronizer; @@ -77,10 +79,10 @@ static void on_jwt_verification_done(void *user_data, grpc_jwt_verifier_status_to_string(status)); } - gpr_mu_lock(GRPC_POLLSET_MU(&sync->pollset)); + gpr_mu_lock(sync->mu); sync->is_done = 1; - grpc_pollset_kick(&sync->pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(&sync->pollset)); + grpc_pollset_kick(sync->pollset, NULL); + gpr_mu_unlock(sync->mu); } int main(int argc, char **argv) { @@ -103,23 +105,26 @@ int main(int argc, char **argv) { grpc_init(); - grpc_pollset_init(&sync.pollset); + sync.pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(sync.pollset, &sync.mu); sync.is_done = 0; - grpc_jwt_verifier_verify(&exec_ctx, verifier, &sync.pollset, jwt, aud, + grpc_jwt_verifier_verify(&exec_ctx, verifier, sync.pollset, jwt, aud, on_jwt_verification_done, &sync); - gpr_mu_lock(GRPC_POLLSET_MU(&sync.pollset)); + gpr_mu_lock(sync.mu); while (!sync.is_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, &sync.pollset, &worker, + grpc_pollset_work(&exec_ctx, sync.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); - gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset)); + gpr_mu_unlock(sync.mu); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&sync.pollset)); + gpr_mu_lock(sync.mu); } - gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset)); + gpr_mu_unlock(sync.mu); + + gpr_free(sync.pollset); grpc_jwt_verifier_destroy(verifier); gpr_cmdline_destroy(cl); diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c index ad21fe0f53..ba382d242a 100644 --- a/test/core/util/port_posix.c +++ b/test/core/util/port_posix.c @@ -69,7 +69,8 @@ static int has_port_been_chosen(int port) { } typedef struct freereq { - grpc_pollset pollset; + gpr_mu *mu; + grpc_pollset *pollset; int done; } freereq; @@ -82,10 +83,10 @@ static void destroy_pollset_and_shutdown(grpc_exec_ctx *exec_ctx, void *p, static void freed_port_from_server(grpc_exec_ctx *exec_ctx, void *arg, const grpc_httpcli_response *response) { freereq *pr = arg; - gpr_mu_lock(GRPC_POLLSET_MU(&pr->pollset)); + gpr_mu_lock(pr->mu); pr->done = 1; - grpc_pollset_kick(&pr->pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(&pr->pollset)); + grpc_pollset_kick(pr->pollset, NULL); + gpr_mu_unlock(pr->mu); } static void free_port_using_server(char *server, int port) { @@ -100,31 +101,34 @@ static void free_port_using_server(char *server, int port) { memset(&pr, 0, sizeof(pr)); memset(&req, 0, sizeof(req)); - grpc_pollset_init(&pr.pollset); + + pr.pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(pr.pollset, &pr.mu); grpc_closure_init(&shutdown_closure, destroy_pollset_and_shutdown, - &pr.pollset); + pr.pollset); req.host = server; gpr_asprintf(&path, "/drop/%d", port); req.path = path; grpc_httpcli_context_init(&context); - grpc_httpcli_get(&exec_ctx, &context, &pr.pollset, &req, + grpc_httpcli_get(&exec_ctx, &context, pr.pollset, &req, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), freed_port_from_server, &pr); - gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset)); + gpr_mu_lock(pr.mu); while (!pr.done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, &pr.pollset, &worker, + grpc_pollset_work(&exec_ctx, pr.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); } - gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset)); + gpr_mu_unlock(pr.mu); grpc_httpcli_context_destroy(&context); grpc_exec_ctx_finish(&exec_ctx); - grpc_pollset_shutdown(&exec_ctx, &pr.pollset, &shutdown_closure); + grpc_pollset_shutdown(&exec_ctx, pr.pollset, &shutdown_closure); grpc_exec_ctx_finish(&exec_ctx); + gpr_free(pr.pollset); gpr_free(path); } @@ -202,7 +206,8 @@ static int is_port_available(int *port, int is_tcp) { } typedef struct portreq { - grpc_pollset pollset; + gpr_mu *mu; + grpc_pollset *pollset; int port; int retries; char *server; @@ -234,7 +239,7 @@ static void got_port_from_server(grpc_exec_ctx *exec_ctx, void *arg, pr->retries++; req.host = pr->server; req.path = "/get"; - grpc_httpcli_get(exec_ctx, pr->ctx, &pr->pollset, &req, + grpc_httpcli_get(exec_ctx, pr->ctx, pr->pollset, &req, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), got_port_from_server, pr); return; @@ -246,10 +251,10 @@ static void got_port_from_server(grpc_exec_ctx *exec_ctx, void *arg, port = port * 10 + response->body[i] - '0'; } GPR_ASSERT(port > 1024); - gpr_mu_lock(GRPC_POLLSET_MU(&pr->pollset)); + gpr_mu_lock(pr->mu); pr->port = port; - grpc_pollset_kick(&pr->pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(&pr->pollset)); + grpc_pollset_kick(pr->pollset, NULL); + gpr_mu_unlock(pr->mu); } static int pick_port_using_server(char *server) { @@ -263,9 +268,10 @@ static int pick_port_using_server(char *server) { memset(&pr, 0, sizeof(pr)); memset(&req, 0, sizeof(req)); - grpc_pollset_init(&pr.pollset); + pr.pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(pr.pollset, &pr.mu); grpc_closure_init(&shutdown_closure, destroy_pollset_and_shutdown, - &pr.pollset); + pr.pollset); pr.port = -1; pr.server = server; pr.ctx = &context; @@ -274,22 +280,23 @@ static int pick_port_using_server(char *server) { req.path = "/get"; grpc_httpcli_context_init(&context); - grpc_httpcli_get(&exec_ctx, &context, &pr.pollset, &req, + grpc_httpcli_get(&exec_ctx, &context, pr.pollset, &req, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), got_port_from_server, &pr); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset)); + gpr_mu_lock(pr.mu); while (pr.port == -1) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, &pr.pollset, &worker, + grpc_pollset_work(&exec_ctx, pr.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); } - gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset)); + gpr_mu_unlock(pr.mu); grpc_httpcli_context_destroy(&context); - grpc_pollset_shutdown(&exec_ctx, &pr.pollset, &shutdown_closure); + grpc_pollset_shutdown(&exec_ctx, pr.pollset, &shutdown_closure); grpc_exec_ctx_finish(&exec_ctx); + gpr_free(pr.pollset); return pr.port; } diff --git a/test/core/util/port_windows.c b/test/core/util/port_windows.c index b5bd0168ad..3b20aeb718 100644 --- a/test/core/util/port_windows.c +++ b/test/core/util/port_windows.c @@ -129,7 +129,8 @@ static int is_port_available(int *port, int is_tcp) { } typedef struct portreq { - grpc_pollset pollset; + grpc_pollset *pollset; + gpr_mu *mu; int port; } portreq; @@ -145,10 +146,10 @@ static void got_port_from_server(grpc_exec_ctx *exec_ctx, void *arg, port = port * 10 + response->body[i] - '0'; } GPR_ASSERT(port > 1024); - gpr_mu_lock(GRPC_POLLSET_MU(&pr->pollset)); + gpr_mu_lock(pr->mu); pr->port = port; - grpc_pollset_kick(&pr->pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(&pr->pollset)); + grpc_pollset_kick(pr->pollset, NULL); + gpr_mu_unlock(pr->mu); } static void destroy_pollset_and_shutdown(grpc_exec_ctx *exec_ctx, void *p, @@ -168,32 +169,34 @@ static int pick_port_using_server(char *server) { memset(&pr, 0, sizeof(pr)); memset(&req, 0, sizeof(req)); - grpc_pollset_init(&pr.pollset); + pr.pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(pr.pollset, &pr.mu); pr.port = -1; req.host = server; req.path = "/get"; grpc_httpcli_context_init(&context); - grpc_httpcli_get(&exec_ctx, &context, &pr.pollset, &req, + grpc_httpcli_get(&exec_ctx, &context, pr.pollset, &req, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), got_port_from_server, &pr); - gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset)); + gpr_mu_lock(pr.mu); while (pr.port == -1) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, &pr.pollset, &worker, + grpc_pollset_work(&exec_ctx, pr.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); - gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset)); + gpr_mu_unlock(pr.mu); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset)); + gpr_mu_lock(pr.mu); } - gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset)); + gpr_mu_unlock(pr.mu); grpc_httpcli_context_destroy(&context); grpc_closure_init(&destroy_pollset_closure, destroy_pollset_and_shutdown, &pr.pollset); - grpc_pollset_shutdown(&exec_ctx, &pr.pollset, &destroy_pollset_closure); + grpc_pollset_shutdown(&exec_ctx, pr.pollset, &destroy_pollset_closure); + gpr_free(pr.pollset); grpc_exec_ctx_finish(&exec_ctx); return pr.port; diff --git a/test/core/util/test_tcp_server.c b/test/core/util/test_tcp_server.c index e99d5dcffd..ab379441d8 100644 --- a/test/core/util/test_tcp_server.c +++ b/test/core/util/test_tcp_server.c @@ -57,8 +57,8 @@ void test_tcp_server_init(test_tcp_server *server, server->tcp_server = NULL; grpc_closure_init(&server->shutdown_complete, on_server_destroyed, server); server->shutdown = 0; - grpc_pollset_init(&server->pollset); - server->pollsets[0] = &server->pollset; + server->pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(server->pollset, &server->mu); server->on_connect = on_connect; server->cb_data = user_data; } @@ -77,7 +77,7 @@ void test_tcp_server_start(test_tcp_server *server, int port) { grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr)); GPR_ASSERT(port_added == port); - grpc_tcp_server_start(&exec_ctx, server->tcp_server, server->pollsets, 1, + grpc_tcp_server_start(&exec_ctx, server->tcp_server, &server->pollset, 1, server->on_connect, server->cb_data); gpr_log(GPR_INFO, "test tcp server listening on 0.0.0.0:%d", port); @@ -90,10 +90,10 @@ void test_tcp_server_poll(test_tcp_server *server, int seconds) { gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(seconds, GPR_TIMESPAN)); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - gpr_mu_lock(GRPC_POLLSET_MU(&server->pollset)); - grpc_pollset_work(&exec_ctx, &server->pollset, &worker, + gpr_mu_lock(server->mu); + grpc_pollset_work(&exec_ctx, server->pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); - gpr_mu_unlock(GRPC_POLLSET_MU(&server->pollset)); + gpr_mu_unlock(server->mu); grpc_exec_ctx_finish(&exec_ctx); } @@ -111,8 +111,9 @@ void test_tcp_server_destroy(test_tcp_server *server) { gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), shutdown_deadline) < 0) { test_tcp_server_poll(server, 1); } - grpc_pollset_shutdown(&exec_ctx, &server->pollset, &do_nothing_cb); + grpc_pollset_shutdown(&exec_ctx, server->pollset, &do_nothing_cb); grpc_exec_ctx_finish(&exec_ctx); - grpc_pollset_destroy(&server->pollset); + grpc_pollset_destroy(server->pollset); + gpr_free(server->pollset); grpc_shutdown(); } diff --git a/test/core/util/test_tcp_server.h b/test/core/util/test_tcp_server.h index 51119cf6c8..15fcb4fb87 100644 --- a/test/core/util/test_tcp_server.h +++ b/test/core/util/test_tcp_server.h @@ -41,8 +41,8 @@ typedef struct test_tcp_server { grpc_tcp_server *tcp_server; grpc_closure shutdown_complete; int shutdown; - grpc_pollset pollset; - grpc_pollset *pollsets[1]; + gpr_mu *mu; + grpc_pollset *pollset; grpc_tcp_server_cb on_connect; void *cb_data; } test_tcp_server; |