diff options
Diffstat (limited to 'src/core/ext/client_config')
-rw-r--r-- | src/core/ext/client_config/client_channel.c | 24 | ||||
-rw-r--r-- | src/core/ext/client_config/lb_policy.c | 10 | ||||
-rw-r--r-- | src/core/ext/client_config/lb_policy.h | 60 | ||||
-rw-r--r-- | src/core/ext/client_config/lb_policy_factory.h | 13 | ||||
-rw-r--r-- | src/core/ext/client_config/subchannel.c | 18 |
5 files changed, 81 insertions, 44 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index 76c2f38a5d..3ed66a3313 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -399,13 +399,15 @@ typedef struct client_channel_call_data { grpc_connected_subchannel *connected_subchannel; grpc_polling_entity *pollent; - grpc_transport_stream_op *waiting_ops; + grpc_transport_stream_op **waiting_ops; size_t waiting_ops_count; size_t waiting_ops_capacity; grpc_closure next_step; grpc_call_stack *owning_call; + + grpc_linked_mdelem lb_token_mdelem; } call_data; static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) { @@ -416,7 +418,7 @@ static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) { gpr_realloc(calld->waiting_ops, calld->waiting_ops_capacity * sizeof(*calld->waiting_ops)); } - calld->waiting_ops[calld->waiting_ops_count++] = *op; + calld->waiting_ops[calld->waiting_ops_count++] = op; GPR_TIMER_END("add_waiting_locked", 0); } @@ -425,14 +427,14 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld, size_t i; for (i = 0; i < calld->waiting_ops_count; i++) { grpc_transport_stream_op_finish_with_failure( - exec_ctx, &calld->waiting_ops[i], GRPC_ERROR_REF(error)); + exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error)); } calld->waiting_ops_count = 0; GRPC_ERROR_UNREF(error); } typedef struct { - grpc_transport_stream_op *ops; + grpc_transport_stream_op **ops; size_t nops; grpc_subchannel_call *call; } retry_ops_args; @@ -441,7 +443,7 @@ static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { retry_ops_args *a = args; size_t i; for (i = 0; i < a->nops; i++) { - grpc_subchannel_call_process_op(exec_ctx, a->call, &a->ops[i]); + grpc_subchannel_call_process_op(exec_ctx, a->call, a->ops[i]); } GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops"); gpr_free(a->ops); @@ -449,6 +451,10 @@ static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { } static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { + if (calld->waiting_ops_count == 0) { + return; + } + retry_ops_args *a = gpr_malloc(sizeof(*a)); a->ops = calld->waiting_ops; a->nops = calld->waiting_ops_count; @@ -582,9 +588,11 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, int r; GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel"); gpr_mu_unlock(&chand->mu); - r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollent, - initial_metadata, initial_metadata_flags, - connected_subchannel, on_ready); + const grpc_lb_policy_pick_args inputs = {calld->pollent, initial_metadata, + initial_metadata_flags, + &calld->lb_token_mdelem}; + r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel, + NULL, on_ready); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel"); GPR_TIMER_END("pick_subchannel", 0); return r; diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c index 3e498f5f56..46391272a6 100644 --- a/src/core/ext/client_config/lb_policy.c +++ b/src/core/ext/client_config/lb_policy.c @@ -100,13 +100,11 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, } int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_polling_entity *pollent, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **target, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, void **user_data, grpc_closure *on_complete) { - return policy->vtable->pick(exec_ctx, policy, pollent, initial_metadata, - initial_metadata_flags, target, on_complete); + return policy->vtable->pick(exec_ctx, policy, pick_args, target, user_data, + on_complete); } void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h index eb07ad3360..7fb3e08cb3 100644 --- a/src/core/ext/client_config/lb_policy.h +++ b/src/core/ext/client_config/lb_policy.h @@ -53,23 +53,38 @@ struct grpc_lb_policy { grpc_pollset_set *interested_parties; }; +/** Extra arguments for an LB pick */ +typedef struct grpc_lb_policy_pick_args { + /** Parties interested in the pick's progress */ + grpc_polling_entity *pollent; + /** Initial metadata associated with the picking call. */ + grpc_metadata_batch *initial_metadata; + /** See \a GRPC_INITIAL_METADATA_* in grpc_types.h */ + uint32_t initial_metadata_flags; + /** Storage for LB token in \a initial_metadata, or NULL if not used */ + grpc_linked_mdelem *lb_token_mdelem_storage; +} grpc_lb_policy_pick_args; + struct grpc_lb_policy_vtable { void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); - void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); - /** implement grpc_lb_policy_pick */ + /** \see grpc_lb_policy_pick */ int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_polling_entity *pollent, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **target, grpc_closure *on_complete); + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, void **user_data, + grpc_closure *on_complete); + + /** \see grpc_lb_policy_cancel_pick */ void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_connected_subchannel **target, grpc_error *error); + + /** \see grpc_lb_policy_cancel_picks */ void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_eq, grpc_error *error); + /** \see grpc_lb_policy_ping_one */ void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_closure *closure); @@ -83,8 +98,7 @@ struct grpc_lb_policy_vtable { /** call notify when the connectivity state of a channel changes from *state. Updates *state with the new state of the policy. Calling with a NULL \a - state cancels the subscription. - */ + state cancels the subscription. */ void (*notify_on_state_change)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_connectivity_state *state, @@ -124,27 +138,35 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable); -/** Given initial metadata in \a initial_metadata, find an appropriate - target for this rpc, and 'return' it by calling \a on_complete after setting - \a target. - Picking can be asynchronous. Any IO should be done under \a pollent. */ +/** Find an appropriate target for this call, based on \a pick_args. + Picking can be synchronous or asynchronous. In the synchronous case, when a + pick is readily available, it'll be returned in \a target and a non-zero + value will be returned. + In the asynchronous case, zero is returned and \a on_complete will be called + once \a target and \a user_data have been set. Any IO should be done under + \a pick_args->pollent. The opaque \a user_data output argument corresponds + to information that may need be propagated from the LB policy. It may be + NULL. Errors are signaled by receiving a NULL \a *target. */ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_polling_entity *pollent, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **target, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, void **user_data, grpc_closure *on_complete); +/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping) + against one of the connected subchannels managed by \a policy. */ void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_closure *closure); +/** Cancel picks for \a target. + The \a on_complete callback of the pending picks will be invoked with \a + *target set to NULL. */ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_connected_subchannel **target, grpc_error *error); -/** Cancel all pending picks which have: - (initial_metadata_flags & initial_metadata_flags_mask) == - initial_metadata_flags_eq */ +/** Cancel all pending picks for which their \a initial_metadata_flags (as given + in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq + when AND'd with \a initial_metadata_flags_mask */ void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, uint32_t initial_metadata_flags_mask, diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h index da1de3579a..7191ca7d89 100644 --- a/src/core/ext/client_config/lb_policy_factory.h +++ b/src/core/ext/client_config/lb_policy_factory.h @@ -47,8 +47,19 @@ struct grpc_lb_policy_factory { const grpc_lb_policy_factory_vtable *vtable; }; +/** A resolved address alongside any LB related information associated with it. + * \a user_data, if not NULL, contains opaque data meant to be consumed by the + * gRPC LB policy. Note that no all LB policies support \a user_data as input. + * Those who don't will simply ignore it and will correspondingly return NULL in + * their namesake pick() output argument. */ +typedef struct grpc_lb_address { + grpc_resolved_address *resolved_address; + void *user_data; +} grpc_lb_address; + typedef struct grpc_lb_policy_args { - grpc_resolved_addresses *addresses; + grpc_lb_address *addresses; + size_t num_addresses; grpc_client_channel_factory *client_channel_factory; } grpc_lb_policy_args; diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index 8f4a2f9e3e..53ba8aeca6 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -504,14 +504,13 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, grpc_pollset_set *interested_parties, grpc_connectivity_state *state, grpc_closure *closure) { - grpc_transport_op op; + grpc_transport_op *op = grpc_make_transport_op(NULL); grpc_channel_element *elem; - memset(&op, 0, sizeof(op)); - op.connectivity_state = state; - op.on_connectivity_state_change = closure; - op.bind_pollset_set = interested_parties; + op->connectivity_state = state; + op->on_connectivity_state_change = closure; + op->bind_pollset_set = interested_parties; elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); - elem->filter->start_transport_op(exec_ctx, elem, &op); + elem->filter->start_transport_op(exec_ctx, elem, op); } void grpc_connected_subchannel_notify_on_state_change( @@ -525,12 +524,11 @@ void grpc_connected_subchannel_notify_on_state_change( void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_closure *closure) { - grpc_transport_op op; + grpc_transport_op *op = grpc_make_transport_op(NULL); grpc_channel_element *elem; - memset(&op, 0, sizeof(op)); - op.send_ping = closure; + op->send_ping = closure; elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); - elem->filter->start_transport_op(exec_ctx, elem, &op); + elem->filter->start_transport_op(exec_ctx, elem, op); } static void publish_transport_locked(grpc_exec_ctx *exec_ctx, |