diff options
Diffstat (limited to 'src/core/client_config')
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 37 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/round_robin.c | 40 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.c | 17 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.h | 19 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 148 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 34 |
6 files changed, 218 insertions, 77 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index e5bf0680ff..93312abb00 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -130,6 +130,30 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } } +static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_subchannel **target) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + pending_pick *pp; + gpr_mu_lock(&p->mu); + pp = p->pending_picks; + p->pending_picks = NULL; + while (pp != NULL) { + pending_pick *next = pp->next; + if (pp->target == target) { + grpc_subchannel_del_interested_party( + exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset); + *target = NULL; + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); + gpr_free(pp); + } else { + pp->next = p->pending_picks; + p->pending_picks = pp; + } + pp = next; + } + gpr_mu_unlock(&p->mu); +} + static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { p->started_picking = 1; p->checking_subchannel = 0; @@ -149,16 +173,16 @@ void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_mu_unlock(&p->mu); } -void pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, - grpc_subchannel **target, grpc_closure *on_complete) { +int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, + grpc_metadata_batch *initial_metadata, grpc_subchannel **target, + grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); if (p->selected) { gpr_mu_unlock(&p->mu); *target = p->selected; - grpc_exec_ctx_enqueue(exec_ctx, on_complete, 1); + return 1; } else { if (!p->started_picking) { start_picking(exec_ctx, p); @@ -172,6 +196,7 @@ void pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp->on_complete = on_complete; p->pending_picks = pp; gpr_mu_unlock(&p->mu); + return 0; } } @@ -365,8 +390,8 @@ void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { - pf_destroy, pf_shutdown, pf_pick, pf_exit_idle, pf_broadcast, - pf_check_connectivity, pf_notify_on_state_change}; + pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_exit_idle, + pf_broadcast, pf_check_connectivity, pf_notify_on_state_change}; static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index d0b60a0df2..1ffe32fff2 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -264,6 +264,33 @@ void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_mu_unlock(&p->mu); } +static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_subchannel **target) { + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + pending_pick *pp; + size_t i; + gpr_mu_lock(&p->mu); + pp = p->pending_picks; + p->pending_picks = NULL; + while (pp != NULL) { + pending_pick *next = pp->next; + if (pp->target == target) { + for (i = 0; i < p->num_subchannels; i++) { + grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i], + pp->pollset); + } + *target = NULL; + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); + gpr_free(pp); + } else { + pp->next = p->pending_picks; + p->pending_picks = pp; + } + pp = next; + } + gpr_mu_unlock(&p->mu); +} + static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { size_t i; p->started_picking = 1; @@ -286,9 +313,9 @@ void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_mu_unlock(&p->mu); } -void rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, - grpc_subchannel **target, grpc_closure *on_complete) { +int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, + grpc_metadata_batch *initial_metadata, grpc_subchannel **target, + grpc_closure *on_complete) { size_t i; round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; @@ -303,7 +330,7 @@ void rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } /* only advance the last picked pointer if the selection was used */ advance_last_picked_locked(p); - on_complete->cb(exec_ctx, on_complete->cb_arg, 1); + return 1; } else { if (!p->started_picking) { start_picking(exec_ctx, p); @@ -319,6 +346,7 @@ void rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp->on_complete = on_complete; p->pending_picks = pp; gpr_mu_unlock(&p->mu); + return 0; } } @@ -487,8 +515,8 @@ static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx, } static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { - rr_destroy, rr_shutdown, rr_pick, rr_exit_idle, rr_broadcast, - rr_check_connectivity, rr_notify_on_state_change}; + rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_exit_idle, + rr_broadcast, rr_check_connectivity, rr_notify_on_state_change}; static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index c955186f7f..36a2454309 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -68,12 +68,17 @@ void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { policy->vtable->shutdown(exec_ctx, policy); } -void grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, - grpc_subchannel **target, grpc_closure *on_complete) { - policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata, target, - on_complete); +int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_pollset *pollset, + grpc_metadata_batch *initial_metadata, + grpc_subchannel **target, grpc_closure *on_complete) { + return policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata, + target, on_complete); +} + +void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_subchannel **target) { + policy->vtable->cancel_pick(exec_ctx, policy, target); } void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 0eefe64991..a696c3ce64 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -56,9 +56,11 @@ struct grpc_lb_policy_vtable { void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); /** implement grpc_lb_policy_pick */ - void (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, - grpc_subchannel **target, grpc_closure *on_complete); + int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, + grpc_subchannel **target, grpc_closure *on_complete); + void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_subchannel **target); /** try to enter a READY connectivity state */ void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); @@ -106,10 +108,13 @@ void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); target for this rpc, and 'return' it by calling \a on_complete after setting \a target. Picking can be asynchronous. Any IO should be done under \a pollset. */ -void grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, - grpc_subchannel **target, grpc_closure *on_complete); +int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_pollset *pollset, + grpc_metadata_batch *initial_metadata, + grpc_subchannel **target, grpc_closure *on_complete); + +void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_subchannel **target); void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_transport_op *op); diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index f13bc0736e..ff211bc2e6 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -42,8 +42,10 @@ #include "src/core/channel/connected_channel.h" #include "src/core/client_config/initial_connect_string.h" #include "src/core/iomgr/timer.h" -#include "src/core/transport/connectivity_state.h" +#include "src/core/profiling/timers.h" #include "src/core/surface/channel.h" +#include "src/core/transport/connectivity_state.h" +#include "src/core/transport/connectivity_state.h" #define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20 #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1 @@ -70,7 +72,7 @@ typedef struct waiting_for_connect { struct waiting_for_connect *next; grpc_closure *notify; grpc_pollset *pollset; - grpc_subchannel_call **target; + gpr_atm *target; grpc_subchannel *subchannel; grpc_closure continuation; } waiting_for_connect; @@ -140,14 +142,16 @@ struct grpc_subchannel { struct grpc_subchannel_call { connection *connection; - gpr_refcount refs; }; #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) #define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1)) +#define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ + (((grpc_subchannel_call *)(callstack)) - 1) static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx, - connection *con); + connection *con, + grpc_pollset *pollset); static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, const char *reason); @@ -166,7 +170,7 @@ static grpc_subchannel *connection_unref_locked( connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c); -#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG +#ifdef GRPC_STREAM_REFCOUNT_DEBUG #define SUBCHANNEL_REF_LOCKED(p, r) \ subchannel_ref_locked((p), __FILE__, __LINE__, (r)) #define SUBCHANNEL_UNREF_LOCKED(p, r) \ @@ -176,6 +180,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c); #define CONNECTION_UNREF_LOCKED(cl, p, r) \ connection_unref_locked((cl), (p), __FILE__, __LINE__, (r)) #define REF_PASS_ARGS , file, line, reason +#define REF_PASS_REASON , reason #define REF_LOG(name, p) \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \ (name), (p), (p)->refs, (p)->refs + 1, reason) @@ -188,6 +193,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c); #define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p)) #define CONNECTION_UNREF_LOCKED(cl, p, r) connection_unref_locked((cl), (p)) #define REF_PASS_ARGS +#define REF_PASS_REASON #define REF_LOG(name, p) \ do { \ } while (0) @@ -318,9 +324,9 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, return c; } -void grpc_subchannel_cancel_waiting_call(grpc_exec_ctx *exec_ctx, - grpc_subchannel *subchannel, - int iomgr_success) { +static void cancel_waiting_calls(grpc_exec_ctx *exec_ctx, + grpc_subchannel *subchannel, + int iomgr_success) { waiting_for_connect *w4c; gpr_mu_lock(&subchannel->mu); w4c = subchannel->waiting; @@ -341,6 +347,37 @@ void grpc_subchannel_cancel_waiting_call(grpc_exec_ctx *exec_ctx, } } +void grpc_subchannel_cancel_create_call(grpc_exec_ctx *exec_ctx, + grpc_subchannel *subchannel, + gpr_atm *target) { + waiting_for_connect *w4c; + int unref_count = 0; + gpr_mu_lock(&subchannel->mu); + w4c = subchannel->waiting; + subchannel->waiting = NULL; + while (w4c != NULL) { + waiting_for_connect *next = w4c->next; + if (w4c->target == target) { + grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, + w4c->pollset); + grpc_exec_ctx_enqueue(exec_ctx, w4c->notify, 0); + + unref_count++; + gpr_free(w4c); + } else { + w4c->next = subchannel->waiting; + subchannel->waiting = w4c; + } + + w4c = next; + } + gpr_mu_unlock(&subchannel->mu); + + while (unref_count-- > 0) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "waiting_for_connect"); + } +} + static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connect_in_args args; @@ -365,29 +402,35 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { - grpc_subchannel_call_create_status call_creation_status; + int call_creation_finished_ok; waiting_for_connect *w4c = arg; grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset); - call_creation_status = grpc_subchannel_create_call( + call_creation_finished_ok = grpc_subchannel_create_call( exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify); - GPR_ASSERT(call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY); + GPR_ASSERT(call_creation_finished_ok == 1); w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success); GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect"); gpr_free(w4c); } -grpc_subchannel_call_create_status grpc_subchannel_create_call( - grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset, - grpc_subchannel_call **target, grpc_closure *notify) { +int grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, + grpc_pollset *pollset, gpr_atm *target, + grpc_closure *notify) { connection *con; + grpc_subchannel_call *call; + GPR_TIMER_BEGIN("grpc_subchannel_create_call", 0); gpr_mu_lock(&c->mu); if (c->active != NULL) { con = c->active; CONNECTION_REF_LOCKED(con, "call"); gpr_mu_unlock(&c->mu); - *target = create_call(exec_ctx, con); - return GRPC_SUBCHANNEL_CALL_CREATE_READY; + call = create_call(exec_ctx, con, pollset); + if (!gpr_atm_rel_cas(target, 0, (gpr_atm)(gpr_uintptr)call)) { + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "failed to set"); + } + GPR_TIMER_END("grpc_subchannel_create_call", 0); + return 1; } else { waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c)); w4c->next = c->waiting; @@ -412,7 +455,8 @@ grpc_subchannel_call_create_status grpc_subchannel_create_call( } else { gpr_mu_unlock(&c->mu); } - return GRPC_SUBCHANNEL_CALL_CREATE_PENDING; + GPR_TIMER_END("grpc_subchannel_create_call", 0); + return 0; } } @@ -660,10 +704,25 @@ static double generate_uniform_random_number(grpc_subchannel *c) { /* Update backoff_delta and next_attempt in subchannel */ static void update_reconnect_parameters(grpc_subchannel *c) { + size_t i; gpr_int32 backoff_delta_millis, jitter; gpr_int32 max_backoff_millis = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; double jitter_range; + + if (c->args) { + for (i = 0; i < c->args->num_args; i++) { + if (0 == strcmp(c->args->args[i].key, + "grpc.testing.fixed_reconnect_backoff")) { + GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER); + c->next_attempt = gpr_time_add( + gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_millis(c->args->args[i].value.integer, GPR_TIMESPAN)); + return; + } + } + } + backoff_delta_millis = (gpr_int32)(gpr_time_to_millis(c->backoff_delta) * GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER); @@ -694,7 +753,7 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { update_reconnect_parameters(c); continue_connect(exec_ctx, c); } else { - grpc_subchannel_cancel_waiting_call(exec_ctx, c, iomgr_success); + cancel_waiting_calls(exec_ctx, c, iomgr_success); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting"); GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); } @@ -754,26 +813,40 @@ static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx, * grpc_subchannel_call implementation */ +static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, + int success) { + grpc_subchannel_call *c = call; + gpr_mu *mu = &c->connection->subchannel->mu; + grpc_subchannel *destroy; + GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); + grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); + gpr_mu_lock(mu); + destroy = CONNECTION_UNREF_LOCKED(exec_ctx, c->connection, "call"); + gpr_mu_unlock(mu); + gpr_free(c); + if (destroy != NULL) { + subchannel_destroy(exec_ctx, destroy); + } + GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); +} + void grpc_subchannel_call_ref(grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - gpr_ref(&c->refs); +#ifdef GRPC_STREAM_REFCOUNT_DEBUG + grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c), reason); +#else + grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c)); +#endif } void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - if (gpr_unref(&c->refs)) { - gpr_mu *mu = &c->connection->subchannel->mu; - grpc_subchannel *destroy; - grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); - gpr_mu_lock(mu); - destroy = CONNECTION_UNREF_LOCKED(exec_ctx, c->connection, "call"); - gpr_mu_unlock(mu); - gpr_free(c); - if (destroy != NULL) { - subchannel_destroy(exec_ctx, destroy); - } - } +#ifdef GRPC_STREAM_REFCOUNT_DEBUG + grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), reason); +#else + grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); +#endif } char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, @@ -792,14 +865,16 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, } static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx, - connection *con) { + connection *con, + grpc_pollset *pollset) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); grpc_subchannel_call *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call); call->connection = con; - gpr_ref_init(&call->refs, 1); - grpc_call_stack_init(exec_ctx, chanstk, NULL, NULL, callstk); + grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, call, + NULL, NULL, callstk); + grpc_call_stack_set_pollset(exec_ctx, callstk, pollset); return call; } @@ -810,3 +885,8 @@ grpc_mdctx *grpc_subchannel_get_mdctx(grpc_subchannel *subchannel) { grpc_channel *grpc_subchannel_get_master(grpc_subchannel *subchannel) { return subchannel->master; } + +grpc_call_stack *grpc_subchannel_call_get_call_stack( + grpc_subchannel_call *subchannel_call) { + return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); +} diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index ec1cc7cc69..1fefa1888a 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -44,7 +44,7 @@ typedef struct grpc_subchannel grpc_subchannel; typedef struct grpc_subchannel_call grpc_subchannel_call; typedef struct grpc_subchannel_args grpc_subchannel_args; -#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG +#ifdef GRPC_STREAM_REFCOUNT_DEBUG #define GRPC_SUBCHANNEL_REF(p, r) \ grpc_subchannel_ref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_UNREF(cl, p, r) \ @@ -75,27 +75,22 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -typedef enum { - GRPC_SUBCHANNEL_CALL_CREATE_READY, - GRPC_SUBCHANNEL_CALL_CREATE_PENDING -} grpc_subchannel_call_create_status; - /** construct a subchannel call (possibly asynchronously). * - * If the returned status is \a GRPC_SUBCHANNEL_CALL_CREATE_READY, the call will - * return immediately and \a target will point to a connected \a subchannel_call - * instance. Note that \a notify will \em not be invoked in this case. - * Otherwise, if the returned status is GRPC_SUBCHANNEL_CALL_CREATE_PENDING, the - * subchannel call will be created asynchronously, invoking the \a notify - * callback upon completion. */ -grpc_subchannel_call_create_status grpc_subchannel_create_call( - grpc_exec_ctx *exec_ctx, grpc_subchannel *subchannel, grpc_pollset *pollset, - grpc_subchannel_call **target, grpc_closure *notify); + * If the returned status is 1, the call will return immediately and \a target + * will point to a connected \a subchannel_call instance. Note that \a notify + * will \em not be invoked in this case. + * Otherwise, if the returned status is 0, the subchannel call will be created + * asynchronously, invoking the \a notify callback upon completion. */ +int grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, + grpc_subchannel *subchannel, + grpc_pollset *pollset, gpr_atm *target, + grpc_closure *notify); /** cancel \a call in the waiting state. */ -void grpc_subchannel_cancel_waiting_call(grpc_exec_ctx *exec_ctx, - grpc_subchannel *subchannel, - int iomgr_success); +void grpc_subchannel_cancel_create_call(grpc_exec_ctx *exec_ctx, + grpc_subchannel *subchannel, + gpr_atm *target); /** process a transport level op */ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, @@ -138,6 +133,9 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *subchannel_call); +grpc_call_stack *grpc_subchannel_call_get_call_stack( + grpc_subchannel_call *subchannel_call); + struct grpc_subchannel_args { /** Channel filters for this channel - wrapped factories will likely want to mutate this */ |