diff options
author | Alexander Polcyn <apolcyn@google.com> | 2017-04-18 13:51:36 -0700 |
---|---|---|
committer | Alexander Polcyn <apolcyn@google.com> | 2017-05-18 12:09:26 -0700 |
commit | c3b1f18a7e7f443329f95ff25485c503ab76cc69 (patch) | |
tree | fb2c76f39c0bbe118cd4ba3f96d2fd7678d995ae /src | |
parent | 7ad166fafcc5e75097ac6a8c5403423500a23a7a (diff) |
get rid of connectivity state watchers right after timeout
Diffstat (limited to 'src')
5 files changed, 171 insertions, 32 deletions
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c index 62f58fb278..5f3ffd4947 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.c +++ b/src/core/ext/filters/client_channel/channel_connectivity.c @@ -67,9 +67,8 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( typedef enum { WAITING, - CALLING_BACK, + READY_TO_CALL_BACK, CALLING_BACK_AND_FINISHED, - CALLED_BACK } callback_phase; typedef struct { @@ -77,11 +76,13 @@ typedef struct { callback_phase phase; grpc_closure on_complete; grpc_closure on_timeout; + grpc_closure watcher_timer_init; grpc_timer alarm; grpc_connectivity_state state; grpc_completion_queue *cq; grpc_cq_completion completion_storage; grpc_channel *channel; + grpc_error *error; void *tag; } state_watcher; @@ -105,11 +106,8 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw, gpr_mu_lock(&w->mu); switch (w->phase) { case WAITING: - case CALLED_BACK: + case READY_TO_CALL_BACK: GPR_UNREACHABLE_CODE(return ); - case CALLING_BACK: - w->phase = CALLED_BACK; - break; case CALLING_BACK_AND_FINISHED: delete = 1; break; @@ -123,10 +121,14 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw, static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, bool due_to_completion, grpc_error *error) { - int delete = 0; - if (due_to_completion) { grpc_timer_cancel(exec_ctx, &w->alarm); + } else { + grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(w->channel)); + grpc_client_channel_watch_connectivity_state(exec_ctx, client_channel_elem, + grpc_cq_pollset(w->cq), NULL, + &w->on_complete, NULL); } gpr_mu_lock(&w->mu); @@ -147,25 +149,27 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, } switch (w->phase) { case WAITING: - w->phase = CALLING_BACK; - grpc_cq_end_op(exec_ctx, w->cq, w->tag, GRPC_ERROR_REF(error), - finished_completion, w, &w->completion_storage); + GRPC_ERROR_REF(error); + w->error = error; + w->phase = READY_TO_CALL_BACK; break; - case CALLING_BACK: + case READY_TO_CALL_BACK: + if (error != GRPC_ERROR_NONE) { + GPR_ASSERT(!due_to_completion); + GRPC_ERROR_UNREF(w->error); + GRPC_ERROR_REF(error); + w->error = error; + } w->phase = CALLING_BACK_AND_FINISHED; + grpc_cq_end_op(exec_ctx, w->cq, w->tag, w->error, finished_completion, w, + &w->completion_storage); break; case CALLING_BACK_AND_FINISHED: GPR_UNREACHABLE_CODE(return ); - case CALLED_BACK: - delete = 1; break; } gpr_mu_unlock(&w->mu); - if (delete) { - delete_state_watcher(exec_ctx, w); - } - GRPC_ERROR_UNREF(error); } @@ -179,6 +183,28 @@ static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, partly_done(exec_ctx, pw, false, GRPC_ERROR_REF(error)); } +int grpc_channel_num_external_connectivity_watchers(grpc_channel *channel) { + grpc_channel_element *client_channel_elem = + grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); + return grpc_client_channel_num_external_connectivity_watchers( + client_channel_elem); +} + +typedef struct watcher_timer_init_arg { + state_watcher *w; + gpr_timespec deadline; +} watcher_timer_init_arg; + +static void watcher_timer_init(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + watcher_timer_init_arg *wa = (watcher_timer_init_arg *)arg; + + grpc_timer_init(exec_ctx, &wa->w->alarm, + gpr_convert_clock_type(wa->deadline, GPR_CLOCK_MONOTONIC), + &wa->w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC)); + gpr_free(wa); +} + void grpc_channel_watch_connectivity_state( grpc_channel *channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue *cq, void *tag) { @@ -208,16 +234,19 @@ void grpc_channel_watch_connectivity_state( w->cq = cq; w->tag = tag; w->channel = channel; + w->error = NULL; - grpc_timer_init(&exec_ctx, &w->alarm, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - &w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC)); + watcher_timer_init_arg *wa = gpr_malloc(sizeof(watcher_timer_init_arg)); + wa->w = w; + wa->deadline = deadline; + grpc_closure_init(&w->watcher_timer_init, watcher_timer_init, wa, + grpc_schedule_on_exec_ctx); if (client_channel_elem->filter == &grpc_client_channel_filter) { GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity"); - grpc_client_channel_watch_connectivity_state(&exec_ctx, client_channel_elem, - grpc_cq_pollset(cq), &w->state, - &w->on_complete); + grpc_client_channel_watch_connectivity_state( + &exec_ctx, client_channel_elem, grpc_cq_pollset(cq), &w->state, + &w->on_complete, &w->watcher_timer_init); } else { abort(); } diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index 83e3b8f118..4bd153e8ac 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -174,6 +174,8 @@ static void *method_parameters_create_from_json(const grpc_json *json) { return value; } +struct external_connectivity_watcher; + /************************************************************************* * CHANNEL-WIDE FUNCTIONS */ @@ -209,6 +211,11 @@ typedef struct client_channel_channel_data { /** interested parties (owned) */ grpc_pollset_set *interested_parties; + /* external_connectivity_watcher_list head is guarded by its own mutex, since + * counts need to be grabbed immediately without polling on a cq */ + gpr_mu external_connectivity_watcher_list_mu; + struct external_connectivity_watcher *external_connectivity_watcher_list_head; + /* the following properties are guarded by a mutex since API's require them to be instantaneously available */ gpr_mu info_mu; @@ -632,6 +639,12 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, // Initialize data members. chand->combiner = grpc_combiner_create(NULL); gpr_mu_init(&chand->info_mu); + gpr_mu_init(&chand->external_connectivity_watcher_list_mu); + + gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); + chand->external_connectivity_watcher_list_head = NULL; + gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); + chand->owning_stack = args->channel_stack; grpc_closure_init(&chand->on_resolver_result_changed, on_resolver_result_changed_locked, chand, @@ -718,6 +731,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_pollset_set_destroy(exec_ctx, chand->interested_parties); GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel"); gpr_mu_destroy(&chand->info_mu); + gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu); } /************************************************************************* @@ -1361,14 +1375,79 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( return out; } -typedef struct { +typedef struct external_connectivity_watcher { channel_data *chand; grpc_pollset *pollset; grpc_closure *on_complete; + grpc_closure *watcher_timer_init; grpc_connectivity_state *state; grpc_closure my_closure; + struct external_connectivity_watcher *next; } external_connectivity_watcher; +static external_connectivity_watcher *lookup_external_connectivity_watcher( + channel_data *chand, grpc_closure *on_complete) { + gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); + external_connectivity_watcher *w = + chand->external_connectivity_watcher_list_head; + while (w != NULL && w->on_complete != on_complete) { + w = w->next; + } + gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); + return w; +} + +static void external_connectivity_watcher_list_append( + channel_data *chand, external_connectivity_watcher *w) { + GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete)); + + gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu); + GPR_ASSERT(!w->next); + w->next = chand->external_connectivity_watcher_list_head; + chand->external_connectivity_watcher_list_head = w; + gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu); +} + +static void external_connectivity_watcher_list_remove( + channel_data *chand, external_connectivity_watcher *too_remove) { + GPR_ASSERT( + lookup_external_connectivity_watcher(chand, too_remove->on_complete)); + gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); + if (too_remove == chand->external_connectivity_watcher_list_head) { + chand->external_connectivity_watcher_list_head = too_remove->next; + gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); + return; + } + external_connectivity_watcher *w = + chand->external_connectivity_watcher_list_head; + while (w != NULL) { + if (w->next == too_remove) { + w->next = w->next->next; + gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); + return; + } + w = w->next; + } + GPR_UNREACHABLE_CODE(return ); +} + +int grpc_client_channel_num_external_connectivity_watchers( + grpc_channel_element *elem) { + channel_data *chand = elem->channel_data; + int count = 0; + + gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); + external_connectivity_watcher *w = + chand->external_connectivity_watcher_list_head; + while (w != NULL) { + count++; + w = w->next; + } + gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); + + return count; +} + static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { external_connectivity_watcher *w = arg; @@ -1377,6 +1456,7 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, w->pollset); GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); + external_connectivity_watcher_list_remove(w->chand, w); gpr_free(w); grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error)); } @@ -1384,21 +1464,42 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_ignored) { external_connectivity_watcher *w = arg; - grpc_closure_init(&w->my_closure, on_external_watch_complete, w, - grpc_schedule_on_exec_ctx); - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure); + external_connectivity_watcher *found = NULL; + if (w->state != NULL) { + external_connectivity_watcher_list_append(w->chand, w); + grpc_closure_run(exec_ctx, w->watcher_timer_init, GRPC_ERROR_NONE); + grpc_closure_init(&w->my_closure, on_external_watch_complete, w, + grpc_schedule_on_exec_ctx); + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure); + } else { + GPR_ASSERT(w->watcher_timer_init == NULL); + found = lookup_external_connectivity_watcher(w->chand, w->on_complete); + if (found) { + GPR_ASSERT(found->on_complete == w->on_complete); + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &found->chand->state_tracker, NULL, &found->my_closure); + } + grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties, + w->pollset); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, + "external_connectivity_watcher"); + gpr_free(w); + } } void grpc_client_channel_watch_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, - grpc_connectivity_state *state, grpc_closure *on_complete) { + grpc_connectivity_state *state, grpc_closure *on_complete, + grpc_closure *watcher_timer_init) { channel_data *chand = elem->channel_data; - external_connectivity_watcher *w = gpr_malloc(sizeof(*w)); + external_connectivity_watcher *w = gpr_zalloc(sizeof(*w)); w->chand = chand; w->pollset = pollset; w->on_complete = on_complete; w->state = state; + w->watcher_timer_init = watcher_timer_init; + grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index 8d2490ea55..356a7ab0c1 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -53,9 +53,13 @@ extern const grpc_channel_filter grpc_client_channel_filter; grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect); +int grpc_client_channel_num_external_connectivity_watchers( + grpc_channel_element *elem); + void grpc_client_channel_watch_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, - grpc_connectivity_state *state, grpc_closure *on_complete); + grpc_connectivity_state *state, grpc_closure *on_complete, + grpc_closure *watcher_timer_init); /* Debug helper: pull the subchannel call from a call stack element */ grpc_subchannel_call *grpc_client_channel_get_subchannel_call( diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 063f92114c..332907c0af 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -103,6 +103,7 @@ grpc_alarm_create_type grpc_alarm_create_import; grpc_alarm_cancel_type grpc_alarm_cancel_import; grpc_alarm_destroy_type grpc_alarm_destroy_import; grpc_channel_check_connectivity_state_type grpc_channel_check_connectivity_state_import; +grpc_channel_num_external_connectivity_watchers_type grpc_channel_num_external_connectivity_watchers_import; grpc_channel_watch_connectivity_state_type grpc_channel_watch_connectivity_state_import; grpc_channel_create_call_type grpc_channel_create_call_import; grpc_channel_ping_type grpc_channel_ping_import; @@ -400,6 +401,7 @@ void grpc_rb_load_imports(HMODULE library) { grpc_alarm_cancel_import = (grpc_alarm_cancel_type) GetProcAddress(library, "grpc_alarm_cancel"); grpc_alarm_destroy_import = (grpc_alarm_destroy_type) GetProcAddress(library, "grpc_alarm_destroy"); grpc_channel_check_connectivity_state_import = (grpc_channel_check_connectivity_state_type) GetProcAddress(library, "grpc_channel_check_connectivity_state"); + grpc_channel_num_external_connectivity_watchers_import = (grpc_channel_num_external_connectivity_watchers_type) GetProcAddress(library, "grpc_channel_num_external_connectivity_watchers"); grpc_channel_watch_connectivity_state_import = (grpc_channel_watch_connectivity_state_type) GetProcAddress(library, "grpc_channel_watch_connectivity_state"); grpc_channel_create_call_import = (grpc_channel_create_call_type) GetProcAddress(library, "grpc_channel_create_call"); grpc_channel_ping_import = (grpc_channel_ping_type) GetProcAddress(library, "grpc_channel_ping"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index f5dcd68a8e..e6a3f6d2f4 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -260,6 +260,9 @@ extern grpc_alarm_destroy_type grpc_alarm_destroy_import; typedef grpc_connectivity_state(*grpc_channel_check_connectivity_state_type)(grpc_channel *channel, int try_to_connect); extern grpc_channel_check_connectivity_state_type grpc_channel_check_connectivity_state_import; #define grpc_channel_check_connectivity_state grpc_channel_check_connectivity_state_import +typedef int(*grpc_channel_num_external_connectivity_watchers_type)(grpc_channel *channel); +extern grpc_channel_num_external_connectivity_watchers_type grpc_channel_num_external_connectivity_watchers_import; +#define grpc_channel_num_external_connectivity_watchers grpc_channel_num_external_connectivity_watchers_import typedef void(*grpc_channel_watch_connectivity_state_type)(grpc_channel *channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue *cq, void *tag); extern grpc_channel_watch_connectivity_state_type grpc_channel_watch_connectivity_state_import; #define grpc_channel_watch_connectivity_state grpc_channel_watch_connectivity_state_import |