diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/channel/client_channel.c | 166 | ||||
-rw-r--r-- | src/core/channel/connectivity_state.c | 92 | ||||
-rw-r--r-- | src/core/channel/connectivity_state.h | 66 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 44 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.h | 10 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 58 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 15 |
7 files changed, 235 insertions, 216 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index dc838de715..4d082aceb8 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -426,88 +426,6 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { } } -#if 0 -static void channel_op(grpc_channel_element *elem, - grpc_channel_element *from_elem, grpc_channel_op *op) { - channel_data *chand = elem->channel_data; - grpc_child_channel *child_channel; - grpc_channel_op rop; - GPR_ASSERT(elem->filter == &grpc_client_channel_filter); - - switch (op->type) { - case GRPC_CHANNEL_GOAWAY: - /* sending goaway: clear out the active child on the way through */ - gpr_mu_lock(&chand->mu); - child_channel = chand->active_child; - chand->active_child = NULL; - gpr_mu_unlock(&chand->mu); - if (child_channel) { - grpc_child_channel_handle_op(child_channel, op); - grpc_child_channel_destroy(child_channel, 1); - } else { - gpr_slice_unref(op->data.goaway.message); - } - break; - case GRPC_CHANNEL_DISCONNECT: - /* sending disconnect: clear out the active child on the way through */ - gpr_mu_lock(&chand->mu); - child_channel = chand->active_child; - chand->active_child = NULL; - gpr_mu_unlock(&chand->mu); - if (child_channel) { - grpc_child_channel_destroy(child_channel, 1); - } - /* fake a transport closed to satisfy the refcounting in client */ - rop.type = GRPC_TRANSPORT_CLOSED; - rop.dir = GRPC_CALL_UP; - grpc_channel_next_op(elem, &rop); - break; - case GRPC_TRANSPORT_GOAWAY: - /* receiving goaway: if it's from our active child, drop the active child; - in all cases consume the event here */ - gpr_mu_lock(&chand->mu); - child_channel = grpc_channel_stack_from_top_element(from_elem); - if (child_channel == chand->active_child) { - chand->active_child = NULL; - } else { - child_channel = NULL; - } - gpr_mu_unlock(&chand->mu); - if (child_channel) { - grpc_child_channel_destroy(child_channel, 0); - } - gpr_slice_unref(op->data.goaway.message); - break; - case GRPC_TRANSPORT_CLOSED: - /* receiving disconnect: if it's from our active child, drop the active - child; in all cases consume the event here */ - gpr_mu_lock(&chand->mu); - child_channel = grpc_channel_stack_from_top_element(from_elem); - if (child_channel == chand->active_child) { - chand->active_child = NULL; - } else { - child_channel = NULL; - } - gpr_mu_unlock(&chand->mu); - if (child_channel) { - grpc_child_channel_destroy(child_channel, 0); - } - break; - default: - switch (op->dir) { - case GRPC_CALL_UP: - grpc_channel_next_op(elem, op); - break; - case GRPC_CALL_DOWN: - gpr_log(GPR_ERROR, "unhandled channel op: %d", op->type); - abort(); - break; - } - break; - } -} -#endif - static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) {} /* Constructor for call_data */ @@ -591,90 +509,6 @@ const grpc_channel_filter grpc_client_channel_filter = { init_channel_elem, destroy_channel_elem, "client-channel", }; -#if 0 -grpc_transport_setup_result grpc_client_channel_transport_setup_complete( - grpc_channel_stack *channel_stack, grpc_transport *transport, - grpc_channel_filter const **channel_filters, size_t num_channel_filters, - grpc_mdctx *mdctx) { - /* we just got a new transport: lets create a child channel stack for it */ - grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); - channel_data *chand = elem->channel_data; - size_t num_child_filters = 2 + num_channel_filters; - grpc_channel_filter const **child_filters; - grpc_transport_setup_result result; - grpc_child_channel *old_active = NULL; - call_data **waiting_children; - size_t waiting_child_count; - size_t i; - grpc_transport_stream_op *call_ops; - - /* build the child filter stack */ - child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters); - /* we always need a link back filter to get back to the connected channel */ - child_filters[0] = &grpc_child_channel_top_filter; - for (i = 0; i < num_channel_filters; i++) { - child_filters[i + 1] = channel_filters[i]; - } - /* and we always need a connected channel to talk to the transport */ - child_filters[num_child_filters - 1] = &grpc_connected_channel_filter; - - GPR_ASSERT(elem->filter == &grpc_client_channel_filter); - - /* BEGIN LOCKING CHANNEL */ - gpr_mu_lock(&chand->mu); - chand->transport_setup_initiated = 0; - - if (chand->active_child) { - old_active = chand->active_child; - } - chand->active_child = grpc_child_channel_create( - elem, child_filters, num_child_filters, chand->args, mdctx); - result = - grpc_connected_channel_bind_transport(chand->active_child, transport); - - /* capture the waiting children - we'll activate them outside the lock - to avoid re-entrancy problems */ - waiting_children = chand->waiting_children; - waiting_child_count = chand->waiting_child_count; - /* bumping up inflight_requests here avoids taking a lock per rpc below */ - - chand->waiting_children = NULL; - chand->waiting_child_count = 0; - chand->waiting_child_capacity = 0; - - call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count); - - for (i = 0; i < waiting_child_count; i++) { - call_ops[i] = waiting_children[i]->waiting_op; - if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) { - waiting_children[i] = NULL; - grpc_transport_stream_op_finish_with_failure(&call_ops[i]); - } - } - - /* END LOCKING CHANNEL */ - gpr_mu_unlock(&chand->mu); - - /* activate any pending operations - this is safe to do as we guarantee one - and only one write operation per request at the surface api - if we lose - that guarantee we need to do some curly locking here */ - for (i = 0; i < waiting_child_count; i++) { - if (waiting_children[i]) { - complete_activate(waiting_children[i]->elem, &call_ops[i]); - } - } - gpr_free(waiting_children); - gpr_free(call_ops); - gpr_free(child_filters); - - if (old_active) { - grpc_child_channel_destroy(old_active, 1); - } - - return result; -} -#endif - void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, grpc_resolver *resolver) { /* post construction initialization: set the transport setup pointer */ diff --git a/src/core/channel/connectivity_state.c b/src/core/channel/connectivity_state.c new file mode 100644 index 0000000000..566a2c3344 --- /dev/null +++ b/src/core/channel/connectivity_state.c @@ -0,0 +1,92 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/channel/connectivity_state.h" +#include <grpc/support/alloc.h> + +void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state init_state) { + tracker->current_state = init_state; + tracker->watchers = NULL; +} + +void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) { + grpc_connectivity_state_watcher *w; + while ((w = tracker->watchers)) { + tracker->watchers = w->next; + + if (GRPC_CHANNEL_FATAL_FAILURE != *w->current) { + *w->current = GRPC_CHANNEL_FATAL_FAILURE; + grpc_iomgr_add_callback(w->notify); + } else { + grpc_iomgr_add_delayed_callback(w->notify, 0); + } + gpr_free(w); + } +} + +grpc_connectivity_state grpc_connectivity_state_check(grpc_connectivity_state_tracker *tracker) { + return tracker->current_state; +} + +int grpc_connectivity_state_notify_on_state_change(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_iomgr_closure *notify) { + if (tracker->current_state != *current) { + *current = tracker->current_state; + grpc_iomgr_add_callback(notify); + } else { + grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); + w->current = current; + w->notify = notify; + w->next = tracker->watchers; + tracker->watchers = w; + } + return tracker->current_state == GRPC_CHANNEL_IDLE; +} + +void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state) { + grpc_connectivity_state_watcher *new = NULL; + grpc_connectivity_state_watcher *w; + tracker->current_state = state; + while ((w = tracker->watchers)) { + tracker->watchers = w->next; + + if (state != *w->current) { + *w->current = state; + grpc_iomgr_add_callback(w->notify); + gpr_free(w); + } else { + w->next = new; + new = w; + } + } + tracker->watchers = new; +} diff --git a/src/core/channel/connectivity_state.h b/src/core/channel/connectivity_state.h new file mode 100644 index 0000000000..ebc1acc559 --- /dev/null +++ b/src/core/channel/connectivity_state.h @@ -0,0 +1,66 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_CHANNEL_CONNECTIVITY_STATE_H +#define GRPC_INTERNAL_CORE_CHANNEL_CONNECTIVITY_STATE_H + +#include <grpc/grpc.h> +#include "src/core/iomgr/iomgr.h" + +typedef struct grpc_connectivity_state_watcher { + /** we keep watchers in a linked list */ + struct grpc_connectivity_state_watcher *next; + /** closure to notify on change */ + grpc_iomgr_closure *notify; + /** the current state as believed by the watcher */ + grpc_connectivity_state *current; +} grpc_connectivity_state_watcher; + +typedef struct { + /** current connectivity state */ + grpc_connectivity_state current_state; + /** all our watchers */ + grpc_connectivity_state_watcher *watchers; +} grpc_connectivity_state_tracker; + +void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state init_state); +void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker); + +void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state); + +grpc_connectivity_state grpc_connectivity_state_check(grpc_connectivity_state_tracker *tracker); + +/** Return 1 if the channel should start connecting, 0 otherwise */ +int grpc_connectivity_state_notify_on_state_change(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_iomgr_closure *notify); + +#endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTIVITY_STATE_H */ diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index f3a6d21eb5..4fda07a63f 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -36,6 +36,7 @@ #include <string.h> #include <grpc/support/alloc.h> +#include "src/core/channel/connectivity_state.h" typedef struct pending_pick { struct pending_pick *next; @@ -69,6 +70,9 @@ typedef struct { grpc_connectivity_state checking_connectivity; /** list of picks that are waiting on connectivity */ pending_pick *pending_picks; + + /** our connectivity state tracker */ + grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; void pf_ref(grpc_lb_policy *pol) { @@ -184,8 +188,46 @@ loop: } } +static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) { + pick_first_lb_policy *p = (pick_first_lb_policy*)pol; + size_t i; + size_t n; + grpc_subchannel **subchannels; + + gpr_mu_lock(&p->mu); + n = p->num_subchannels; + subchannels = gpr_malloc(n * sizeof(*subchannels)); + for (i = 0; i < n; i++) { + subchannels[i] = p->subchannels[i]; + grpc_subchannel_ref(subchannels[i]); + } + gpr_mu_unlock(&p->mu); + + for (i = 0; i < n; i++) { + grpc_subchannel_process_transport_op(subchannels[i], op); + grpc_subchannel_unref(subchannels[i]); + } + gpr_free(subchannels); +} + +static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) { + pick_first_lb_policy *p = (pick_first_lb_policy*)pol; + grpc_connectivity_state st; + gpr_mu_lock(&p->mu); + st = grpc_connectivity_state_check(&p->state_tracker); + gpr_mu_unlock(&p->mu); + return st; +} + +static void pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_state *current, grpc_iomgr_closure *notify) { + pick_first_lb_policy *p = (pick_first_lb_policy*)pol; + gpr_mu_lock(&p->mu); + grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, notify); + gpr_mu_unlock(&p->mu); +} + static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { - pf_ref, pf_unref, pf_shutdown, pf_pick}; + pf_ref, pf_unref, pf_shutdown, pf_pick, pf_broadcast, pf_check_connectivity, pf_notify_on_state_change}; grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels, size_t num_subchannels) { diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 42929e933b..42be9152cb 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -58,6 +58,16 @@ struct grpc_lb_policy_vtable { void (*pick)(grpc_lb_policy *policy, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, grpc_subchannel **target, grpc_iomgr_closure *on_complete); + + /** broadcast a transport op to all subchannels */ + void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op); + + /** check the current connectivity of the lb_policy */ + grpc_connectivity_state (*check_connectivity)(grpc_lb_policy *policy); + + /** call notify when the connectivity state of a channel changes from *state. + Updates *state with the new state of the policy */ + void (*notify_on_state_change)(grpc_lb_policy *policy, grpc_connectivity_state *state, grpc_iomgr_closure *closure); }; void grpc_lb_policy_ref(grpc_lb_policy *policy); diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index b489ce04a5..5cbb5d9971 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -39,6 +39,7 @@ #include "src/core/channel/channel_args.h" #include "src/core/channel/connected_channel.h" +#include "src/core/channel/connectivity_state.h" typedef struct { gpr_refcount refs; @@ -52,12 +53,6 @@ typedef struct waiting_for_connect { grpc_subchannel_call **target; } waiting_for_connect; -typedef struct connectivity_state_watcher { - struct connectivity_state_watcher *next; - grpc_iomgr_closure *notify; - grpc_connectivity_state *current; -} connectivity_state_watcher; - struct grpc_subchannel { gpr_refcount refs; grpc_connector *connector; @@ -92,8 +87,8 @@ struct grpc_subchannel { int connecting; /** things waiting for a connection */ waiting_for_connect *waiting; - /** things watching the connectivity state */ - connectivity_state_watcher *watchers; + /** connectivity state tracking */ + grpc_connectivity_state_tracker state_tracker; }; struct grpc_subchannel_call { @@ -123,6 +118,7 @@ void grpc_subchannel_unref(grpc_subchannel *c) { gpr_free(c->addr); grpc_mdctx_unref(c->mdctx); grpc_pollset_set_destroy(&c->pollset_set); + grpc_connectivity_state_destroy(&c->state_tracker); gpr_free(c); } } @@ -156,6 +152,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, grpc_mdctx_ref(c->mdctx); grpc_pollset_set_init(&c->pollset_set); grpc_iomgr_closure_init(&c->connected, subchannel_connected, c); + grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE); gpr_mu_init(&c->mu); return c; } @@ -210,7 +207,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { grpc_connectivity_state state; gpr_mu_lock(&c->mu); - state = compute_connectivity_locked(c); + state = grpc_connectivity_state_check(&c->state_tracker); gpr_mu_unlock(&c->mu); return state; } @@ -218,35 +215,24 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, grpc_connectivity_state *state, grpc_iomgr_closure *notify) { - grpc_connectivity_state current; int do_connect = 0; - connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); - w->current = state; - w->notify = notify; gpr_mu_lock(&c->mu); - current = compute_connectivity_locked(c); - if (current == GRPC_CHANNEL_IDLE) { - current = GRPC_CHANNEL_CONNECTING; + if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, notify)) { + do_connect = 1; c->connecting = 1; - do_connect = 1; grpc_subchannel_ref(c); - connectivity_state_changed_locked(c); - } - if (current != *state) { - gpr_mu_unlock(&c->mu); - *state = current; - grpc_iomgr_add_callback(notify); - gpr_free(w); - } else { - w->next = c->watchers; - c->watchers = w; - gpr_mu_unlock(&c->mu); + grpc_connectivity_state_set(&c->state_tracker, compute_connectivity_locked(c)); } + gpr_mu_unlock(&c->mu); if (do_connect) { start_connect(c); } } +void grpc_subchannel_process_transport_op(grpc_subchannel *c, grpc_transport_op *op) { + abort(); +} + static void publish_transport(grpc_subchannel *c) { size_t channel_stack_size; connection *con; @@ -311,21 +297,7 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) { static void connectivity_state_changed_locked(grpc_subchannel *c) { grpc_connectivity_state current = compute_connectivity_locked(c); - connectivity_state_watcher *new = NULL; - connectivity_state_watcher *w; - while ((w = c->watchers)) { - c->watchers = w->next; - - if (current != *w->current) { - *w->current = current; - grpc_iomgr_add_callback(w->notify); - gpr_free(w); - } else { - w->next = new; - new = w; - } - } - c->watchers = new; + grpc_connectivity_state_set(&c->state_tracker, current); } /* diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 8836f9b09c..60b95d3d8f 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -46,6 +46,15 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; void grpc_subchannel_ref(grpc_subchannel *channel); void grpc_subchannel_unref(grpc_subchannel *channel); +/** construct a call (possibly asynchronously) */ +void grpc_subchannel_create_call(grpc_subchannel *subchannel, + grpc_transport_stream_op *initial_op, + grpc_subchannel_call **target, + grpc_iomgr_closure *notify); + +/** process a transport level op */ +void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel, grpc_transport_op *op); + void grpc_subchannel_call_ref(grpc_subchannel_call *call); void grpc_subchannel_call_unref(grpc_subchannel_call *call); @@ -62,12 +71,6 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel, void grpc_subchannel_add_interested_party(grpc_subchannel *channel, grpc_pollset *pollset); void grpc_subchannel_del_interested_party(grpc_subchannel *channel, grpc_pollset *pollset); -/** construct a call (possibly asynchronously) */ -void grpc_subchannel_create_call(grpc_subchannel *subchannel, - grpc_transport_stream_op *initial_op, - grpc_subchannel_call **target, - grpc_iomgr_closure *notify); - /** continue processing a transport op */ void grpc_subchannel_call_process_op(grpc_subchannel_call *subchannel_call, grpc_transport_stream_op *op); |