diff options
author | Mark D. Roth <roth@google.com> | 2016-11-14 12:08:13 -0800 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2016-11-14 12:08:13 -0800 |
commit | b16c1e32fd774ab38dc6cbf671c29f676905f889 (patch) | |
tree | d452714a6762e7afe3a694860c3c2575e78f695f /src/core/lib/channel | |
parent | 4b5cdb751ebda76b7cd78577f80b58a3bcf0105a (diff) |
Move deadline handling to handshake manager.
Diffstat (limited to 'src/core/lib/channel')
-rw-r--r-- | src/core/lib/channel/handshaker.c | 145 | ||||
-rw-r--r-- | src/core/lib/channel/handshaker.h | 1 |
2 files changed, 89 insertions, 57 deletions
diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index bb2833046f..681baaa7aa 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -38,6 +38,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/iomgr/timer.h" // // grpc_handshaker @@ -60,11 +61,10 @@ static void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx, static void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, - gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, grpc_closure* on_handshake_done, grpc_handshaker_args* args) { - handshaker->vtable->do_handshake(exec_ctx, handshaker, deadline, acceptor, + handshaker->vtable->do_handshake(exec_ctx, handshaker, acceptor, on_handshake_done, args); } @@ -72,31 +72,29 @@ static void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, // grpc_handshake_manager // -// State used while chaining handshakers. -struct grpc_handshaker_state { - // The index of the handshaker to invoke next and the closure to invoke it. +struct grpc_handshake_manager { + gpr_mu mu; + gpr_refcount refs; + // An array of handshakers added via grpc_handshake_manager_add(). + size_t count; + grpc_handshaker** handshakers; + // The index of the handshaker to invoke next and closure to invoke it. size_t index; grpc_closure call_next_handshaker; - // The deadline for all handshakers. - gpr_timespec deadline; // The acceptor to call the handshakers with. grpc_tcp_server_acceptor* acceptor; + // Deadline timer across all handshakers. + grpc_timer deadline_timer; // The final callback and user_data to invoke after the last handshaker. grpc_closure on_handshake_done; void* user_data; }; -struct grpc_handshake_manager { - // An array of handshakers added via grpc_handshake_manager_add(). - size_t count; - grpc_handshaker** handshakers; - // State used while chaining handshakers. - struct grpc_handshaker_state* state; -}; - grpc_handshake_manager* grpc_handshake_manager_create() { grpc_handshake_manager* mgr = gpr_malloc(sizeof(grpc_handshake_manager)); memset(mgr, 0, sizeof(*mgr)); + gpr_mu_init(&mgr->mu); + gpr_ref_init(&mgr->refs, 1); return mgr; } @@ -104,6 +102,7 @@ static bool is_power_of_2(size_t n) { return (n & (n - 1)) == 0; } void grpc_handshake_manager_add(grpc_handshake_manager* mgr, grpc_handshaker* handshaker) { + gpr_mu_lock(&mgr->mu); // To avoid allocating memory for each handshaker we add, we double // the number of elements every time we need more. size_t realloc_count = 0; @@ -117,57 +116,86 @@ void grpc_handshake_manager_add(grpc_handshake_manager* mgr, gpr_realloc(mgr->handshakers, realloc_count * sizeof(grpc_handshaker*)); } mgr->handshakers[mgr->count++] = handshaker; + gpr_mu_unlock(&mgr->mu); +} + +static void grpc_handshake_manager_unref(grpc_exec_ctx* exec_ctx, + grpc_handshake_manager* mgr) { + if (gpr_unref(&mgr->refs)) { + for (size_t i = 0; i < mgr->count; ++i) { + grpc_handshaker_destroy(exec_ctx, mgr->handshakers[i]); + } + gpr_free(mgr->handshakers); + gpr_mu_destroy(&mgr->mu); + gpr_free(mgr); + } } void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr) { - for (size_t i = 0; i < mgr->count; ++i) { - grpc_handshaker_destroy(exec_ctx, mgr->handshakers[i]); - } - gpr_free(mgr->handshakers); - gpr_free(mgr); + grpc_handshake_manager_unref(exec_ctx, mgr); } void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr) { + gpr_mu_lock(&mgr->mu); for (size_t i = 0; i < mgr->count; ++i) { grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[i]); } - if (mgr->state != NULL) { - gpr_free(mgr->state); - mgr->state = NULL; - } + gpr_mu_unlock(&mgr->mu); } -// A function used as the handshaker-done callback when chaining -// handshakers together. static void call_next_handshaker(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { - grpc_handshaker_args* args = arg; - grpc_handshake_manager* mgr = args->user_data; - GPR_ASSERT(mgr->state != NULL); - GPR_ASSERT(mgr->state->index <= mgr->count); + grpc_error* error); + +// Helper function to call either the next handshaker or the +// on_handshake_done callback. +static void call_next_handshaker_locked( + grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, + grpc_handshaker_args* args, grpc_error* error) { + GPR_ASSERT(mgr->index <= mgr->count); // If we got an error, skip all remaining handshakers and invoke the // caller-supplied callback immediately. - // Otherwise, if this is the last handshaker, then call the final + // Otherwise, if this is the last handshaker, then call the on_handshake_done // callback instead of chaining back to this function again. - if (error != GRPC_ERROR_NONE || mgr->state->index == mgr->count) { - args->user_data = mgr->state->user_data; - grpc_exec_ctx_sched(exec_ctx, &mgr->state->on_handshake_done, + if (error != GRPC_ERROR_NONE || mgr->index == mgr->count) { + // Cancel deadline timer, since we're invoking the on_handshake_done + // callback now. + grpc_timer_cancel(exec_ctx, &mgr->deadline_timer); + args->user_data = mgr->user_data; + grpc_exec_ctx_sched(exec_ctx, &mgr->on_handshake_done, GRPC_ERROR_REF(error), NULL); + // Since we're invoking the final callback, we won't be coming back + // to this function, so we can release our reference to the + // handshake manager. + grpc_handshake_manager_unref(exec_ctx, mgr); return; } // Call the next handshaker. grpc_handshaker_do_handshake( - exec_ctx, mgr->handshakers[mgr->state->index], mgr->state->deadline, - mgr->state->acceptor, &mgr->state->call_next_handshaker, args); - // If this is the last handshaker, clean up state. - if (mgr->state->index == mgr->count) { - gpr_free(mgr->state); - mgr->state = NULL; - } else { - ++mgr->state->index; + exec_ctx, mgr->handshakers[mgr->index], mgr->acceptor, + &mgr->call_next_handshaker, args); + ++mgr->index; +} + +// A function used as the handshaker-done callback when chaining +// handshakers together. +static void call_next_handshaker(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_handshaker_args* args = arg; + grpc_handshake_manager* mgr = args->user_data; + gpr_mu_lock(&mgr->mu); + call_next_handshaker_locked(exec_ctx, mgr, args, error); + gpr_mu_unlock(&mgr->mu); +} + +// Callback invoked when deadline is exceeded. +static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + grpc_handshake_manager* mgr = arg; + if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled. + grpc_handshake_manager_shutdown(exec_ctx, mgr); } + grpc_handshake_manager_unref(exec_ctx, mgr); } void grpc_handshake_manager_do_handshake( @@ -176,26 +204,31 @@ void grpc_handshake_manager_do_handshake( gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, grpc_iomgr_cb_func on_handshake_done, void* user_data) { // Construct handshaker args. These will be passed through all - // handshakers and eventually be freed by the final callback. + // handshakers and eventually be freed by the on_handshake_done callback. grpc_handshaker_args* args = gpr_malloc(sizeof(*args)); args->endpoint = endpoint; args->args = grpc_channel_args_copy(channel_args); args->read_buffer = gpr_malloc(sizeof(*args->read_buffer)); grpc_slice_buffer_init(args->read_buffer); - // Construct state. - GPR_ASSERT(mgr->state == NULL); - mgr->state = gpr_malloc(sizeof(struct grpc_handshaker_state)); - memset(mgr->state, 0, sizeof(*mgr->state)); - grpc_closure_init(&mgr->state->call_next_handshaker, call_next_handshaker, - args); - mgr->state->deadline = deadline; - mgr->state->acceptor = acceptor; - grpc_closure_init(&mgr->state->on_handshake_done, on_handshake_done, args); + // Initialize state needed for calling handshakers. + gpr_mu_lock(&mgr->mu); + GPR_ASSERT(mgr->index == 0); + mgr->acceptor = acceptor; + grpc_closure_init(&mgr->call_next_handshaker, call_next_handshaker, args); + grpc_closure_init(&mgr->on_handshake_done, on_handshake_done, args); // While chaining between handshakers, we use args->user_data to // store a pointer to the handshake manager. This will be // changed to point to the caller-supplied user_data before calling - // the final callback. + // the on_handshake_done callback. args->user_data = mgr; - mgr->state->user_data = user_data; - call_next_handshaker(exec_ctx, args, GRPC_ERROR_NONE); + mgr->user_data = user_data; + // Start deadline timer, which owns a ref. + gpr_ref(&mgr->refs); + grpc_timer_init(exec_ctx, &mgr->deadline_timer, + gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), + on_timeout, mgr, gpr_now(GPR_CLOCK_MONOTONIC)); + // Start first handshaker, which also owns a ref. + gpr_ref(&mgr->refs); + call_next_handshaker_locked(exec_ctx, mgr, args, GRPC_ERROR_NONE); + gpr_mu_unlock(&mgr->mu); } diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index eb869e05dc..2b633dba98 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -80,7 +80,6 @@ typedef struct { /// When finished, invokes \a on_handshake_done. /// \a acceptor will be NULL for client-side handshakers. void (*do_handshake)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, - gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, grpc_closure* on_handshake_done, grpc_handshaker_args* args); |