aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config/subchannel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/client_config/subchannel.c')
-rw-r--r--src/core/client_config/subchannel.c133
1 files changed, 99 insertions, 34 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 0401dd3868..e911a46faf 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -22,7 +22,7 @@
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMA`S (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
@@ -41,8 +41,10 @@
#include "src/core/channel/client_channel.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/iomgr/timer.h"
-#include "src/core/transport/connectivity_state.h"
+#include "src/core/profiling/timers.h"
#include "src/core/surface/channel.h"
+#include "src/core/transport/connectivity_state.h"
+#include "src/core/transport/connectivity_state.h"
#define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20
#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
@@ -69,7 +71,7 @@ typedef struct waiting_for_connect {
struct waiting_for_connect *next;
grpc_closure *notify;
grpc_pollset *pollset;
- grpc_subchannel_call **target;
+ gpr_atm *target;
grpc_subchannel *subchannel;
grpc_closure continuation;
} waiting_for_connect;
@@ -137,14 +139,16 @@ struct grpc_subchannel {
struct grpc_subchannel_call {
connection *connection;
- gpr_refcount refs;
};
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
+#define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \
+ (((grpc_subchannel_call *)(callstack)) - 1)
static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx,
- connection *con);
+ connection *con,
+ grpc_pollset *pollset);
static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_subchannel *c,
const char *reason);
@@ -163,7 +167,7 @@ static grpc_subchannel *connection_unref_locked(
connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c);
-#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
#define SUBCHANNEL_REF_LOCKED(p, r) \
subchannel_ref_locked((p), __FILE__, __LINE__, (r))
#define SUBCHANNEL_UNREF_LOCKED(p, r) \
@@ -173,6 +177,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c);
#define CONNECTION_UNREF_LOCKED(cl, p, r) \
connection_unref_locked((cl), (p), __FILE__, __LINE__, (r))
#define REF_PASS_ARGS , file, line, reason
+#define REF_PASS_REASON , reason
#define REF_LOG(name, p) \
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \
(name), (p), (p)->refs, (p)->refs + 1, reason)
@@ -185,6 +190,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c);
#define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p))
#define CONNECTION_UNREF_LOCKED(cl, p, r) connection_unref_locked((cl), (p))
#define REF_PASS_ARGS
+#define REF_PASS_REASON
#define REF_LOG(name, p) \
do { \
} while (0)
@@ -312,9 +318,9 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
return c;
}
-void grpc_subchannel_cancel_waiting_call(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *subchannel,
- int iomgr_success) {
+static void cancel_waiting_calls(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *subchannel,
+ int iomgr_success) {
waiting_for_connect *w4c;
gpr_mu_lock(&subchannel->mu);
w4c = subchannel->waiting;
@@ -335,6 +341,37 @@ void grpc_subchannel_cancel_waiting_call(grpc_exec_ctx *exec_ctx,
}
}
+void grpc_subchannel_cancel_create_call(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *subchannel,
+ gpr_atm *target) {
+ waiting_for_connect *w4c;
+ int unref_count = 0;
+ gpr_mu_lock(&subchannel->mu);
+ w4c = subchannel->waiting;
+ subchannel->waiting = NULL;
+ while (w4c != NULL) {
+ waiting_for_connect *next = w4c->next;
+ if (w4c->target == target) {
+ grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel,
+ w4c->pollset);
+ grpc_exec_ctx_enqueue(exec_ctx, w4c->notify, 0);
+
+ unref_count++;
+ gpr_free(w4c);
+ } else {
+ w4c->next = subchannel->waiting;
+ subchannel->waiting = w4c;
+ }
+
+ w4c = next;
+ }
+ gpr_mu_unlock(&subchannel->mu);
+
+ while (unref_count-- > 0) {
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "waiting_for_connect");
+ }
+}
+
static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
grpc_connect_in_args args;
@@ -358,29 +395,35 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
- grpc_subchannel_call_create_status call_creation_status;
+ int call_creation_status;
waiting_for_connect *w4c = arg;
grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset);
call_creation_status = grpc_subchannel_create_call(
exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify);
- GPR_ASSERT(call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY);
+ GPR_ASSERT(call_creation_status == 1);
w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success);
GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
gpr_free(w4c);
}
-grpc_subchannel_call_create_status grpc_subchannel_create_call(
- grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset,
- grpc_subchannel_call **target, grpc_closure *notify) {
+int grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
+ grpc_pollset *pollset, gpr_atm *target,
+ grpc_closure *notify) {
connection *con;
+ grpc_subchannel_call *call;
+ GPR_TIMER_BEGIN("grpc_subchannel_create_call", 0);
gpr_mu_lock(&c->mu);
if (c->active != NULL) {
con = c->active;
CONNECTION_REF_LOCKED(con, "call");
gpr_mu_unlock(&c->mu);
- *target = create_call(exec_ctx, con);
- return GRPC_SUBCHANNEL_CALL_CREATE_READY;
+ call = create_call(exec_ctx, con, pollset);
+ if (!gpr_atm_rel_cas(target, 0, (gpr_atm)(gpr_uintptr)call)) {
+ GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "failed to set");
+ }
+ GPR_TIMER_END("grpc_subchannel_create_call", 0);
+ return 1;
} else {
waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
w4c->next = c->waiting;
@@ -405,7 +448,8 @@ grpc_subchannel_call_create_status grpc_subchannel_create_call(
} else {
gpr_mu_unlock(&c->mu);
}
- return GRPC_SUBCHANNEL_CALL_CREATE_PENDING;
+ GPR_TIMER_END("grpc_subchannel_create_call", 0);
+ return 0;
}
}
@@ -687,7 +731,7 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) {
update_reconnect_parameters(c);
continue_connect(exec_ctx, c);
} else {
- grpc_subchannel_cancel_waiting_call(exec_ctx, c, iomgr_success);
+ cancel_waiting_calls(exec_ctx, c, iomgr_success);
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
}
@@ -747,26 +791,40 @@ static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx,
* grpc_subchannel_call implementation
*/
+static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
+ int success) {
+ grpc_subchannel_call *c = call;
+ gpr_mu *mu = &c->connection->subchannel->mu;
+ grpc_subchannel *destroy;
+ GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
+ grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
+ gpr_mu_lock(mu);
+ destroy = CONNECTION_UNREF_LOCKED(exec_ctx, c->connection, "call");
+ gpr_mu_unlock(mu);
+ gpr_free(c);
+ if (destroy != NULL) {
+ subchannel_destroy(exec_ctx, destroy);
+ }
+ GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
+}
+
void grpc_subchannel_call_ref(grpc_subchannel_call *c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- gpr_ref(&c->refs);
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+ grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c), reason);
+#else
+ grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c));
+#endif
}
void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- if (gpr_unref(&c->refs)) {
- gpr_mu *mu = &c->connection->subchannel->mu;
- grpc_subchannel *destroy;
- grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
- gpr_mu_lock(mu);
- destroy = CONNECTION_UNREF_LOCKED(exec_ctx, c->connection, "call");
- gpr_mu_unlock(mu);
- gpr_free(c);
- if (destroy != NULL) {
- subchannel_destroy(exec_ctx, destroy);
- }
- }
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+ grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), reason);
+#else
+ grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
+#endif
}
char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
@@ -785,14 +843,16 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
}
static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx,
- connection *con) {
+ connection *con,
+ grpc_pollset *pollset) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_subchannel_call *call =
gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
call->connection = con;
- gpr_ref_init(&call->refs, 1);
- grpc_call_stack_init(exec_ctx, chanstk, NULL, NULL, callstk);
+ grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, call,
+ NULL, NULL, callstk);
+ grpc_call_stack_set_pollset(exec_ctx, callstk, pollset);
return call;
}
@@ -803,3 +863,8 @@ grpc_mdctx *grpc_subchannel_get_mdctx(grpc_subchannel *subchannel) {
grpc_channel *grpc_subchannel_get_master(grpc_subchannel *subchannel) {
return subchannel->master;
}
+
+grpc_call_stack *grpc_subchannel_call_get_call_stack(
+ grpc_subchannel_call *subchannel_call) {
+ return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call);
+}