aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel/client_setup.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel/client_setup.c')
-rw-r--r--src/core/channel/client_setup.c95
1 files changed, 64 insertions, 31 deletions
diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c
index 6d892d6c92..5be8fa66e9 100644
--- a/src/core/channel/client_setup.c
+++ b/src/core/channel/client_setup.c
@@ -56,6 +56,9 @@ struct grpc_client_setup {
gpr_cv cv;
grpc_client_setup_request *active_request;
int refs;
+ /** The set of pollsets that are currently interested in this
+ connection being established */
+ grpc_pollset_set interested_parties;
};
struct grpc_client_setup_request {
@@ -68,14 +71,22 @@ gpr_timespec grpc_client_setup_request_deadline(grpc_client_setup_request *r) {
return r->deadline;
}
+grpc_pollset_set *grpc_client_setup_get_interested_parties(
+ grpc_client_setup_request *r) {
+ return &r->setup->interested_parties;
+}
+
static void destroy_setup(grpc_client_setup *s) {
gpr_mu_destroy(&s->mu);
gpr_cv_destroy(&s->cv);
s->done(s->user_data);
grpc_channel_args_destroy(s->args);
+ grpc_pollset_set_destroy(&s->interested_parties);
gpr_free(s);
}
+static void destroy_request(grpc_client_setup_request *r) { gpr_free(r); }
+
/* initiate handshaking */
static void setup_initiate(grpc_transport_setup *sp) {
grpc_client_setup *s = (grpc_client_setup *)sp;
@@ -83,8 +94,7 @@ static void setup_initiate(grpc_transport_setup *sp) {
int in_alarm = 0;
r->setup = s;
- /* TODO(klempner): Actually set a deadline */
- r->deadline = gpr_inf_future;
+ r->deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(60));
gpr_mu_lock(&s->mu);
GPR_ASSERT(s->refs > 0);
@@ -104,10 +114,30 @@ static void setup_initiate(grpc_transport_setup *sp) {
if (!in_alarm) {
s->initiate(s->user_data, r);
} else {
- gpr_free(r);
+ destroy_request(r);
}
}
+/** implementation of add_interested_party for setup vtable */
+static void setup_add_interested_party(grpc_transport_setup *sp,
+ grpc_pollset *pollset) {
+ grpc_client_setup *s = (grpc_client_setup *)sp;
+
+ gpr_mu_lock(&s->mu);
+ grpc_pollset_set_add_pollset(&s->interested_parties, pollset);
+ gpr_mu_unlock(&s->mu);
+}
+
+/** implementation of del_interested_party for setup vtable */
+static void setup_del_interested_party(grpc_transport_setup *sp,
+ grpc_pollset *pollset) {
+ grpc_client_setup *s = (grpc_client_setup *)sp;
+
+ gpr_mu_lock(&s->mu);
+ grpc_pollset_set_del_pollset(&s->interested_parties, pollset);
+ gpr_mu_unlock(&s->mu);
+}
+
/* cancel handshaking: cancel all requests, and shutdown (the caller promises
not to initiate again) */
static void setup_cancel(grpc_transport_setup *sp) {
@@ -137,7 +167,8 @@ static void setup_cancel(grpc_transport_setup *sp) {
}
}
-int grpc_client_setup_cb_begin(grpc_client_setup_request *r) {
+int grpc_client_setup_cb_begin(grpc_client_setup_request *r,
+ const char *reason) {
gpr_mu_lock(&r->setup->mu);
if (r->setup->cancelled) {
gpr_mu_unlock(&r->setup->mu);
@@ -148,7 +179,8 @@ int grpc_client_setup_cb_begin(grpc_client_setup_request *r) {
return 1;
}
-void grpc_client_setup_cb_end(grpc_client_setup_request *r) {
+void grpc_client_setup_cb_end(grpc_client_setup_request *r,
+ const char *reason) {
gpr_mu_lock(&r->setup->mu);
r->setup->in_cb--;
if (r->setup->cancelled) gpr_cv_signal(&r->setup->cv);
@@ -156,8 +188,9 @@ void grpc_client_setup_cb_end(grpc_client_setup_request *r) {
}
/* vtable for transport setup */
-static const grpc_transport_setup_vtable setup_vtable = {setup_initiate,
- setup_cancel};
+static const grpc_transport_setup_vtable setup_vtable = {
+ setup_initiate, setup_add_interested_party, setup_del_interested_party,
+ setup_cancel};
void grpc_client_setup_create_and_attach(
grpc_channel_stack *newly_minted_channel, const grpc_channel_args *args,
@@ -180,42 +213,44 @@ void grpc_client_setup_create_and_attach(
s->in_alarm = 0;
s->in_cb = 0;
s->cancelled = 0;
+ grpc_pollset_set_init(&s->interested_parties);
grpc_client_channel_set_transport_setup(newly_minted_channel, &s->base);
}
-int grpc_client_setup_request_should_continue(grpc_client_setup_request *r) {
+int grpc_client_setup_request_should_continue(grpc_client_setup_request *r,
+ const char *reason) {
int result;
if (gpr_time_cmp(gpr_now(), r->deadline) > 0) {
- return 0;
+ result = 0;
+ } else {
+ gpr_mu_lock(&r->setup->mu);
+ result = r->setup->active_request == r;
+ gpr_mu_unlock(&r->setup->mu);
}
- gpr_mu_lock(&r->setup->mu);
- result = r->setup->active_request == r;
- gpr_mu_unlock(&r->setup->mu);
return result;
}
-static void backoff_alarm_done(void *arg /* grpc_client_setup */, int success) {
- grpc_client_setup *s = arg;
- grpc_client_setup_request *r = gpr_malloc(sizeof(grpc_client_setup_request));
- r->setup = s;
- /* TODO(klempner): Set this to something useful */
- r->deadline = gpr_inf_future;
+static void backoff_alarm_done(void *arg /* grpc_client_setup_request */,
+ int success) {
+ grpc_client_setup_request *r = arg;
+ grpc_client_setup *s = r->setup;
/* Handle status cancelled? */
gpr_mu_lock(&s->mu);
- s->active_request = r;
s->in_alarm = 0;
- if (!success) {
+ if (s->active_request != NULL || !success) {
if (0 == --s->refs) {
gpr_mu_unlock(&s->mu);
destroy_setup(s);
- gpr_free(r);
+ destroy_request(r);
return;
} else {
gpr_mu_unlock(&s->mu);
+ destroy_request(r);
return;
}
}
+ s->active_request = r;
gpr_mu_unlock(&s->mu);
s->initiate(s->user_data, r);
}
@@ -231,16 +266,12 @@ void grpc_client_setup_request_finish(grpc_client_setup_request *r,
} else {
retry = 0;
}
+
if (!retry && 0 == --s->refs) {
gpr_mu_unlock(&s->mu);
destroy_setup(s);
- gpr_free(r);
- return;
- }
-
- gpr_free(r);
-
- if (retry) {
+ destroy_request(r);
+ } else if (retry) {
/* TODO(klempner): Replace these values with further consideration. 2x is
probably too aggressive of a backoff. */
gpr_timespec max_backoff = gpr_time_from_minutes(2);
@@ -248,15 +279,17 @@ void grpc_client_setup_request_finish(grpc_client_setup_request *r,
gpr_timespec deadline = gpr_time_add(s->current_backoff_interval, now);
GPR_ASSERT(!s->in_alarm);
s->in_alarm = 1;
- grpc_alarm_init(&s->backoff_alarm, deadline, backoff_alarm_done, s, now);
+ grpc_alarm_init(&s->backoff_alarm, deadline, backoff_alarm_done, r, now);
s->current_backoff_interval =
gpr_time_add(s->current_backoff_interval, s->current_backoff_interval);
if (gpr_time_cmp(s->current_backoff_interval, max_backoff) > 0) {
s->current_backoff_interval = max_backoff;
}
+ gpr_mu_unlock(&s->mu);
+ } else {
+ gpr_mu_unlock(&s->mu);
+ destroy_request(r);
}
-
- gpr_mu_unlock(&s->mu);
}
const grpc_channel_args *grpc_client_setup_get_channel_args(