diff options
Diffstat (limited to 'src/core/client_config/subchannel.c')
-rw-r--r-- | src/core/client_config/subchannel.c | 133 |
1 files changed, 99 insertions, 34 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 0401dd3868..e911a46faf 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -22,7 +22,7 @@ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMA`S (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT @@ -41,8 +41,10 @@ #include "src/core/channel/client_channel.h" #include "src/core/channel/connected_channel.h" #include "src/core/iomgr/timer.h" -#include "src/core/transport/connectivity_state.h" +#include "src/core/profiling/timers.h" #include "src/core/surface/channel.h" +#include "src/core/transport/connectivity_state.h" +#include "src/core/transport/connectivity_state.h" #define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20 #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1 @@ -69,7 +71,7 @@ typedef struct waiting_for_connect { struct waiting_for_connect *next; grpc_closure *notify; grpc_pollset *pollset; - grpc_subchannel_call **target; + gpr_atm *target; grpc_subchannel *subchannel; grpc_closure continuation; } waiting_for_connect; @@ -137,14 +139,16 @@ struct grpc_subchannel { struct grpc_subchannel_call { connection *connection; - gpr_refcount refs; }; #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) #define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1)) +#define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ + (((grpc_subchannel_call *)(callstack)) - 1) static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx, - connection *con); + connection *con, + grpc_pollset *pollset); static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, const char *reason); @@ -163,7 +167,7 @@ static grpc_subchannel *connection_unref_locked( connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c); -#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG +#ifdef GRPC_STREAM_REFCOUNT_DEBUG #define SUBCHANNEL_REF_LOCKED(p, r) \ subchannel_ref_locked((p), __FILE__, __LINE__, (r)) #define SUBCHANNEL_UNREF_LOCKED(p, r) \ @@ -173,6 +177,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c); #define CONNECTION_UNREF_LOCKED(cl, p, r) \ connection_unref_locked((cl), (p), __FILE__, __LINE__, (r)) #define REF_PASS_ARGS , file, line, reason +#define REF_PASS_REASON , reason #define REF_LOG(name, p) \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \ (name), (p), (p)->refs, (p)->refs + 1, reason) @@ -185,6 +190,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c); #define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p)) #define CONNECTION_UNREF_LOCKED(cl, p, r) connection_unref_locked((cl), (p)) #define REF_PASS_ARGS +#define REF_PASS_REASON #define REF_LOG(name, p) \ do { \ } while (0) @@ -312,9 +318,9 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, return c; } -void grpc_subchannel_cancel_waiting_call(grpc_exec_ctx *exec_ctx, - grpc_subchannel *subchannel, - int iomgr_success) { +static void cancel_waiting_calls(grpc_exec_ctx *exec_ctx, + grpc_subchannel *subchannel, + int iomgr_success) { waiting_for_connect *w4c; gpr_mu_lock(&subchannel->mu); w4c = subchannel->waiting; @@ -335,6 +341,37 @@ void grpc_subchannel_cancel_waiting_call(grpc_exec_ctx *exec_ctx, } } +void grpc_subchannel_cancel_create_call(grpc_exec_ctx *exec_ctx, + grpc_subchannel *subchannel, + gpr_atm *target) { + waiting_for_connect *w4c; + int unref_count = 0; + gpr_mu_lock(&subchannel->mu); + w4c = subchannel->waiting; + subchannel->waiting = NULL; + while (w4c != NULL) { + waiting_for_connect *next = w4c->next; + if (w4c->target == target) { + grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, + w4c->pollset); + grpc_exec_ctx_enqueue(exec_ctx, w4c->notify, 0); + + unref_count++; + gpr_free(w4c); + } else { + w4c->next = subchannel->waiting; + subchannel->waiting = w4c; + } + + w4c = next; + } + gpr_mu_unlock(&subchannel->mu); + + while (unref_count-- > 0) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "waiting_for_connect"); + } +} + static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connect_in_args args; @@ -358,29 +395,35 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { - grpc_subchannel_call_create_status call_creation_status; + int call_creation_status; waiting_for_connect *w4c = arg; grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset); call_creation_status = grpc_subchannel_create_call( exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify); - GPR_ASSERT(call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY); + GPR_ASSERT(call_creation_status == 1); w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success); GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect"); gpr_free(w4c); } -grpc_subchannel_call_create_status grpc_subchannel_create_call( - grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset, - grpc_subchannel_call **target, grpc_closure *notify) { +int grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, + grpc_pollset *pollset, gpr_atm *target, + grpc_closure *notify) { connection *con; + grpc_subchannel_call *call; + GPR_TIMER_BEGIN("grpc_subchannel_create_call", 0); gpr_mu_lock(&c->mu); if (c->active != NULL) { con = c->active; CONNECTION_REF_LOCKED(con, "call"); gpr_mu_unlock(&c->mu); - *target = create_call(exec_ctx, con); - return GRPC_SUBCHANNEL_CALL_CREATE_READY; + call = create_call(exec_ctx, con, pollset); + if (!gpr_atm_rel_cas(target, 0, (gpr_atm)(gpr_uintptr)call)) { + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "failed to set"); + } + GPR_TIMER_END("grpc_subchannel_create_call", 0); + return 1; } else { waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c)); w4c->next = c->waiting; @@ -405,7 +448,8 @@ grpc_subchannel_call_create_status grpc_subchannel_create_call( } else { gpr_mu_unlock(&c->mu); } - return GRPC_SUBCHANNEL_CALL_CREATE_PENDING; + GPR_TIMER_END("grpc_subchannel_create_call", 0); + return 0; } } @@ -687,7 +731,7 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { update_reconnect_parameters(c); continue_connect(exec_ctx, c); } else { - grpc_subchannel_cancel_waiting_call(exec_ctx, c, iomgr_success); + cancel_waiting_calls(exec_ctx, c, iomgr_success); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting"); GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); } @@ -747,26 +791,40 @@ static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx, * grpc_subchannel_call implementation */ +static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, + int success) { + grpc_subchannel_call *c = call; + gpr_mu *mu = &c->connection->subchannel->mu; + grpc_subchannel *destroy; + GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); + grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); + gpr_mu_lock(mu); + destroy = CONNECTION_UNREF_LOCKED(exec_ctx, c->connection, "call"); + gpr_mu_unlock(mu); + gpr_free(c); + if (destroy != NULL) { + subchannel_destroy(exec_ctx, destroy); + } + GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); +} + void grpc_subchannel_call_ref(grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - gpr_ref(&c->refs); +#ifdef GRPC_STREAM_REFCOUNT_DEBUG + grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c), reason); +#else + grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c)); +#endif } void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - if (gpr_unref(&c->refs)) { - gpr_mu *mu = &c->connection->subchannel->mu; - grpc_subchannel *destroy; - grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); - gpr_mu_lock(mu); - destroy = CONNECTION_UNREF_LOCKED(exec_ctx, c->connection, "call"); - gpr_mu_unlock(mu); - gpr_free(c); - if (destroy != NULL) { - subchannel_destroy(exec_ctx, destroy); - } - } +#ifdef GRPC_STREAM_REFCOUNT_DEBUG + grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), reason); +#else + grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); +#endif } char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, @@ -785,14 +843,16 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, } static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx, - connection *con) { + connection *con, + grpc_pollset *pollset) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); grpc_subchannel_call *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call); call->connection = con; - gpr_ref_init(&call->refs, 1); - grpc_call_stack_init(exec_ctx, chanstk, NULL, NULL, callstk); + grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, call, + NULL, NULL, callstk); + grpc_call_stack_set_pollset(exec_ctx, callstk, pollset); return call; } @@ -803,3 +863,8 @@ grpc_mdctx *grpc_subchannel_get_mdctx(grpc_subchannel *subchannel) { grpc_channel *grpc_subchannel_get_master(grpc_subchannel *subchannel) { return subchannel->master; } + +grpc_call_stack *grpc_subchannel_call_get_call_stack( + grpc_subchannel_call *subchannel_call) { + return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); +} |