diff options
Diffstat (limited to 'src/core/channel/client_channel.c')
-rw-r--r-- | src/core/channel/client_channel.c | 329 |
1 files changed, 170 insertions, 159 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 9630f6898d..965d4e53dc 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -34,6 +34,7 @@ #include "src/core/channel/client_channel.h" #include <stdio.h> +#include <string.h> #include "src/core/channel/channel_args.h" #include "src/core/channel/connected_channel.h" @@ -75,6 +76,7 @@ typedef enum { CALL_CREATED, CALL_WAITING_FOR_CONFIG, CALL_WAITING_FOR_PICK, + CALL_WAITING_FOR_CALL, CALL_ACTIVE, CALL_CANCELLED } call_state; @@ -87,17 +89,13 @@ struct call_data { call_state state; gpr_timespec deadline; - union { - struct { - /* our child call stack */ - grpc_subchannel_call *subchannel_call; - } active; - grpc_transport_stream_op waiting_op; - struct { - grpc_linked_mdelem status; - grpc_linked_mdelem details; - } cancelled; - } s; + grpc_subchannel *picked_channel; + grpc_iomgr_closure async_setup_task; + grpc_transport_stream_op waiting_op; + /* our child call stack */ + grpc_subchannel_call *subchannel_call; + grpc_linked_mdelem status; + grpc_linked_mdelem details; }; #if 0 @@ -110,9 +108,9 @@ static int prepare_activate(grpc_call_element *elem, /* no more access to calld->s.waiting allowed */ GPR_ASSERT(calld->state == CALL_WAITING); - if (calld->s.waiting_op.bind_pollset) { + if (calld->waiting_op.bind_pollset) { grpc_transport_setup_del_interested_party(chand->transport_setup, - calld->s.waiting_op.bind_pollset); + calld->waiting_op.bind_pollset); } calld->state = CALL_ACTIVE; @@ -143,7 +141,7 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) { for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) { if (chand->waiting_children[i] == calld) { grpc_transport_setup_del_interested_party( - chand->transport_setup, calld->s.waiting_op.bind_pollset); + chand->transport_setup, calld->waiting_op.bind_pollset); continue; } chand->waiting_children[new_count++] = chand->waiting_children[i]; @@ -166,15 +164,15 @@ static void handle_op_after_cancellation(grpc_call_element *elem, char status[GPR_LTOA_MIN_BUFSIZE]; grpc_metadata_batch mdb; gpr_ltoa(GRPC_STATUS_CANCELLED, status); - calld->s.cancelled.status.md = + calld->status.md = grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status); - calld->s.cancelled.details.md = + calld->details.md = grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled"); - calld->s.cancelled.status.prev = calld->s.cancelled.details.next = NULL; - calld->s.cancelled.status.next = &calld->s.cancelled.details; - calld->s.cancelled.details.prev = &calld->s.cancelled.status; - mdb.list.head = &calld->s.cancelled.status; - mdb.list.tail = &calld->s.cancelled.details; + calld->status.prev = calld->details.next = NULL; + calld->status.next = &calld->details; + calld->details.prev = &calld->status; + mdb.list.head = &calld->status; + mdb.list.tail = &calld->details; mdb.garbage.head = mdb.garbage.tail = NULL; mdb.deadline = gpr_inf_future; grpc_sopb_add_metadata(op->recv_ops, mdb); @@ -186,16 +184,111 @@ static void handle_op_after_cancellation(grpc_call_element *elem, } } -static void add_to_lb_policy_wait_queue_locked_state_config(channel_data *chand, call_data *calld) { - abort(); +typedef struct { + grpc_iomgr_closure closure; + grpc_call_element *elem; +} waiting_call; + +static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation); + +static void continue_with_pick(void *arg, int iomgr_success) { + waiting_call *wc = arg; + call_data *calld = wc->elem->call_data; + perform_transport_stream_op(wc->elem, &calld->waiting_op, 1); + gpr_free(wc); +} + +static void add_to_lb_policy_wait_queue_locked_state_config(grpc_call_element *elem) { + channel_data *chand = elem->channel_data; + waiting_call *wc = gpr_malloc(sizeof(*wc)); + grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc); + wc->elem = elem; + wc->closure.next = chand->waiting_for_config_closures; + chand->waiting_for_config_closures = &wc->closure; +} + +static int is_empty(void *p, int len) { + char *ptr = p; + int i; + for (i = 0; i < len; i++) { + if (ptr[i] != 0) return 0; + } + return 1; +} + +static void started_call(void *arg, int iomgr_success) { + call_data *calld = arg; + grpc_transport_stream_op op; + int have_waiting; + + gpr_mu_lock(&calld->mu_state); + if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) { + memset(&op, 0, sizeof(op)); + op.cancel_with_status = GRPC_STATUS_CANCELLED; + gpr_mu_unlock(&calld->mu_state); + grpc_subchannel_call_process_op(calld->subchannel_call, &op); + } else if (calld->state == CALL_WAITING_FOR_CALL) { + have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op)); + if (calld->subchannel_call != NULL) { + calld->state = CALL_ACTIVE; + gpr_mu_unlock(&calld->mu_state); + if (have_waiting) { + grpc_subchannel_call_process_op(calld->subchannel_call, &calld->waiting_op); + } + } else { + calld->state = CALL_CANCELLED; + gpr_mu_unlock(&calld->mu_state); + if (have_waiting) { + handle_op_after_cancellation(calld->elem, &calld->waiting_op); + } + } + } else { + GPR_ASSERT(calld->state == CALL_CANCELLED); + } +} + +static void picked_target(void *arg, int iomgr_success) { + call_data *calld = arg; + channel_data *chand = calld->elem->channel_data; + grpc_transport_stream_op op; + + if (calld->picked_channel == NULL) { + /* treat this like a cancellation */ + calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE; + perform_transport_stream_op(calld->elem, &calld->waiting_op, 1); + } else { + gpr_mu_lock(&calld->mu_state); + if (calld->state == CALL_CANCELLED) { + gpr_mu_unlock(&calld->mu_state); + handle_op_after_cancellation(calld->elem, &calld->waiting_op); + } else { + GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK); + calld->state = CALL_WAITING_FOR_CALL; + op = calld->waiting_op; + memset(&calld->waiting_op, 0, sizeof(calld->waiting_op)); + gpr_mu_unlock(&calld->mu_state); + grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld); + grpc_subchannel_create_call(calld->picked_channel, chand->mdctx, &op, &calld->subchannel_call, &calld->async_setup_task); + } + } } static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) { - abort(); + grpc_metadata_batch *initial_metadata; + grpc_transport_stream_op *op = &calld->waiting_op; + + GPR_ASSERT(op->bind_pollset); + GPR_ASSERT(op->send_ops); + GPR_ASSERT(op->send_ops->nops >= 1); + GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA); + initial_metadata = &op->send_ops->ops[0].data.metadata; + + grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld); + grpc_lb_policy_pick(lb_policy, op->bind_pollset, + initial_metadata, &calld->picked_channel, &calld->async_setup_task); } -static void cc_start_transport_stream_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { +static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_subchannel_call *subchannel_call; @@ -206,7 +299,8 @@ static void cc_start_transport_stream_op(grpc_call_element *elem, gpr_mu_lock(&calld->mu_state); switch (calld->state) { case CALL_ACTIVE: - subchannel_call = calld->s.active.subchannel_call; + GPR_ASSERT(!continuation); + subchannel_call = calld->subchannel_call; gpr_mu_unlock(&calld->mu_state); grpc_subchannel_call_process_op(subchannel_call, op); break; @@ -214,13 +308,44 @@ static void cc_start_transport_stream_op(grpc_call_element *elem, gpr_mu_unlock(&calld->mu_state); handle_op_after_cancellation(elem, op); break; + case CALL_WAITING_FOR_CONFIG: + case CALL_WAITING_FOR_PICK: + case CALL_WAITING_FOR_CALL: + if (!continuation) { + if (op->cancel_with_status != GRPC_STATUS_OK) { + calld->state = CALL_CANCELLED; + gpr_mu_unlock(&calld->mu_state); + handle_op_after_cancellation(elem, op); + } else { + GPR_ASSERT((calld->waiting_op.send_ops == NULL) != + (op->send_ops == NULL)); + GPR_ASSERT((calld->waiting_op.recv_ops == NULL) != + (op->recv_ops == NULL)); + if (op->send_ops != NULL) { + calld->waiting_op.send_ops = op->send_ops; + calld->waiting_op.is_last_send = op->is_last_send; + calld->waiting_op.on_done_send = op->on_done_send; + } + if (op->recv_ops != NULL) { + calld->waiting_op.recv_ops = op->recv_ops; + calld->waiting_op.recv_state = op->recv_state; + calld->waiting_op.on_done_recv = op->on_done_recv; + } + gpr_mu_unlock(&calld->mu_state); + if (op->on_consumed != NULL) { + op->on_consumed->cb(op->on_consumed->cb_arg, 0); + } + } + break; + } + /* fall through */ case CALL_CREATED: if (op->cancel_with_status != GRPC_STATUS_OK) { calld->state = CALL_CANCELLED; gpr_mu_unlock(&calld->mu_state); handle_op_after_cancellation(elem, op); } else { - calld->s.waiting_op = *op; + calld->waiting_op = *op; gpr_mu_lock(&chand->mu_config); lb_policy = chand->lb_policy; @@ -235,141 +360,22 @@ static void cc_start_transport_stream_op(grpc_call_element *elem, grpc_lb_policy_unref(lb_policy); } else { calld->state = CALL_WAITING_FOR_CONFIG; - add_to_lb_policy_wait_queue_locked_state_config(chand, calld); + add_to_lb_policy_wait_queue_locked_state_config(elem); gpr_mu_unlock(&chand->mu_config); gpr_mu_unlock(&calld->mu_state); } } break; - case CALL_WAITING_FOR_CONFIG: - case CALL_WAITING_FOR_PICK: - if (op->cancel_with_status != GRPC_STATUS_OK) { - calld->state = CALL_CANCELLED; - gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(elem, op); - } else { - GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) != - (op->send_ops == NULL)); - GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) != - (op->recv_ops == NULL)); - if (op->send_ops != NULL) { - calld->s.waiting_op.send_ops = op->send_ops; - calld->s.waiting_op.is_last_send = op->is_last_send; - calld->s.waiting_op.on_done_send = op->on_done_send; - } - if (op->recv_ops != NULL) { - calld->s.waiting_op.recv_ops = op->recv_ops; - calld->s.waiting_op.recv_state = op->recv_state; - calld->s.waiting_op.on_done_recv = op->on_done_recv; - } - gpr_mu_unlock(&calld->mu_state); - if (op->on_consumed != NULL) { - op->on_consumed->cb(op->on_consumed->cb_arg, 0); - } - } - break; } +} - - - -#if 0 - gpr_mu_lock(&chand->mu); - switch (calld->state) { - case CALL_ACTIVE: - child_elem = grpc_child_call_get_top_element(calld->s.active.child_call); - gpr_mu_unlock(&chand->mu); - child_elem->filter->start_transport_op(child_elem, op); - break; - case CALL_CREATED: - if (op->cancel_with_status != GRPC_STATUS_OK) { - calld->state = CALL_CANCELLED; - gpr_mu_unlock(&chand->mu); - handle_op_after_cancellation(elem, op); - } else { - calld->state = CALL_WAITING; - calld->s.waiting_op.bind_pollset = NULL; - if (chand->active_child) { - /* channel is connected - use the connected stack */ - if (prepare_activate(elem, chand->active_child)) { - gpr_mu_unlock(&chand->mu); - /* activate the request (pass it down) outside the lock */ - complete_activate(elem, op); - } else { - gpr_mu_unlock(&chand->mu); - } - } else { - /* check to see if we should initiate a connection (if we're not - already), - but don't do so until outside the lock to avoid re-entrancy - problems if - the callback is immediate */ - int initiate_transport_setup = 0; - if (!chand->transport_setup_initiated) { - chand->transport_setup_initiated = 1; - initiate_transport_setup = 1; - } - /* add this call to the waiting set to be resumed once we have a child - channel stack, growing the waiting set if needed */ - if (chand->waiting_child_count == chand->waiting_child_capacity) { - chand->waiting_child_capacity = - GPR_MAX(chand->waiting_child_capacity * 2, 8); - chand->waiting_children = gpr_realloc( - chand->waiting_children, - chand->waiting_child_capacity * sizeof(call_data *)); - } - calld->s.waiting_op = *op; - chand->waiting_children[chand->waiting_child_count++] = calld; - grpc_transport_setup_add_interested_party(chand->transport_setup, - op->bind_pollset); - gpr_mu_unlock(&chand->mu); - - /* finally initiate transport setup if needed */ - if (initiate_transport_setup) { - grpc_transport_setup_initiate(chand->transport_setup); - } - } - } - break; - case CALL_WAITING: - if (op->cancel_with_status != GRPC_STATUS_OK) { - waiting_op = calld->s.waiting_op; - remove_waiting_child(chand, calld); - calld->state = CALL_CANCELLED; - gpr_mu_unlock(&chand->mu); - handle_op_after_cancellation(elem, &waiting_op); - handle_op_after_cancellation(elem, op); - } else { - GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) != - (op->send_ops == NULL)); - GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) != - (op->recv_ops == NULL)); - if (op->send_ops) { - calld->s.waiting_op.send_ops = op->send_ops; - calld->s.waiting_op.is_last_send = op->is_last_send; - calld->s.waiting_op.on_done_send = op->on_done_send; - } - if (op->recv_ops) { - calld->s.waiting_op.recv_ops = op->recv_ops; - calld->s.waiting_op.recv_state = op->recv_state; - calld->s.waiting_op.on_done_recv = op->on_done_recv; - } - gpr_mu_unlock(&chand->mu); - if (op->on_consumed) { - op->on_consumed->cb(op->on_consumed->cb_arg, 0); - } - } - break; - case CALL_CANCELLED: - gpr_mu_unlock(&chand->mu); - handle_op_after_cancellation(elem, op); - break; - } -#endif +static void cc_start_transport_stream_op(grpc_call_element *elem, + grpc_transport_stream_op *op) { + perform_transport_stream_op(elem, op, 0); } static void update_state_locked(channel_data *chand) { - + gpr_log(GPR_ERROR, "update_state_locked not implemented"); } static void cc_on_config_changed(void *arg, int iomgr_success) { @@ -382,9 +388,10 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { if (chand->incoming_configuration) { lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration); grpc_lb_policy_ref(lb_policy); + + grpc_client_config_unref(chand->incoming_configuration); } - grpc_client_config_unref(chand->incoming_configuration); chand->incoming_configuration = NULL; gpr_mu_lock(&chand->mu_config); @@ -402,7 +409,9 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { wakeup_closures = next; } - grpc_lb_policy_unref(old_lb_policy); + if (old_lb_policy) { + grpc_lb_policy_unref(old_lb_policy); + } if (iomgr_success) { grpc_resolver_next(chand->resolver, &chand->incoming_configuration, &chand->on_config_changed); @@ -511,6 +520,7 @@ static void init_call_elem(grpc_call_element *elem, GPR_ASSERT(elem->filter == &grpc_client_channel_filter); GPR_ASSERT(server_transport_data == NULL); + gpr_mu_init(&calld->mu_state); calld->elem = elem; calld->state = CALL_CREATED; calld->deadline = gpr_inf_future; @@ -527,7 +537,7 @@ static void destroy_call_elem(grpc_call_element *elem) { gpr_mu_lock(&calld->mu_state); switch (calld->state) { case CALL_ACTIVE: - subchannel_call = calld->s.active.subchannel_call; + subchannel_call = calld->subchannel_call; gpr_mu_unlock(&calld->mu_state); grpc_subchannel_call_unref(subchannel_call); break; @@ -537,6 +547,7 @@ static void destroy_call_elem(grpc_call_element *elem) { break; case CALL_WAITING_FOR_PICK: case CALL_WAITING_FOR_CONFIG: + case CALL_WAITING_FOR_CALL: gpr_log(GPR_ERROR, "should never reach here"); abort(); break; @@ -550,12 +561,12 @@ static void init_channel_elem(grpc_channel_element *elem, int is_last) { channel_data *chand = elem->channel_data; - GPR_ASSERT(!is_first); + memset(chand, 0, sizeof(*chand)); + GPR_ASSERT(is_last); GPR_ASSERT(elem->filter == &grpc_client_channel_filter); gpr_mu_init(&chand->mu_config); - chand->resolver = NULL; chand->mdctx = metadata_context; grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); } @@ -633,7 +644,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count); for (i = 0; i < waiting_child_count; i++) { - call_ops[i] = waiting_children[i]->s.waiting_op; + call_ops[i] = waiting_children[i]->waiting_op; if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) { waiting_children[i] = NULL; grpc_transport_stream_op_finish_with_failure(&call_ops[i]); |