diff options
Diffstat (limited to 'src/core/channel')
-rw-r--r-- | src/core/channel/client_channel.c | 7 | ||||
-rw-r--r-- | src/core/channel/client_setup.c | 64 | ||||
-rw-r--r-- | src/core/channel/client_setup.h | 2 |
3 files changed, 59 insertions, 14 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 78f8d06d89..9f23ed3d71 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -39,6 +39,7 @@ #include "src/core/channel/child_channel.h" #include "src/core/channel/connected_channel.h" #include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/pollset_set.h" #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -131,7 +132,10 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) { size_t new_count; size_t i; for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) { - if (chand->waiting_children[i] == calld) continue; + if (chand->waiting_children[i] == calld) { + grpc_transport_setup_del_interested_party(chand->transport_setup, calld->s.waiting_op.bind_pollset); + continue; + } chand->waiting_children[new_count++] = chand->waiting_children[i]; } GPR_ASSERT(new_count == chand->waiting_child_count - 1 || @@ -227,6 +231,7 @@ static void cc_start_transport_op(grpc_call_element *elem, if (initiate_transport_setup) { grpc_transport_setup_initiate(chand->transport_setup); } + grpc_transport_setup_add_interested_party(chand->transport_setup, op->bind_pollset); } } break; diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c index 6d892d6c92..e2bd7ef52e 100644 --- a/src/core/channel/client_setup.c +++ b/src/core/channel/client_setup.c @@ -61,6 +61,7 @@ struct grpc_client_setup { struct grpc_client_setup_request { /* pointer back to the setup object */ grpc_client_setup *setup; + grpc_pollset_set interested_parties; gpr_timespec deadline; }; @@ -68,6 +69,11 @@ gpr_timespec grpc_client_setup_request_deadline(grpc_client_setup_request *r) { return r->deadline; } +grpc_pollset_set *grpc_client_setup_get_interested_parties( + grpc_client_setup_request *r) { + return &r->interested_parties; +} + static void destroy_setup(grpc_client_setup *s) { gpr_mu_destroy(&s->mu); gpr_cv_destroy(&s->cv); @@ -76,6 +82,11 @@ static void destroy_setup(grpc_client_setup *s) { gpr_free(s); } +static void destroy_request(grpc_client_setup_request *r) { + grpc_pollset_set_destroy(&r->interested_parties); + gpr_free(r); +} + /* initiate handshaking */ static void setup_initiate(grpc_transport_setup *sp) { grpc_client_setup *s = (grpc_client_setup *)sp; @@ -83,6 +94,7 @@ static void setup_initiate(grpc_transport_setup *sp) { int in_alarm = 0; r->setup = s; + grpc_pollset_set_init(&r->interested_parties); /* TODO(klempner): Actually set a deadline */ r->deadline = gpr_inf_future; @@ -104,8 +116,36 @@ static void setup_initiate(grpc_transport_setup *sp) { if (!in_alarm) { s->initiate(s->user_data, r); } else { - gpr_free(r); + destroy_request(r); + } +} + +static void setup_add_interested_party(grpc_transport_setup *sp, grpc_pollset *pollset) { + grpc_client_setup *s = (grpc_client_setup *)sp; + + gpr_mu_lock(&s->mu); + if (!s->active_request) { + gpr_mu_unlock(&s->mu); + return; + } + + grpc_pollset_set_add_pollset(&s->active_request->interested_parties, pollset); + + gpr_mu_unlock(&s->mu); +} + +static void setup_del_interested_party(grpc_transport_setup *sp, grpc_pollset *pollset) { + grpc_client_setup *s = (grpc_client_setup *)sp; + + gpr_mu_lock(&s->mu); + if (!s->active_request) { + gpr_mu_unlock(&s->mu); + return; } + + grpc_pollset_set_del_pollset(&s->active_request->interested_parties, pollset); + + gpr_mu_unlock(&s->mu); } /* cancel handshaking: cancel all requests, and shutdown (the caller promises @@ -157,6 +197,8 @@ void grpc_client_setup_cb_end(grpc_client_setup_request *r) { /* vtable for transport setup */ static const grpc_transport_setup_vtable setup_vtable = {setup_initiate, + setup_add_interested_party, + setup_del_interested_party, setup_cancel}; void grpc_client_setup_create_and_attach( @@ -196,26 +238,24 @@ int grpc_client_setup_request_should_continue(grpc_client_setup_request *r) { } static void backoff_alarm_done(void *arg /* grpc_client_setup */, int success) { - grpc_client_setup *s = arg; - grpc_client_setup_request *r = gpr_malloc(sizeof(grpc_client_setup_request)); - r->setup = s; - /* TODO(klempner): Set this to something useful */ - r->deadline = gpr_inf_future; + grpc_client_setup_request *r = arg; + grpc_client_setup *s = r->setup; /* Handle status cancelled? */ gpr_mu_lock(&s->mu); - s->active_request = r; s->in_alarm = 0; - if (!success) { + if (s->active_request != NULL || !success) { if (0 == --s->refs) { gpr_mu_unlock(&s->mu); destroy_setup(s); - gpr_free(r); + destroy_request(r); return; } else { gpr_mu_unlock(&s->mu); + destroy_request(r); return; } } + s->active_request = r; gpr_mu_unlock(&s->mu); s->initiate(s->user_data, r); } @@ -234,12 +274,10 @@ void grpc_client_setup_request_finish(grpc_client_setup_request *r, if (!retry && 0 == --s->refs) { gpr_mu_unlock(&s->mu); destroy_setup(s); - gpr_free(r); + destroy_request(r); return; } - gpr_free(r); - if (retry) { /* TODO(klempner): Replace these values with further consideration. 2x is probably too aggressive of a backoff. */ @@ -248,7 +286,7 @@ void grpc_client_setup_request_finish(grpc_client_setup_request *r, gpr_timespec deadline = gpr_time_add(s->current_backoff_interval, now); GPR_ASSERT(!s->in_alarm); s->in_alarm = 1; - grpc_alarm_init(&s->backoff_alarm, deadline, backoff_alarm_done, s, now); + grpc_alarm_init(&s->backoff_alarm, deadline, backoff_alarm_done, r, now); s->current_backoff_interval = gpr_time_add(s->current_backoff_interval, s->current_backoff_interval); if (gpr_time_cmp(s->current_backoff_interval, max_backoff) > 0) { diff --git a/src/core/channel/client_setup.h b/src/core/channel/client_setup.h index 70137e1365..2486da5dbf 100644 --- a/src/core/channel/client_setup.h +++ b/src/core/channel/client_setup.h @@ -67,6 +67,8 @@ void grpc_client_setup_cb_end(grpc_client_setup_request *r); /* Get the deadline for a request passed in to initiate. Implementations should make a best effort to honor this deadline. */ gpr_timespec grpc_client_setup_request_deadline(grpc_client_setup_request *r); +grpc_pollset_set *grpc_client_setup_get_interested_parties( + grpc_client_setup_request *r); grpc_mdctx *grpc_client_setup_get_mdctx(grpc_client_setup_request *r); |