aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/client_channel.c2
-rw-r--r--src/core/client_config/lb_policies/pick_first.c47
-rw-r--r--src/core/iomgr/closure.c24
-rw-r--r--src/core/iomgr/closure.h3
-rw-r--r--src/core/iomgr/tcp_server_windows.c2
5 files changed, 77 insertions, 1 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index b59b62a6aa..8e7cb27cfd 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -51,7 +51,7 @@
typedef struct call_data call_data;
-typedef struct {
+typedef struct client_channel_channel_data {
/** metadata context for this channel */
grpc_mdctx *mdctx;
/** resolver for this channel */
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 28155d0fbc..4b3aaab09c 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -101,6 +101,9 @@ void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
for (i = 0; i < p->num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
}
+ if (p->selected) {
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
+ }
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
gpr_free(p->subchannels);
gpr_mu_destroy(&p->mu);
@@ -172,6 +175,35 @@ void pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
}
+static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
+ pick_first_lb_policy *p = arg;
+ size_t i;
+ grpc_transport_op op;
+ size_t num_subchannels = p->num_subchannels;
+ grpc_subchannel **subchannels;
+ grpc_subchannel *exclude_subchannel;
+
+ gpr_mu_lock(&p->mu);
+ subchannels = p->subchannels;
+ p->num_subchannels = 0;
+ p->subchannels = NULL;
+ exclude_subchannel = p->selected;
+ gpr_mu_unlock(&p->mu);
+ GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels");
+
+ for (i = 0; i < num_subchannels; i++) {
+ if (subchannels[i] != exclude_subchannel) {
+ memset(&op, 0, sizeof(op));
+ op.disconnect = 1;
+ grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op);
+ }
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first");
+ }
+
+ gpr_free(subchannels);
+}
+
static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
pick_first_lb_policy *p = arg;
@@ -200,6 +232,11 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_READY, "connecting_ready");
p->selected = p->subchannels[p->checking_subchannel];
+ GRPC_SUBCHANNEL_REF(p->selected, "picked_first");
+ /* drop the pick list: we are connected now */
+ GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels");
+ grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(destroy_subchannels, p), 1);
+ /* update any calls that were waiting for a pick */
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = p->selected;
@@ -279,10 +316,15 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
size_t i;
size_t n;
grpc_subchannel **subchannels;
+ grpc_subchannel *selected;
gpr_mu_lock(&p->mu);
n = p->num_subchannels;
subchannels = gpr_malloc(n * sizeof(*subchannels));
+ selected = p->selected;
+ if (selected) {
+ GRPC_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected");
+ }
for (i = 0; i < n; i++) {
subchannels[i] = p->subchannels[i];
GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast");
@@ -290,9 +332,14 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
for (i = 0; i < n; i++) {
+ if (selected == subchannels[i]) continue;
grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op);
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast");
}
+ if (p->selected) {
+ grpc_subchannel_process_transport_op(exec_ctx, selected, op);
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected");
+ }
gpr_free(subchannels);
}
diff --git a/src/core/iomgr/closure.c b/src/core/iomgr/closure.c
index 3265425789..d91681990f 100644
--- a/src/core/iomgr/closure.c
+++ b/src/core/iomgr/closure.c
@@ -33,6 +33,8 @@
#include "src/core/iomgr/closure.h"
+#include <grpc/support/alloc.h>
+
void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg) {
closure->cb = cb;
@@ -69,3 +71,25 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst) {
}
src->head = src->tail = NULL;
}
+
+typedef struct {
+ grpc_iomgr_cb_func cb;
+ void *cb_arg;
+ grpc_closure wrapper;
+} wrapped_closure;
+
+static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+ wrapped_closure *wc = arg;
+ grpc_iomgr_cb_func cb = wc->cb;
+ void *cb_arg = wc->cb_arg;
+ gpr_free(wc);
+ cb(exec_ctx, cb_arg, success);
+}
+
+grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) {
+ wrapped_closure *wc = gpr_malloc(sizeof(*wc));
+ wc->cb = cb;
+ wc->cb_arg = cb_arg;
+ grpc_closure_init(&wc->wrapper, closure_wrapper, wc);
+ return &wc->wrapper;
+}
diff --git a/src/core/iomgr/closure.h b/src/core/iomgr/closure.h
index 982ffa4e1b..d812659af0 100644
--- a/src/core/iomgr/closure.h
+++ b/src/core/iomgr/closure.h
@@ -77,6 +77,9 @@ struct grpc_closure {
void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg);
+/* Create a heap allocated closure: try to avoid except for very rare events */
+grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg);
+
#define GRPC_CLOSURE_LIST_INIT \
{ NULL, NULL }
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index db3319b3c6..3fea8b5b35 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -336,6 +336,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) {
peer_name_string);
gpr_free(fd_name);
gpr_free(peer_name_string);
+ } else {
+ closesocket(sock);
}
}