diff options
author | 2015-09-15 07:41:28 -0700 | |
---|---|---|
committer | 2015-09-15 07:41:28 -0700 | |
commit | 06a43f5d7e7ec90dfe133c5dfa1bb2c3acb85059 (patch) | |
tree | 3280f1a990f3bf9f8d86fe899fc70b66641482fc /src/core/client_config/lb_policies/pick_first.c | |
parent | 1701b093339fc124bd9c7f08eb7d8511799281ec (diff) |
Progress towards workqueue transition
Diffstat (limited to 'src/core/client_config/lb_policies/pick_first.c')
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 11 |
1 files changed, 8 insertions, 3 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index c8262e92ef..151b6f12f8 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -52,6 +52,8 @@ typedef struct { /** all our subchannels */ grpc_subchannel **subchannels; size_t num_subchannels; + /** workqueue for async work */ + grpc_workqueue *workqueue; grpc_iomgr_closure connectivity_changed; @@ -102,6 +104,7 @@ void pf_destroy(grpc_lb_policy *pol) { grpc_connectivity_state_destroy(&p->state_tracker); gpr_free(p->subchannels); gpr_mu_destroy(&p->mu); + grpc_workqueue_unref(p->workqueue); gpr_free(p); } @@ -114,7 +117,7 @@ void pf_shutdown(grpc_lb_policy *pol) { while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_iomgr_add_delayed_callback(pp->on_complete, 0); + grpc_workqueue_push(p->workqueue, pp->on_complete, 0); gpr_free(pp); } grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, @@ -196,7 +199,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { p->pending_picks = pp->next; *pp->target = p->selected; grpc_subchannel_del_interested_party(p->selected, pp->pollset); - grpc_iomgr_add_delayed_callback(pp->on_complete, 1); + grpc_workqueue_push(p->workqueue, pp->on_complete, 1); gpr_free(pp); } grpc_subchannel_notify_on_state_change( @@ -241,7 +244,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_iomgr_add_delayed_callback(pp->on_complete, 1); + grpc_workqueue_push(p->workqueue, pp->on_complete, 1); gpr_free(pp); } unref = 1; @@ -327,6 +330,8 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); p->num_subchannels = args->num_subchannels; + p->workqueue = args->workqueue; + grpc_workqueue_ref(p->workqueue); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "pick_first"); memcpy(p->subchannels, args->subchannels, |