diff options
Diffstat (limited to 'src/core/channel/client_channel.c')
-rw-r--r-- | src/core/channel/client_channel.c | 258 |
1 files changed, 128 insertions, 130 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index bc481e59ca..78f8d06d89 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -58,6 +58,7 @@ typedef struct { /* the sending child (may be null) */ grpc_child_channel *active_child; + grpc_mdctx *mdctx; /* calls waiting for a channel to be ready */ call_data **waiting_children; @@ -82,8 +83,6 @@ struct call_data { /* owning element */ grpc_call_element *elem; - gpr_uint8 got_first_send; - call_state state; gpr_timespec deadline; union { @@ -91,7 +90,11 @@ struct call_data { /* our child call stack */ grpc_child_call *child_call; } active; - grpc_call_op waiting_op; + grpc_transport_op waiting_op; + struct { + grpc_linked_mdelem status; + grpc_linked_mdelem details; + } cancelled; } s; }; @@ -105,14 +108,14 @@ static int prepare_activate(grpc_call_element *elem, calld->state = CALL_ACTIVE; /* create a child call */ - calld->s.active.child_call = grpc_child_channel_create_call(on_child, elem); + /* TODO(ctiller): pass the waiting op down here */ + calld->s.active.child_call = + grpc_child_channel_create_call(on_child, elem, NULL); return 1; } -static void do_nothing(void *ignored, grpc_op_error error) {} - -static void complete_activate(grpc_call_element *elem, grpc_call_op *op) { +static void complete_activate(grpc_call_element *elem, grpc_transport_op *op) { call_data *calld = elem->call_data; grpc_call_element *child_elem = grpc_child_call_get_top_element(calld->s.active.child_call); @@ -121,57 +124,7 @@ static void complete_activate(grpc_call_element *elem, grpc_call_op *op) { /* continue the start call down the stack, this nees to happen after metadata are flushed*/ - child_elem->filter->call_op(child_elem, elem, op); -} - -static void start_rpc(grpc_call_element *elem, grpc_call_op *op) { - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->mu); - if (calld->state == CALL_CANCELLED) { - gpr_mu_unlock(&chand->mu); - grpc_metadata_batch_destroy(&op->data.metadata); - op->done_cb(op->user_data, GRPC_OP_ERROR); - return; - } - GPR_ASSERT(calld->state == CALL_CREATED); - calld->state = CALL_WAITING; - if (chand->active_child) { - /* channel is connected - use the connected stack */ - if (prepare_activate(elem, chand->active_child)) { - gpr_mu_unlock(&chand->mu); - /* activate the request (pass it down) outside the lock */ - complete_activate(elem, op); - } else { - gpr_mu_unlock(&chand->mu); - } - } else { - /* check to see if we should initiate a connection (if we're not already), - but don't do so until outside the lock to avoid re-entrancy problems if - the callback is immediate */ - int initiate_transport_setup = 0; - if (!chand->transport_setup_initiated) { - chand->transport_setup_initiated = 1; - initiate_transport_setup = 1; - } - /* add this call to the waiting set to be resumed once we have a child - channel stack, growing the waiting set if needed */ - if (chand->waiting_child_count == chand->waiting_child_capacity) { - chand->waiting_child_capacity = - GPR_MAX(chand->waiting_child_capacity * 2, 8); - chand->waiting_children = - gpr_realloc(chand->waiting_children, - chand->waiting_child_capacity * sizeof(call_data *)); - } - calld->s.waiting_op = *op; - chand->waiting_children[chand->waiting_child_count++] = calld; - gpr_mu_unlock(&chand->mu); - - /* finally initiate transport setup if needed */ - if (initiate_transport_setup) { - grpc_transport_setup_initiate(chand->transport_setup); - } - } + child_elem->filter->start_transport_op(child_elem, op); } static void remove_waiting_child(channel_data *chand, call_data *calld) { @@ -186,85 +139,128 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) { chand->waiting_child_count = new_count; } -static void send_up_cancelled_ops(grpc_call_element *elem) { - grpc_call_op finish_op; - /* send up a synthesized status */ - grpc_call_element_recv_status(elem, GRPC_STATUS_CANCELLED, "Cancelled"); - /* send up a finish */ - finish_op.type = GRPC_RECV_FINISH; - finish_op.dir = GRPC_CALL_UP; - finish_op.flags = 0; - finish_op.done_cb = do_nothing; - finish_op.user_data = NULL; - grpc_call_next_op(elem, &finish_op); +static void handle_op_after_cancellation(grpc_call_element *elem, + grpc_transport_op *op) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + if (op->send_ops) { + op->on_done_send(op->send_user_data, 0); + } + if (op->recv_ops) { + char status[GPR_LTOA_MIN_BUFSIZE]; + grpc_metadata_batch mdb; + gpr_ltoa(GRPC_STATUS_CANCELLED, status); + calld->s.cancelled.status.md = + grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status); + calld->s.cancelled.details.md = + grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled"); + calld->s.cancelled.status.prev = calld->s.cancelled.details.next = NULL; + calld->s.cancelled.status.next = &calld->s.cancelled.details; + calld->s.cancelled.details.prev = &calld->s.cancelled.status; + mdb.list.head = &calld->s.cancelled.status; + mdb.list.tail = &calld->s.cancelled.details; + mdb.garbage.head = mdb.garbage.tail = NULL; + mdb.deadline = gpr_inf_future; + grpc_sopb_add_metadata(op->recv_ops, mdb); + *op->recv_state = GRPC_STREAM_CLOSED; + op->on_done_recv(op->recv_user_data, 1); + } } -static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) { +static void cc_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_call_element *child_elem; + grpc_transport_op waiting_op; + GPR_ASSERT(elem->filter == &grpc_client_channel_filter); + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); gpr_mu_lock(&chand->mu); switch (calld->state) { case CALL_ACTIVE: child_elem = grpc_child_call_get_top_element(calld->s.active.child_call); gpr_mu_unlock(&chand->mu); - child_elem->filter->call_op(child_elem, elem, op); - return; /* early out */ - case CALL_WAITING: - grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata); - remove_waiting_child(chand, calld); - calld->state = CALL_CANCELLED; - gpr_mu_unlock(&chand->mu); - send_up_cancelled_ops(elem); - calld->s.waiting_op.done_cb(calld->s.waiting_op.user_data, GRPC_OP_ERROR); - return; /* early out */ + child_elem->filter->start_transport_op(child_elem, op); + break; case CALL_CREATED: - calld->state = CALL_CANCELLED; - gpr_mu_unlock(&chand->mu); - send_up_cancelled_ops(elem); - return; /* early out */ - case CALL_CANCELLED: - gpr_mu_unlock(&chand->mu); - return; /* early out */ - } - gpr_log(GPR_ERROR, "should never reach here"); - abort(); -} - -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { - call_data *calld = elem->call_data; - GPR_ASSERT(elem->filter == &grpc_client_channel_filter); - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - - switch (op->type) { - case GRPC_SEND_METADATA: - if (!calld->got_first_send) { - /* filter out the start event to find which child to send on */ - calld->got_first_send = 1; - start_rpc(elem, op); + if (op->cancel_with_status != GRPC_STATUS_OK) { + calld->state = CALL_CANCELLED; + gpr_mu_unlock(&chand->mu); + handle_op_after_cancellation(elem, op); } else { - grpc_call_next_op(elem, op); + calld->state = CALL_WAITING; + if (chand->active_child) { + /* channel is connected - use the connected stack */ + if (prepare_activate(elem, chand->active_child)) { + gpr_mu_unlock(&chand->mu); + /* activate the request (pass it down) outside the lock */ + complete_activate(elem, op); + } else { + gpr_mu_unlock(&chand->mu); + } + } else { + /* check to see if we should initiate a connection (if we're not + already), + but don't do so until outside the lock to avoid re-entrancy + problems if + the callback is immediate */ + int initiate_transport_setup = 0; + if (!chand->transport_setup_initiated) { + chand->transport_setup_initiated = 1; + initiate_transport_setup = 1; + } + /* add this call to the waiting set to be resumed once we have a child + channel stack, growing the waiting set if needed */ + if (chand->waiting_child_count == chand->waiting_child_capacity) { + chand->waiting_child_capacity = + GPR_MAX(chand->waiting_child_capacity * 2, 8); + chand->waiting_children = gpr_realloc( + chand->waiting_children, + chand->waiting_child_capacity * sizeof(call_data *)); + } + calld->s.waiting_op = *op; + chand->waiting_children[chand->waiting_child_count++] = calld; + gpr_mu_unlock(&chand->mu); + + /* finally initiate transport setup if needed */ + if (initiate_transport_setup) { + grpc_transport_setup_initiate(chand->transport_setup); + } + } } break; - case GRPC_CANCEL_OP: - cancel_rpc(elem, op); - break; - case GRPC_SEND_MESSAGE: - case GRPC_SEND_FINISH: - case GRPC_REQUEST_DATA: - if (calld->state == CALL_ACTIVE) { - grpc_call_element *child_elem = - grpc_child_call_get_top_element(calld->s.active.child_call); - child_elem->filter->call_op(child_elem, elem, op); + case CALL_WAITING: + if (op->cancel_with_status != GRPC_STATUS_OK) { + waiting_op = calld->s.waiting_op; + remove_waiting_child(chand, calld); + calld->state = CALL_CANCELLED; + gpr_mu_unlock(&chand->mu); + handle_op_after_cancellation(elem, &waiting_op); + handle_op_after_cancellation(elem, op); } else { - op->done_cb(op->user_data, GRPC_OP_ERROR); + GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) != + (op->send_ops == NULL)); + GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) != + (op->recv_ops == NULL)); + if (op->send_ops) { + calld->s.waiting_op.send_ops = op->send_ops; + calld->s.waiting_op.is_last_send = op->is_last_send; + calld->s.waiting_op.on_done_send = op->on_done_send; + calld->s.waiting_op.send_user_data = op->send_user_data; + } + if (op->recv_ops) { + calld->s.waiting_op.recv_ops = op->recv_ops; + calld->s.waiting_op.recv_state = op->recv_state; + calld->s.waiting_op.on_done_recv = op->on_done_recv; + calld->s.waiting_op.recv_user_data = op->recv_user_data; + } + gpr_mu_unlock(&chand->mu); } break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_UP); - grpc_call_next_op(elem, op); + case CALL_CANCELLED: + gpr_mu_unlock(&chand->mu); + handle_op_after_cancellation(elem, op); break; } } @@ -351,15 +347,18 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data) { + const void *server_transport_data, + grpc_transport_op *initial_op) { call_data *calld = elem->call_data; + /* TODO(ctiller): is there something useful we can do here? */ + GPR_ASSERT(initial_op == NULL); + GPR_ASSERT(elem->filter == &grpc_client_channel_filter); GPR_ASSERT(server_transport_data == NULL); calld->elem = elem; calld->state = CALL_CREATED; calld->deadline = gpr_inf_future; - calld->got_first_send = 0; } /* Destructor for call_data */ @@ -372,9 +371,7 @@ static void destroy_call_elem(grpc_call_element *elem) { if (calld->state == CALL_ACTIVE) { grpc_child_call_destroy(calld->s.active.child_call); } - if (calld->state == CALL_WAITING) { - grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata); - } + GPR_ASSERT(calld->state != CALL_WAITING); } /* Constructor for channel_data */ @@ -396,6 +393,7 @@ static void init_channel_elem(grpc_channel_element *elem, chand->transport_setup = NULL; chand->transport_setup_initiated = 0; chand->args = grpc_channel_args_copy(args); + chand->mdctx = metadata_context; } /* Destructor for channel_data */ @@ -417,9 +415,9 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_client_channel_filter = { - call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, - sizeof(channel_data), init_channel_elem, destroy_channel_elem, - "client-channel", + cc_start_transport_op, channel_op, sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "client-channel", }; grpc_transport_setup_result grpc_client_channel_transport_setup_complete( @@ -436,7 +434,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( call_data **waiting_children; size_t waiting_child_count; size_t i; - grpc_call_op *call_ops; + grpc_transport_op *call_ops; /* build the child filter stack */ child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters); @@ -472,13 +470,13 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( chand->waiting_child_count = 0; chand->waiting_child_capacity = 0; - call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count); + call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count); for (i = 0; i < waiting_child_count; i++) { call_ops[i] = waiting_children[i]->s.waiting_op; if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) { waiting_children[i] = NULL; - call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR); + grpc_transport_op_finish_with_failure(&call_ops[i]); } } |