diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/channel/client_channel.c | 2 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 47 | ||||
-rw-r--r-- | src/core/iomgr/closure.c | 24 | ||||
-rw-r--r-- | src/core/iomgr/closure.h | 3 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_windows.c | 2 |
5 files changed, 77 insertions, 1 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index b59b62a6aa..8e7cb27cfd 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -51,7 +51,7 @@ typedef struct call_data call_data; -typedef struct { +typedef struct client_channel_channel_data { /** metadata context for this channel */ grpc_mdctx *mdctx; /** resolver for this channel */ diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 28155d0fbc..4b3aaab09c 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -101,6 +101,9 @@ void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { for (i = 0; i < p->num_subchannels; i++) { GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first"); } + if (p->selected) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first"); + } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); gpr_free(p->subchannels); gpr_mu_destroy(&p->mu); @@ -172,6 +175,35 @@ void pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } } +static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { + pick_first_lb_policy *p = arg; + size_t i; + grpc_transport_op op; + size_t num_subchannels = p->num_subchannels; + grpc_subchannel **subchannels; + grpc_subchannel *exclude_subchannel; + + gpr_mu_lock(&p->mu); + subchannels = p->subchannels; + p->num_subchannels = 0; + p->subchannels = NULL; + exclude_subchannel = p->selected; + gpr_mu_unlock(&p->mu); + GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels"); + + for (i = 0; i < num_subchannels; i++) { + if (subchannels[i] != exclude_subchannel) { + memset(&op, 0, sizeof(op)); + op.disconnect = 1; + grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op); + } + GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first"); + } + + gpr_free(subchannels); +} + static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { pick_first_lb_policy *p = arg; @@ -200,6 +232,11 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, "connecting_ready"); p->selected = p->subchannels[p->checking_subchannel]; + GRPC_SUBCHANNEL_REF(p->selected, "picked_first"); + /* drop the pick list: we are connected now */ + GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels"); + grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(destroy_subchannels, p), 1); + /* update any calls that were waiting for a pick */ while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = p->selected; @@ -279,10 +316,15 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, size_t i; size_t n; grpc_subchannel **subchannels; + grpc_subchannel *selected; gpr_mu_lock(&p->mu); n = p->num_subchannels; subchannels = gpr_malloc(n * sizeof(*subchannels)); + selected = p->selected; + if (selected) { + GRPC_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected"); + } for (i = 0; i < n; i++) { subchannels[i] = p->subchannels[i]; GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast"); @@ -290,9 +332,14 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, gpr_mu_unlock(&p->mu); for (i = 0; i < n; i++) { + if (selected == subchannels[i]) continue; grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op); GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast"); } + if (p->selected) { + grpc_subchannel_process_transport_op(exec_ctx, selected, op); + GRPC_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected"); + } gpr_free(subchannels); } diff --git a/src/core/iomgr/closure.c b/src/core/iomgr/closure.c index 3265425789..d91681990f 100644 --- a/src/core/iomgr/closure.c +++ b/src/core/iomgr/closure.c @@ -33,6 +33,8 @@ #include "src/core/iomgr/closure.h" +#include <grpc/support/alloc.h> + void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, void *cb_arg) { closure->cb = cb; @@ -69,3 +71,25 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst) { } src->head = src->tail = NULL; } + +typedef struct { + grpc_iomgr_cb_func cb; + void *cb_arg; + grpc_closure wrapper; +} wrapped_closure; + +static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg, int success) { + wrapped_closure *wc = arg; + grpc_iomgr_cb_func cb = wc->cb; + void *cb_arg = wc->cb_arg; + gpr_free(wc); + cb(exec_ctx, cb_arg, success); +} + +grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) { + wrapped_closure *wc = gpr_malloc(sizeof(*wc)); + wc->cb = cb; + wc->cb_arg = cb_arg; + grpc_closure_init(&wc->wrapper, closure_wrapper, wc); + return &wc->wrapper; +} diff --git a/src/core/iomgr/closure.h b/src/core/iomgr/closure.h index 982ffa4e1b..d812659af0 100644 --- a/src/core/iomgr/closure.h +++ b/src/core/iomgr/closure.h @@ -77,6 +77,9 @@ struct grpc_closure { void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, void *cb_arg); +/* Create a heap allocated closure: try to avoid except for very rare events */ +grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg); + #define GRPC_CLOSURE_LIST_INIT \ { NULL, NULL } diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index db3319b3c6..3fea8b5b35 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -336,6 +336,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) { peer_name_string); gpr_free(fd_name); gpr_free(peer_name_string); + } else { + closesocket(sock); } } |