aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2015-12-16 17:36:04 -0800
committerGravatar David Garcia Quintas <dgq@google.com>2015-12-16 17:36:04 -0800
commit7052ac25e60e137514d9a201a86eeb9b29b03d24 (patch)
tree2ce8f32319129e346a27d3b29a9b8d6b440cdd6c /src/core/channel
parent886b7d19bafbb61e84141e66a040da8c27781c44 (diff)
parent788767a18f918131268ca88985b3547a8257e973 (diff)
Merge branch 'master' of github.com:grpc/grpc into grpclb_api
Diffstat (limited to 'src/core/channel')
-rw-r--r--src/core/channel/channel_stack.c17
-rw-r--r--src/core/channel/channel_stack.h41
-rw-r--r--src/core/channel/client_channel.c163
-rw-r--r--src/core/channel/client_channel.h12
-rw-r--r--src/core/channel/client_uchannel.c82
-rw-r--r--src/core/channel/client_uchannel.h16
-rw-r--r--src/core/channel/compress_filter.c7
-rw-r--r--src/core/channel/connected_channel.c6
-rw-r--r--src/core/channel/http_client_filter.c4
-rw-r--r--src/core/channel/http_server_filter.c6
-rw-r--r--src/core/channel/noop_filter.c118
-rw-r--r--src/core/channel/noop_filter.h44
-rw-r--r--src/core/channel/subchannel_call_holder.c80
-rw-r--r--src/core/channel/subchannel_call_holder.h14
14 files changed, 204 insertions, 406 deletions
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index 7f7fbf420f..5e09a050ee 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -101,11 +101,12 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack,
return CALL_ELEMS_FROM_STACK(call_stack) + index;
}
-void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,
+void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs,
+ grpc_iomgr_cb_func destroy, void *destroy_arg,
const grpc_channel_filter **filters,
- size_t filter_count, grpc_channel *master,
+ size_t filter_count,
const grpc_channel_args *channel_args,
- grpc_channel_stack *stack) {
+ const char *name, grpc_channel_stack *stack) {
size_t call_size =
ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) +
ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element));
@@ -115,6 +116,8 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,
size_t i;
stack->count = filter_count;
+ GRPC_STREAM_REF_INIT(&stack->refcount, initial_refs, destroy, destroy_arg,
+ name);
elems = CHANNEL_ELEMS_FROM_STACK(stack);
user_data =
((char *)elems) +
@@ -122,7 +125,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,
/* init per-filter data */
for (i = 0; i < filter_count; i++) {
- args.master = master;
+ args.channel_stack = stack;
args.channel_args = channel_args;
args.is_first = i == 0;
args.is_last = i == (filter_count - 1);
@@ -166,15 +169,15 @@ void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
size_t i;
call_stack->count = count;
- gpr_ref_init(&call_stack->refcount.refs, initial_refs);
- grpc_closure_init(&call_stack->refcount.destroy, destroy, destroy_arg);
+ GRPC_STREAM_REF_INIT(&call_stack->refcount, initial_refs, destroy,
+ destroy_arg, "CALL_STACK");
call_elems = CALL_ELEMS_FROM_STACK(call_stack);
user_data = ((char *)call_elems) +
ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element));
/* init per-filter data */
for (i = 0; i < count; i++) {
- args.refcount = &call_stack->refcount;
+ args.call_stack = call_stack;
args.server_transport_data = transport_server_data;
args.context = context;
call_elems[i].filter = channel_elems[i].filter;
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index 1db12ead7e..c01050e717 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -51,15 +51,18 @@
typedef struct grpc_channel_element grpc_channel_element;
typedef struct grpc_call_element grpc_call_element;
+typedef struct grpc_channel_stack grpc_channel_stack;
+typedef struct grpc_call_stack grpc_call_stack;
+
typedef struct {
- grpc_channel *master;
+ grpc_channel_stack *channel_stack;
const grpc_channel_args *channel_args;
int is_first;
int is_last;
} grpc_channel_element_args;
typedef struct {
- grpc_stream_refcount *refcount;
+ grpc_call_stack *call_stack;
const void *server_transport_data;
grpc_call_context_element *context;
} grpc_call_element_args;
@@ -144,23 +147,24 @@ struct grpc_call_element {
/* A channel stack tracks a set of related filters for one channel, and
guarantees they live within a single malloc() allocation */
-typedef struct {
+struct grpc_channel_stack {
+ grpc_stream_refcount refcount;
size_t count;
/* Memory required for a call stack (computed at channel stack
initialization) */
size_t call_stack_size;
-} grpc_channel_stack;
+};
/* A call stack tracks a set of related filters for one call, and guarantees
they live within a single malloc() allocation */
-typedef struct {
+struct grpc_call_stack {
/* shared refcount for this channel stack.
MUST be the first element: the underlying code calls destroy
with the address of the refcount, but higher layers prefer to think
about the address of the call stack itself. */
grpc_stream_refcount refcount;
size_t count;
-} grpc_call_stack;
+};
/* Get a channel element given a channel stack and its index */
grpc_channel_element *grpc_channel_stack_element(grpc_channel_stack *stack,
@@ -175,11 +179,11 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *stack, size_t i);
size_t grpc_channel_stack_size(const grpc_channel_filter **filters,
size_t filter_count);
/* Initialize a channel stack given some filters */
-void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,
+void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs,
+ grpc_iomgr_cb_func destroy, void *destroy_arg,
const grpc_channel_filter **filters,
- size_t filter_count, grpc_channel *master,
- const grpc_channel_args *args,
- grpc_channel_stack *stack);
+ size_t filter_count, const grpc_channel_args *args,
+ const char *name, grpc_channel_stack *stack);
/* Destroy a channel stack */
void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
grpc_channel_stack *stack);
@@ -199,14 +203,23 @@ void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset);
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
-#define grpc_call_stack_ref(call_stack, reason) \
+#define GRPC_CALL_STACK_REF(call_stack, reason) \
grpc_stream_ref(&(call_stack)->refcount, reason)
-#define grpc_call_stack_unref(exec_ctx, call_stack, reason) \
+#define GRPC_CALL_STACK_UNREF(exec_ctx, call_stack, reason) \
grpc_stream_unref(exec_ctx, &(call_stack)->refcount, reason)
+#define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \
+ grpc_stream_ref(&(channel_stack)->refcount, reason)
+#define GRPC_CHANNEL_STACK_UNREF(exec_ctx, channel_stack, reason) \
+ grpc_stream_unref(exec_ctx, &(channel_stack)->refcount, reason)
#else
-#define grpc_call_stack_ref(call_stack) grpc_stream_ref(&(call_stack)->refcount)
-#define grpc_call_stack_unref(exec_ctx, call_stack) \
+#define GRPC_CALL_STACK_REF(call_stack, reason) \
+ grpc_stream_ref(&(call_stack)->refcount)
+#define GRPC_CALL_STACK_UNREF(exec_ctx, call_stack, reason) \
grpc_stream_unref(exec_ctx, &(call_stack)->refcount)
+#define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \
+ grpc_stream_ref(&(channel_stack)->refcount)
+#define GRPC_CHANNEL_STACK_UNREF(exec_ctx, channel_stack, reason) \
+ grpc_stream_unref(exec_ctx, &(channel_stack)->refcount)
#endif
/* Destroy a call stack */
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 020138bf15..385ae3be9b 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -59,11 +59,6 @@ typedef struct client_channel_channel_data {
grpc_resolver *resolver;
/** have we started resolving this channel */
int started_resolving;
- /** master channel - the grpc_channel instance that ultimately owns
- this channel_data via its channel stack.
- We occasionally use this to bump the refcount on the master channel
- to keep ourselves alive through an asynchronous operation. */
- grpc_channel *master;
/** mutex protecting client configuration, including all
variables below in this data structure */
@@ -81,8 +76,10 @@ typedef struct client_channel_channel_data {
grpc_connectivity_state_tracker state_tracker;
/** when an lb_policy arrives, should we try to exit idle */
int exit_idle_when_lb_policy_arrives;
- /** pollset_set of interested parties in a new connection */
- grpc_pollset_set pollset_set;
+ /** owning stack */
+ grpc_channel_stack *owning_stack;
+ /** interested parties */
+ grpc_pollset_set interested_parties;
} channel_data;
/** We create one watcher for each new lb_policy that is returned from a
@@ -103,9 +100,7 @@ typedef struct {
} waiting_call;
static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- channel_data *chand = elem->channel_data;
- return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data,
- chand->master);
+ return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data);
}
static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
@@ -121,10 +116,18 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
static void on_lb_policy_state_changed_locked(
grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) {
+ grpc_connectivity_state publish_state = w->state;
/* check if the notification is for a stale policy */
if (w->lb_policy != w->chand->lb_policy) return;
- grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, w->state,
+ if (publish_state == GRPC_CHANNEL_FATAL_FAILURE &&
+ w->chand->resolver != NULL) {
+ publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
+ grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver);
+ GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
+ w->chand->lb_policy = NULL;
+ }
+ grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, publish_state,
"lb_changed");
if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
@@ -139,7 +142,7 @@ static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg,
on_lb_policy_state_changed_locked(exec_ctx, w);
gpr_mu_unlock(&w->chand->mu_config);
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->chand->master, "watch_lb_policy");
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
gpr_free(w);
}
@@ -147,7 +150,7 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state) {
lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
- GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
w->chand = chand;
grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
@@ -179,6 +182,11 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
chand->incoming_configuration = NULL;
+ if (lb_policy != NULL) {
+ grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties,
+ &chand->interested_parties);
+ }
+
gpr_mu_lock(&chand->mu_config);
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
@@ -200,7 +208,7 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
watch_lb_policy(exec_ctx, chand, lb_policy, state);
}
gpr_mu_unlock(&chand->mu_config);
- GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration,
&chand->on_config_changed);
GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel-next");
@@ -222,7 +230,9 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
if (old_lb_policy != NULL) {
- grpc_lb_policy_shutdown(exec_ctx, old_lb_policy);
+ grpc_pollset_set_del_pollset_set(exec_ctx,
+ &old_lb_policy->interested_parties,
+ &chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
}
@@ -230,20 +240,22 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
}
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->master, "resolver");
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
}
static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_transport_op *op) {
- grpc_lb_policy *lb_policy = NULL;
channel_data *chand = elem->channel_data;
grpc_resolver *destroy_resolver = NULL;
grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
GPR_ASSERT(op->set_accept_stream == NULL);
- GPR_ASSERT(op->bind_pollset == NULL);
+ if (op->bind_pollset != NULL) {
+ grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties,
+ op->bind_pollset);
+ }
gpr_mu_lock(&chand->mu_config);
if (op->on_connectivity_state_change != NULL) {
@@ -254,9 +266,14 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
op->connectivity_state = NULL;
}
- lb_policy = chand->lb_policy;
- if (lb_policy) {
- GRPC_LB_POLICY_REF(lb_policy, "broadcast");
+ if (op->send_ping != NULL) {
+ if (chand->lb_policy == NULL) {
+ grpc_exec_ctx_enqueue(exec_ctx, op->send_ping, 0);
+ } else {
+ grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping);
+ op->bind_pollset = NULL;
+ }
+ op->send_ping = NULL;
}
if (op->disconnect && chand->resolver != NULL) {
@@ -265,7 +282,9 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
destroy_resolver = chand->resolver;
chand->resolver = NULL;
if (chand->lb_policy != NULL) {
- grpc_lb_policy_shutdown(exec_ctx, chand->lb_policy);
+ grpc_pollset_set_del_pollset_set(exec_ctx,
+ &chand->lb_policy->interested_parties,
+ &chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
chand->lb_policy = NULL;
}
@@ -276,16 +295,11 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_resolver_shutdown(exec_ctx, destroy_resolver);
GRPC_RESOLVER_UNREF(exec_ctx, destroy_resolver, "channel");
}
-
- if (lb_policy) {
- grpc_lb_policy_broadcast(exec_ctx, lb_policy, op);
- GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "broadcast");
- }
}
typedef struct {
grpc_metadata_batch *initial_metadata;
- grpc_subchannel **subchannel;
+ grpc_connected_subchannel **connected_subchannel;
grpc_closure *on_ready;
grpc_call_element *elem;
grpc_closure closure;
@@ -293,17 +307,17 @@ typedef struct {
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_metadata_batch *initial_metadata,
- grpc_subchannel **subchannel,
+ grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready);
static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, int success) {
continue_picking_args *cpa = arg;
if (!success) {
grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0);
- } else if (cpa->subchannel == NULL) {
+ } else if (cpa->connected_subchannel == NULL) {
/* cancelled, do nothing */
} else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
- cpa->subchannel, cpa->on_ready)) {
+ cpa->connected_subchannel, cpa->on_ready)) {
grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 1);
}
gpr_free(cpa);
@@ -311,7 +325,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, int success) {
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
grpc_metadata_batch *initial_metadata,
- grpc_subchannel **subchannel,
+ grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready) {
grpc_call_element *elem = elemp;
channel_data *chand = elem->channel_data;
@@ -319,18 +333,19 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
continue_picking_args *cpa;
grpc_closure *closure;
- GPR_ASSERT(subchannel);
+ GPR_ASSERT(connected_subchannel);
gpr_mu_lock(&chand->mu_config);
if (initial_metadata == NULL) {
if (chand->lb_policy != NULL) {
- grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, subchannel);
+ grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy,
+ connected_subchannel);
}
for (closure = chand->waiting_for_config_closures.head; closure != NULL;
closure = grpc_closure_next(closure)) {
cpa = closure->cb_arg;
- if (cpa->subchannel == subchannel) {
- cpa->subchannel = NULL;
+ if (cpa->connected_subchannel == connected_subchannel) {
+ cpa->connected_subchannel = NULL;
grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0);
}
}
@@ -338,21 +353,22 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
return 1;
}
if (chand->lb_policy != NULL) {
- int r = grpc_lb_policy_pick(exec_ctx, chand->lb_policy, calld->pollset,
- initial_metadata, subchannel, on_ready);
+ int r =
+ grpc_lb_policy_pick(exec_ctx, chand->lb_policy, calld->pollset,
+ initial_metadata, connected_subchannel, on_ready);
gpr_mu_unlock(&chand->mu_config);
return r;
}
if (chand->resolver != NULL && !chand->started_resolving) {
chand->started_resolving = 1;
- GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
grpc_resolver_next(exec_ctx, chand->resolver,
&chand->incoming_configuration,
&chand->on_config_changed);
}
cpa = gpr_malloc(sizeof(*cpa));
cpa->initial_metadata = initial_metadata;
- cpa->subchannel = subchannel;
+ cpa->connected_subchannel = connected_subchannel;
cpa->on_ready = on_ready;
cpa->elem = elem;
grpc_closure_init(&cpa->closure, continue_picking, cpa);
@@ -364,7 +380,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
- grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem);
+ grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem,
+ args->call_stack);
}
/* Destructor for call_data */
@@ -385,12 +402,12 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
gpr_mu_init(&chand->mu_config);
- chand->master = args->master;
- grpc_pollset_set_init(&chand->pollset_set);
grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
+ chand->owning_stack = args->channel_stack;
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
+ grpc_pollset_set_init(&chand->interested_parties);
}
/* Destructor for channel_data */
@@ -403,10 +420,13 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
}
if (chand->lb_policy != NULL) {
+ grpc_pollset_set_del_pollset_set(exec_ctx,
+ &chand->lb_policy->interested_parties,
+ &chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
}
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
- grpc_pollset_set_destroy(&chand->pollset_set);
+ grpc_pollset_set_destroy(&chand->interested_parties);
gpr_mu_destroy(&chand->mu_config);
}
@@ -435,7 +455,7 @@ void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
if (!grpc_closure_list_empty(chand->waiting_for_config_closures) ||
chand->exit_idle_when_lb_policy_arrives) {
chand->started_resolving = 1;
- GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration,
&chand->on_config_changed);
}
@@ -454,7 +474,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
} else {
chand->exit_idle_when_lb_policy_arrives = 1;
if (!chand->started_resolving && chand->resolver != NULL) {
- GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
chand->started_resolving = 1;
grpc_resolver_next(exec_ctx, chand->resolver,
&chand->incoming_configuration,
@@ -466,32 +486,39 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
return out;
}
+typedef struct {
+ channel_data *chand;
+ grpc_pollset *pollset;
+ grpc_closure *on_complete;
+ grpc_closure my_closure;
+} external_connectivity_watcher;
+
+static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
+ external_connectivity_watcher *w = arg;
+ grpc_closure *follow_up = w->on_complete;
+ grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties,
+ w->pollset);
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
+ "external_connectivity_watcher");
+ gpr_free(w);
+ follow_up->cb(exec_ctx, follow_up->cb_arg, iomgr_success);
+}
+
void grpc_client_channel_watch_connectivity_state(
- grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
grpc_connectivity_state *state, grpc_closure *on_complete) {
channel_data *chand = elem->channel_data;
+ external_connectivity_watcher *w = gpr_malloc(sizeof(*w));
+ w->chand = chand;
+ w->pollset = pollset;
+ w->on_complete = on_complete;
+ grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, pollset);
+ grpc_closure_init(&w->my_closure, on_external_watch_complete, w);
+ GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
+ "external_connectivity_watcher");
gpr_mu_lock(&chand->mu_config);
grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &chand->state_tracker, state, on_complete);
+ exec_ctx, &chand->state_tracker, state, &w->my_closure);
gpr_mu_unlock(&chand->mu_config);
}
-
-grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
- grpc_channel_element *elem) {
- channel_data *chand = elem->channel_data;
- return &chand->pollset_set;
-}
-
-void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem,
- grpc_pollset *pollset) {
- channel_data *chand = elem->channel_data;
- grpc_pollset_set_add_pollset(exec_ctx, &chand->pollset_set, pollset);
-}
-
-void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem,
- grpc_pollset *pollset) {
- channel_data *chand = elem->channel_data;
- grpc_pollset_set_del_pollset(exec_ctx, &chand->pollset_set, pollset);
-}
diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h
index 5103f07a43..d9bc4971f1 100644
--- a/src/core/channel/client_channel.h
+++ b/src/core/channel/client_channel.h
@@ -57,17 +57,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect);
void grpc_client_channel_watch_connectivity_state(
- grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
grpc_connectivity_state *state, grpc_closure *on_complete);
-grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
- grpc_channel_element *elem);
-
-void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *channel,
- grpc_pollset *pollset);
-void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *channel,
- grpc_pollset *pollset);
-
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */
diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c
index 456ffb7371..2c0b07d8bf 100644
--- a/src/core/channel/client_uchannel.c
+++ b/src/core/channel/client_uchannel.c
@@ -58,13 +58,13 @@ typedef struct client_uchannel_channel_data {
this channel_data via its channel stack.
We occasionally use this to bump the refcount on the master channel
to keep ourselves alive through an asynchronous operation. */
- grpc_channel *master;
+ grpc_channel_stack *owning_stack;
/** connectivity state being tracked */
grpc_connectivity_state_tracker state_tracker;
/** the subchannel wrapped by the microchannel */
- grpc_subchannel *subchannel;
+ grpc_connected_subchannel *connected_subchannel;
/** the callback used to stay subscribed to subchannel connectivity
* notifications */
@@ -84,15 +84,13 @@ static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
chand->subchannel_connectivity,
"uchannel_monitor_subchannel");
- grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel,
- &chand->subchannel_connectivity,
- &chand->connectivity_cb);
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, chand->connected_subchannel, NULL,
+ &chand->subchannel_connectivity, &chand->connectivity_cb);
}
static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- channel_data *chand = elem->channel_data;
- return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data,
- chand->master);
+ return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data);
}
static void cuc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
@@ -128,11 +126,11 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx,
static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_metadata_batch *initial_metadata,
- grpc_subchannel **subchannel,
+ grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready) {
channel_data *chand = arg;
GPR_ASSERT(initial_metadata != NULL);
- *subchannel = chand->subchannel;
+ *connected_subchannel = chand->connected_subchannel;
return 1;
}
@@ -140,7 +138,7 @@ static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
static void cuc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
grpc_subchannel_call_holder_init(elem->call_data, cuc_pick_subchannel,
- elem->channel_data);
+ elem->channel_data, args->call_stack);
}
/* Destructor for call_data */
@@ -158,7 +156,7 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&chand->connectivity_cb, monitor_subchannel, chand);
GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
- chand->master = args->master;
+ chand->owning_stack = args->channel_stack;
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_uchannel");
gpr_mu_init(&chand->mu_state);
@@ -168,10 +166,14 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx,
static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
- grpc_subchannel_state_change_unsubscribe(exec_ctx, chand->subchannel,
- &chand->connectivity_cb);
+ /* cancel subscription */
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, chand->connected_subchannel, NULL, NULL,
+ &chand->connectivity_cb);
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
gpr_mu_destroy(&chand->mu_state);
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, chand->connected_subchannel,
+ "uchannel");
}
static void cuc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
@@ -191,23 +193,14 @@ grpc_connectivity_state grpc_client_uchannel_check_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
channel_data *chand = elem->channel_data;
grpc_connectivity_state out;
- out = grpc_connectivity_state_check(&chand->state_tracker);
gpr_mu_lock(&chand->mu_state);
- if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
- GRPC_CHANNEL_CONNECTING,
- "uchannel_connecting_changed");
- chand->subchannel_connectivity = out;
- grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel,
- &chand->subchannel_connectivity,
- &chand->connectivity_cb);
- }
+ out = grpc_connectivity_state_check(&chand->state_tracker);
gpr_mu_unlock(&chand->mu_state);
return out;
}
void grpc_client_uchannel_watch_connectivity_state(
- grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
grpc_connectivity_state *state, grpc_closure *on_complete) {
channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->mu_state);
@@ -216,40 +209,11 @@ void grpc_client_uchannel_watch_connectivity_state(
gpr_mu_unlock(&chand->mu_state);
}
-grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set(
- grpc_channel_element *elem) {
- channel_data *chand = elem->channel_data;
- grpc_channel_element *parent_elem;
- gpr_mu_lock(&chand->mu_state);
- parent_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(
- grpc_subchannel_get_master(chand->subchannel)));
- gpr_mu_unlock(&chand->mu_state);
- return grpc_client_channel_get_connecting_pollset_set(parent_elem);
-}
-
-void grpc_client_uchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem,
- grpc_pollset *pollset) {
- grpc_pollset_set *master_pollset_set =
- grpc_client_uchannel_get_connecting_pollset_set(elem);
- grpc_pollset_set_add_pollset(exec_ctx, master_pollset_set, pollset);
-}
-
-void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem,
- grpc_pollset *pollset) {
- grpc_pollset_set *master_pollset_set =
- grpc_client_uchannel_get_connecting_pollset_set(elem);
- grpc_pollset_set_del_pollset(exec_ctx, master_pollset_set, pollset);
-}
-
grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel,
grpc_channel_args *args) {
grpc_channel *channel = NULL;
#define MAX_FILTERS 3
const grpc_channel_filter *filters[MAX_FILTERS];
- grpc_channel *master = grpc_subchannel_get_master(subchannel);
- char *target = grpc_channel_get_target(master);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
size_t n = 0;
@@ -261,19 +225,19 @@ grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel,
GPR_ASSERT(n <= MAX_FILTERS);
channel =
- grpc_channel_create_from_filters(&exec_ctx, target, filters, n, args, 1);
+ grpc_channel_create_from_filters(&exec_ctx, NULL, filters, n, args, 1);
- gpr_free(target);
return channel;
}
-void grpc_client_uchannel_set_subchannel(grpc_channel *uchannel,
- grpc_subchannel *subchannel) {
+void grpc_client_uchannel_set_connected_subchannel(
+ grpc_channel *uchannel, grpc_connected_subchannel *connected_subchannel) {
grpc_channel_element *elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(uchannel));
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
gpr_mu_lock(&chand->mu_state);
- chand->subchannel = subchannel;
+ chand->connected_subchannel = connected_subchannel;
+ GRPC_CONNECTED_SUBCHANNEL_REF(connected_subchannel, "uchannel");
gpr_mu_unlock(&chand->mu_state);
}
diff --git a/src/core/channel/client_uchannel.h b/src/core/channel/client_uchannel.h
index dfe6695ae3..92a831493c 100644
--- a/src/core/channel/client_uchannel.h
+++ b/src/core/channel/client_uchannel.h
@@ -48,23 +48,13 @@ grpc_connectivity_state grpc_client_uchannel_check_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect);
void grpc_client_uchannel_watch_connectivity_state(
- grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
grpc_connectivity_state *state, grpc_closure *on_complete);
-grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set(
- grpc_channel_element *elem);
-
-void grpc_client_uchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *channel,
- grpc_pollset *pollset);
-void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *channel,
- grpc_pollset *pollset);
-
grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel,
grpc_channel_args *args);
-void grpc_client_uchannel_set_subchannel(grpc_channel *uchannel,
- grpc_subchannel *subchannel);
+void grpc_client_uchannel_set_connected_subchannel(
+ grpc_channel *uchannel, grpc_connected_subchannel *connected_subchannel);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_MICROCHANNEL_H */
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index fc8b425e47..cc8e191628 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -39,11 +39,11 @@
#include <grpc/support/log.h>
#include <grpc/support/slice_buffer.h>
-#include "src/core/channel/compress_filter.h"
#include "src/core/channel/channel_args.h"
-#include "src/core/profiling/timers.h"
+#include "src/core/channel/compress_filter.h"
#include "src/core/compression/algorithm_metadata.h"
#include "src/core/compression/message_compress.h"
+#include "src/core/profiling/timers.h"
#include "src/core/support/string.h"
#include "src/core/transport/static_metadata.h"
@@ -288,8 +288,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
/* Destructor for channel data */
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem) {
-}
+ grpc_channel_element *elem) {}
const grpc_channel_filter grpc_compress_filter = {
compress_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data),
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 0e1efd965a..e8eb9dcfc5 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -89,9 +89,9 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
int r;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- r = grpc_transport_init_stream(exec_ctx, chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- args->refcount, args->server_transport_data);
+ r = grpc_transport_init_stream(
+ exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld),
+ &args->call_stack->refcount, args->server_transport_data);
GPR_ASSERT(r == 0);
}
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index b9a30cdaf2..65cfb778bb 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -31,12 +31,12 @@
*/
#include "src/core/channel/http_client_filter.h"
-#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-#include "src/core/support/string.h"
+#include <string.h>
#include "src/core/profiling/timers.h"
+#include "src/core/support/string.h"
#include "src/core/transport/static_metadata.h"
typedef struct call_data {
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index c1645c2ba0..ae8660da92 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -33,9 +33,9 @@
#include "src/core/channel/http_server_filter.h"
-#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <string.h>
#include "src/core/profiling/timers.h"
#include "src/core/transport/static_metadata.h"
@@ -124,7 +124,6 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
omitted */
grpc_mdelem *authority = grpc_mdelem_from_metadata_strings(
GRPC_MDSTR_AUTHORITY, GRPC_MDSTR_REF(md->value));
- GRPC_MDELEM_UNREF(md);
calld->seen_authority = 1;
return authority;
} else {
@@ -225,8 +224,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
/* Destructor for channel data */
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem) {
-}
+ grpc_channel_element *elem) {}
const grpc_channel_filter grpc_http_server_filter = {
hs_start_transport_op, grpc_channel_next_op, sizeof(call_data),
diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c
deleted file mode 100644
index 2fbf1c06bb..0000000000
--- a/src/core/channel/noop_filter.c
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/channel/noop_filter.h"
-#include <grpc/support/log.h>
-
-typedef struct call_data {
- int unused; /* C89 requires at least one struct element */
-} call_data;
-
-typedef struct channel_data {
- int unused; /* C89 requires at least one struct element */
-} channel_data;
-
-/* used to silence 'variable not used' warnings */
-static void ignore_unused(void *ignored) {}
-
-static void noop_mutate_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(calld);
- ignore_unused(channeld);
-
- /* do nothing */
-}
-
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void noop_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op *op) {
- noop_mutate_op(elem, op);
-
- /* pass control down the stack */
- grpc_call_next_op(exec_ctx, elem, op);
-}
-
-/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- /* initialize members */
- calld->unused = channeld->unused;
-}
-
-/* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {}
-
-/* Constructor for channel_data */
-static void init_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem,
- grpc_channel_element_args *args) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- /* The last filter tends to be implemented differently to
- handle the case that there's no 'next' filter to call on the down
- path */
- GPR_ASSERT(!args->is_last);
-
- /* initialize members */
- channeld->unused = 0;
-}
-
-/* Destructor for channel data */
-static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-}
-
-const grpc_channel_filter grpc_no_op_filter = {
- noop_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data),
- init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem,
- grpc_call_next_get_peer, "no-op"};
diff --git a/src/core/channel/noop_filter.h b/src/core/channel/noop_filter.h
deleted file mode 100644
index ded9b33117..0000000000
--- a/src/core/channel/noop_filter.h
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_INTERNAL_CORE_CHANNEL_NOOP_FILTER_H
-#define GRPC_INTERNAL_CORE_CHANNEL_NOOP_FILTER_H
-
-#include "src/core/channel/channel_stack.h"
-
-/* No-op filter: simply takes everything it's given, and passes it on to the
- next filter. Exists simply as a starting point that others can take and
- customize for their own filters */
-extern const grpc_channel_filter grpc_no_op_filter;
-
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_NOOP_FILTER_H */
diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c
index 7251714519..f5da41f3cd 100644
--- a/src/core/channel/subchannel_call_holder.c
+++ b/src/core/channel/subchannel_call_holder.c
@@ -44,7 +44,6 @@
static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder,
int success);
-static void call_ready(grpc_exec_ctx *exec_ctx, void *holder, int success);
static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args,
int success);
@@ -58,16 +57,17 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
void grpc_subchannel_call_holder_init(
grpc_subchannel_call_holder *holder,
grpc_subchannel_call_holder_pick_subchannel pick_subchannel,
- void *pick_subchannel_arg) {
+ void *pick_subchannel_arg, grpc_call_stack *owning_call) {
gpr_atm_rel_store(&holder->subchannel_call, 0);
holder->pick_subchannel = pick_subchannel;
holder->pick_subchannel_arg = pick_subchannel_arg;
gpr_mu_init(&holder->mu);
- holder->subchannel = NULL;
+ holder->connected_subchannel = NULL;
holder->waiting_ops = NULL;
holder->waiting_ops_count = 0;
holder->waiting_ops_capacity = 0;
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
+ holder->owning_call = owning_call;
}
void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx,
@@ -125,13 +125,9 @@ retry:
case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
fail_locked(exec_ctx, holder);
break;
- case GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL:
- grpc_subchannel_cancel_create_call(exec_ctx, holder->subchannel,
- &holder->subchannel_call);
- break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL,
- &holder->subchannel, NULL);
+ &holder->connected_subchannel, NULL);
break;
}
gpr_mu_unlock(&holder->mu);
@@ -142,28 +138,27 @@ retry:
}
/* if we don't have a subchannel, try to get one */
if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
- holder->subchannel == NULL && op->send_initial_metadata != NULL) {
+ holder->connected_subchannel == NULL &&
+ op->send_initial_metadata != NULL) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
grpc_closure_init(&holder->next_step, subchannel_ready, holder);
- if (holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg,
- op->send_initial_metadata, &holder->subchannel,
- &holder->next_step)) {
+ GRPC_CALL_STACK_REF(holder->owning_call, "pick_subchannel");
+ if (holder->pick_subchannel(
+ exec_ctx, holder->pick_subchannel_arg, op->send_initial_metadata,
+ &holder->connected_subchannel, &holder->next_step)) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
+ GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel");
}
}
/* if we've got a subchannel, then let's ask it to create a call */
if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
- holder->subchannel != NULL) {
- holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL;
- grpc_closure_init(&holder->next_step, call_ready, holder);
- if (grpc_subchannel_create_call(exec_ctx, holder->subchannel,
- holder->pollset, &holder->subchannel_call,
- &holder->next_step)) {
- /* got one immediately - continue the op (and any waiting ops) */
- holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
- retry_waiting_locked(exec_ctx, holder);
- goto retry;
- }
+ holder->connected_subchannel != NULL) {
+ gpr_atm_rel_store(
+ &holder->subchannel_call,
+ (gpr_atm)(gpr_uintptr)grpc_connected_subchannel_create_call(
+ exec_ctx, holder->connected_subchannel, holder->pollset));
+ retry_waiting_locked(exec_ctx, holder);
+ goto retry;
}
/* nothing to be done but wait */
add_waiting_locked(holder, op);
@@ -179,36 +174,18 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) {
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
call = GET_CALL(holder);
GPR_ASSERT(call == NULL || call == CANCELLED_CALL);
- if (holder->subchannel == NULL) {
- holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
+ holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
+ if (holder->connected_subchannel == NULL) {
fail_locked(exec_ctx, holder);
} else {
- grpc_closure_init(&holder->next_step, call_ready, holder);
- if (grpc_subchannel_create_call(exec_ctx, holder->subchannel,
- holder->pollset, &holder->subchannel_call,
- &holder->next_step)) {
- holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
- /* got one immediately - continue the op (and any waiting ops) */
- retry_waiting_locked(exec_ctx, holder);
- }
- }
- gpr_mu_unlock(&holder->mu);
-}
-
-static void call_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) {
- grpc_subchannel_call_holder *holder = arg;
- GPR_TIMER_BEGIN("call_ready", 0);
- gpr_mu_lock(&holder->mu);
- GPR_ASSERT(holder->creation_phase ==
- GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL);
- holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
- if (GET_CALL(holder) != NULL) {
+ gpr_atm_rel_store(
+ &holder->subchannel_call,
+ (gpr_atm)(gpr_uintptr)grpc_connected_subchannel_create_call(
+ exec_ctx, holder->connected_subchannel, holder->pollset));
retry_waiting_locked(exec_ctx, holder);
- } else {
- fail_locked(exec_ctx, holder);
}
gpr_mu_unlock(&holder->mu);
- GPR_TIMER_END("call_ready", 0);
+ GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel");
}
typedef struct {
@@ -270,14 +247,13 @@ static void fail_locked(grpc_exec_ctx *exec_ctx,
holder->waiting_ops_count = 0;
}
-char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_call_holder *holder,
- grpc_channel *master) {
+char *grpc_subchannel_call_holder_get_peer(
+ grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder) {
grpc_subchannel_call *subchannel_call = GET_CALL(holder);
if (subchannel_call) {
return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
} else {
- return grpc_channel_get_target(master);
+ return NULL;
}
}
diff --git a/src/core/channel/subchannel_call_holder.h b/src/core/channel/subchannel_call_holder.h
index bda051c566..9cf72c6cf7 100644
--- a/src/core/channel/subchannel_call_holder.h
+++ b/src/core/channel/subchannel_call_holder.h
@@ -42,12 +42,11 @@
called when the subchannel is available) */
typedef int (*grpc_subchannel_call_holder_pick_subchannel)(
grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata,
- grpc_subchannel **subchannel, grpc_closure *on_ready);
+ grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready);
typedef enum {
GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING,
- GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL,
- GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL
+ GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL
} grpc_subchannel_call_holder_creation_phase;
/** Wrapper for holding a pointer to grpc_subchannel_call, and the
@@ -71,7 +70,7 @@ typedef struct grpc_subchannel_call_holder {
gpr_mu mu;
grpc_subchannel_call_holder_creation_phase creation_phase;
- grpc_subchannel *subchannel;
+ grpc_connected_subchannel *connected_subchannel;
grpc_pollset *pollset;
grpc_transport_stream_op *waiting_ops;
@@ -79,12 +78,14 @@ typedef struct grpc_subchannel_call_holder {
size_t waiting_ops_capacity;
grpc_closure next_step;
+
+ grpc_call_stack *owning_call;
} grpc_subchannel_call_holder;
void grpc_subchannel_call_holder_init(
grpc_subchannel_call_holder *holder,
grpc_subchannel_call_holder_pick_subchannel pick_subchannel,
- void *pick_subchannel_arg);
+ void *pick_subchannel_arg, grpc_call_stack *owning_call);
void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder);
@@ -92,7 +93,6 @@ void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder,
grpc_transport_stream_op *op);
char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_call_holder *holder,
- grpc_channel *master);
+ grpc_subchannel_call_holder *holder);
#endif