aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/census/grpc_filter.c5
-rw-r--r--src/core/channel/channel_stack.c17
-rw-r--r--src/core/channel/channel_stack.h41
-rw-r--r--src/core/channel/client_channel.c152
-rw-r--r--src/core/channel/client_channel.h12
-rw-r--r--src/core/channel/client_uchannel.c82
-rw-r--r--src/core/channel/client_uchannel.h16
-rw-r--r--src/core/channel/compress_filter.c3
-rw-r--r--src/core/channel/connected_channel.c6
-rw-r--r--src/core/channel/http_server_filter.c3
-rw-r--r--src/core/channel/subchannel_call_holder.c80
-rw-r--r--src/core/channel/subchannel_call_holder.h14
-rw-r--r--src/core/client_config/lb_policies/pick_first.c151
-rw-r--r--src/core/client_config/lb_policies/round_robin.c240
-rw-r--r--src/core/client_config/lb_policy.c78
-rw-r--r--src/core/client_config/lb_policy.h35
-rw-r--r--src/core/client_config/resolver.c7
-rw-r--r--src/core/client_config/resolver.h10
-rw-r--r--src/core/client_config/resolvers/dns_resolver.c7
-rw-r--r--src/core/client_config/resolvers/sockaddr_resolver.c13
-rw-r--r--src/core/client_config/resolvers/zookeeper_resolver.c7
-rw-r--r--src/core/client_config/subchannel.c726
-rw-r--r--src/core/client_config/subchannel.h89
-rw-r--r--src/core/iomgr/fd_posix.c6
-rw-r--r--src/core/iomgr/fd_posix.h1
-rw-r--r--src/core/iomgr/pollset_set.h22
-rw-r--r--src/core/iomgr/pollset_set_posix.c51
-rw-r--r--src/core/iomgr/pollset_set_posix.h4
-rw-r--r--src/core/iomgr/pollset_set_windows.c8
-rw-r--r--src/core/security/credentials.c14
-rw-r--r--src/core/surface/call.c34
-rw-r--r--src/core/surface/call.h1
-rw-r--r--src/core/surface/channel.c45
-rw-r--r--src/core/surface/channel.h2
-rw-r--r--src/core/surface/channel_connectivity.c33
-rw-r--r--src/core/surface/channel_create.c1
-rw-r--r--src/core/surface/lame_client.c6
-rw-r--r--src/core/surface/secure_channel_create.c1
-rw-r--r--src/core/surface/server_create.c13
-rw-r--r--src/core/transport/chttp2/hpack_encoder.c7
-rw-r--r--src/core/transport/chttp2_transport.c2
-rw-r--r--src/core/transport/connectivity_state.c73
-rw-r--r--src/core/transport/connectivity_state.h11
-rw-r--r--src/core/transport/static_metadata.c103
-rw-r--r--src/core/transport/transport.c21
-rw-r--r--src/core/transport/transport.h17
46 files changed, 974 insertions, 1296 deletions
diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c
index 8f18cd503e..4529ae9bd7 100644
--- a/src/core/census/grpc_filter.c
+++ b/src/core/census/grpc_filter.c
@@ -115,8 +115,11 @@ static void server_mutate_op(grpc_call_element *elem,
static void server_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
+ /* TODO(ctiller): this code fails. I don't know why. I expect it's
+ incomplete, and someone should look at it soon.
+
call_data *calld = elem->call_data;
- GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
+ GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0)); */
server_mutate_op(elem, op);
grpc_call_next_op(exec_ctx, elem, op);
}
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index 7f7fbf420f..5e09a050ee 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -101,11 +101,12 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack,
return CALL_ELEMS_FROM_STACK(call_stack) + index;
}
-void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,
+void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs,
+ grpc_iomgr_cb_func destroy, void *destroy_arg,
const grpc_channel_filter **filters,
- size_t filter_count, grpc_channel *master,
+ size_t filter_count,
const grpc_channel_args *channel_args,
- grpc_channel_stack *stack) {
+ const char *name, grpc_channel_stack *stack) {
size_t call_size =
ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) +
ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element));
@@ -115,6 +116,8 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,
size_t i;
stack->count = filter_count;
+ GRPC_STREAM_REF_INIT(&stack->refcount, initial_refs, destroy, destroy_arg,
+ name);
elems = CHANNEL_ELEMS_FROM_STACK(stack);
user_data =
((char *)elems) +
@@ -122,7 +125,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,
/* init per-filter data */
for (i = 0; i < filter_count; i++) {
- args.master = master;
+ args.channel_stack = stack;
args.channel_args = channel_args;
args.is_first = i == 0;
args.is_last = i == (filter_count - 1);
@@ -166,15 +169,15 @@ void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
size_t i;
call_stack->count = count;
- gpr_ref_init(&call_stack->refcount.refs, initial_refs);
- grpc_closure_init(&call_stack->refcount.destroy, destroy, destroy_arg);
+ GRPC_STREAM_REF_INIT(&call_stack->refcount, initial_refs, destroy,
+ destroy_arg, "CALL_STACK");
call_elems = CALL_ELEMS_FROM_STACK(call_stack);
user_data = ((char *)call_elems) +
ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element));
/* init per-filter data */
for (i = 0; i < count; i++) {
- args.refcount = &call_stack->refcount;
+ args.call_stack = call_stack;
args.server_transport_data = transport_server_data;
args.context = context;
call_elems[i].filter = channel_elems[i].filter;
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index 1db12ead7e..c01050e717 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -51,15 +51,18 @@
typedef struct grpc_channel_element grpc_channel_element;
typedef struct grpc_call_element grpc_call_element;
+typedef struct grpc_channel_stack grpc_channel_stack;
+typedef struct grpc_call_stack grpc_call_stack;
+
typedef struct {
- grpc_channel *master;
+ grpc_channel_stack *channel_stack;
const grpc_channel_args *channel_args;
int is_first;
int is_last;
} grpc_channel_element_args;
typedef struct {
- grpc_stream_refcount *refcount;
+ grpc_call_stack *call_stack;
const void *server_transport_data;
grpc_call_context_element *context;
} grpc_call_element_args;
@@ -144,23 +147,24 @@ struct grpc_call_element {
/* A channel stack tracks a set of related filters for one channel, and
guarantees they live within a single malloc() allocation */
-typedef struct {
+struct grpc_channel_stack {
+ grpc_stream_refcount refcount;
size_t count;
/* Memory required for a call stack (computed at channel stack
initialization) */
size_t call_stack_size;
-} grpc_channel_stack;
+};
/* A call stack tracks a set of related filters for one call, and guarantees
they live within a single malloc() allocation */
-typedef struct {
+struct grpc_call_stack {
/* shared refcount for this channel stack.
MUST be the first element: the underlying code calls destroy
with the address of the refcount, but higher layers prefer to think
about the address of the call stack itself. */
grpc_stream_refcount refcount;
size_t count;
-} grpc_call_stack;
+};
/* Get a channel element given a channel stack and its index */
grpc_channel_element *grpc_channel_stack_element(grpc_channel_stack *stack,
@@ -175,11 +179,11 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *stack, size_t i);
size_t grpc_channel_stack_size(const grpc_channel_filter **filters,
size_t filter_count);
/* Initialize a channel stack given some filters */
-void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,
+void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs,
+ grpc_iomgr_cb_func destroy, void *destroy_arg,
const grpc_channel_filter **filters,
- size_t filter_count, grpc_channel *master,
- const grpc_channel_args *args,
- grpc_channel_stack *stack);
+ size_t filter_count, const grpc_channel_args *args,
+ const char *name, grpc_channel_stack *stack);
/* Destroy a channel stack */
void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
grpc_channel_stack *stack);
@@ -199,14 +203,23 @@ void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset);
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
-#define grpc_call_stack_ref(call_stack, reason) \
+#define GRPC_CALL_STACK_REF(call_stack, reason) \
grpc_stream_ref(&(call_stack)->refcount, reason)
-#define grpc_call_stack_unref(exec_ctx, call_stack, reason) \
+#define GRPC_CALL_STACK_UNREF(exec_ctx, call_stack, reason) \
grpc_stream_unref(exec_ctx, &(call_stack)->refcount, reason)
+#define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \
+ grpc_stream_ref(&(channel_stack)->refcount, reason)
+#define GRPC_CHANNEL_STACK_UNREF(exec_ctx, channel_stack, reason) \
+ grpc_stream_unref(exec_ctx, &(channel_stack)->refcount, reason)
#else
-#define grpc_call_stack_ref(call_stack) grpc_stream_ref(&(call_stack)->refcount)
-#define grpc_call_stack_unref(exec_ctx, call_stack) \
+#define GRPC_CALL_STACK_REF(call_stack, reason) \
+ grpc_stream_ref(&(call_stack)->refcount)
+#define GRPC_CALL_STACK_UNREF(exec_ctx, call_stack, reason) \
grpc_stream_unref(exec_ctx, &(call_stack)->refcount)
+#define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \
+ grpc_stream_ref(&(channel_stack)->refcount)
+#define GRPC_CHANNEL_STACK_UNREF(exec_ctx, channel_stack, reason) \
+ grpc_stream_unref(exec_ctx, &(channel_stack)->refcount)
#endif
/* Destroy a call stack */
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 020138bf15..9f993b39d6 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -59,11 +59,6 @@ typedef struct client_channel_channel_data {
grpc_resolver *resolver;
/** have we started resolving this channel */
int started_resolving;
- /** master channel - the grpc_channel instance that ultimately owns
- this channel_data via its channel stack.
- We occasionally use this to bump the refcount on the master channel
- to keep ourselves alive through an asynchronous operation. */
- grpc_channel *master;
/** mutex protecting client configuration, including all
variables below in this data structure */
@@ -81,8 +76,10 @@ typedef struct client_channel_channel_data {
grpc_connectivity_state_tracker state_tracker;
/** when an lb_policy arrives, should we try to exit idle */
int exit_idle_when_lb_policy_arrives;
- /** pollset_set of interested parties in a new connection */
- grpc_pollset_set pollset_set;
+ /** owning stack */
+ grpc_channel_stack *owning_stack;
+ /** interested parties */
+ grpc_pollset_set interested_parties;
} channel_data;
/** We create one watcher for each new lb_policy that is returned from a
@@ -103,9 +100,7 @@ typedef struct {
} waiting_call;
static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- channel_data *chand = elem->channel_data;
- return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data,
- chand->master);
+ return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data);
}
static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
@@ -121,10 +116,18 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
static void on_lb_policy_state_changed_locked(
grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) {
+ grpc_connectivity_state publish_state = w->state;
/* check if the notification is for a stale policy */
if (w->lb_policy != w->chand->lb_policy) return;
- grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, w->state,
+ if (publish_state == GRPC_CHANNEL_FATAL_FAILURE &&
+ w->chand->resolver != NULL) {
+ publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
+ grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver);
+ GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
+ w->chand->lb_policy = NULL;
+ }
+ grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, publish_state,
"lb_changed");
if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
@@ -139,7 +142,7 @@ static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg,
on_lb_policy_state_changed_locked(exec_ctx, w);
gpr_mu_unlock(&w->chand->mu_config);
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->chand->master, "watch_lb_policy");
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
gpr_free(w);
}
@@ -147,7 +150,7 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state) {
lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
- GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
w->chand = chand;
grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
@@ -179,6 +182,11 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
chand->incoming_configuration = NULL;
+ if (lb_policy != NULL) {
+ grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties,
+ &chand->interested_parties);
+ }
+
gpr_mu_lock(&chand->mu_config);
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
@@ -200,7 +208,7 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
watch_lb_policy(exec_ctx, chand, lb_policy, state);
}
gpr_mu_unlock(&chand->mu_config);
- GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration,
&chand->on_config_changed);
GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel-next");
@@ -222,7 +230,9 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
if (old_lb_policy != NULL) {
- grpc_lb_policy_shutdown(exec_ctx, old_lb_policy);
+ grpc_pollset_set_del_pollset_set(exec_ctx,
+ &old_lb_policy->interested_parties,
+ &chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
}
@@ -230,13 +240,12 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
}
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->master, "resolver");
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
}
static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_transport_op *op) {
- grpc_lb_policy *lb_policy = NULL;
channel_data *chand = elem->channel_data;
grpc_resolver *destroy_resolver = NULL;
@@ -254,18 +263,15 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
op->connectivity_state = NULL;
}
- lb_policy = chand->lb_policy;
- if (lb_policy) {
- GRPC_LB_POLICY_REF(lb_policy, "broadcast");
- }
-
if (op->disconnect && chand->resolver != NULL) {
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
destroy_resolver = chand->resolver;
chand->resolver = NULL;
if (chand->lb_policy != NULL) {
- grpc_lb_policy_shutdown(exec_ctx, chand->lb_policy);
+ grpc_pollset_set_del_pollset_set(exec_ctx,
+ &chand->lb_policy->interested_parties,
+ &chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
chand->lb_policy = NULL;
}
@@ -276,16 +282,11 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_resolver_shutdown(exec_ctx, destroy_resolver);
GRPC_RESOLVER_UNREF(exec_ctx, destroy_resolver, "channel");
}
-
- if (lb_policy) {
- grpc_lb_policy_broadcast(exec_ctx, lb_policy, op);
- GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "broadcast");
- }
}
typedef struct {
grpc_metadata_batch *initial_metadata;
- grpc_subchannel **subchannel;
+ grpc_connected_subchannel **connected_subchannel;
grpc_closure *on_ready;
grpc_call_element *elem;
grpc_closure closure;
@@ -293,17 +294,17 @@ typedef struct {
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_metadata_batch *initial_metadata,
- grpc_subchannel **subchannel,
+ grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready);
static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, int success) {
continue_picking_args *cpa = arg;
if (!success) {
grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0);
- } else if (cpa->subchannel == NULL) {
+ } else if (cpa->connected_subchannel == NULL) {
/* cancelled, do nothing */
} else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
- cpa->subchannel, cpa->on_ready)) {
+ cpa->connected_subchannel, cpa->on_ready)) {
grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 1);
}
gpr_free(cpa);
@@ -311,7 +312,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, int success) {
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
grpc_metadata_batch *initial_metadata,
- grpc_subchannel **subchannel,
+ grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready) {
grpc_call_element *elem = elemp;
channel_data *chand = elem->channel_data;
@@ -319,18 +320,19 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
continue_picking_args *cpa;
grpc_closure *closure;
- GPR_ASSERT(subchannel);
+ GPR_ASSERT(connected_subchannel);
gpr_mu_lock(&chand->mu_config);
if (initial_metadata == NULL) {
if (chand->lb_policy != NULL) {
- grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, subchannel);
+ grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy,
+ connected_subchannel);
}
for (closure = chand->waiting_for_config_closures.head; closure != NULL;
closure = grpc_closure_next(closure)) {
cpa = closure->cb_arg;
- if (cpa->subchannel == subchannel) {
- cpa->subchannel = NULL;
+ if (cpa->connected_subchannel == connected_subchannel) {
+ cpa->connected_subchannel = NULL;
grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0);
}
}
@@ -338,21 +340,22 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
return 1;
}
if (chand->lb_policy != NULL) {
- int r = grpc_lb_policy_pick(exec_ctx, chand->lb_policy, calld->pollset,
- initial_metadata, subchannel, on_ready);
+ int r =
+ grpc_lb_policy_pick(exec_ctx, chand->lb_policy, calld->pollset,
+ initial_metadata, connected_subchannel, on_ready);
gpr_mu_unlock(&chand->mu_config);
return r;
}
if (chand->resolver != NULL && !chand->started_resolving) {
chand->started_resolving = 1;
- GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
grpc_resolver_next(exec_ctx, chand->resolver,
&chand->incoming_configuration,
&chand->on_config_changed);
}
cpa = gpr_malloc(sizeof(*cpa));
cpa->initial_metadata = initial_metadata;
- cpa->subchannel = subchannel;
+ cpa->connected_subchannel = connected_subchannel;
cpa->on_ready = on_ready;
cpa->elem = elem;
grpc_closure_init(&cpa->closure, continue_picking, cpa);
@@ -364,7 +367,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
- grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem);
+ grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem,
+ args->call_stack);
}
/* Destructor for call_data */
@@ -385,12 +389,12 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
gpr_mu_init(&chand->mu_config);
- chand->master = args->master;
- grpc_pollset_set_init(&chand->pollset_set);
grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
+ chand->owning_stack = args->channel_stack;
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
+ grpc_pollset_set_init(&chand->interested_parties);
}
/* Destructor for channel_data */
@@ -403,10 +407,13 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
}
if (chand->lb_policy != NULL) {
+ grpc_pollset_set_del_pollset_set(exec_ctx,
+ &chand->lb_policy->interested_parties,
+ &chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
}
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
- grpc_pollset_set_destroy(&chand->pollset_set);
+ grpc_pollset_set_destroy(&chand->interested_parties);
gpr_mu_destroy(&chand->mu_config);
}
@@ -435,7 +442,7 @@ void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
if (!grpc_closure_list_empty(chand->waiting_for_config_closures) ||
chand->exit_idle_when_lb_policy_arrives) {
chand->started_resolving = 1;
- GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration,
&chand->on_config_changed);
}
@@ -454,7 +461,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
} else {
chand->exit_idle_when_lb_policy_arrives = 1;
if (!chand->started_resolving && chand->resolver != NULL) {
- GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
chand->started_resolving = 1;
grpc_resolver_next(exec_ctx, chand->resolver,
&chand->incoming_configuration,
@@ -466,32 +473,39 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
return out;
}
+typedef struct {
+ channel_data *chand;
+ grpc_pollset *pollset;
+ grpc_closure *on_complete;
+ grpc_closure my_closure;
+} external_connectivity_watcher;
+
+static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
+ external_connectivity_watcher *w = arg;
+ grpc_closure *follow_up = w->on_complete;
+ grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties,
+ w->pollset);
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
+ "external_connectivity_watcher");
+ gpr_free(w);
+ follow_up->cb(exec_ctx, follow_up->cb_arg, iomgr_success);
+}
+
void grpc_client_channel_watch_connectivity_state(
- grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
grpc_connectivity_state *state, grpc_closure *on_complete) {
channel_data *chand = elem->channel_data;
+ external_connectivity_watcher *w = gpr_malloc(sizeof(*w));
+ w->chand = chand;
+ w->pollset = pollset;
+ w->on_complete = on_complete;
+ grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, pollset);
+ grpc_closure_init(&w->my_closure, on_external_watch_complete, w);
+ GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
+ "external_connectivity_watcher");
gpr_mu_lock(&chand->mu_config);
grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &chand->state_tracker, state, on_complete);
+ exec_ctx, &chand->state_tracker, state, &w->my_closure);
gpr_mu_unlock(&chand->mu_config);
}
-
-grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
- grpc_channel_element *elem) {
- channel_data *chand = elem->channel_data;
- return &chand->pollset_set;
-}
-
-void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem,
- grpc_pollset *pollset) {
- channel_data *chand = elem->channel_data;
- grpc_pollset_set_add_pollset(exec_ctx, &chand->pollset_set, pollset);
-}
-
-void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem,
- grpc_pollset *pollset) {
- channel_data *chand = elem->channel_data;
- grpc_pollset_set_del_pollset(exec_ctx, &chand->pollset_set, pollset);
-}
diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h
index 5103f07a43..d9bc4971f1 100644
--- a/src/core/channel/client_channel.h
+++ b/src/core/channel/client_channel.h
@@ -57,17 +57,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect);
void grpc_client_channel_watch_connectivity_state(
- grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
grpc_connectivity_state *state, grpc_closure *on_complete);
-grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
- grpc_channel_element *elem);
-
-void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *channel,
- grpc_pollset *pollset);
-void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *channel,
- grpc_pollset *pollset);
-
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */
diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c
index 456ffb7371..2c0b07d8bf 100644
--- a/src/core/channel/client_uchannel.c
+++ b/src/core/channel/client_uchannel.c
@@ -58,13 +58,13 @@ typedef struct client_uchannel_channel_data {
this channel_data via its channel stack.
We occasionally use this to bump the refcount on the master channel
to keep ourselves alive through an asynchronous operation. */
- grpc_channel *master;
+ grpc_channel_stack *owning_stack;
/** connectivity state being tracked */
grpc_connectivity_state_tracker state_tracker;
/** the subchannel wrapped by the microchannel */
- grpc_subchannel *subchannel;
+ grpc_connected_subchannel *connected_subchannel;
/** the callback used to stay subscribed to subchannel connectivity
* notifications */
@@ -84,15 +84,13 @@ static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
chand->subchannel_connectivity,
"uchannel_monitor_subchannel");
- grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel,
- &chand->subchannel_connectivity,
- &chand->connectivity_cb);
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, chand->connected_subchannel, NULL,
+ &chand->subchannel_connectivity, &chand->connectivity_cb);
}
static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- channel_data *chand = elem->channel_data;
- return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data,
- chand->master);
+ return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data);
}
static void cuc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
@@ -128,11 +126,11 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx,
static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_metadata_batch *initial_metadata,
- grpc_subchannel **subchannel,
+ grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready) {
channel_data *chand = arg;
GPR_ASSERT(initial_metadata != NULL);
- *subchannel = chand->subchannel;
+ *connected_subchannel = chand->connected_subchannel;
return 1;
}
@@ -140,7 +138,7 @@ static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
static void cuc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
grpc_subchannel_call_holder_init(elem->call_data, cuc_pick_subchannel,
- elem->channel_data);
+ elem->channel_data, args->call_stack);
}
/* Destructor for call_data */
@@ -158,7 +156,7 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&chand->connectivity_cb, monitor_subchannel, chand);
GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
- chand->master = args->master;
+ chand->owning_stack = args->channel_stack;
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_uchannel");
gpr_mu_init(&chand->mu_state);
@@ -168,10 +166,14 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx,
static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
- grpc_subchannel_state_change_unsubscribe(exec_ctx, chand->subchannel,
- &chand->connectivity_cb);
+ /* cancel subscription */
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, chand->connected_subchannel, NULL, NULL,
+ &chand->connectivity_cb);
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
gpr_mu_destroy(&chand->mu_state);
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, chand->connected_subchannel,
+ "uchannel");
}
static void cuc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
@@ -191,23 +193,14 @@ grpc_connectivity_state grpc_client_uchannel_check_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
channel_data *chand = elem->channel_data;
grpc_connectivity_state out;
- out = grpc_connectivity_state_check(&chand->state_tracker);
gpr_mu_lock(&chand->mu_state);
- if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
- GRPC_CHANNEL_CONNECTING,
- "uchannel_connecting_changed");
- chand->subchannel_connectivity = out;
- grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel,
- &chand->subchannel_connectivity,
- &chand->connectivity_cb);
- }
+ out = grpc_connectivity_state_check(&chand->state_tracker);
gpr_mu_unlock(&chand->mu_state);
return out;
}
void grpc_client_uchannel_watch_connectivity_state(
- grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
grpc_connectivity_state *state, grpc_closure *on_complete) {
channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->mu_state);
@@ -216,40 +209,11 @@ void grpc_client_uchannel_watch_connectivity_state(
gpr_mu_unlock(&chand->mu_state);
}
-grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set(
- grpc_channel_element *elem) {
- channel_data *chand = elem->channel_data;
- grpc_channel_element *parent_elem;
- gpr_mu_lock(&chand->mu_state);
- parent_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(
- grpc_subchannel_get_master(chand->subchannel)));
- gpr_mu_unlock(&chand->mu_state);
- return grpc_client_channel_get_connecting_pollset_set(parent_elem);
-}
-
-void grpc_client_uchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem,
- grpc_pollset *pollset) {
- grpc_pollset_set *master_pollset_set =
- grpc_client_uchannel_get_connecting_pollset_set(elem);
- grpc_pollset_set_add_pollset(exec_ctx, master_pollset_set, pollset);
-}
-
-void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem,
- grpc_pollset *pollset) {
- grpc_pollset_set *master_pollset_set =
- grpc_client_uchannel_get_connecting_pollset_set(elem);
- grpc_pollset_set_del_pollset(exec_ctx, master_pollset_set, pollset);
-}
-
grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel,
grpc_channel_args *args) {
grpc_channel *channel = NULL;
#define MAX_FILTERS 3
const grpc_channel_filter *filters[MAX_FILTERS];
- grpc_channel *master = grpc_subchannel_get_master(subchannel);
- char *target = grpc_channel_get_target(master);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
size_t n = 0;
@@ -261,19 +225,19 @@ grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel,
GPR_ASSERT(n <= MAX_FILTERS);
channel =
- grpc_channel_create_from_filters(&exec_ctx, target, filters, n, args, 1);
+ grpc_channel_create_from_filters(&exec_ctx, NULL, filters, n, args, 1);
- gpr_free(target);
return channel;
}
-void grpc_client_uchannel_set_subchannel(grpc_channel *uchannel,
- grpc_subchannel *subchannel) {
+void grpc_client_uchannel_set_connected_subchannel(
+ grpc_channel *uchannel, grpc_connected_subchannel *connected_subchannel) {
grpc_channel_element *elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(uchannel));
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
gpr_mu_lock(&chand->mu_state);
- chand->subchannel = subchannel;
+ chand->connected_subchannel = connected_subchannel;
+ GRPC_CONNECTED_SUBCHANNEL_REF(connected_subchannel, "uchannel");
gpr_mu_unlock(&chand->mu_state);
}
diff --git a/src/core/channel/client_uchannel.h b/src/core/channel/client_uchannel.h
index dfe6695ae3..92a831493c 100644
--- a/src/core/channel/client_uchannel.h
+++ b/src/core/channel/client_uchannel.h
@@ -48,23 +48,13 @@ grpc_connectivity_state grpc_client_uchannel_check_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect);
void grpc_client_uchannel_watch_connectivity_state(
- grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
grpc_connectivity_state *state, grpc_closure *on_complete);
-grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set(
- grpc_channel_element *elem);
-
-void grpc_client_uchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *channel,
- grpc_pollset *pollset);
-void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *channel,
- grpc_pollset *pollset);
-
grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel,
grpc_channel_args *args);
-void grpc_client_uchannel_set_subchannel(grpc_channel *uchannel,
- grpc_subchannel *subchannel);
+void grpc_client_uchannel_set_connected_subchannel(
+ grpc_channel *uchannel, grpc_connected_subchannel *connected_subchannel);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_MICROCHANNEL_H */
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index d7d1c189fe..cc8e191628 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -288,8 +288,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
/* Destructor for channel data */
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem) {
-}
+ grpc_channel_element *elem) {}
const grpc_channel_filter grpc_compress_filter = {
compress_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data),
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 0e1efd965a..e8eb9dcfc5 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -89,9 +89,9 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
int r;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- r = grpc_transport_init_stream(exec_ctx, chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- args->refcount, args->server_transport_data);
+ r = grpc_transport_init_stream(
+ exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld),
+ &args->call_stack->refcount, args->server_transport_data);
GPR_ASSERT(r == 0);
}
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index bc3a56cbf0..ae8660da92 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -224,8 +224,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
/* Destructor for channel data */
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem) {
-}
+ grpc_channel_element *elem) {}
const grpc_channel_filter grpc_http_server_filter = {
hs_start_transport_op, grpc_channel_next_op, sizeof(call_data),
diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c
index 7251714519..f5da41f3cd 100644
--- a/src/core/channel/subchannel_call_holder.c
+++ b/src/core/channel/subchannel_call_holder.c
@@ -44,7 +44,6 @@
static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder,
int success);
-static void call_ready(grpc_exec_ctx *exec_ctx, void *holder, int success);
static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args,
int success);
@@ -58,16 +57,17 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
void grpc_subchannel_call_holder_init(
grpc_subchannel_call_holder *holder,
grpc_subchannel_call_holder_pick_subchannel pick_subchannel,
- void *pick_subchannel_arg) {
+ void *pick_subchannel_arg, grpc_call_stack *owning_call) {
gpr_atm_rel_store(&holder->subchannel_call, 0);
holder->pick_subchannel = pick_subchannel;
holder->pick_subchannel_arg = pick_subchannel_arg;
gpr_mu_init(&holder->mu);
- holder->subchannel = NULL;
+ holder->connected_subchannel = NULL;
holder->waiting_ops = NULL;
holder->waiting_ops_count = 0;
holder->waiting_ops_capacity = 0;
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
+ holder->owning_call = owning_call;
}
void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx,
@@ -125,13 +125,9 @@ retry:
case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
fail_locked(exec_ctx, holder);
break;
- case GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL:
- grpc_subchannel_cancel_create_call(exec_ctx, holder->subchannel,
- &holder->subchannel_call);
- break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL,
- &holder->subchannel, NULL);
+ &holder->connected_subchannel, NULL);
break;
}
gpr_mu_unlock(&holder->mu);
@@ -142,28 +138,27 @@ retry:
}
/* if we don't have a subchannel, try to get one */
if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
- holder->subchannel == NULL && op->send_initial_metadata != NULL) {
+ holder->connected_subchannel == NULL &&
+ op->send_initial_metadata != NULL) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
grpc_closure_init(&holder->next_step, subchannel_ready, holder);
- if (holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg,
- op->send_initial_metadata, &holder->subchannel,
- &holder->next_step)) {
+ GRPC_CALL_STACK_REF(holder->owning_call, "pick_subchannel");
+ if (holder->pick_subchannel(
+ exec_ctx, holder->pick_subchannel_arg, op->send_initial_metadata,
+ &holder->connected_subchannel, &holder->next_step)) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
+ GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel");
}
}
/* if we've got a subchannel, then let's ask it to create a call */
if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
- holder->subchannel != NULL) {
- holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL;
- grpc_closure_init(&holder->next_step, call_ready, holder);
- if (grpc_subchannel_create_call(exec_ctx, holder->subchannel,
- holder->pollset, &holder->subchannel_call,
- &holder->next_step)) {
- /* got one immediately - continue the op (and any waiting ops) */
- holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
- retry_waiting_locked(exec_ctx, holder);
- goto retry;
- }
+ holder->connected_subchannel != NULL) {
+ gpr_atm_rel_store(
+ &holder->subchannel_call,
+ (gpr_atm)(gpr_uintptr)grpc_connected_subchannel_create_call(
+ exec_ctx, holder->connected_subchannel, holder->pollset));
+ retry_waiting_locked(exec_ctx, holder);
+ goto retry;
}
/* nothing to be done but wait */
add_waiting_locked(holder, op);
@@ -179,36 +174,18 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) {
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
call = GET_CALL(holder);
GPR_ASSERT(call == NULL || call == CANCELLED_CALL);
- if (holder->subchannel == NULL) {
- holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
+ holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
+ if (holder->connected_subchannel == NULL) {
fail_locked(exec_ctx, holder);
} else {
- grpc_closure_init(&holder->next_step, call_ready, holder);
- if (grpc_subchannel_create_call(exec_ctx, holder->subchannel,
- holder->pollset, &holder->subchannel_call,
- &holder->next_step)) {
- holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
- /* got one immediately - continue the op (and any waiting ops) */
- retry_waiting_locked(exec_ctx, holder);
- }
- }
- gpr_mu_unlock(&holder->mu);
-}
-
-static void call_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) {
- grpc_subchannel_call_holder *holder = arg;
- GPR_TIMER_BEGIN("call_ready", 0);
- gpr_mu_lock(&holder->mu);
- GPR_ASSERT(holder->creation_phase ==
- GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL);
- holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
- if (GET_CALL(holder) != NULL) {
+ gpr_atm_rel_store(
+ &holder->subchannel_call,
+ (gpr_atm)(gpr_uintptr)grpc_connected_subchannel_create_call(
+ exec_ctx, holder->connected_subchannel, holder->pollset));
retry_waiting_locked(exec_ctx, holder);
- } else {
- fail_locked(exec_ctx, holder);
}
gpr_mu_unlock(&holder->mu);
- GPR_TIMER_END("call_ready", 0);
+ GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel");
}
typedef struct {
@@ -270,14 +247,13 @@ static void fail_locked(grpc_exec_ctx *exec_ctx,
holder->waiting_ops_count = 0;
}
-char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_call_holder *holder,
- grpc_channel *master) {
+char *grpc_subchannel_call_holder_get_peer(
+ grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder) {
grpc_subchannel_call *subchannel_call = GET_CALL(holder);
if (subchannel_call) {
return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
} else {
- return grpc_channel_get_target(master);
+ return NULL;
}
}
diff --git a/src/core/channel/subchannel_call_holder.h b/src/core/channel/subchannel_call_holder.h
index bda051c566..9cf72c6cf7 100644
--- a/src/core/channel/subchannel_call_holder.h
+++ b/src/core/channel/subchannel_call_holder.h
@@ -42,12 +42,11 @@
called when the subchannel is available) */
typedef int (*grpc_subchannel_call_holder_pick_subchannel)(
grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata,
- grpc_subchannel **subchannel, grpc_closure *on_ready);
+ grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready);
typedef enum {
GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING,
- GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL,
- GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL
+ GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL
} grpc_subchannel_call_holder_creation_phase;
/** Wrapper for holding a pointer to grpc_subchannel_call, and the
@@ -71,7 +70,7 @@ typedef struct grpc_subchannel_call_holder {
gpr_mu mu;
grpc_subchannel_call_holder_creation_phase creation_phase;
- grpc_subchannel *subchannel;
+ grpc_connected_subchannel *connected_subchannel;
grpc_pollset *pollset;
grpc_transport_stream_op *waiting_ops;
@@ -79,12 +78,14 @@ typedef struct grpc_subchannel_call_holder {
size_t waiting_ops_capacity;
grpc_closure next_step;
+
+ grpc_call_stack *owning_call;
} grpc_subchannel_call_holder;
void grpc_subchannel_call_holder_init(
grpc_subchannel_call_holder *holder,
grpc_subchannel_call_holder_pick_subchannel pick_subchannel,
- void *pick_subchannel_arg);
+ void *pick_subchannel_arg, grpc_call_stack *owning_call);
void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder);
@@ -92,7 +93,6 @@ void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder,
grpc_transport_stream_op *op);
char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_call_holder *holder,
- grpc_channel *master);
+ grpc_subchannel_call_holder *holder);
#endif
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 93312abb00..b91f0609d2 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -42,7 +42,7 @@
typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
- grpc_subchannel **target;
+ grpc_connected_subchannel **target;
grpc_closure *on_complete;
} pending_pick;
@@ -60,7 +60,7 @@ typedef struct {
/** the selected channel
TODO(ctiller): this should be atomically set so we don't
need to take a mutex in the common case */
- grpc_subchannel *selected;
+ grpc_connected_subchannel *selected;
/** have we started picking? */
int started_picking;
/** are we shut down? */
@@ -76,24 +76,6 @@ typedef struct {
grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy;
-static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx,
- pick_first_lb_policy *p) {
- pending_pick *pp;
- for (pp = p->pending_picks; pp; pp = pp->next) {
- grpc_subchannel_del_interested_party(
- exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
- }
-}
-
-static void add_interested_parties_locked(grpc_exec_ctx *exec_ctx,
- pick_first_lb_policy *p) {
- pending_pick *pp;
- for (pp = p->pending_picks; pp; pp = pp->next) {
- grpc_subchannel_add_interested_party(
- exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
- }
-}
-
void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
size_t i;
@@ -102,7 +84,7 @@ void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
}
if (p->selected) {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
}
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
gpr_free(p->subchannels);
@@ -114,16 +96,26 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
- del_interested_parties_locked(exec_ctx, p);
p->shutdown = 1;
pp = p->pending_picks;
p->pending_picks = NULL;
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "shutdown");
+ /* cancel subscription */
+ if (p->selected != NULL) {
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
+ } else {
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
+ &p->connectivity_changed);
+ }
gpr_mu_unlock(&p->mu);
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
+ pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
pp = next;
@@ -131,7 +123,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_subchannel **target) {
+ grpc_connected_subchannel **target) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -140,8 +132,8 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- grpc_subchannel_del_interested_party(
- exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
+ pp->pollset);
*target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
gpr_free(pp);
@@ -158,10 +150,11 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
p->started_picking = 1;
p->checking_subchannel = 0;
p->checking_connectivity = GRPC_CHANNEL_IDLE;
- GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
+ GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity");
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
- &p->checking_connectivity, &p->connectivity_changed);
+ &p->base.interested_parties, &p->checking_connectivity,
+ &p->connectivity_changed);
}
void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
@@ -174,8 +167,8 @@ void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
- grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
- grpc_closure *on_complete) {
+ grpc_metadata_batch *initial_metadata,
+ grpc_connected_subchannel **target, grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -187,8 +180,8 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_subchannel_add_interested_party(
- exec_ctx, p->subchannels[p->checking_subchannel], pollset);
+ grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties,
+ pollset);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->pollset = pollset;
@@ -204,25 +197,17 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
pick_first_lb_policy *p = arg;
size_t i;
- grpc_transport_op op;
size_t num_subchannels = p->num_subchannels;
grpc_subchannel **subchannels;
- grpc_subchannel *exclude_subchannel;
gpr_mu_lock(&p->mu);
subchannels = p->subchannels;
p->num_subchannels = 0;
p->subchannels = NULL;
- exclude_subchannel = p->selected;
gpr_mu_unlock(&p->mu);
- GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels");
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels");
for (i = 0; i < num_subchannels; i++) {
- if (subchannels[i] != exclude_subchannel) {
- memset(&op, 0, sizeof(op));
- op.disconnect = 1;
- grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op);
- }
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first");
}
@@ -232,23 +217,28 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
pick_first_lb_policy *p = arg;
+ grpc_subchannel *selected_subchannel;
pending_pick *pp;
gpr_mu_lock(&p->mu);
if (p->shutdown) {
gpr_mu_unlock(&p->mu);
- GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
return;
} else if (p->selected != NULL) {
+ if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ /* if the selected channel goes bad, we're done */
+ p->checking_connectivity = GRPC_CHANNEL_FATAL_FAILURE;
+ }
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
p->checking_connectivity, "selected_changed");
if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
- grpc_subchannel_notify_on_state_change(exec_ctx, p->selected,
- &p->checking_connectivity,
- &p->connectivity_changed);
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, p->selected, &p->base.interested_parties,
+ &p->checking_connectivity, &p->connectivity_changed);
} else {
- GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
}
} else {
loop:
@@ -256,39 +246,41 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
case GRPC_CHANNEL_READY:
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_READY, "connecting_ready");
- p->selected = p->subchannels[p->checking_subchannel];
- GRPC_SUBCHANNEL_REF(p->selected, "picked_first");
+ selected_subchannel = p->subchannels[p->checking_subchannel];
+ p->selected =
+ grpc_subchannel_get_connected_subchannel(selected_subchannel);
+ GPR_ASSERT(p->selected);
+ GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked_first");
/* drop the pick list: we are connected now */
- GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels");
+ GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
grpc_exec_ctx_enqueue(exec_ctx,
grpc_closure_create(destroy_subchannels, p), 1);
/* update any calls that were waiting for a pick */
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = p->selected;
- grpc_subchannel_del_interested_party(exec_ctx, p->selected,
- pp->pollset);
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
+ pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
}
- grpc_subchannel_notify_on_state_change(exec_ctx, p->selected,
- &p->checking_connectivity,
- &p->connectivity_changed);
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, p->selected, &p->base.interested_parties,
+ &p->checking_connectivity, &p->connectivity_changed);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
"connecting_transient_failure");
- del_interested_parties_locked(exec_ctx, p);
p->checking_subchannel =
(p->checking_subchannel + 1) % p->num_subchannels;
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
- add_interested_parties_locked(exec_ctx, p);
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
- &p->checking_connectivity, &p->connectivity_changed);
+ &p->base.interested_parties, &p->checking_connectivity,
+ &p->connectivity_changed);
} else {
goto loop;
}
@@ -300,13 +292,13 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"connecting_changed");
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
- &p->checking_connectivity, &p->connectivity_changed);
+ &p->base.interested_parties, &p->checking_connectivity,
+ &p->connectivity_changed);
break;
case GRPC_CHANNEL_FATAL_FAILURE:
- del_interested_parties_locked(exec_ctx, p);
- GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
- p->subchannels[p->num_subchannels - 1]);
p->num_subchannels--;
+ GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
+ p->subchannels[p->num_subchannels]);
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels],
"pick_first");
if (p->num_subchannels == 0) {
@@ -319,7 +311,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
}
- GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
+ "pick_first_connectivity");
} else {
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
@@ -327,7 +320,6 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
p->checking_subchannel %= p->num_subchannels;
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
- add_interested_parties_locked(exec_ctx, p);
goto loop;
}
}
@@ -336,39 +328,6 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&p->mu);
}
-static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_transport_op *op) {
- pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- size_t i;
- size_t n;
- grpc_subchannel **subchannels;
- grpc_subchannel *selected;
-
- gpr_mu_lock(&p->mu);
- n = p->num_subchannels;
- subchannels = gpr_malloc(n * sizeof(*subchannels));
- selected = p->selected;
- if (selected) {
- GRPC_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected");
- }
- for (i = 0; i < n; i++) {
- subchannels[i] = p->subchannels[i];
- GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast");
- }
- gpr_mu_unlock(&p->mu);
-
- for (i = 0; i < n; i++) {
- if (selected == subchannels[i]) continue;
- grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op);
- GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast");
- }
- if (p->selected) {
- grpc_subchannel_process_transport_op(exec_ctx, selected, op);
- GRPC_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected");
- }
- gpr_free(subchannels);
-}
-
static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
@@ -391,7 +350,7 @@ void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_exit_idle,
- pf_broadcast, pf_check_connectivity, pf_notify_on_state_change};
+ pf_check_connectivity, pf_notify_on_state_change};
static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c
index 1ffe32fff2..b86dba20ee 100644
--- a/src/core/client_config/lb_policies/round_robin.c
+++ b/src/core/client_config/lb_policies/round_robin.c
@@ -38,6 +38,8 @@
#include <grpc/support/alloc.h>
#include "src/core/transport/connectivity_state.h"
+typedef struct round_robin_lb_policy round_robin_lb_policy;
+
int grpc_lb_round_robin_trace = 0;
/** List of entities waiting for a pick.
@@ -46,7 +48,7 @@ int grpc_lb_round_robin_trace = 0;
typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
- grpc_subchannel **target;
+ grpc_connected_subchannel **target;
grpc_closure *on_complete;
} pending_pick;
@@ -58,22 +60,27 @@ typedef struct ready_list {
} ready_list;
typedef struct {
- size_t subchannel_idx; /**< Index over p->subchannels */
- void *p; /**< round_robin_lb_policy instance */
-} connectivity_changed_cb_arg;
-
-typedef struct {
+ /** index within policy->subchannels */
+ size_t index;
+ /** backpointer to owning policy */
+ round_robin_lb_policy *policy;
+ /** subchannel itself */
+ grpc_subchannel *subchannel;
+ /** notification that connectivity has changed on subchannel */
+ grpc_closure connectivity_changed_closure;
+ /** this subchannels current position in subchannel->ready_list */
+ ready_list *ready_list_node;
+ /** last observed connectivity */
+ grpc_connectivity_state connectivity_state;
+} subchannel_data;
+
+struct round_robin_lb_policy {
/** base policy: must be first */
grpc_lb_policy base;
/** all our subchannels */
- grpc_subchannel **subchannels;
size_t num_subchannels;
-
- /** Callbacks, one per subchannel being watched, to be called when their
- * respective connectivity changes */
- grpc_closure *connectivity_changed_cbs;
- connectivity_changed_cb_arg *cb_args;
+ subchannel_data **subchannels;
/** mutex protecting remaining members */
gpr_mu mu;
@@ -81,8 +88,6 @@ typedef struct {
int started_picking;
/** are we shutting down? */
int shutdown;
- /** Connectivity state of the subchannels being watched */
- grpc_connectivity_state *subchannel_connectivity;
/** List of picks that are waiting on connectivity */
pending_pick *pending_picks;
@@ -93,13 +98,7 @@ typedef struct {
ready_list ready_list;
/** Last pick from the ready list. */
ready_list *ready_list_last_pick;
-
- /** Subchannel index to ready_list node.
- *
- * Kept in order to remove nodes from the ready list associated with a
- * subchannel */
- ready_list **subchannel_index_to_readylist_node;
-} round_robin_lb_policy;
+};
/** Returns the next subchannel from the connected list or NULL if the list is
* empty.
@@ -144,9 +143,9 @@ static void advance_last_picked_locked(round_robin_lb_policy *p) {
/** Prepends (relative to the root at p->ready_list) the connected subchannel \a
* csc to the list of ready subchannels. */
static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
- grpc_subchannel *csc) {
+ grpc_subchannel *sc) {
ready_list *new_elem = gpr_malloc(sizeof(ready_list));
- new_elem->subchannel = csc;
+ new_elem->subchannel = sc;
if (p->ready_list.prev == NULL) {
/* first element */
new_elem->next = &p->ready_list;
@@ -160,7 +159,7 @@ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
p->ready_list.prev = new_elem;
}
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, csc);
+ gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, sc);
}
return new_elem;
}
@@ -200,28 +199,15 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
gpr_free(node);
}
-static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx,
- round_robin_lb_policy *p,
- const size_t subchannel_idx) {
- pending_pick *pp;
- for (pp = p->pending_picks; pp; pp = pp->next) {
- grpc_subchannel_del_interested_party(
- exec_ctx, p->subchannels[subchannel_idx], pp->pollset);
- }
-}
-
void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
size_t i;
ready_list *elem;
for (i = 0; i < p->num_subchannels; i++) {
- del_interested_parties_locked(exec_ctx, p, i);
+ subchannel_data *sd = p->subchannels[i];
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin");
+ gpr_free(sd);
}
- for (i = 0; i < p->num_subchannels; i++) {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "round_robin");
- }
- gpr_free(p->connectivity_changed_cbs);
- gpr_free(p->subchannel_connectivity);
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
gpr_free(p->subchannels);
@@ -237,20 +223,15 @@ void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_free(elem);
elem = tmp;
}
- gpr_free(p->subchannel_index_to_readylist_node);
- gpr_free(p->cb_args);
gpr_free(p);
}
void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
- size_t i;
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
- gpr_mu_lock(&p->mu);
+ size_t i;
- for (i = 0; i < p->num_subchannels; i++) {
- del_interested_parties_locked(exec_ctx, p, i);
- }
+ gpr_mu_lock(&p->mu);
p->shutdown = 1;
while ((pp = p->pending_picks)) {
@@ -261,24 +242,26 @@ void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "shutdown");
+ for (i = 0; i < p->num_subchannels; i++) {
+ subchannel_data *sd = p->subchannels[i];
+ grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL,
+ &sd->connectivity_changed_closure);
+ }
gpr_mu_unlock(&p->mu);
}
static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_subchannel **target) {
+ grpc_connected_subchannel **target) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
- size_t i;
gpr_mu_lock(&p->mu);
pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- for (i = 0; i < p->num_subchannels; i++) {
- grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i],
- pp->pollset);
- }
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
+ pp->pollset);
*target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
gpr_free(pp);
@@ -295,12 +278,16 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
size_t i;
p->started_picking = 1;
+ gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%d", p,
+ p->num_subchannels);
+
for (i = 0; i < p->num_subchannels; i++) {
- p->subchannel_connectivity[i] = GRPC_CHANNEL_IDLE;
- grpc_subchannel_notify_on_state_change(exec_ctx, p->subchannels[i],
- &p->subchannel_connectivity[i],
- &p->connectivity_changed_cbs[i]);
- GRPC_LB_POLICY_REF(&p->base, "round_robin_connectivity");
+ subchannel_data *sd = p->subchannels[i];
+ sd->connectivity_state = GRPC_CHANNEL_IDLE;
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, &p->base.interested_parties,
+ &sd->connectivity_state, &sd->connectivity_changed_closure);
+ GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity");
}
}
@@ -314,18 +301,18 @@ void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
- grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
- grpc_closure *on_complete) {
- size_t i;
+ grpc_metadata_batch *initial_metadata,
+ grpc_connected_subchannel **target, grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
ready_list *selected;
gpr_mu_lock(&p->mu);
if ((selected = peek_next_connected_locked(p))) {
gpr_mu_unlock(&p->mu);
- *target = selected->subchannel;
+ *target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- SUBCHANNEL %p (NODE %p)",
+ gpr_log(GPR_DEBUG,
+ "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)",
selected->subchannel, selected);
}
/* only advance the last picked pointer if the selection was used */
@@ -335,10 +322,8 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- for (i = 0; i < p->num_subchannels; i++) {
- grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i],
- pollset);
- }
+ grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties,
+ pollset);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->pollset = pollset;
@@ -352,33 +337,25 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
- connectivity_changed_cb_arg *cb_arg = arg;
- round_robin_lb_policy *p = cb_arg->p;
- /* index over p->subchannels of this cb's subchannel */
- const size_t this_idx = cb_arg->subchannel_idx;
+ subchannel_data *sd = arg;
+ round_robin_lb_policy *p = sd->policy;
pending_pick *pp;
ready_list *selected;
int unref = 0;
- /* connectivity state of this cb's subchannel */
- grpc_connectivity_state *this_connectivity;
-
gpr_mu_lock(&p->mu);
- this_connectivity = &p->subchannel_connectivity[this_idx];
-
if (p->shutdown) {
unref = 1;
} else {
- switch (*this_connectivity) {
+ switch (sd->connectivity_state) {
case GRPC_CHANNEL_READY:
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_READY, "connecting_ready");
/* add the newly connected subchannel to the list of connected ones.
* Note that it goes to the "end of the line". */
- p->subchannel_index_to_readylist_node[this_idx] =
- add_connected_sc_locked(p, p->subchannels[this_idx]);
+ sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel);
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */
@@ -390,60 +367,60 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
- *pp->target = selected->subchannel;
+ *pp->target =
+ grpc_subchannel_get_connected_subchannel(selected->subchannel);
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
selected->subchannel, selected);
}
- grpc_subchannel_del_interested_party(exec_ctx, selected->subchannel,
- pp->pollset);
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
+ pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
}
grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[this_idx], this_connectivity,
- &p->connectivity_changed_cbs[this_idx]);
+ exec_ctx, sd->subchannel, &p->base.interested_parties,
+ &sd->connectivity_state, &sd->connectivity_changed_closure);
break;
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- *this_connectivity, "connecting_changed");
+ sd->connectivity_state,
+ "connecting_changed");
grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[this_idx], this_connectivity,
- &p->connectivity_changed_cbs[this_idx]);
+ exec_ctx, sd->subchannel, &p->base.interested_parties,
+ &sd->connectivity_state, &sd->connectivity_changed_closure);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
- del_interested_parties_locked(exec_ctx, p, this_idx);
/* renew state notification */
grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[this_idx], this_connectivity,
- &p->connectivity_changed_cbs[this_idx]);
+ exec_ctx, sd->subchannel, &p->base.interested_parties,
+ &sd->connectivity_state, &sd->connectivity_changed_closure);
/* remove from ready list if still present */
- if (p->subchannel_index_to_readylist_node[this_idx] != NULL) {
- remove_disconnected_sc_locked(
- p, p->subchannel_index_to_readylist_node[this_idx]);
- p->subchannel_index_to_readylist_node[this_idx] = NULL;
+ if (sd->ready_list_node != NULL) {
+ remove_disconnected_sc_locked(p, sd->ready_list_node);
+ sd->ready_list_node = NULL;
}
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
"connecting_transient_failure");
break;
case GRPC_CHANNEL_FATAL_FAILURE:
- del_interested_parties_locked(exec_ctx, p, this_idx);
- if (p->subchannel_index_to_readylist_node[this_idx] != NULL) {
- remove_disconnected_sc_locked(
- p, p->subchannel_index_to_readylist_node[this_idx]);
- p->subchannel_index_to_readylist_node[this_idx] = NULL;
+ if (sd->ready_list_node != NULL) {
+ remove_disconnected_sc_locked(p, sd->ready_list_node);
+ sd->ready_list_node = NULL;
}
- GPR_SWAP(grpc_subchannel *, p->subchannels[this_idx],
- p->subchannels[p->num_subchannels - 1]);
p->num_subchannels--;
- GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels],
- "round_robin");
+ GPR_SWAP(subchannel_data *, p->subchannels[sd->index],
+ p->subchannels[p->num_subchannels]);
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin");
+ p->subchannels[sd->index]->index = sd->index;
+ gpr_free(sd);
+ unref = 1;
if (p->num_subchannels == 0) {
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE,
@@ -454,7 +431,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
}
- unref = 1;
} else {
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
@@ -466,31 +442,8 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&p->mu);
if (unref) {
- GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "round_robin_connectivity");
- }
-}
-
-static void rr_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_transport_op *op) {
- round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- size_t i;
- size_t n;
- grpc_subchannel **subchannels;
-
- gpr_mu_lock(&p->mu);
- n = p->num_subchannels;
- subchannels = gpr_malloc(n * sizeof(*subchannels));
- for (i = 0; i < n; i++) {
- subchannels[i] = p->subchannels[i];
- GRPC_SUBCHANNEL_REF(subchannels[i], "rr_broadcast");
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "round_robin_connectivity");
}
- gpr_mu_unlock(&p->mu);
-
- for (i = 0; i < n; i++) {
- grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op);
- GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "rr_broadcast");
- }
- gpr_free(subchannels);
}
static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx,
@@ -516,7 +469,7 @@ static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx,
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_exit_idle,
- rr_broadcast, rr_check_connectivity, rr_notify_on_state_change};
+ rr_check_connectivity, rr_notify_on_state_change};
static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
@@ -529,27 +482,22 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory,
GPR_ASSERT(args->num_subchannels > 0);
memset(p, 0, sizeof(*p));
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable);
- p->subchannels =
- gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels);
p->num_subchannels = args->num_subchannels;
+ p->subchannels = gpr_malloc(sizeof(*p->subchannels) * p->num_subchannels);
+ memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_subchannels);
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"round_robin");
- memcpy(p->subchannels, args->subchannels,
- sizeof(grpc_subchannel *) * args->num_subchannels);
gpr_mu_init(&p->mu);
- p->connectivity_changed_cbs =
- gpr_malloc(sizeof(grpc_closure) * args->num_subchannels);
- p->subchannel_connectivity =
- gpr_malloc(sizeof(grpc_connectivity_state) * args->num_subchannels);
-
- p->cb_args =
- gpr_malloc(sizeof(connectivity_changed_cb_arg) * args->num_subchannels);
for (i = 0; i < args->num_subchannels; i++) {
- p->cb_args[i].subchannel_idx = i;
- p->cb_args[i].p = p;
- grpc_closure_init(&p->connectivity_changed_cbs[i], rr_connectivity_changed,
- &p->cb_args[i]);
+ subchannel_data *sd = gpr_malloc(sizeof(*sd));
+ memset(sd, 0, sizeof(*sd));
+ p->subchannels[i] = sd;
+ sd->policy = p;
+ sd->index = i;
+ sd->subchannel = args->subchannels[i];
+ grpc_closure_init(&sd->connectivity_changed_closure,
+ rr_connectivity_changed, sd);
}
/* The (dummy node) root of the ready list */
@@ -558,10 +506,6 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory,
p->ready_list.next = NULL;
p->ready_list_last_pick = &p->ready_list;
- p->subchannel_index_to_readylist_node =
- gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels);
- memset(p->subchannel_index_to_readylist_node, 0,
- sizeof(grpc_subchannel *) * args->num_subchannels);
return &p->base;
}
diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c
index 36a2454309..d254161546 100644
--- a/src/core/client_config/lb_policy.c
+++ b/src/core/client_config/lb_policy.c
@@ -33,59 +33,85 @@
#include "src/core/client_config/lb_policy.h"
+#define WEAK_REF_BITS 16
+
void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable) {
policy->vtable = vtable;
- gpr_ref_init(&policy->refs, 1);
+ gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS);
+ grpc_pollset_set_init(&policy->interested_parties);
}
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
-void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line,
- const char *reason) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p ref %d -> %d %s",
- policy, (int)policy->refs.count, (int)policy->refs.count + 1, reason);
+#define REF_FUNC_EXTRA_ARGS , const char *file, int line, const char *reason
+#define REF_MUTATE_EXTRA_ARGS REF_FUNC_EXTRA_ARGS, const char *purpose
+#define REF_FUNC_PASS_ARGS(new_reason) , file, line, new_reason
+#define REF_MUTATE_PASS_ARGS(purpose) , file, line, reason, purpose
#else
-void grpc_lb_policy_ref(grpc_lb_policy *policy) {
+#define REF_FUNC_EXTRA_ARGS
+#define REF_MUTATE_EXTRA_ARGS
+#define REF_FUNC_PASS_ARGS(new_reason)
+#define REF_MUTATE_PASS_ARGS(x)
#endif
- gpr_ref(&policy->refs);
-}
+static gpr_atm ref_mutate(grpc_lb_policy *c, gpr_atm delta,
+ int barrier REF_MUTATE_EXTRA_ARGS) {
+ gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta)
+ : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
-void grpc_lb_policy_unref(grpc_lb_policy *policy,
- grpc_closure_list *closure_list, const char *file,
- int line, const char *reason) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p unref %d -> %d %s",
- policy, (int)policy->refs.count, (int)policy->refs.count - 1, reason);
-#else
-void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "LB_POLICY: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val,
+ old_val + delta, reason);
#endif
- if (gpr_unref(&policy->refs)) {
- policy->vtable->destroy(exec_ctx, policy);
+ return old_val;
+}
+
+void grpc_lb_policy_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
+ ref_mutate(policy, 1 << WEAK_REF_BITS, 0 REF_MUTATE_PASS_ARGS("STRONG_REF"));
+}
+
+void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
+ gpr_atm old_val =
+ ref_mutate(policy, (gpr_atm)1 - (gpr_atm)(1 << WEAK_REF_BITS),
+ 1 REF_MUTATE_PASS_ARGS("STRONG_UNREF"));
+ gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1);
+ gpr_atm check = 1 << WEAK_REF_BITS;
+ if ((old_val & mask) == check) {
+ policy->vtable->shutdown(exec_ctx, policy);
}
+ grpc_lb_policy_weak_unref(exec_ctx,
+ policy REF_FUNC_PASS_ARGS("strong-unref"));
}
-void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
- policy->vtable->shutdown(exec_ctx, policy);
+void grpc_lb_policy_weak_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
+ ref_mutate(policy, 1, 0 REF_MUTATE_PASS_ARGS("WEAK_REF"));
+}
+
+void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
+ gpr_atm old_val =
+ ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF"));
+ if (old_val == 1) {
+ grpc_pollset_set_destroy(&policy->interested_parties);
+ policy->vtable->destroy(exec_ctx, policy);
+ }
}
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
- grpc_subchannel **target, grpc_closure *on_complete) {
+ grpc_connected_subchannel **target,
+ grpc_closure *on_complete) {
return policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata,
target, on_complete);
}
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_subchannel **target) {
+ grpc_connected_subchannel **target) {
policy->vtable->cancel_pick(exec_ctx, policy, target);
}
-void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_transport_op *op) {
- policy->vtable->broadcast(exec_ctx, policy, op);
-}
-
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
policy->vtable->exit_idle(exec_ctx, policy);
}
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index a696c3ce64..2f8d655558 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -47,7 +47,8 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel,
struct grpc_lb_policy {
const grpc_lb_policy_vtable *vtable;
- gpr_refcount refs;
+ gpr_atm ref_pair;
+ grpc_pollset_set interested_parties;
};
struct grpc_lb_policy_vtable {
@@ -58,17 +59,13 @@ struct grpc_lb_policy_vtable {
/** implement grpc_lb_policy_pick */
int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
- grpc_subchannel **target, grpc_closure *on_complete);
+ grpc_connected_subchannel **target, grpc_closure *on_complete);
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_subchannel **target);
+ grpc_connected_subchannel **target);
/** try to enter a READY connectivity state */
void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
- /** broadcast a transport op to all subchannels */
- void (*broadcast)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_transport_op *op);
-
/** check the current connectivity of the lb_policy */
grpc_connectivity_state (*check_connectivity)(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy);
@@ -81,29 +78,39 @@ struct grpc_lb_policy_vtable {
grpc_closure *closure);
};
+/*#define GRPC_LB_POLICY_REFCOUNT_DEBUG*/
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
#define GRPC_LB_POLICY_REF(p, r) \
grpc_lb_policy_ref((p), __FILE__, __LINE__, (r))
#define GRPC_LB_POLICY_UNREF(exec_ctx, p, r) \
grpc_lb_policy_unref((exec_ctx), (p), __FILE__, __LINE__, (r))
+#define GRPC_LB_POLICY_WEAK_REF(p, r) \
+ grpc_lb_policy_weak_ref((p), __FILE__, __LINE__, (r))
+#define GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, p, r) \
+ grpc_lb_policy_weak_unref((exec_ctx), (p), __FILE__, __LINE__, (r))
void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line,
const char *reason);
void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const char *file, int line, const char *reason);
+void grpc_lb_policy_weak_ref(grpc_lb_policy *policy, const char *file, int line,
+ const char *reason);
+void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ const char *file, int line, const char *reason);
#else
#define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p))
#define GRPC_LB_POLICY_UNREF(cl, p, r) grpc_lb_policy_unref((cl), (p))
+#define GRPC_LB_POLICY_WEAK_REF(p, r) grpc_lb_policy_weak_ref((p))
+#define GRPC_LB_POLICY_WEAK_UNREF(cl, p, r) grpc_lb_policy_weak_unref((cl), (p))
void grpc_lb_policy_ref(grpc_lb_policy *policy);
void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
+void grpc_lb_policy_weak_ref(grpc_lb_policy *policy);
+void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
#endif
/** called by concrete implementations to initialize the base struct */
void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable);
-/** Start shutting down (fail any pending picks) */
-void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
-
/** Given initial metadata in \a initial_metadata, find an appropriate
target for this rpc, and 'return' it by calling \a on_complete after setting
\a target.
@@ -111,13 +118,11 @@ void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
- grpc_subchannel **target, grpc_closure *on_complete);
+ grpc_connected_subchannel **target,
+ grpc_closure *on_complete);
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_subchannel **target);
-
-void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_transport_op *op);
+ grpc_connected_subchannel **target);
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
diff --git a/src/core/client_config/resolver.c b/src/core/client_config/resolver.c
index 081097eb19..eda01e72ba 100644
--- a/src/core/client_config/resolver.c
+++ b/src/core/client_config/resolver.c
@@ -71,11 +71,8 @@ void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
}
void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx,
- grpc_resolver *resolver,
- struct sockaddr *failing_address,
- int failing_address_len) {
- resolver->vtable->channel_saw_error(exec_ctx, resolver, failing_address,
- failing_address_len);
+ grpc_resolver *resolver) {
+ resolver->vtable->channel_saw_error(exec_ctx, resolver);
}
void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
diff --git a/src/core/client_config/resolver.h b/src/core/client_config/resolver.h
index 7ba0cd5bd4..e612eaf3b3 100644
--- a/src/core/client_config/resolver.h
+++ b/src/core/client_config/resolver.h
@@ -35,8 +35,8 @@
#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVER_H
#include "src/core/client_config/client_config.h"
+#include "src/core/client_config/subchannel.h"
#include "src/core/iomgr/iomgr.h"
-#include "src/core/iomgr/sockaddr.h"
typedef struct grpc_resolver grpc_resolver;
typedef struct grpc_resolver_vtable grpc_resolver_vtable;
@@ -51,9 +51,7 @@ struct grpc_resolver {
struct grpc_resolver_vtable {
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver);
- void (*channel_saw_error)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
- struct sockaddr *failing_address,
- int failing_address_len);
+ void (*channel_saw_error)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver);
void (*next)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
grpc_client_config **target_config, grpc_closure *on_complete);
};
@@ -81,9 +79,7 @@ void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver);
/** Notification that the channel has seen an error on some address.
Can be used as a hint that re-resolution is desirable soon. */
void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx,
- grpc_resolver *resolver,
- struct sockaddr *failing_address,
- int failing_address_len);
+ grpc_resolver *resolver);
/** Get the next client config. Called by the channel to fetch a new
configuration. Expected to set *target_config with a new configuration,
diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c
index b40c41544a..28ca30b946 100644
--- a/src/core/client_config/resolvers/dns_resolver.c
+++ b/src/core/client_config/resolvers/dns_resolver.c
@@ -80,9 +80,7 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
dns_resolver *r);
static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
-static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, grpc_resolver *r,
- struct sockaddr *failing_address,
- int failing_address_len);
+static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r,
grpc_client_config **target_config,
grpc_closure *on_complete);
@@ -102,8 +100,7 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
}
static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx,
- grpc_resolver *resolver, struct sockaddr *sa,
- int len) {
+ grpc_resolver *resolver) {
dns_resolver *r = (dns_resolver *)resolver;
gpr_mu_lock(&r->mu);
if (!r->resolving) {
diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c
index 0b017f06c7..fd0212a1e7 100644
--- a/src/core/client_config/resolvers/sockaddr_resolver.c
+++ b/src/core/client_config/resolvers/sockaddr_resolver.c
@@ -83,9 +83,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx,
- grpc_resolver *r,
- struct sockaddr *failing_address,
- int failing_address_len);
+ grpc_resolver *r);
static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r,
grpc_client_config **target_config,
grpc_closure *on_complete);
@@ -107,8 +105,13 @@ static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx,
}
static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx,
- grpc_resolver *resolver,
- struct sockaddr *sa, int len) {}
+ grpc_resolver *resolver) {
+ sockaddr_resolver *r = (sockaddr_resolver *)resolver;
+ gpr_mu_lock(&r->mu);
+ r->published = 0;
+ sockaddr_maybe_finish_next_locked(exec_ctx, r);
+ gpr_mu_unlock(&r->mu);
+}
static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
grpc_client_config **target_config,
diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c
index 136197d4c6..4924ca77d6 100644
--- a/src/core/client_config/resolvers/zookeeper_resolver.c
+++ b/src/core/client_config/resolvers/zookeeper_resolver.c
@@ -96,9 +96,7 @@ static void zookeeper_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
static void zookeeper_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
static void zookeeper_channel_saw_error(grpc_exec_ctx *exec_ctx,
- grpc_resolver *r,
- struct sockaddr *failing_address,
- int failing_address_len);
+ grpc_resolver *r);
static void zookeeper_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r,
grpc_client_config **target_config,
grpc_closure *on_complete);
@@ -125,8 +123,7 @@ static void zookeeper_shutdown(grpc_exec_ctx *exec_ctx,
}
static void zookeeper_channel_saw_error(grpc_exec_ctx *exec_ctx,
- grpc_resolver *resolver,
- struct sockaddr *sa, int len) {
+ grpc_resolver *resolver) {
zookeeper_resolver *r = (zookeeper_resolver *)resolver;
gpr_mu_lock(&r->mu);
if (r->resolving == 0) {
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 28496ac2c9..6631e9bae2 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -47,39 +47,44 @@
#include "src/core/transport/connectivity_state.h"
#include "src/core/transport/connectivity_state.h"
+#define INTERNAL_REF_BITS 16
+#define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
+
#define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20
#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
-typedef struct {
- /* all fields protected by subchannel->mu */
- /** refcount */
- int refs;
- /** parent subchannel */
- grpc_subchannel *subchannel;
-} connection;
+#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \
+ ((grpc_connected_subchannel *)(gpr_atm_##barrier##_load( \
+ &(subchannel)->connected_subchannel)))
typedef struct {
grpc_closure closure;
- size_t version;
grpc_subchannel *subchannel;
grpc_connectivity_state connectivity_state;
} state_watcher;
-typedef struct waiting_for_connect {
- struct waiting_for_connect *next;
- grpc_closure *notify;
- grpc_pollset *pollset;
- gpr_atm *target;
+typedef struct external_state_watcher {
grpc_subchannel *subchannel;
- grpc_closure continuation;
-} waiting_for_connect;
+ grpc_pollset_set *pollset_set;
+ grpc_closure *notify;
+ grpc_closure closure;
+ struct external_state_watcher *next;
+ struct external_state_watcher *prev;
+} external_state_watcher;
struct grpc_subchannel {
grpc_connector *connector;
+ /** refcount
+ - lower INTERNAL_REF_BITS bits are for internal references:
+ these do not keep the subchannel open.
+ - upper remaining bits are for public references: these do
+ keep the subchannel open */
+ gpr_atm ref_pair;
+
/** non-transport related channel filters */
const grpc_channel_filter **filters;
size_t num_filters;
@@ -88,15 +93,9 @@ struct grpc_subchannel {
/** address to connect to */
struct sockaddr *addr;
size_t addr_len;
+
/** initial string to send to peer */
gpr_slice initial_connect_string;
- /** master channel - the grpc_channel instance that ultimately owns
- this channel_data via its channel stack.
- We occasionally use this to bump the refcount on the master channel
- to keep ourselves alive through an asynchronous operation. */
- grpc_channel *master;
- /** have we seen a disconnection? */
- int disconnected;
/** set during connection */
grpc_connect_out_args connecting_result;
@@ -105,27 +104,24 @@ struct grpc_subchannel {
grpc_closure connected;
/** pollset_set tracking who's interested in a connection
- being setup - owned by the master channel (in particular the
- client_channel
- filter there-in) */
- grpc_pollset_set *pollset_set;
+ being setup */
+ grpc_pollset_set pollset_set;
+
+ /** active connection, or null; of type grpc_connected_subchannel */
+ gpr_atm connected_subchannel;
/** mutex protecting remaining elements */
gpr_mu mu;
- /** active connection */
- connection *active;
- /** version number for the active connection */
- size_t active_version;
- /** refcount */
- int refs;
+ /** have we seen a disconnection? */
+ int disconnected;
/** are we connecting */
int connecting;
- /** things waiting for a connection */
- waiting_for_connect *waiting;
/** connectivity state tracking */
grpc_connectivity_state_tracker state_tracker;
+ external_state_watcher root_external_state_watcher;
+
/** next connect attempt time */
gpr_timespec next_attempt;
/** amount to backoff each failure */
@@ -139,151 +135,141 @@ struct grpc_subchannel {
};
struct grpc_subchannel_call {
- connection *connection;
+ grpc_connected_subchannel *connection;
};
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
-#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
+#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)(con))
#define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \
(((grpc_subchannel_call *)(callstack)) - 1)
-static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx,
- connection *con,
- grpc_pollset *pollset);
-static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c,
- const char *reason);
-static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
int iomgr_success);
-static void subchannel_ref_locked(grpc_subchannel *c
- GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-static int subchannel_unref_locked(
- grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
-static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-static grpc_subchannel *connection_unref_locked(
- grpc_exec_ctx *exec_ctx,
- connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
-static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c);
-
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
-#define SUBCHANNEL_REF_LOCKED(p, r) \
- subchannel_ref_locked((p), __FILE__, __LINE__, (r))
-#define SUBCHANNEL_UNREF_LOCKED(p, r) \
- subchannel_unref_locked((p), __FILE__, __LINE__, (r))
-#define CONNECTION_REF_LOCKED(p, r) \
- connection_ref_locked((p), __FILE__, __LINE__, (r))
-#define CONNECTION_UNREF_LOCKED(cl, p, r) \
- connection_unref_locked((cl), (p), __FILE__, __LINE__, (r))
-#define REF_PASS_ARGS , file, line, reason
-#define REF_PASS_REASON , reason
+#define REF_REASON reason
#define REF_LOG(name, p) \
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \
- (name), (p), (p)->refs, (p)->refs + 1, reason)
+ (name), (p), (p)->refs.count, (p)->refs.count + 1, reason)
#define UNREF_LOG(name, p) \
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \
- (name), (p), (p)->refs, (p)->refs - 1, reason)
+ (name), (p), (p)->refs.count, (p)->refs.count - 1, reason)
+#define REF_MUTATE_EXTRA_ARGS \
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char *purpose
+#define REF_MUTATE_PURPOSE(x) , file, line, reason, x
#else
-#define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p))
-#define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p))
-#define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p))
-#define CONNECTION_UNREF_LOCKED(cl, p, r) connection_unref_locked((cl), (p))
-#define REF_PASS_ARGS
-#define REF_PASS_REASON
+#define REF_REASON ""
#define REF_LOG(name, p) \
do { \
} while (0)
#define UNREF_LOG(name, p) \
do { \
} while (0)
+#define REF_MUTATE_EXTRA_ARGS
+#define REF_MUTATE_PURPOSE(x)
#endif
/*
* connection implementation
*/
-static void connection_destroy(grpc_exec_ctx *exec_ctx, connection *c) {
- GPR_ASSERT(c->refs == 0);
+static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg,
+ int success) {
+ grpc_connected_subchannel *c = arg;
grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c));
gpr_free(c);
}
-static void connection_ref_locked(connection *c
- GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- REF_LOG("CONNECTION", c);
- subchannel_ref_locked(c->subchannel REF_PASS_ARGS);
- ++c->refs;
+void grpc_connected_subchannel_ref(grpc_connected_subchannel *c
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON);
}
-static grpc_subchannel *connection_unref_locked(
- grpc_exec_ctx *exec_ctx, connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- grpc_subchannel *destroy = NULL;
- UNREF_LOG("CONNECTION", c);
- if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) {
- destroy = c->subchannel;
- }
- if (--c->refs == 0 && c->subchannel->active != c) {
- connection_destroy(exec_ctx, c);
- }
- return destroy;
+void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
+ grpc_connected_subchannel *c
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c),
+ REF_REASON);
}
/*
* grpc_subchannel implementation
*/
-static void subchannel_ref_locked(grpc_subchannel *c
- GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- REF_LOG("SUBCHANNEL", c);
- ++c->refs;
+static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
+ int success) {
+ grpc_subchannel *c = arg;
+ gpr_free((void *)c->filters);
+ grpc_channel_args_destroy(c->args);
+ gpr_free(c->addr);
+ gpr_slice_unref(c->initial_connect_string);
+ grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker);
+ grpc_connector_unref(exec_ctx, c->connector);
+ grpc_pollset_set_destroy(&c->pollset_set);
+ gpr_free(c);
}
-static int subchannel_unref_locked(grpc_subchannel *c
- GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- UNREF_LOG("SUBCHANNEL", c);
- return --c->refs == 0;
+static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta,
+ int barrier REF_MUTATE_EXTRA_ARGS) {
+ gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta)
+ : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val,
+ old_val + delta, reason);
+#endif
+ return old_val;
}
void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- gpr_mu_lock(&c->mu);
- subchannel_ref_locked(c REF_PASS_ARGS);
- gpr_mu_unlock(&c->mu);
+ gpr_atm old_refs;
+ old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS),
+ 0 REF_MUTATE_PURPOSE("STRONG_REF"));
+ GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0);
}
-void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- int destroy;
- gpr_mu_lock(&c->mu);
- destroy = subchannel_unref_locked(c REF_PASS_ARGS);
- gpr_mu_unlock(&c->mu);
- if (destroy) subchannel_destroy(exec_ctx, c);
+void grpc_subchannel_weak_ref(grpc_subchannel *c
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ gpr_atm old_refs;
+ old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF"));
+ GPR_ASSERT(old_refs != 0);
}
-static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
- if (c->active != NULL) {
- connection_destroy(exec_ctx, c->active);
+static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
+ grpc_connected_subchannel *con;
+ gpr_mu_lock(&c->mu);
+ GPR_ASSERT(!c->disconnected);
+ c->disconnected = 1;
+ grpc_connector_shutdown(exec_ctx, c->connector);
+ con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
+ if (con != NULL) {
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection");
+ gpr_atm_no_barrier_store(&c->connected_subchannel, 0xdeadbeef);
}
- gpr_free((void *)c->filters);
- grpc_channel_args_destroy(c->args);
- gpr_free(c->addr);
- gpr_slice_unref(c->initial_connect_string);
- grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker);
- grpc_connector_unref(exec_ctx, c->connector);
- gpr_free(c);
+ gpr_mu_unlock(&c->mu);
}
-void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c,
- grpc_pollset *pollset) {
- grpc_pollset_set_add_pollset(exec_ctx, c->pollset_set, pollset);
+void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ gpr_atm old_refs;
+ old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS),
+ 1 REF_MUTATE_PURPOSE("STRONG_UNREF"));
+ if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) {
+ disconnect(exec_ctx, c);
+ }
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "strong-unref");
}
-void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c,
- grpc_pollset *pollset) {
- grpc_pollset_set_del_pollset(exec_ctx, c->pollset_set, pollset);
+void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ gpr_atm old_refs;
+ old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF"));
+ if (old_refs == 1) {
+ grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c),
+ 1);
+ }
}
static gpr_uint32 random_seed() {
@@ -293,10 +279,8 @@ static gpr_uint32 random_seed() {
grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
grpc_subchannel_args *args) {
grpc_subchannel *c = gpr_malloc(sizeof(*c));
- grpc_channel_element *parent_elem = grpc_channel_stack_last_element(
- grpc_channel_get_channel_stack(args->master));
memset(c, 0, sizeof(*c));
- c->refs = 1;
+ gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS);
c->connector = connector;
grpc_connector_ref(c->connector);
c->num_filters = args->filter_count;
@@ -305,13 +289,14 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
sizeof(grpc_channel_filter *) * c->num_filters);
c->addr = gpr_malloc(args->addr_len);
memcpy(c->addr, args->addr, args->addr_len);
+ grpc_pollset_set_init(&c->pollset_set);
c->addr_len = args->addr_len;
grpc_set_initial_connect_string(&c->addr, &c->addr_len,
&c->initial_connect_string);
c->args = grpc_channel_args_copy(args->args);
- c->master = args->master;
- c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem);
c->random = random_seed();
+ c->root_external_state_watcher.next = c->root_external_state_watcher.prev =
+ &c->root_external_state_watcher;
grpc_closure_init(&c->connected, subchannel_connected, c);
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
"subchannel");
@@ -319,70 +304,18 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
return c;
}
-static void cancel_waiting_calls(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *subchannel,
- int iomgr_success) {
- waiting_for_connect *w4c;
- gpr_mu_lock(&subchannel->mu);
- w4c = subchannel->waiting;
- subchannel->waiting = NULL;
- gpr_mu_unlock(&subchannel->mu);
- while (w4c != NULL) {
- waiting_for_connect *next = w4c->next;
- grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel,
- w4c->pollset);
- if (w4c->notify) {
- w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success);
- }
-
- GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
- gpr_free(w4c);
-
- w4c = next;
- }
-}
-
-void grpc_subchannel_cancel_create_call(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *subchannel,
- gpr_atm *target) {
- waiting_for_connect *w4c;
- int unref_count = 0;
- gpr_mu_lock(&subchannel->mu);
- w4c = subchannel->waiting;
- subchannel->waiting = NULL;
- while (w4c != NULL) {
- waiting_for_connect *next = w4c->next;
- if (w4c->target == target) {
- grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel,
- w4c->pollset);
- grpc_exec_ctx_enqueue(exec_ctx, w4c->notify, 0);
-
- unref_count++;
- gpr_free(w4c);
- } else {
- w4c->next = subchannel->waiting;
- subchannel->waiting = w4c;
- }
-
- w4c = next;
- }
- gpr_mu_unlock(&subchannel->mu);
-
- while (unref_count-- > 0) {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "waiting_for_connect");
- }
-}
-
static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
grpc_connect_in_args args;
- args.interested_parties = c->pollset_set;
+ args.interested_parties = &c->pollset_set;
args.addr = c->addr;
args.addr_len = c->addr_len;
args.deadline = compute_connect_deadline(c);
args.channel_args = c->args;
args.initial_connect_string = c->initial_connect_string;
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
+ GRPC_CHANNEL_CONNECTING, "state_change");
grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result,
&c->connected);
}
@@ -395,66 +328,6 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
continue_connect(exec_ctx, c);
}
-static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
- int call_creation_finished_ok;
- waiting_for_connect *w4c = arg;
- grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset);
- call_creation_finished_ok = grpc_subchannel_create_call(
- exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify);
- GPR_ASSERT(call_creation_finished_ok == 1);
- w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success);
- GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
- gpr_free(w4c);
-}
-
-int grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
- grpc_pollset *pollset, gpr_atm *target,
- grpc_closure *notify) {
- connection *con;
- grpc_subchannel_call *call;
- GPR_TIMER_BEGIN("grpc_subchannel_create_call", 0);
- gpr_mu_lock(&c->mu);
- if (c->active != NULL) {
- con = c->active;
- CONNECTION_REF_LOCKED(con, "call");
- gpr_mu_unlock(&c->mu);
-
- call = create_call(exec_ctx, con, pollset);
- if (!gpr_atm_rel_cas(target, 0, (gpr_atm)(gpr_uintptr)call)) {
- GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "failed to set");
- }
- GPR_TIMER_END("grpc_subchannel_create_call", 0);
- return 1;
- } else {
- waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
- w4c->next = c->waiting;
- w4c->notify = notify;
- w4c->pollset = pollset;
- w4c->target = target;
- w4c->subchannel = c;
- /* released when clearing w4c */
- SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect");
- grpc_closure_init(&w4c->continuation, continue_creating_call, w4c);
- c->waiting = w4c;
- grpc_subchannel_add_interested_party(exec_ctx, c, pollset);
- if (!c->connecting) {
- c->connecting = 1;
- connectivity_state_changed_locked(exec_ctx, c, "create_call");
- /* released by connection */
- SUBCHANNEL_REF_LOCKED(c, "connecting");
- GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
- gpr_mu_unlock(&c->mu);
-
- start_connect(exec_ctx, c);
- } else {
- gpr_mu_unlock(&c->mu);
- }
- GPR_TIMER_END("grpc_subchannel_create_call", 0);
- return 0;
- }
-}
-
grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
grpc_connectivity_state state;
gpr_mu_lock(&c->mu);
@@ -463,153 +336,138 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
return state;
}
-void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c,
- grpc_connectivity_state *state,
- grpc_closure *notify) {
- int do_connect = 0;
- gpr_mu_lock(&c->mu);
- if (grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &c->state_tracker, state, notify)) {
- do_connect = 1;
- c->connecting = 1;
- /* released by connection */
- SUBCHANNEL_REF_LOCKED(c, "connecting");
- GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
- connectivity_state_changed_locked(exec_ctx, c, "state_change");
+static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
+ int success) {
+ external_state_watcher *w = arg;
+ grpc_closure *follow_up = w->notify;
+ if (w->pollset_set != NULL) {
+ grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set,
+ w->pollset_set);
}
- gpr_mu_unlock(&c->mu);
-
- if (do_connect) {
- start_connect(exec_ctx, c);
- }
-}
-
-int grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c,
- grpc_closure *subscribed_notify) {
- int success;
- gpr_mu_lock(&c->mu);
- success = grpc_connectivity_state_change_unsubscribe(
- exec_ctx, &c->state_tracker, subscribed_notify);
- gpr_mu_unlock(&c->mu);
- return success;
-}
-
-void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c,
- grpc_transport_op *op) {
- connection *con = NULL;
- grpc_subchannel *destroy;
- int cancel_alarm = 0;
- gpr_mu_lock(&c->mu);
- if (c->active != NULL) {
- con = c->active;
- CONNECTION_REF_LOCKED(con, "transport-op");
- }
- if (op->disconnect) {
- c->disconnected = 1;
- connectivity_state_changed_locked(exec_ctx, c, "disconnect");
- if (c->have_alarm) {
- cancel_alarm = 1;
- }
- }
- gpr_mu_unlock(&c->mu);
-
- if (con != NULL) {
- grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
- grpc_channel_element *top_elem =
- grpc_channel_stack_element(channel_stack, 0);
- top_elem->filter->start_transport_op(exec_ctx, top_elem, op);
+ gpr_mu_lock(&w->subchannel->mu);
+ w->next->prev = w->prev;
+ w->prev->next = w->next;
+ gpr_mu_unlock(&w->subchannel->mu);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher");
+ gpr_free(w);
+ follow_up->cb(exec_ctx, follow_up->cb_arg, success);
+}
+
+void grpc_subchannel_notify_on_state_change(
+ grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
+ grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
+ grpc_closure *notify) {
+ int do_connect = 0;
+ external_state_watcher *w;
+ if (state == NULL) {
gpr_mu_lock(&c->mu);
- destroy = CONNECTION_UNREF_LOCKED(exec_ctx, con, "transport-op");
+ for (w = c->root_external_state_watcher.next;
+ w != &c->root_external_state_watcher; w = w->next) {
+ if (w->notify == notify) {
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &c->state_tracker, NULL, &w->closure);
+ }
+ }
gpr_mu_unlock(&c->mu);
- if (destroy) {
- subchannel_destroy(exec_ctx, destroy);
+ } else {
+ w = gpr_malloc(sizeof(*w));
+ w->subchannel = c;
+ w->pollset_set = interested_parties;
+ w->notify = notify;
+ grpc_closure_init(&w->closure, on_external_state_watcher_done, w);
+ if (interested_parties != NULL) {
+ grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set,
+ interested_parties);
}
+ GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher");
+ gpr_mu_lock(&c->mu);
+ w->next = &c->root_external_state_watcher;
+ w->prev = w->next->prev;
+ w->next->prev = w->prev->next = w;
+ if (grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &c->state_tracker, state, &w->closure)) {
+ do_connect = 1;
+ c->connecting = 1;
+ /* released by connection */
+ GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
+ }
+ gpr_mu_unlock(&c->mu);
}
- if (cancel_alarm) {
- grpc_timer_cancel(exec_ctx, &c->alarm);
+ if (do_connect) {
+ start_connect(exec_ctx, c);
}
+}
- if (op->disconnect) {
- grpc_connector_shutdown(exec_ctx, c->connector);
- }
+void grpc_connected_subchannel_process_transport_op(
+ grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
+ grpc_transport_op *op) {
+ grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
+ grpc_channel_element *top_elem = grpc_channel_stack_element(channel_stack, 0);
+ top_elem->filter->start_transport_op(exec_ctx, top_elem, op);
}
-static void on_state_changed(grpc_exec_ctx *exec_ctx, void *p,
- int iomgr_success) {
+static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
+ int iomgr_success) {
state_watcher *sw = p;
grpc_subchannel *c = sw->subchannel;
gpr_mu *mu = &c->mu;
- int destroy;
- grpc_transport_op op;
- grpc_channel_element *elem;
- connection *destroy_connection = NULL;
gpr_mu_lock(mu);
- /* if we failed or there is a version number mismatch, just leave
- this closure */
- if (!iomgr_success || sw->subchannel->active_version != sw->version) {
- goto done;
- }
-
- switch (sw->connectivity_state) {
- case GRPC_CHANNEL_CONNECTING:
- case GRPC_CHANNEL_READY:
- case GRPC_CHANNEL_IDLE:
- /* all is still good: keep watching */
- memset(&op, 0, sizeof(op));
- op.connectivity_state = &sw->connectivity_state;
- op.on_connectivity_state_change = &sw->closure;
- elem = grpc_channel_stack_element(
- CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
- elem->filter->start_transport_op(exec_ctx, elem, &op);
- /* early out */
- gpr_mu_unlock(mu);
- return;
- case GRPC_CHANNEL_FATAL_FAILURE:
- case GRPC_CHANNEL_TRANSIENT_FAILURE:
- /* things have gone wrong, deactivate and enter idle */
- if (sw->subchannel->active->refs == 0) {
- destroy_connection = sw->subchannel->active;
- }
- sw->subchannel->active = NULL;
- grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
- c->disconnected
- ? GRPC_CHANNEL_FATAL_FAILURE
- : GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connection_failed");
- break;
+ /* if we failed just leave this closure */
+ if (iomgr_success) {
+ if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ /* any errors on a subchannel ==> we're done, create a new one */
+ sw->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE;
+ }
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
+ sw->connectivity_state, "reflect_child");
+ if (sw->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), NULL,
+ &sw->connectivity_state, &sw->closure);
+ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
+ sw = NULL;
+ }
}
-done:
- connectivity_state_changed_locked(exec_ctx, c, "transport_state_changed");
- destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
- gpr_free(sw);
gpr_mu_unlock(mu);
- if (destroy) {
- subchannel_destroy(exec_ctx, c);
- }
- if (destroy_connection != NULL) {
- connection_destroy(exec_ctx, destroy_connection);
- }
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "state_watcher");
+ gpr_free(sw);
+}
+
+static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx,
+ grpc_connected_subchannel *con,
+ grpc_pollset_set *interested_parties,
+ grpc_connectivity_state *state,
+ grpc_closure *closure) {
+ grpc_transport_op op;
+ grpc_channel_element *elem;
+ memset(&op, 0, sizeof(op));
+ op.connectivity_state = state;
+ op.on_connectivity_state_change = closure;
+ op.bind_pollset_set = interested_parties;
+ elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
+ elem->filter->start_transport_op(exec_ctx, elem, &op);
+}
+
+void grpc_connected_subchannel_notify_on_state_change(
+ grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
+ grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
+ grpc_closure *closure) {
+ connected_subchannel_state_op(exec_ctx, con, interested_parties, state,
+ closure);
}
static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
size_t channel_stack_size;
- connection *con;
+ grpc_connected_subchannel *con;
grpc_channel_stack *stk;
size_t num_filters;
const grpc_channel_filter **filters;
- waiting_for_connect *w4c;
- grpc_transport_op op;
- state_watcher *sw;
- connection *destroy_connection = NULL;
- grpc_channel_element *elem;
+ state_watcher *sw_subchannel;
/* build final filter list */
num_filters = c->num_filters + c->connecting_result.num_filters + 1;
@@ -621,74 +479,52 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
/* construct channel stack */
channel_stack_size = grpc_channel_stack_size(filters, num_filters);
- con = gpr_malloc(sizeof(connection) + channel_stack_size);
- stk = (grpc_channel_stack *)(con + 1);
- con->refs = 0;
- con->subchannel = c;
- grpc_channel_stack_init(exec_ctx, filters, num_filters, c->master, c->args,
- stk);
+ con = gpr_malloc(channel_stack_size);
+ stk = CHANNEL_STACK_FROM_CONNECTION(con);
+ grpc_channel_stack_init(exec_ctx, 1, connection_destroy, con, filters,
+ num_filters, c->args, "CONNECTED_SUBCHANNEL", stk);
grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
gpr_free((void *)c->connecting_result.filters);
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
/* initialize state watcher */
- sw = gpr_malloc(sizeof(*sw));
- grpc_closure_init(&sw->closure, on_state_changed, sw);
- sw->subchannel = c;
- sw->connectivity_state = GRPC_CHANNEL_READY;
+ sw_subchannel = gpr_malloc(sizeof(*sw_subchannel));
+ sw_subchannel->subchannel = c;
+ sw_subchannel->connectivity_state = GRPC_CHANNEL_READY;
+ grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed,
+ sw_subchannel);
gpr_mu_lock(&c->mu);
if (c->disconnected) {
gpr_mu_unlock(&c->mu);
- gpr_free(sw);
+ gpr_free(sw_subchannel);
gpr_free((void *)filters);
grpc_channel_stack_destroy(exec_ctx, stk);
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
- GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
+ gpr_free(con);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
return;
}
/* publish */
- if (c->active != NULL && c->active->refs == 0) {
- destroy_connection = c->active;
- }
- c->active = con;
- c->active_version++;
- sw->version = c->active_version;
+ GPR_ASSERT(gpr_atm_no_barrier_cas(&c->connected_subchannel, 0, (gpr_atm)con));
c->connecting = 0;
- /* watch for changes; subchannel ref for connecting is donated
+ /* setup subchannel watching connected subchannel for changes; subchannel ref
+ for connecting is donated
to the state watcher */
- memset(&op, 0, sizeof(op));
- op.connectivity_state = &sw->connectivity_state;
- op.on_connectivity_state_change = &sw->closure;
- op.bind_pollset_set = c->pollset_set;
- SUBCHANNEL_REF_LOCKED(c, "state_watcher");
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
- GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting"));
- elem =
- grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
- elem->filter->start_transport_op(exec_ctx, elem, &op);
+ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, con, &c->pollset_set, &sw_subchannel->connectivity_state,
+ &sw_subchannel->closure);
/* signal completion */
- connectivity_state_changed_locked(exec_ctx, c, "connected");
- w4c = c->waiting;
- c->waiting = NULL;
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY,
+ "connected");
gpr_mu_unlock(&c->mu);
-
- while (w4c != NULL) {
- waiting_for_connect *next = w4c->next;
- grpc_exec_ctx_enqueue(exec_ctx, &w4c->continuation, 1);
- w4c = next;
- }
-
gpr_free((void *)filters);
-
- if (destroy_connection != NULL) {
- connection_destroy(exec_ctx, destroy_connection);
- }
}
/* Generate a random number between 0 and 1. */
@@ -742,29 +578,31 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) {
if (c->disconnected) {
iomgr_success = 0;
}
- connectivity_state_changed_locked(exec_ctx, c, "alarm");
gpr_mu_unlock(&c->mu);
if (iomgr_success) {
update_reconnect_parameters(c);
continue_connect(exec_ctx, c);
} else {
- cancel_waiting_calls(exec_ctx, c, iomgr_success);
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
- GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
}
}
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
grpc_subchannel *c = arg;
+
if (c->connecting_result.transport != NULL) {
publish_transport(exec_ctx, c);
+ } else if (c->disconnected) {
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->have_alarm);
c->have_alarm = 1;
- connectivity_state_changed_locked(exec_ctx, c, "connect_failed");
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ "connect_failed");
grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
gpr_mu_unlock(&c->mu);
}
@@ -781,29 +619,6 @@ static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
: min_deadline;
}
-static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
- if (c->disconnected) {
- return GRPC_CHANNEL_FATAL_FAILURE;
- }
- if (c->connecting) {
- if (c->have_alarm) {
- return GRPC_CHANNEL_TRANSIENT_FAILURE;
- }
- return GRPC_CHANNEL_CONNECTING;
- }
- if (c->active) {
- return GRPC_CHANNEL_READY;
- }
- return GRPC_CHANNEL_IDLE;
-}
-
-static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c,
- const char *reason) {
- grpc_connectivity_state current = compute_connectivity_locked(c);
- grpc_connectivity_state_set(exec_ctx, &c->state_tracker, current, reason);
-}
-
/*
* grpc_subchannel_call implementation
*/
@@ -811,37 +626,22 @@ static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx,
static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
int success) {
grpc_subchannel_call *c = call;
- gpr_mu *mu = &c->connection->subchannel->mu;
- grpc_subchannel *destroy;
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
- gpr_mu_lock(mu);
- destroy = CONNECTION_UNREF_LOCKED(exec_ctx, c->connection, "call");
- gpr_mu_unlock(mu);
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, c->connection, "subchannel_call");
gpr_free(c);
- if (destroy != NULL) {
- subchannel_destroy(exec_ctx, destroy);
- }
GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
}
void grpc_subchannel_call_ref(grpc_subchannel_call *c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
-#ifdef GRPC_STREAM_REFCOUNT_DEBUG
- grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c), reason);
-#else
- grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c));
-#endif
+ GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
}
void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
-#ifdef GRPC_STREAM_REFCOUNT_DEBUG
- grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), reason);
-#else
- grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
-#endif
+ GRPC_CALL_STACK_UNREF(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
}
char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
@@ -859,24 +659,26 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op);
}
-static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx,
- connection *con,
- grpc_pollset *pollset) {
+grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
+ grpc_subchannel *c) {
+ return GET_CONNECTED_SUBCHANNEL(c, acq);
+}
+
+grpc_subchannel_call *grpc_connected_subchannel_create_call(
+ grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
+ grpc_pollset *pollset) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_subchannel_call *call =
gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
call->connection = con;
+ GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, call,
NULL, NULL, callstk);
grpc_call_stack_set_pollset(exec_ctx, callstk, pollset);
return call;
}
-grpc_channel *grpc_subchannel_get_master(grpc_subchannel *subchannel) {
- return subchannel->master;
-}
-
grpc_call_stack *grpc_subchannel_call_get_call_stack(
grpc_subchannel_call *subchannel_call) {
return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call);
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 85ea3739e4..74ebcecfba 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -41,6 +41,7 @@
/** A (sub-)channel that knows how to connect to exactly one target
address. Provides a target for load balancing. */
typedef struct grpc_subchannel grpc_subchannel;
+typedef struct grpc_connected_subchannel grpc_connected_subchannel;
typedef struct grpc_subchannel_call grpc_subchannel_call;
typedef struct grpc_subchannel_args grpc_subchannel_args;
@@ -49,6 +50,14 @@ typedef struct grpc_subchannel_args grpc_subchannel_args;
grpc_subchannel_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_UNREF(cl, p, r) \
grpc_subchannel_unref((cl), (p), __FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_WEAK_REF(p, r) \
+ grpc_subchannel_weak_ref((p), __FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_WEAK_UNREF(cl, p, r) \
+ grpc_subchannel_weak_unref((cl), (p), __FILE__, __LINE__, (r))
+#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) \
+ grpc_connected_subchannel_ref((p), __FILE__, __LINE__, (r))
+#define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) \
+ grpc_connected_subchannel_unref((cl), (p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) \
grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_UNREF(cl, p, r) \
@@ -58,6 +67,12 @@ typedef struct grpc_subchannel_args grpc_subchannel_args;
#else
#define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p))
#define GRPC_SUBCHANNEL_UNREF(cl, p, r) grpc_subchannel_unref((cl), (p))
+#define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p))
+#define GRPC_SUBCHANNEL_WEAK_UNREF(cl, p, r) \
+ grpc_subchannel_weak_unref((cl), (p))
+#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p))
+#define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) \
+ grpc_connected_subchannel_unref((cl), (p))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p))
#define GRPC_SUBCHANNEL_CALL_UNREF(cl, p, r) \
grpc_subchannel_call_unref((cl), (p))
@@ -69,33 +84,31 @@ void grpc_subchannel_ref(grpc_subchannel *channel
void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+void grpc_subchannel_weak_ref(grpc_subchannel *channel
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *channel
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+void grpc_connected_subchannel_ref(grpc_connected_subchannel *channel
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
+ grpc_connected_subchannel *channel
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_ref(grpc_subchannel_call *call
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *call
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-/** construct a subchannel call (possibly asynchronously).
- *
- * If the returned status is 1, the call will return immediately and \a target
- * will point to a connected \a subchannel_call instance. Note that \a notify
- * will \em not be invoked in this case.
- * Otherwise, if the returned status is 0, the subchannel call will be created
- * asynchronously, invoking the \a notify callback upon completion. */
-int grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *subchannel,
- grpc_pollset *pollset, gpr_atm *target,
- grpc_closure *notify);
-
-/** cancel \a call in the waiting state. */
-void grpc_subchannel_cancel_create_call(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *subchannel,
- gpr_atm *target);
+/** construct a subchannel call */
+grpc_subchannel_call *grpc_connected_subchannel_create_call(
+ grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel,
+ grpc_pollset *pollset);
/** process a transport level op */
-void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *subchannel,
- grpc_transport_op *op);
+void grpc_connected_subchannel_process_transport_op(
+ grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *subchannel,
+ grpc_transport_op *op);
/** poll the current connectivity state of a channel */
grpc_connectivity_state grpc_subchannel_check_connectivity(
@@ -103,26 +116,19 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the channel */
-void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *channel,
- grpc_connectivity_state *state,
- grpc_closure *notify);
-
-/** Remove \a subscribed_notify from the list of closures to be called on a
- * state change if present, returning 1. Otherwise, nothing is done and return
- * 0. */
-int grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *channel,
- grpc_closure *subscribed_notify);
-
-/** express interest in \a channel's activities through \a pollset. */
-void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *channel,
- grpc_pollset *pollset);
-/** stop following \a channel's activity through \a pollset. */
-void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *channel,
- grpc_pollset *pollset);
+void grpc_subchannel_notify_on_state_change(
+ grpc_exec_ctx *exec_ctx, grpc_subchannel *channel,
+ grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
+ grpc_closure *notify);
+void grpc_connected_subchannel_notify_on_state_change(
+ grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel,
+ grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
+ grpc_closure *notify);
+
+/** retrieve the grpc_connected_subchannel - or NULL if called before
+ the subchannel becomes connected */
+grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
+ grpc_subchannel *subchannel);
/** continue processing a transport op */
void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
@@ -147,15 +153,10 @@ struct grpc_subchannel_args {
/** Address to connect to */
struct sockaddr *addr;
size_t addr_len;
- /** master channel */
- grpc_channel *master;
};
/** create a subchannel given a connector */
grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
grpc_subchannel_args *args);
-/** Return the master channel associated with the subchannel */
-grpc_channel *grpc_subchannel_get_master(grpc_subchannel *subchannel);
-
#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_H */
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 2be0ea235f..00710d83bd 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -43,6 +43,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#define CLOSURE_NOT_READY ((grpc_closure *)0)
@@ -158,7 +159,10 @@ void grpc_fd_global_shutdown(void) {
grpc_fd *grpc_fd_create(int fd, const char *name) {
grpc_fd *r = alloc_fd(fd);
- grpc_iomgr_register_object(&r->iomgr_object, name);
+ char *name2;
+ gpr_asprintf(&name2, "%s fd=%d", name, fd);
+ grpc_iomgr_register_object(&r->iomgr_object, name2);
+ gpr_free(name2);
#ifdef GRPC_FD_REF_COUNT_DEBUG
gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name);
#endif
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index d628ef3aaf..df4eb64d4c 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -170,6 +170,7 @@ void grpc_fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd);
void grpc_fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd);
/* Reference counting for fds */
+/*#define GRPC_FD_REF_COUNT_DEBUG*/
#ifdef GRPC_FD_REF_COUNT_DEBUG
void grpc_fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
void grpc_fd_unref(grpc_fd *fd, const char *reason, const char *file, int line);
diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h
index 0fdcba01a4..09c04438f7 100644
--- a/src/core/iomgr/pollset_set.h
+++ b/src/core/iomgr/pollset_set.h
@@ -49,13 +49,19 @@
#include "src/core/iomgr/pollset_set_windows.h"
#endif
-void grpc_pollset_set_init(grpc_pollset_set* pollset_set);
-void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set);
-void grpc_pollset_set_add_pollset(grpc_exec_ctx* exec_ctx,
- grpc_pollset_set* pollset_set,
- grpc_pollset* pollset);
-void grpc_pollset_set_del_pollset(grpc_exec_ctx* exec_ctx,
- grpc_pollset_set* pollset_set,
- grpc_pollset* pollset);
+void grpc_pollset_set_init(grpc_pollset_set *pollset_set);
+void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set);
+void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set,
+ grpc_pollset *pollset);
+void grpc_pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set,
+ grpc_pollset *pollset);
+void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item);
+void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item);
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */
diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c
index c86ed3d5da..4ec92202e3 100644
--- a/src/core/iomgr/pollset_set_posix.c
+++ b/src/core/iomgr/pollset_set_posix.c
@@ -52,9 +52,10 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) {
size_t i;
gpr_mu_destroy(&pollset_set->mu);
for (i = 0; i < pollset_set->fd_count; i++) {
- GRPC_FD_UNREF(pollset_set->fds[i], "pollset");
+ GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
}
gpr_free(pollset_set->pollsets);
+ gpr_free(pollset_set->pollset_sets);
gpr_free(pollset_set->fds);
}
@@ -73,7 +74,7 @@ void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
if (grpc_fd_is_orphaned(pollset_set->fds[i])) {
- GRPC_FD_UNREF(pollset_set->fds[i], "pollset");
+ GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
} else {
grpc_pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
pollset_set->fds[j++] = pollset_set->fds[i];
@@ -99,6 +100,46 @@ void grpc_pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
gpr_mu_unlock(&pollset_set->mu);
}
+void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item) {
+ size_t i, j;
+ gpr_mu_lock(&bag->mu);
+ if (bag->pollset_set_count == bag->pollset_set_capacity) {
+ bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
+ bag->pollset_sets =
+ gpr_realloc(bag->pollset_sets,
+ bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
+ }
+ bag->pollset_sets[bag->pollset_set_count++] = item;
+ for (i = 0, j = 0; i < bag->fd_count; i++) {
+ if (grpc_fd_is_orphaned(bag->fds[i])) {
+ GRPC_FD_UNREF(bag->fds[i], "pollset_set");
+ } else {
+ grpc_pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
+ bag->fds[j++] = bag->fds[i];
+ }
+ }
+ bag->fd_count = j;
+ gpr_mu_unlock(&bag->mu);
+}
+
+void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item) {
+ size_t i;
+ gpr_mu_lock(&bag->mu);
+ for (i = 0; i < bag->pollset_set_count; i++) {
+ if (bag->pollset_sets[i] == item) {
+ bag->pollset_set_count--;
+ GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
+ bag->pollset_sets[bag->pollset_set_count]);
+ break;
+ }
+ }
+ gpr_mu_unlock(&bag->mu);
+}
+
void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pollset_set, grpc_fd *fd) {
size_t i;
@@ -113,6 +154,9 @@ void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
for (i = 0; i < pollset_set->pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
}
+ for (i = 0; i < pollset_set->pollset_set_count; i++) {
+ grpc_pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
+ }
gpr_mu_unlock(&pollset_set->mu);
}
@@ -129,6 +173,9 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
break;
}
}
+ for (i = 0; i < pollset_set->pollset_set_count; i++) {
+ grpc_pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
+ }
gpr_mu_unlock(&pollset_set->mu);
}
diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h
index 05234fb642..4820a61e4b 100644
--- a/src/core/iomgr/pollset_set_posix.h
+++ b/src/core/iomgr/pollset_set_posix.h
@@ -44,6 +44,10 @@ typedef struct grpc_pollset_set {
size_t pollset_capacity;
grpc_pollset **pollsets;
+ size_t pollset_set_count;
+ size_t pollset_set_capacity;
+ struct grpc_pollset_set **pollset_sets;
+
size_t fd_count;
size_t fd_capacity;
grpc_fd **fds;
diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c
index 53d5d3dcd4..157b46ec32 100644
--- a/src/core/iomgr/pollset_set_windows.c
+++ b/src/core/iomgr/pollset_set_windows.c
@@ -49,4 +49,12 @@ void grpc_pollset_set_del_pollset(grpc_exec_ctx* exec_ctx,
grpc_pollset_set* pollset_set,
grpc_pollset* pollset) {}
+void grpc_pollset_set_add_pollset_set(grpc_exec_ctx* exec_ctx,
+ grpc_pollset_set* bag,
+ grpc_pollset_set* item) {}
+
+void grpc_pollset_set_del_pollset_set(grpc_exec_ctx* exec_ctx,
+ grpc_pollset_set* bag,
+ grpc_pollset_set* item) {}
+
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c
index 543c75044b..38a6710cd4 100644
--- a/src/core/security/credentials.c
+++ b/src/core/security/credentials.c
@@ -39,7 +39,7 @@
#include "src/core/channel/channel_args.h"
#include "src/core/channel/http_client_filter.h"
#include "src/core/httpcli/httpcli.h"
-#include "src/core/iomgr/iomgr.h"
+#include "src/core/iomgr/executor.h"
#include "src/core/json/json.h"
#include "src/core/support/string.h"
#include "src/core/surface/api_trace.h"
@@ -48,7 +48,6 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
-#include <grpc/support/thd.h>
#include <grpc/support/time.h>
/* -- Common. -- */
@@ -792,15 +791,14 @@ static void md_only_test_destruct(grpc_call_credentials *creds) {
grpc_credentials_md_store_unref(c->md_store);
}
-static void on_simulated_token_fetch_done(void *user_data) {
+static void on_simulated_token_fetch_done(grpc_exec_ctx *exec_ctx,
+ void *user_data, int success) {
grpc_credentials_metadata_request *r =
(grpc_credentials_metadata_request *)user_data;
grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)r->creds;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- r->cb(&exec_ctx, r->user_data, c->md_store->entries, c->md_store->num_entries,
+ r->cb(exec_ctx, r->user_data, c->md_store->entries, c->md_store->num_entries,
GRPC_CREDENTIALS_OK);
grpc_credentials_metadata_request_destroy(r);
- grpc_exec_ctx_finish(&exec_ctx);
}
static void md_only_test_get_request_metadata(
@@ -810,10 +808,10 @@ static void md_only_test_get_request_metadata(
grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)creds;
if (c->is_async) {
- gpr_thd_id thd_id;
grpc_credentials_metadata_request *cb_arg =
grpc_credentials_metadata_request_create(creds, cb, user_data);
- gpr_thd_new(&thd_id, on_simulated_token_fetch_done, cb_arg, NULL);
+ grpc_executor_enqueue(
+ grpc_closure_create(on_simulated_token_fetch_done, cb_arg), 1);
} else {
cb(exec_ctx, user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK);
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 84b9daaa28..5d064ef00d 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -336,26 +336,19 @@ void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_cq_pollset(cq));
}
-grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) {
- return call->cq;
-}
-
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
-void grpc_call_internal_ref(grpc_call *c, const char *reason) {
- grpc_call_stack_ref(CALL_STACK_FROM_CALL(c), reason);
-}
-void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c,
- const char *reason) {
- grpc_call_stack_unref(exec_ctx, CALL_STACK_FROM_CALL(c), reason);
-}
+#define REF_REASON reason
+#define REF_ARG , const char *reason
#else
-void grpc_call_internal_ref(grpc_call *c) {
- grpc_call_stack_ref(CALL_STACK_FROM_CALL(c));
+#define REF_REASON ""
+#define REF_ARG
+#endif
+void grpc_call_internal_ref(grpc_call *c REF_ARG) {
+ GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
}
-void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c) {
- grpc_call_stack_unref(exec_ctx, CALL_STACK_FROM_CALL(c));
+void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) {
+ GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON);
}
-#endif
static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, int success) {
size_t i;
@@ -742,8 +735,15 @@ static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
char *grpc_call_get_peer(grpc_call *call) {
grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- char *result = elem->filter->get_peer(&exec_ctx, elem);
+ char *result;
GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call));
+ result = elem->filter->get_peer(&exec_ctx, elem);
+ if (result == NULL) {
+ result = grpc_channel_get_target(call->channel);
+ }
+ if (result == NULL) {
+ result = gpr_strdup("unknown");
+ }
grpc_exec_ctx_finish(&exec_ctx);
return result;
}
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 20907ac6d6..b53340df8e 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -58,7 +58,6 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_completion_queue *cq);
-grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call);
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
void grpc_call_internal_ref(grpc_call *call, const char *reason);
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 14fe97c30d..573a0e742f 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -63,7 +63,6 @@ typedef struct registered_call {
struct grpc_channel {
int is_client;
- gpr_refcount refs;
gpr_uint32 max_message_length;
grpc_mdelem *default_authority;
@@ -81,6 +80,8 @@ struct grpc_channel {
/* the protobuf library will (by default) start warning at 100megs */
#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
+static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, int success);
+
grpc_channel *grpc_channel_create_from_filters(
grpc_exec_ctx *exec_ctx, const char *target,
const grpc_channel_filter **filters, size_t num_filters,
@@ -93,8 +94,6 @@ grpc_channel *grpc_channel_create_from_filters(
channel->target = gpr_strdup(target);
GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
channel->is_client = is_client;
- /* decremented by grpc_channel_destroy */
- gpr_ref_init(&channel->refs, 1);
gpr_mu_init(&channel->registered_call_mu);
channel->registered_calls = NULL;
@@ -153,7 +152,9 @@ grpc_channel *grpc_channel_create_from_filters(
gpr_free(default_authority);
}
- grpc_channel_stack_init(exec_ctx, filters, num_filters, channel, args,
+ grpc_channel_stack_init(exec_ctx, 1, destroy_channel, channel, filters,
+ num_filters, args,
+ is_client ? "CLIENT_CHANNEL" : "SERVER_CHANNEL",
CHANNEL_STACK_FROM_CHANNEL(channel));
return channel;
@@ -250,17 +251,25 @@ grpc_call *grpc_channel_create_registered_call(
rc->authority ? GRPC_MDELEM_REF(rc->authority) : NULL, deadline);
}
-#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
-void grpc_channel_internal_ref(grpc_channel *c, const char *reason) {
- gpr_log(GPR_DEBUG, "CHANNEL: ref %p %d -> %d [%s]", c, c->refs.count,
- c->refs.count + 1, reason);
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+#define REF_REASON reason
+#define REF_ARG , const char *reason
#else
-void grpc_channel_internal_ref(grpc_channel *c) {
+#define REF_REASON ""
+#define REF_ARG
#endif
- gpr_ref(&c->refs);
+void grpc_channel_internal_ref(grpc_channel *c REF_ARG) {
+ GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CHANNEL(c), REF_REASON);
}
-static void destroy_channel(grpc_exec_ctx *exec_ctx, grpc_channel *channel) {
+void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx,
+ grpc_channel *c REF_ARG) {
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(c), REF_REASON);
+}
+
+static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
+ grpc_channel *channel = arg;
grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(channel));
while (channel->registered_calls) {
registered_call *rc = channel->registered_calls;
@@ -279,20 +288,6 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, grpc_channel *channel) {
gpr_free(channel);
}
-#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
-void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
- const char *reason) {
- gpr_log(GPR_DEBUG, "CHANNEL: unref %p %d -> %d [%s]", channel,
- channel->refs.count, channel->refs.count - 1, reason);
-#else
-void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx,
- grpc_channel *channel) {
-#endif
- if (gpr_unref(&channel->refs)) {
- destroy_channel(exec_ctx, channel);
- }
-}
-
void grpc_channel_destroy(grpc_channel *channel) {
grpc_transport_op op;
grpc_channel_element *elem;
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index 7dea609ebc..3d2ff23542 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -53,7 +53,7 @@ grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel,
int status_code);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
-#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
void grpc_channel_internal_ref(grpc_channel *channel, const char *reason);
void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
const char *reason);
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index df2774b527..ad1c9b334e 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -83,7 +83,6 @@ typedef struct {
gpr_mu mu;
callback_phase phase;
int success;
- int removed;
grpc_closure on_complete;
grpc_timer alarm;
grpc_connectivity_state state;
@@ -135,30 +134,15 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
int due_to_completion) {
int delete = 0;
- grpc_channel_element *client_channel_elem = NULL;
- gpr_mu_lock(&w->mu);
- if (w->removed == 0) {
- w->removed = 1;
- client_channel_elem = grpc_channel_stack_last_element(
- grpc_channel_get_channel_stack(w->channel));
- if (client_channel_elem->filter == &grpc_client_channel_filter) {
- grpc_client_channel_del_interested_party(exec_ctx, client_channel_elem,
- grpc_cq_pollset(w->cq));
- } else {
- grpc_client_uchannel_del_interested_party(exec_ctx, client_channel_elem,
- grpc_cq_pollset(w->cq));
- }
- }
- gpr_mu_unlock(&w->mu);
if (due_to_completion) {
- gpr_mu_lock(&w->mu);
- w->success = 1;
- gpr_mu_unlock(&w->mu);
grpc_timer_cancel(exec_ctx, &w->alarm);
}
gpr_mu_lock(&w->mu);
+ if (due_to_completion) {
+ w->success = 1;
+ }
switch (w->phase) {
case WAITING:
w->phase = CALLING_BACK;
@@ -212,7 +196,6 @@ void grpc_channel_watch_connectivity_state(
w->phase = WAITING;
w->state = last_observed_state;
w->success = 0;
- w->removed = 0;
w->cq = cq;
w->tag = tag;
w->channel = channel;
@@ -223,16 +206,14 @@ void grpc_channel_watch_connectivity_state(
if (client_channel_elem->filter == &grpc_client_channel_filter) {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
- grpc_client_channel_add_interested_party(&exec_ctx, client_channel_elem,
- grpc_cq_pollset(cq));
grpc_client_channel_watch_connectivity_state(&exec_ctx, client_channel_elem,
- &w->state, &w->on_complete);
+ grpc_cq_pollset(cq), &w->state,
+ &w->on_complete);
} else if (client_channel_elem->filter == &grpc_client_uchannel_filter) {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_uchannel_connectivity");
- grpc_client_uchannel_add_interested_party(&exec_ctx, client_channel_elem,
- grpc_cq_pollset(cq));
grpc_client_uchannel_watch_connectivity_state(
- &exec_ctx, client_channel_elem, &w->state, &w->on_complete);
+ &exec_ctx, client_channel_elem, grpc_cq_pollset(cq), &w->state,
+ &w->on_complete);
}
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index fe7e1072ac..97ec23408f 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -171,7 +171,6 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
c->base.vtable = &connector_vtable;
gpr_ref_init(&c->refs, 1);
args->args = final_args;
- args->master = f->master;
s = grpc_subchannel_create(&c->base, args);
grpc_connector_unref(exec_ctx, &c->base);
grpc_channel_args_destroy(final_args);
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 4a55544ac1..a60e9d20da 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -49,7 +49,6 @@ typedef struct {
} call_data;
typedef struct {
- grpc_channel *master;
grpc_status_code error_code;
const char *error_message;
} channel_data;
@@ -84,8 +83,7 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
}
static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- channel_data *chand = elem->channel_data;
- return grpc_channel_get_target(chand->master);
+ return NULL;
}
static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
@@ -111,10 +109,8 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
- channel_data *chand = elem->channel_data;
GPR_ASSERT(args->is_first);
GPR_ASSERT(args->is_last);
- chand->master = args->master;
}
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index c9a54d9237..863c905ea3 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -228,7 +228,6 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
gpr_mu_init(&c->mu);
gpr_ref_init(&c->refs, 1);
args->args = final_args;
- args->master = f->master;
s = grpc_subchannel_create(&c->base, args);
grpc_connector_unref(exec_ctx, &c->base);
grpc_channel_args_destroy(final_args);
diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c
index c7811a6d88..e362bb4376 100644
--- a/src/core/surface/server_create.c
+++ b/src/core/surface/server_create.c
@@ -32,14 +32,21 @@
*/
#include <grpc/grpc.h>
+#include "src/core/census/grpc_filter.h"
+#include "src/core/channel/channel_args.h"
+#include "src/core/channel/compress_filter.h"
#include "src/core/surface/api_trace.h"
#include "src/core/surface/completion_queue.h"
#include "src/core/surface/server.h"
-#include "src/core/channel/compress_filter.h"
grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
- const grpc_channel_filter *filters[] = {&grpc_compress_filter};
+ const grpc_channel_filter *filters[3];
+ size_t num_filters = 0;
+ filters[num_filters++] = &grpc_compress_filter;
+ if (grpc_channel_args_is_census_enabled(args)) {
+ filters[num_filters++] = &grpc_server_census_filter;
+ }
GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
- return grpc_server_create_from_filters(filters, GPR_ARRAY_SIZE(filters),
+ return grpc_server_create_from_filters(filters, num_filters,
args);
}
diff --git a/src/core/transport/chttp2/hpack_encoder.c b/src/core/transport/chttp2/hpack_encoder.c
index 7575031e58..06f603db9c 100644
--- a/src/core/transport/chttp2/hpack_encoder.c
+++ b/src/core/transport/chttp2/hpack_encoder.c
@@ -365,10 +365,9 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
GPR_ASSERT(GPR_SLICE_LENGTH(elem->key->slice) > 0);
if (GPR_SLICE_START_PTR(elem->key->slice)[0] != ':') { /* regular header */
st->seen_regular_header = 1;
- } else if (st->seen_regular_header != 0) { /* reserved header */
- gpr_log(GPR_ERROR,
- "Reserved header (colon-prefixed) happening after regular ones.");
- abort();
+ } else {
+ GPR_ASSERT(st->seen_regular_header == 0 &&
+ "Reserved header (colon-prefixed) happening after regular ones.");
}
inc_filter(HASH_FRAGMENT_1(elem_hash), &c->filter_elems_sum, c->filter_elems);
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 3e88f69abf..6ba9db8348 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -910,7 +910,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
- if (op->on_connectivity_state_change) {
+ if (op->on_connectivity_state_change != NULL) {
grpc_connectivity_state_notify_on_state_change(
exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state,
op->on_connectivity_state_change);
diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c
index b001af7e35..3c3fd4671d 100644
--- a/src/core/transport/connectivity_state.c
+++ b/src/core/transport/connectivity_state.c
@@ -87,7 +87,7 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state grpc_connectivity_state_check(
grpc_connectivity_state_tracker *tracker) {
if (grpc_connectivity_state_trace) {
- gpr_log(GPR_DEBUG, "CONWATCH: %s: get %s", tracker->name,
+ gpr_log(GPR_DEBUG, "CONWATCH: %p %s: get %s", tracker, tracker->name,
grpc_connectivity_state_name(tracker->current_state));
}
return tracker->current_state;
@@ -97,42 +97,47 @@ int grpc_connectivity_state_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state *current, grpc_closure *notify) {
if (grpc_connectivity_state_trace) {
- gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s] notify=%p",
- tracker->name, grpc_connectivity_state_name(*current),
- grpc_connectivity_state_name(tracker->current_state), notify);
+ if (current == NULL) {
+ gpr_log(GPR_DEBUG, "CONWATCH: %p %s: unsubscribe notify=%p", tracker,
+ tracker->name, notify);
+ } else {
+ gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker,
+ tracker->name, grpc_connectivity_state_name(*current),
+ grpc_connectivity_state_name(tracker->current_state), notify);
+ }
}
- if (tracker->current_state != *current) {
- *current = tracker->current_state;
- grpc_exec_ctx_enqueue(exec_ctx, notify, 1);
+ if (current == NULL) {
+ grpc_connectivity_state_watcher *w = tracker->watchers;
+ if (w != NULL && w->notify == notify) {
+ grpc_exec_ctx_enqueue(exec_ctx, notify, 0);
+ tracker->watchers = w->next;
+ gpr_free(w);
+ return 0;
+ }
+ while (w != NULL) {
+ grpc_connectivity_state_watcher *rm_candidate = w->next;
+ if (rm_candidate != NULL && rm_candidate->notify == notify) {
+ grpc_exec_ctx_enqueue(exec_ctx, notify, 0);
+ w->next = w->next->next;
+ gpr_free(rm_candidate);
+ return 0;
+ }
+ w = w->next;
+ }
+ return 0;
} else {
- grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
- w->current = current;
- w->notify = notify;
- w->next = tracker->watchers;
- tracker->watchers = w;
- }
- return tracker->current_state == GRPC_CHANNEL_IDLE;
-}
-
-int grpc_connectivity_state_change_unsubscribe(
- grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker,
- grpc_closure *subscribed_notify) {
- grpc_connectivity_state_watcher *w = tracker->watchers;
- if (w != NULL && w->notify == subscribed_notify) {
- tracker->watchers = w->next;
- gpr_free(w);
- return 1;
- }
- while (w != NULL) {
- grpc_connectivity_state_watcher *rm_candidate = w->next;
- if (rm_candidate != NULL && rm_candidate->notify == subscribed_notify) {
- w->next = w->next->next;
- gpr_free(rm_candidate);
- return 1;
+ if (tracker->current_state != *current) {
+ *current = tracker->current_state;
+ grpc_exec_ctx_enqueue(exec_ctx, notify, 1);
+ } else {
+ grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
+ w->current = current;
+ w->notify = notify;
+ w->next = tracker->watchers;
+ tracker->watchers = w;
}
- w = w->next;
+ return tracker->current_state == GRPC_CHANNEL_IDLE;
}
- return 0;
}
void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
@@ -141,7 +146,7 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
const char *reason) {
grpc_connectivity_state_watcher *w;
if (grpc_connectivity_state_trace) {
- gpr_log(GPR_DEBUG, "SET: %s: %s --> %s [%s]", tracker->name,
+ gpr_log(GPR_DEBUG, "SET: %p %s: %s --> %s [%s]", tracker, tracker->name,
grpc_connectivity_state_name(tracker->current_state),
grpc_connectivity_state_name(state), reason);
}
diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h
index af2734c016..a4eb6652e5 100644
--- a/src/core/transport/connectivity_state.h
+++ b/src/core/transport/connectivity_state.h
@@ -75,16 +75,11 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state grpc_connectivity_state_check(
grpc_connectivity_state_tracker *tracker);
-/** Return 1 if the channel should start connecting, 0 otherwise */
+/** Return 1 if the channel should start connecting, 0 otherwise.
+ If current==NULL cancel notify if it is already queued (success==0 in that
+ case) */
int grpc_connectivity_state_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state *current, grpc_closure *notify);
-/** Remove \a subscribed_notify from the list of closures to be called on a
- * state change if present, returning 1. Otherwise, nothing is done and return
- * 0. */
-int grpc_connectivity_state_change_unsubscribe(
- grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker,
- grpc_closure *subscribed_notify);
-
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */
diff --git a/src/core/transport/static_metadata.c b/src/core/transport/static_metadata.c
index e7aff325c2..c2bfef0397 100644
--- a/src/core/transport/static_metadata.c
+++ b/src/core/transport/static_metadata.c
@@ -65,92 +65,23 @@ const gpr_uint8
80, 81, 82, 33, 83, 33, 84, 33, 85, 33, 86, 33};
const char *const grpc_static_metadata_strings[GRPC_STATIC_MDSTR_COUNT] = {
- "0",
- "1",
- "2",
- "200",
- "204",
- "206",
- "304",
- "400",
- "404",
- "500",
- "accept",
- "accept-charset",
- "accept-encoding",
- "accept-language",
- "accept-ranges",
- "access-control-allow-origin",
- "age",
- "allow",
- "application/grpc",
- ":authority",
- "authorization",
- "cache-control",
- "content-disposition",
- "content-encoding",
- "content-language",
- "content-length",
- "content-location",
- "content-range",
- "content-type",
- "cookie",
- "date",
- "deflate",
- "deflate,gzip",
- "",
- "etag",
- "expect",
- "expires",
- "from",
- "GET",
- "grpc",
- "grpc-accept-encoding",
- "grpc-encoding",
- "grpc-internal-encoding-request",
- "grpc-message",
- "grpc-status",
- "grpc-timeout",
- "gzip",
- "gzip, deflate",
- "host",
- "http",
- "https",
- "identity",
- "identity,deflate",
- "identity,deflate,gzip",
- "identity,gzip",
- "if-match",
- "if-modified-since",
- "if-none-match",
- "if-range",
- "if-unmodified-since",
- "last-modified",
- "link",
- "location",
- "max-forwards",
- ":method",
- ":path",
- "POST",
- "proxy-authenticate",
- "proxy-authorization",
- "range",
- "referer",
- "refresh",
- "retry-after",
- ":scheme",
- "server",
- "set-cookie",
- "/",
- "/index.html",
- ":status",
- "strict-transport-security",
- "te",
- "trailers",
- "transfer-encoding",
- "user-agent",
- "vary",
- "via",
+ "0", "1", "2", "200", "204", "206", "304", "400", "404", "500", "accept",
+ "accept-charset", "accept-encoding", "accept-language", "accept-ranges",
+ "access-control-allow-origin", "age", "allow", "application/grpc",
+ ":authority", "authorization", "cache-control", "content-disposition",
+ "content-encoding", "content-language", "content-length",
+ "content-location", "content-range", "content-type", "cookie", "date",
+ "deflate", "deflate,gzip", "", "etag", "expect", "expires", "from", "GET",
+ "grpc", "grpc-accept-encoding", "grpc-encoding",
+ "grpc-internal-encoding-request", "grpc-message", "grpc-status",
+ "grpc-timeout", "gzip", "gzip, deflate", "host", "http", "https",
+ "identity", "identity,deflate", "identity,deflate,gzip", "identity,gzip",
+ "if-match", "if-modified-since", "if-none-match", "if-range",
+ "if-unmodified-since", "last-modified", "link", "location", "max-forwards",
+ ":method", ":path", "POST", "proxy-authenticate", "proxy-authorization",
+ "range", "referer", "refresh", "retry-after", ":scheme", "server",
+ "set-cookie", "/", "/index.html", ":status", "strict-transport-security",
+ "te", "trailers", "transfer-encoding", "user-agent", "vary", "via",
"www-authenticate"};
const gpr_uint8 grpc_static_accept_encoding_metadata[8] = {0, 29, 26, 30,
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index f2bebc62f3..2ab978be46 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -40,8 +40,8 @@
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) {
gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count);
- gpr_log(GPR_DEBUG, "STREAM %p:%p REF %d->%d %s", refcount,
- refcount->destroy.cb_arg, val, val + 1, reason);
+ gpr_log(GPR_DEBUG, "%s %p:%p REF %d->%d %s", refcount->object_type,
+ refcount, refcount->destroy.cb_arg, val, val + 1, reason);
#else
void grpc_stream_ref(grpc_stream_refcount *refcount) {
#endif
@@ -52,8 +52,8 @@ void grpc_stream_ref(grpc_stream_refcount *refcount) {
void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount,
const char *reason) {
gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count);
- gpr_log(GPR_DEBUG, "STREAM %p:%p UNREF %d->%d %s", refcount,
- refcount->destroy.cb_arg, val, val - 1, reason);
+ gpr_log(GPR_DEBUG, "%s %p:%p UNREF %d->%d %s", refcount->object_type,
+ refcount, refcount->destroy.cb_arg, val, val - 1, reason);
#else
void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_stream_refcount *refcount) {
@@ -63,6 +63,19 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
}
}
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs,
+ grpc_iomgr_cb_func cb, void *cb_arg,
+ const char *object_type) {
+ refcount->object_type = object_type;
+#else
+void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs,
+ grpc_iomgr_cb_func cb, void *cb_arg) {
+#endif
+ gpr_ref_init(&refcount->refs, initial_refs);
+ grpc_closure_init(&refcount->destroy, cb, cb_arg);
+}
+
size_t grpc_transport_stream_size(grpc_transport *transport) {
return transport->vtable->sizeof_stream;
}
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index f296ce8251..f94f0ae76e 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -50,19 +50,32 @@ typedef struct grpc_transport grpc_transport;
for a stream. */
typedef struct grpc_stream grpc_stream;
+/*#define GRPC_STREAM_REFCOUNT_DEBUG*/
+
typedef struct grpc_stream_refcount {
gpr_refcount refs;
grpc_closure destroy;
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+ const char *object_type;
+#endif
} grpc_stream_refcount;
-/*#define GRPC_STREAM_REFCOUNT_DEBUG*/
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs,
+ grpc_iomgr_cb_func cb, void *cb_arg,
+ const char *object_type);
void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason);
void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount,
const char *reason);
+#define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
+ grpc_stream_ref_init(rc, ir, cb, cb_arg, objtype)
#else
+void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs,
+ grpc_iomgr_cb_func cb, void *cb_arg);
void grpc_stream_ref(grpc_stream_refcount *refcount);
void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount);
+#define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
+ grpc_stream_ref_init(rc, ir, cb, cb_arg)
#endif
/* Transport stream op: a set of operations to perform on a transport
@@ -96,7 +109,7 @@ typedef struct grpc_transport_stream_op {
typedef struct grpc_transport_op {
/** called when processing of this op is done */
grpc_closure *on_consumed;
- /** connectivity monitoring */
+ /** connectivity monitoring - set connectivity_state to NULL to unsubscribe */
grpc_closure *on_connectivity_state_change;
grpc_connectivity_state *connectivity_state;
/** should the transport be disconnected */