aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel/client_channel.c
diff options
context:
space:
mode:
authorGravatar ctiller <ctiller@google.com>2014-12-10 15:28:27 -0800
committerGravatar Jan Tattermusch <jtattermusch@google.com>2014-12-11 15:11:44 -0800
commitf962f5264b381febb37bc88c0955a031619bb8d9 (patch)
tree513e514a8f39209c925aa80323c7b41c6a2151b2 /src/core/channel/client_channel.c
parenta8fd44adf8855518ee1bc0cad1ee0a533323d9de (diff)
Tell call/channel op handlers who is invoking them.
This change adds a parameter to all op handlers specifying the invoking filter. It will be used to allow client_channel to distinguish which child channel is disconnecting or going away. Change on 2014/12/10 by ctiller <ctiller@google.com> ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=81823231
Diffstat (limited to 'src/core/channel/client_channel.c')
-rw-r--r--src/core/channel/client_channel.c41
1 files changed, 24 insertions, 17 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 0ceffba62a..9ec19df8a9 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -51,12 +51,13 @@ typedef struct { grpc_channel_element *back; } lb_channel_data;
typedef struct { grpc_call_element *back; } lb_call_data;
-static void lb_call_op(grpc_call_element *elem, grpc_call_op *op) {
+static void lb_call_op(grpc_call_element *elem, grpc_call_element *from_elem,
+ grpc_call_op *op) {
lb_call_data *calld = elem->call_data;
switch (op->dir) {
case GRPC_CALL_UP:
- calld->back->filter->call_op(calld->back, op);
+ calld->back->filter->call_op(calld->back, elem, op);
break;
case GRPC_CALL_DOWN:
grpc_call_next_op(elem, op);
@@ -65,12 +66,14 @@ static void lb_call_op(grpc_call_element *elem, grpc_call_op *op) {
}
/* Currently we assume all channel operations should just be pushed up. */
-static void lb_channel_op(grpc_channel_element *elem, grpc_channel_op *op) {
+static void lb_channel_op(grpc_channel_element *elem,
+ grpc_channel_element *from_elem,
+ grpc_channel_op *op) {
lb_channel_data *chand = elem->channel_data;
switch (op->dir) {
case GRPC_CALL_UP:
- chand->back->filter->channel_op(chand->back, op);
+ chand->back->filter->channel_op(chand->back, elem, op);
break;
case GRPC_CALL_DOWN:
grpc_channel_next_op(elem, op);
@@ -201,8 +204,9 @@ static int prepare_activate(call_data *calld, child_entry *on_child) {
static void do_nothing(void *ignored, grpc_op_error error) {}
-static void complete_activate(call_data *calld, child_entry *on_child,
+static void complete_activate(grpc_call_element *elem, child_entry *on_child,
grpc_call_op *op) {
+ call_data *calld = elem->call_data;
grpc_call_element *child_elem =
grpc_call_stack_element(calld->s.active.child_stack, 0);
@@ -219,15 +223,17 @@ static void complete_activate(call_data *calld, child_entry *on_child,
dop.data.deadline = calld->deadline;
dop.done_cb = do_nothing;
dop.user_data = NULL;
- child_elem->filter->call_op(child_elem, &dop);
+ child_elem->filter->call_op(child_elem, elem, &dop);
}
/* continue the start call down the stack, this nees to happen after metadata
are flushed*/
- child_elem->filter->call_op(child_elem, op);
+ child_elem->filter->call_op(child_elem, elem, op);
}
-static void start_rpc(call_data *calld, channel_data *chand, grpc_call_op *op) {
+static void start_rpc(grpc_call_element *elem, grpc_call_op *op) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->mu);
if (calld->state == CALL_CANCELLED) {
gpr_mu_unlock(&chand->mu);
@@ -241,7 +247,7 @@ static void start_rpc(call_data *calld, channel_data *chand, grpc_call_op *op) {
if (prepare_activate(calld, chand->active_child)) {
gpr_mu_unlock(&chand->mu);
/* activate the request (pass it down) outside the lock */
- complete_activate(calld, chand->active_child, op);
+ complete_activate(elem, chand->active_child, op);
} else {
gpr_mu_unlock(&chand->mu);
}
@@ -299,7 +305,7 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
case CALL_ACTIVE:
child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0);
gpr_mu_unlock(&chand->mu);
- child_elem->filter->call_op(child_elem, op);
+ child_elem->filter->call_op(child_elem, elem, op);
return; /* early out */
case CALL_WAITING:
remove_waiting_child(chand, calld);
@@ -333,9 +339,9 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
abort();
}
-static void call_op(grpc_call_element *elem, grpc_call_op *op) {
+static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
+ grpc_call_op *op) {
call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
grpc_call_element *child_elem;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
@@ -350,7 +356,7 @@ static void call_op(grpc_call_element *elem, grpc_call_op *op) {
break;
case GRPC_SEND_START:
/* filter out the start event to find which child to send on */
- start_rpc(calld, chand, op);
+ start_rpc(elem, op);
break;
case GRPC_CANCEL_OP:
cancel_rpc(elem, op);
@@ -363,7 +369,7 @@ static void call_op(grpc_call_element *elem, grpc_call_op *op) {
case GRPC_CALL_DOWN:
child_elem = grpc_call_stack_element(calld->s.active.child_stack, 0);
GPR_ASSERT(calld->state == CALL_ACTIVE);
- child_elem->filter->call_op(child_elem, op);
+ child_elem->filter->call_op(child_elem, elem, op);
break;
}
break;
@@ -395,7 +401,7 @@ static void broadcast_channel_op_down(grpc_channel_element *elem,
if (op->type == GRPC_CHANNEL_GOAWAY) {
gpr_slice_ref(op->data.goaway.message);
}
- child_elem->filter->channel_op(child_elem, op);
+ child_elem->filter->channel_op(child_elem, elem, op);
}
if (op->type == GRPC_CHANNEL_GOAWAY) {
gpr_slice_unref(op->data.goaway.message);
@@ -411,7 +417,8 @@ static void broadcast_channel_op_down(grpc_channel_element *elem,
gpr_free(children);
}
-static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) {
+static void channel_op(grpc_channel_element *elem,
+ grpc_channel_element *from_elem, grpc_channel_op *op) {
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
switch (op->type) {
@@ -627,7 +634,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
that guarantee we need to do some curly locking here */
for (i = 0; i < waiting_child_count; i++) {
if (waiting_children[i]) {
- complete_activate(waiting_children[i], child_ent, &call_ops[i]);
+ complete_activate(waiting_children[i]->elem, child_ent, &call_ops[i]);
}
}
gpr_free(waiting_children);