diff options
Diffstat (limited to 'src/core/channel')
-rw-r--r-- | src/core/channel/client_channel.c | 174 |
1 files changed, 76 insertions, 98 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 0ad108ad6b..b9a489e0cc 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -83,8 +83,6 @@ struct call_data { /* owning element */ grpc_call_element *elem; - gpr_uint8 got_first_op; - call_state state; gpr_timespec deadline; union { @@ -129,55 +127,6 @@ static void complete_activate(grpc_call_element *elem, grpc_transport_op *op) { child_elem->filter->start_transport_op(child_elem, op); } -static void start_rpc(grpc_call_element *elem, grpc_transport_op *op) { - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->mu); - if (calld->state == CALL_CANCELLED) { - gpr_mu_unlock(&chand->mu); - grpc_transport_op_finish_with_failure(op); - return; - } - GPR_ASSERT(calld->state == CALL_CREATED); - calld->state = CALL_WAITING; - if (chand->active_child) { - /* channel is connected - use the connected stack */ - if (prepare_activate(elem, chand->active_child)) { - gpr_mu_unlock(&chand->mu); - /* activate the request (pass it down) outside the lock */ - complete_activate(elem, op); - } else { - gpr_mu_unlock(&chand->mu); - } - } else { - /* check to see if we should initiate a connection (if we're not already), - but don't do so until outside the lock to avoid re-entrancy problems if - the callback is immediate */ - int initiate_transport_setup = 0; - if (!chand->transport_setup_initiated) { - chand->transport_setup_initiated = 1; - initiate_transport_setup = 1; - } - /* add this call to the waiting set to be resumed once we have a child - channel stack, growing the waiting set if needed */ - if (chand->waiting_child_count == chand->waiting_child_capacity) { - chand->waiting_child_capacity = - GPR_MAX(chand->waiting_child_capacity * 2, 8); - chand->waiting_children = - gpr_realloc(chand->waiting_children, - chand->waiting_child_capacity * sizeof(call_data *)); - } - calld->s.waiting_op = *op; - chand->waiting_children[chand->waiting_child_count++] = calld; - gpr_mu_unlock(&chand->mu); - - /* finally initiate transport setup if needed */ - if (initiate_transport_setup) { - grpc_transport_setup_initiate(chand->transport_setup); - } - } -} - static void remove_waiting_child(channel_data *chand, call_data *calld) { size_t new_count; size_t i; @@ -217,11 +166,14 @@ static void handle_op_after_cancellation(grpc_call_element *elem, grpc_transport } } -static void cancel_rpc(grpc_call_element *elem, grpc_transport_op *op) { +static void cc_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_call_element *child_elem; grpc_transport_op waiting_op; + GPR_ASSERT(elem->filter == &grpc_client_channel_filter); + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); gpr_mu_lock(&chand->mu); switch (calld->state) { @@ -229,55 +181,82 @@ static void cancel_rpc(grpc_call_element *elem, grpc_transport_op *op) { child_elem = grpc_child_call_get_top_element(calld->s.active.child_call); gpr_mu_unlock(&chand->mu); child_elem->filter->start_transport_op(child_elem, op); - return; /* early out */ - case CALL_WAITING: - waiting_op = calld->s.waiting_op; - remove_waiting_child(chand, calld); - calld->state = CALL_CANCELLED; - gpr_mu_unlock(&chand->mu); - handle_op_after_cancellation(elem, &waiting_op); - handle_op_after_cancellation(elem, op); - return; /* early out */ + break; case CALL_CREATED: - calld->state = CALL_CANCELLED; - gpr_mu_unlock(&chand->mu); - handle_op_after_cancellation(elem, op); - return; /* early out */ + if (op->cancel_with_status != GRPC_STATUS_OK) { + calld->state = CALL_CANCELLED; + gpr_mu_unlock(&chand->mu); + handle_op_after_cancellation(elem, op); + } else { + calld->state = CALL_WAITING; + if (chand->active_child) { + /* channel is connected - use the connected stack */ + if (prepare_activate(elem, chand->active_child)) { + gpr_mu_unlock(&chand->mu); + /* activate the request (pass it down) outside the lock */ + complete_activate(elem, op); + } else { + gpr_mu_unlock(&chand->mu); + } + } else { + /* check to see if we should initiate a connection (if we're not already), + but don't do so until outside the lock to avoid re-entrancy problems if + the callback is immediate */ + int initiate_transport_setup = 0; + if (!chand->transport_setup_initiated) { + chand->transport_setup_initiated = 1; + initiate_transport_setup = 1; + } + /* add this call to the waiting set to be resumed once we have a child + channel stack, growing the waiting set if needed */ + if (chand->waiting_child_count == chand->waiting_child_capacity) { + chand->waiting_child_capacity = + GPR_MAX(chand->waiting_child_capacity * 2, 8); + chand->waiting_children = + gpr_realloc(chand->waiting_children, + chand->waiting_child_capacity * sizeof(call_data *)); + } + calld->s.waiting_op = *op; + chand->waiting_children[chand->waiting_child_count++] = calld; + gpr_mu_unlock(&chand->mu); + + /* finally initiate transport setup if needed */ + if (initiate_transport_setup) { + grpc_transport_setup_initiate(chand->transport_setup); + } + } + } + break; + case CALL_WAITING: + if (op->cancel_with_status != GRPC_STATUS_OK) { + waiting_op = calld->s.waiting_op; + remove_waiting_child(chand, calld); + calld->state = CALL_CANCELLED; + gpr_mu_unlock(&chand->mu); + handle_op_after_cancellation(elem, &waiting_op); + handle_op_after_cancellation(elem, op); + } else { + GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) != (op->send_ops == NULL)); + GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) != (op->recv_ops == NULL)); + if (op->send_ops) { + calld->s.waiting_op.send_ops = op->send_ops; + calld->s.waiting_op.is_last_send = op->is_last_send; + calld->s.waiting_op.on_done_send = op->on_done_send; + calld->s.waiting_op.send_user_data = op->send_user_data; + } + if (op->recv_ops) { + calld->s.waiting_op.recv_ops = op->recv_ops; + calld->s.waiting_op.recv_state = op->recv_state; + calld->s.waiting_op.on_done_recv = op->on_done_recv; + calld->s.waiting_op.recv_user_data = op->recv_user_data; + } + gpr_mu_unlock(&chand->mu); + } + break; case CALL_CANCELLED: gpr_mu_unlock(&chand->mu); handle_op_after_cancellation(elem, op); - return; /* early out */ - } - gpr_log(GPR_ERROR, "should never reach here"); - abort(); -} - -static void cc_start_transport_op(grpc_call_element *elem, - grpc_transport_op *op) { - call_data *calld = elem->call_data; - GPR_ASSERT(elem->filter == &grpc_client_channel_filter); - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - - if (op->cancel_with_status != GRPC_STATUS_OK) { - GPR_ASSERT(op->send_ops == NULL); - GPR_ASSERT(op->recv_ops == NULL); - - cancel_rpc(elem, op); - return; - } - - if (calld->state == CALL_CANCELLED) { - handle_op_after_cancellation(elem, op); - return; - } - - if (!calld->got_first_op) { - calld->got_first_op = 1; - start_rpc(elem, op); - } else { - grpc_call_element *child_elem = - grpc_child_call_get_top_element(calld->s.active.child_call); - child_elem->filter->start_transport_op(child_elem, op); + break; } } @@ -375,7 +354,6 @@ static void init_call_elem(grpc_call_element *elem, calld->elem = elem; calld->state = CALL_CREATED; calld->deadline = gpr_inf_future; - calld->got_first_op = 0; } /* Destructor for call_data */ |