aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel')
-rw-r--r--src/core/channel/client_channel.c7
-rw-r--r--src/core/channel/client_setup.c64
-rw-r--r--src/core/channel/client_setup.h2
3 files changed, 59 insertions, 14 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 78f8d06d89..9f23ed3d71 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -39,6 +39,7 @@
#include "src/core/channel/child_channel.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/iomgr/iomgr.h"
+#include "src/core/iomgr/pollset_set.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -131,7 +132,10 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) {
size_t new_count;
size_t i;
for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) {
- if (chand->waiting_children[i] == calld) continue;
+ if (chand->waiting_children[i] == calld) {
+ grpc_transport_setup_del_interested_party(chand->transport_setup, calld->s.waiting_op.bind_pollset);
+ continue;
+ }
chand->waiting_children[new_count++] = chand->waiting_children[i];
}
GPR_ASSERT(new_count == chand->waiting_child_count - 1 ||
@@ -227,6 +231,7 @@ static void cc_start_transport_op(grpc_call_element *elem,
if (initiate_transport_setup) {
grpc_transport_setup_initiate(chand->transport_setup);
}
+ grpc_transport_setup_add_interested_party(chand->transport_setup, op->bind_pollset);
}
}
break;
diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c
index 6d892d6c92..e2bd7ef52e 100644
--- a/src/core/channel/client_setup.c
+++ b/src/core/channel/client_setup.c
@@ -61,6 +61,7 @@ struct grpc_client_setup {
struct grpc_client_setup_request {
/* pointer back to the setup object */
grpc_client_setup *setup;
+ grpc_pollset_set interested_parties;
gpr_timespec deadline;
};
@@ -68,6 +69,11 @@ gpr_timespec grpc_client_setup_request_deadline(grpc_client_setup_request *r) {
return r->deadline;
}
+grpc_pollset_set *grpc_client_setup_get_interested_parties(
+ grpc_client_setup_request *r) {
+ return &r->interested_parties;
+}
+
static void destroy_setup(grpc_client_setup *s) {
gpr_mu_destroy(&s->mu);
gpr_cv_destroy(&s->cv);
@@ -76,6 +82,11 @@ static void destroy_setup(grpc_client_setup *s) {
gpr_free(s);
}
+static void destroy_request(grpc_client_setup_request *r) {
+ grpc_pollset_set_destroy(&r->interested_parties);
+ gpr_free(r);
+}
+
/* initiate handshaking */
static void setup_initiate(grpc_transport_setup *sp) {
grpc_client_setup *s = (grpc_client_setup *)sp;
@@ -83,6 +94,7 @@ static void setup_initiate(grpc_transport_setup *sp) {
int in_alarm = 0;
r->setup = s;
+ grpc_pollset_set_init(&r->interested_parties);
/* TODO(klempner): Actually set a deadline */
r->deadline = gpr_inf_future;
@@ -104,8 +116,36 @@ static void setup_initiate(grpc_transport_setup *sp) {
if (!in_alarm) {
s->initiate(s->user_data, r);
} else {
- gpr_free(r);
+ destroy_request(r);
+ }
+}
+
+static void setup_add_interested_party(grpc_transport_setup *sp, grpc_pollset *pollset) {
+ grpc_client_setup *s = (grpc_client_setup *)sp;
+
+ gpr_mu_lock(&s->mu);
+ if (!s->active_request) {
+ gpr_mu_unlock(&s->mu);
+ return;
+ }
+
+ grpc_pollset_set_add_pollset(&s->active_request->interested_parties, pollset);
+
+ gpr_mu_unlock(&s->mu);
+}
+
+static void setup_del_interested_party(grpc_transport_setup *sp, grpc_pollset *pollset) {
+ grpc_client_setup *s = (grpc_client_setup *)sp;
+
+ gpr_mu_lock(&s->mu);
+ if (!s->active_request) {
+ gpr_mu_unlock(&s->mu);
+ return;
}
+
+ grpc_pollset_set_del_pollset(&s->active_request->interested_parties, pollset);
+
+ gpr_mu_unlock(&s->mu);
}
/* cancel handshaking: cancel all requests, and shutdown (the caller promises
@@ -157,6 +197,8 @@ void grpc_client_setup_cb_end(grpc_client_setup_request *r) {
/* vtable for transport setup */
static const grpc_transport_setup_vtable setup_vtable = {setup_initiate,
+ setup_add_interested_party,
+ setup_del_interested_party,
setup_cancel};
void grpc_client_setup_create_and_attach(
@@ -196,26 +238,24 @@ int grpc_client_setup_request_should_continue(grpc_client_setup_request *r) {
}
static void backoff_alarm_done(void *arg /* grpc_client_setup */, int success) {
- grpc_client_setup *s = arg;
- grpc_client_setup_request *r = gpr_malloc(sizeof(grpc_client_setup_request));
- r->setup = s;
- /* TODO(klempner): Set this to something useful */
- r->deadline = gpr_inf_future;
+ grpc_client_setup_request *r = arg;
+ grpc_client_setup *s = r->setup;
/* Handle status cancelled? */
gpr_mu_lock(&s->mu);
- s->active_request = r;
s->in_alarm = 0;
- if (!success) {
+ if (s->active_request != NULL || !success) {
if (0 == --s->refs) {
gpr_mu_unlock(&s->mu);
destroy_setup(s);
- gpr_free(r);
+ destroy_request(r);
return;
} else {
gpr_mu_unlock(&s->mu);
+ destroy_request(r);
return;
}
}
+ s->active_request = r;
gpr_mu_unlock(&s->mu);
s->initiate(s->user_data, r);
}
@@ -234,12 +274,10 @@ void grpc_client_setup_request_finish(grpc_client_setup_request *r,
if (!retry && 0 == --s->refs) {
gpr_mu_unlock(&s->mu);
destroy_setup(s);
- gpr_free(r);
+ destroy_request(r);
return;
}
- gpr_free(r);
-
if (retry) {
/* TODO(klempner): Replace these values with further consideration. 2x is
probably too aggressive of a backoff. */
@@ -248,7 +286,7 @@ void grpc_client_setup_request_finish(grpc_client_setup_request *r,
gpr_timespec deadline = gpr_time_add(s->current_backoff_interval, now);
GPR_ASSERT(!s->in_alarm);
s->in_alarm = 1;
- grpc_alarm_init(&s->backoff_alarm, deadline, backoff_alarm_done, s, now);
+ grpc_alarm_init(&s->backoff_alarm, deadline, backoff_alarm_done, r, now);
s->current_backoff_interval =
gpr_time_add(s->current_backoff_interval, s->current_backoff_interval);
if (gpr_time_cmp(s->current_backoff_interval, max_backoff) > 0) {
diff --git a/src/core/channel/client_setup.h b/src/core/channel/client_setup.h
index 70137e1365..2486da5dbf 100644
--- a/src/core/channel/client_setup.h
+++ b/src/core/channel/client_setup.h
@@ -67,6 +67,8 @@ void grpc_client_setup_cb_end(grpc_client_setup_request *r);
/* Get the deadline for a request passed in to initiate. Implementations should
make a best effort to honor this deadline. */
gpr_timespec grpc_client_setup_request_deadline(grpc_client_setup_request *r);
+grpc_pollset_set *grpc_client_setup_get_interested_parties(
+ grpc_client_setup_request *r);
grpc_mdctx *grpc_client_setup_get_mdctx(grpc_client_setup_request *r);