diff options
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/http/httpcli.c | 4 | ||||
-rw-r--r-- | src/core/lib/http/httpcli_security_connector.c | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/combiner.c | 23 | ||||
-rw-r--r-- | src/core/lib/iomgr/iomgr.c | 3 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_generic.c | 53 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_manager.c | 186 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_uv.c | 6 | ||||
-rw-r--r-- | src/core/lib/security/transport/security_connector.c | 19 | ||||
-rw-r--r-- | src/core/lib/security/transport/security_handshaker.c | 220 |
10 files changed, 312 insertions, 216 deletions
diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 0ac2c2ad52..7012ffe568 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -105,7 +105,7 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, grpc_error *error) { grpc_polling_entity_del_from_pollset_set(exec_ctx, req->pollent, req->context->pollset_set); - grpc_closure_sched(exec_ctx, req->on_done, GRPC_ERROR_REF(error)); + grpc_closure_sched(exec_ctx, req->on_done, error); grpc_http_parser_destroy(&req->parser); if (req->addresses != NULL) { grpc_resolved_addresses_destroy(req->addresses); @@ -244,7 +244,7 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req, static void on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { internal_request *req = arg; if (error != GRPC_ERROR_NONE) { - finish(exec_ctx, req, error); + finish(exec_ctx, req, GRPC_ERROR_REF(error)); return; } req->next_address = 0; diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c index 76946434f0..ea7c1122c1 100644 --- a/src/core/lib/http/httpcli_security_connector.c +++ b/src/core/lib/http/httpcli_security_connector.c @@ -44,6 +44,7 @@ #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/string.h" #include "src/core/tsi/ssl_transport_security.h" +#include "src/core/tsi/transport_security_adapter.h" typedef struct { grpc_channel_security_connector base; @@ -78,7 +79,8 @@ static void httpcli_ssl_add_handshakers(grpc_exec_ctx *exec_ctx, } grpc_handshake_manager_add( handshake_mgr, - grpc_security_handshaker_create(exec_ctx, handshaker, &sc->base)); + grpc_security_handshaker_create( + exec_ctx, tsi_create_adapter_handshaker(handshaker), &sc->base)); } static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 38eace12c7..b77a68eead 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -59,6 +59,11 @@ struct grpc_combiner { grpc_closure_scheduler scheduler; grpc_closure_scheduler finally_scheduler; gpr_mpscq queue; + // either: + // a pointer to the initiating exec ctx if that is the only exec_ctx that has + // ever queued to this combiner, or NULL. If this is non-null, it's not + // dereferencable (since the initiating exec_ctx may have gone out of scope) + gpr_atm initiating_exec_ctx_or_null; // state is: // lower bit - zero if orphaned (STATE_UNORPHANED) // other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT) @@ -168,15 +173,25 @@ static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *cl, GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute c=%p last=%" PRIdPTR, lock, cl, last)); - GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed - assert(cl->cb); - cl->error_data.error = error; - gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); if (last == 1) { + gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, + (gpr_atm)exec_ctx); // first element on this list: add it to the list of combiner locks // executing within this exec_ctx push_last_on_exec_ctx(exec_ctx, lock); + } else { + // there may be a race with setting here: if that happens, we may delay + // offload for one or two actions, and that's fine + gpr_atm initiator = + gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null); + if (initiator != 0 && initiator != (gpr_atm)exec_ctx) { + gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, 0); + } } + GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed + assert(cl->cb); + cl->error_data.error = error; + gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); GPR_TIMER_END("combiner.execute", 0); } diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c index 0623acc597..391449136e 100644 --- a/src/core/lib/iomgr/iomgr.c +++ b/src/core/lib/iomgr/iomgr.c @@ -110,7 +110,8 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) { } last_warning_time = gpr_now(GPR_CLOCK_REALTIME); } - if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) { + if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL) == + GRPC_TIMERS_FIRED) { gpr_mu_unlock(&g_mu); grpc_exec_ctx_flush(exec_ctx); grpc_iomgr_platform_flush(); diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h index e0338f93c7..d6758f7c3b 100644 --- a/src/core/lib/iomgr/timer.h +++ b/src/core/lib/iomgr/timer.h @@ -89,6 +89,12 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer); /* iomgr internal api for dealing with timers */ +typedef enum { + GRPC_TIMERS_NOT_CHECKED, + GRPC_TIMERS_CHECKED_AND_EMPTY, + GRPC_TIMERS_FIRED, +} grpc_timer_check_result; + /* Check for timers to be run, and run them. Return true if timer callbacks were executed. If next is non-null, TRY to update *next with the next running timer @@ -96,8 +102,8 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer); *next is never guaranteed to be updated on any given execution; however, with high probability at least one thread in the system will see an update at any time slice. */ -bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, - gpr_timespec *next); +grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, + gpr_timespec now, gpr_timespec *next); void grpc_timer_list_init(gpr_timespec now); void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx); diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index b28340b71c..288782c060 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -101,8 +101,10 @@ static gpr_atm saturating_add(gpr_atm a, gpr_atm b) { return a + b; } -static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, - gpr_atm *next, grpc_error *error); +static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx, + gpr_atm now, + gpr_atm *next, + grpc_error *error); static gpr_timespec dbl_to_ts(double d) { gpr_timespec ts; @@ -421,9 +423,11 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, return n; } -static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, - gpr_atm *next, grpc_error *error) { - size_t n = 0; +static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx, + gpr_atm now, + gpr_atm *next, + grpc_error *error) { + grpc_timer_check_result result = GRPC_TIMERS_NOT_CHECKED; gpr_atm min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer); gpr_tls_set(&g_last_seen_min_timer, min_timer); @@ -434,6 +438,7 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, if (gpr_spinlock_trylock(&g_shared_mutables.checker_mu)) { gpr_mu_lock(&g_shared_mutables.mu); + result = GRPC_TIMERS_CHECKED_AND_EMPTY; if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, " .. shard[%d]->min_deadline = %" PRIdPTR, @@ -448,14 +453,17 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, /* For efficiency, we pop as many available timers as we can from the shard. This may violate perfect timer deadline ordering, but that shouldn't be a big deal because we don't make ordering guarantees. */ - n += - pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, error); + if (pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, + error) > 0) { + result = GRPC_TIMERS_FIRED; + } if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_log(GPR_DEBUG, " .. popped --> %" PRIdPTR - ", shard[%d]->min_deadline %" PRIdPTR - " --> %" PRIdPTR ", now=%" PRIdPTR, - n, (int)(g_shard_queue[0] - g_shards), + gpr_log(GPR_DEBUG, + " .. result --> %d" + ", shard[%d]->min_deadline %" PRIdPTR " --> %" PRIdPTR + ", now=%" PRIdPTR, + result, (int)(g_shard_queue[0] - g_shards), g_shard_queue[0]->min_deadline, new_min_deadline, now); } @@ -476,26 +484,15 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now, g_shard_queue[0]->min_deadline); gpr_mu_unlock(&g_shared_mutables.mu); gpr_spinlock_unlock(&g_shared_mutables.checker_mu); - } else if (next != NULL) { - /* TODO(ctiller): this forces calling code to do an short poll, and - then retry the timer check (because this time through the timer list was - contended). - - We could reduce the cost here dramatically by keeping a count of how - many currently active pollers got through the uncontended case above - successfully, and waking up other pollers IFF that count drops to zero. - - Once that count is in place, this entire else branch could disappear. */ - *next = GPR_MIN(*next, now + 1); } GRPC_ERROR_UNREF(error); - return (int)n; + return result; } -bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, - gpr_timespec *next) { +grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, + gpr_timespec now, gpr_timespec *next) { // prelude GPR_ASSERT(now.clock_type == g_clock_type); gpr_atm now_atm = timespec_to_atm_round_down(now); @@ -513,7 +510,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, "TIMER CHECK SKIP: now_atm=%" PRIdPTR " min_timer=%" PRIdPTR, now_atm, min_timer); } - return 0; + return GRPC_TIMERS_CHECKED_AND_EMPTY; } grpc_error *shutdown_error = @@ -538,7 +535,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_free(next_str); } // actual code - bool r; + grpc_timer_check_result r; gpr_atm next_atm; if (next == NULL) { r = run_some_expired_timers(exec_ctx, now_atm, NULL, shutdown_error); @@ -560,7 +557,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, next_str); gpr_free(next_str); } - return r > 0; + return r; } #endif /* GRPC_TIMER_USE_GENERIC */ diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c index 24085093e7..5fb3102f38 100644 --- a/src/core/lib/iomgr/timer_manager.c +++ b/src/core/lib/iomgr/timer_manager.c @@ -107,86 +107,119 @@ void grpc_timer_manager_tick() { grpc_exec_ctx_finish(&exec_ctx); } -static void timer_thread(void *unused) { - // this threads exec_ctx: we try to run things through to completion here - // since it's easy to spin up new threads - grpc_exec_ctx exec_ctx = - GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); +static void run_some_timers(grpc_exec_ctx *exec_ctx) { + // if there's something to execute... + gpr_mu_lock(&g_mu); + // remove a waiter from the pool, and start another thread if necessary + --g_waiter_count; + if (g_waiter_count == 0 && g_threaded) { + start_timer_thread_and_unlock(); + } else { + // if there's no thread waiting with a timeout, kick an existing + // waiter + // so that the next deadline is not missed + if (!g_has_timed_waiter) { + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "kick untimed waiter"); + } + gpr_cv_signal(&g_cv_wait); + } + gpr_mu_unlock(&g_mu); + } + // without our lock, flush the exec_ctx + grpc_exec_ctx_flush(exec_ctx); + gpr_mu_lock(&g_mu); + // garbage collect any threads hanging out that are dead + gc_completed_threads(); + // get ready to wait again + ++g_waiter_count; + gpr_mu_unlock(&g_mu); +} + +// wait until 'next' (or forever if there is already a timed waiter in the pool) +// returns true if the thread should continue executing (false if it should +// shutdown) +static bool wait_until(gpr_timespec next) { + const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC); + gpr_mu_lock(&g_mu); + // if we're not threaded anymore, leave + if (!g_threaded) { + gpr_mu_unlock(&g_mu); + return false; + } + // if there's no timed waiter, we should become one: that waiter waits + // only until the next timer should expire + // all other timers wait forever + uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1; + if (!g_has_timed_waiter && gpr_time_cmp(next, inf_future) != 0) { + g_has_timed_waiter = true; + // we use a generation counter to track the timed waiter so we can + // cancel an existing one quickly (and when it actually times out it'll + // figure stuff out instead of incurring a wakeup) + my_timed_waiter_generation = ++g_timed_waiter_generation; + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "sleep for a while"); + } + } else { + next = inf_future; + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "sleep until kicked"); + } + } + gpr_cv_wait(&g_cv_wait, &g_mu, next); + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", + my_timed_waiter_generation == g_timed_waiter_generation, g_kicked); + } + // if this was the timed waiter, then we need to check timers, and flag + // that there's now no timed waiter... we'll look for a replacement if + // there's work to do after checking timers (code above) + if (my_timed_waiter_generation == g_timed_waiter_generation) { + g_has_timed_waiter = false; + } + // if this was a kick from the timer system, consume it (and don't stop + // this thread yet) + if (g_kicked) { + grpc_timer_consume_kick(); + g_kicked = false; + } + gpr_mu_unlock(&g_mu); + return true; +} + +static void timer_main_loop(grpc_exec_ctx *exec_ctx) { const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC); for (;;) { gpr_timespec next = inf_future; gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); // check timer state, updates next to the next time to run a check - if (grpc_timer_check(&exec_ctx, now, &next)) { - // if there's something to execute... - gpr_mu_lock(&g_mu); - // remove a waiter from the pool, and start another thread if necessary - --g_waiter_count; - if (g_waiter_count == 0 && g_threaded) { - start_timer_thread_and_unlock(); - } else { - // if there's no thread waiting with a timeout, kick an existing waiter - // so that the next deadline is not missed - if (!g_has_timed_waiter) { - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_log(GPR_DEBUG, "kick untimed waiter"); - } - gpr_cv_signal(&g_cv_wait); - } - gpr_mu_unlock(&g_mu); - } - // without our lock, flush the exec_ctx - grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(&g_mu); - // garbage collect any threads hanging out that are dead - gc_completed_threads(); - // get ready to wait again - ++g_waiter_count; - gpr_mu_unlock(&g_mu); - } else { - gpr_mu_lock(&g_mu); - // if we're not threaded anymore, leave - if (!g_threaded) break; - // if there's no timed waiter, we should become one: that waiter waits - // only until the next timer should expire - // all other timers wait forever - uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1; - if (!g_has_timed_waiter) { - g_has_timed_waiter = true; - // we use a generation counter to track the timed waiter so we can - // cancel an existing one quickly (and when it actually times out it'll - // figure stuff out instead of incurring a wakeup) - my_timed_waiter_generation = ++g_timed_waiter_generation; - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_log(GPR_DEBUG, "sleep for a while"); - } - } else { + switch (grpc_timer_check(exec_ctx, now, &next)) { + case GRPC_TIMERS_FIRED: + run_some_timers(exec_ctx); + break; + case GRPC_TIMERS_NOT_CHECKED: + /* This case only happens under contention, meaning more than one timer + manager thread checked timers concurrently. + + If that happens, we're guaranteed that some other thread has just + checked timers, and this will avalanche into some other thread seeing + empty timers and doing a timed sleep. + + Consequently, we can just sleep forever here and be happy at some + saved wakeup cycles. */ next = inf_future; - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_log(GPR_DEBUG, "sleep until kicked"); + /* fall through */ + case GRPC_TIMERS_CHECKED_AND_EMPTY: + if (!wait_until(next)) { + return; } - } - gpr_cv_wait(&g_cv_wait, &g_mu, next); - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", - my_timed_waiter_generation == g_timed_waiter_generation, - g_kicked); - } - // if this was the timed waiter, then we need to check timers, and flag - // that there's now no timed waiter... we'll look for a replacement if - // there's work to do after checking timers (code above) - if (my_timed_waiter_generation == g_timed_waiter_generation) { - g_has_timed_waiter = false; - } - // if this was a kick from the timer system, consume it (and don't stop - // this thread yet) - if (g_kicked) { - grpc_timer_consume_kick(); - g_kicked = false; - } - gpr_mu_unlock(&g_mu); + break; } } +} + +static void timer_thread_cleanup(void) { + gpr_mu_lock(&g_mu); // terminate the thread: drop the waiter count, thread count, and let whomever // stopped the threading stuff know that we're done --g_waiter_count; @@ -199,12 +232,21 @@ static void timer_thread(void *unused) { ct->next = g_completed_threads; g_completed_threads = ct; gpr_mu_unlock(&g_mu); - grpc_exec_ctx_finish(&exec_ctx); if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "End timer thread"); } } +static void timer_thread(void *unused) { + // this threads exec_ctx: we try to run things through to completion here + // since it's easy to spin up new threads + grpc_exec_ctx exec_ctx = + GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); + timer_main_loop(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); + timer_thread_cleanup(); +} + static void start_threads(void) { gpr_mu_lock(&g_mu); if (!g_threaded) { diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index 2952e44b58..967e84eb14 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -96,9 +96,9 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { } } -bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, - gpr_timespec *next) { - return false; +grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, + gpr_timespec now, gpr_timespec *next) { + return GRPC_TIMERS_NOT_CHECKED; } void grpc_timer_list_init(gpr_timespec now) {} diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c index 30431a4e4a..416a3bdb35 100644 --- a/src/core/lib/security/transport/security_connector.c +++ b/src/core/lib/security/transport/security_connector.c @@ -56,6 +56,7 @@ #include "src/core/lib/support/string.h" #include "src/core/tsi/fake_transport_security.h" #include "src/core/tsi/ssl_transport_security.h" +#include "src/core/tsi/transport_security_adapter.h" /* -- Constants. -- */ @@ -390,7 +391,8 @@ static void fake_channel_add_handshakers( grpc_handshake_manager_add( handshake_mgr, grpc_security_handshaker_create( - exec_ctx, tsi_create_fake_handshaker(true /* is_client */), + exec_ctx, tsi_create_adapter_handshaker( + tsi_create_fake_handshaker(true /* is_client */)), &sc->base)); } @@ -400,7 +402,8 @@ static void fake_server_add_handshakers(grpc_exec_ctx *exec_ctx, grpc_handshake_manager_add( handshake_mgr, grpc_security_handshaker_create( - exec_ctx, tsi_create_fake_handshaker(false /* is_client */), + exec_ctx, tsi_create_adapter_handshaker( + tsi_create_fake_handshaker(false /* is_client */)), &sc->base)); } @@ -495,8 +498,10 @@ static void ssl_channel_add_handshakers(grpc_exec_ctx *exec_ctx, } // Create handshakers. - grpc_handshake_manager_add(handshake_mgr, grpc_security_handshaker_create( - exec_ctx, tsi_hs, &sc->base)); + grpc_handshake_manager_add( + handshake_mgr, + grpc_security_handshaker_create( + exec_ctx, tsi_create_adapter_handshaker(tsi_hs), &sc->base)); } static void ssl_server_add_handshakers(grpc_exec_ctx *exec_ctx, @@ -515,8 +520,10 @@ static void ssl_server_add_handshakers(grpc_exec_ctx *exec_ctx, } // Create handshakers. - grpc_handshake_manager_add(handshake_mgr, grpc_security_handshaker_create( - exec_ctx, tsi_hs, &sc->base)); + grpc_handshake_manager_add( + handshake_mgr, + grpc_security_handshaker_create( + exec_ctx, tsi_create_adapter_handshaker(tsi_hs), &sc->base)); } static int ssl_host_matches_name(const tsi_peer *peer, const char *peer_name) { diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index 509b4b556d..3bc113e20f 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -71,12 +71,12 @@ typedef struct { unsigned char *handshake_buffer; size_t handshake_buffer_size; - grpc_slice_buffer left_overs; grpc_slice_buffer outgoing; grpc_closure on_handshake_data_sent_to_peer; grpc_closure on_handshake_data_received_from_peer; grpc_closure on_peer_checked; grpc_auth_context *auth_context; + tsi_handshaker_result *handshaker_result; } security_handshaker; static void security_handshaker_unref(grpc_exec_ctx *exec_ctx, @@ -84,6 +84,7 @@ static void security_handshaker_unref(grpc_exec_ctx *exec_ctx, if (gpr_unref(&h->refs)) { gpr_mu_destroy(&h->mu); tsi_handshaker_destroy(h->handshaker); + tsi_handshaker_result_destroy(h->handshaker_result); if (h->endpoint_to_destroy != NULL) { grpc_endpoint_destroy(exec_ctx, h->endpoint_to_destroy); } @@ -92,7 +93,6 @@ static void security_handshaker_unref(grpc_exec_ctx *exec_ctx, gpr_free(h->read_buffer_to_destroy); } gpr_free(h->handshake_buffer); - grpc_slice_buffer_destroy_internal(exec_ctx, &h->left_overs); grpc_slice_buffer_destroy_internal(exec_ctx, &h->outgoing); GRPC_AUTH_CONTEXT_UNREF(h->auth_context, "handshake"); GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, h->connector, "handshake"); @@ -150,10 +150,10 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, security_handshake_failed_locked(exec_ctx, h, GRPC_ERROR_REF(error)); goto done; } - // Get frame protector. + // Create frame protector. tsi_frame_protector *protector; - tsi_result result = - tsi_handshaker_create_frame_protector(h->handshaker, NULL, &protector); + tsi_result result = tsi_handshaker_result_create_frame_protector( + h->handshaker_result, NULL, &protector); if (result != TSI_OK) { error = grpc_set_tsi_error_result( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Frame protector creation failed"), @@ -161,14 +161,25 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, security_handshake_failed_locked(exec_ctx, h, error); goto done; } - // Success. + // Get unused bytes. + unsigned char *unused_bytes = NULL; + size_t unused_bytes_size = 0; + result = tsi_handshaker_result_get_unused_bytes( + h->handshaker_result, &unused_bytes, &unused_bytes_size); // Create secure endpoint. - h->args->endpoint = grpc_secure_endpoint_create( - protector, h->args->endpoint, h->left_overs.slices, h->left_overs.count); - h->left_overs.count = 0; - h->left_overs.length = 0; - // Clear out the read buffer before it gets passed to the transport, - // since any excess bytes were already copied to h->left_overs. + if (unused_bytes_size > 0) { + grpc_slice slice = + grpc_slice_from_copied_buffer((char *)unused_bytes, unused_bytes_size); + h->args->endpoint = + grpc_secure_endpoint_create(protector, h->args->endpoint, &slice, 1); + grpc_slice_unref_internal(exec_ctx, slice); + } else { + h->args->endpoint = + grpc_secure_endpoint_create(protector, h->args->endpoint, NULL, 0); + } + tsi_handshaker_result_destroy(h->handshaker_result); + h->handshaker_result = NULL; + // Clear out the read buffer before it gets passed to the transport. grpc_slice_buffer_reset_and_unref_internal(exec_ctx, h->args->read_buffer); // Add auth context to channel args. grpc_arg auth_context_arg = grpc_auth_context_to_arg(h->auth_context); @@ -189,7 +200,8 @@ done: static grpc_error *check_peer_locked(grpc_exec_ctx *exec_ctx, security_handshaker *h) { tsi_peer peer; - tsi_result result = tsi_handshaker_extract_peer(h->handshaker, &peer); + tsi_result result = + tsi_handshaker_result_extract_peer(h->handshaker_result, &peer); if (result != TSI_OK) { return grpc_set_tsi_error_result( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Peer extraction failed"), result); @@ -199,34 +211,87 @@ static grpc_error *check_peer_locked(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } -static grpc_error *send_handshake_bytes_to_peer_locked(grpc_exec_ctx *exec_ctx, - security_handshaker *h) { - // Get data to send. - tsi_result result = TSI_OK; - size_t offset = 0; - do { - size_t to_send_size = h->handshake_buffer_size - offset; - result = tsi_handshaker_get_bytes_to_send_to_peer( - h->handshaker, h->handshake_buffer + offset, &to_send_size); - offset += to_send_size; - if (result == TSI_INCOMPLETE_DATA) { - h->handshake_buffer_size *= 2; - h->handshake_buffer = - gpr_realloc(h->handshake_buffer, h->handshake_buffer_size); - } - } while (result == TSI_INCOMPLETE_DATA); +static grpc_error *on_handshake_next_done_locked( + grpc_exec_ctx *exec_ctx, security_handshaker *h, tsi_result result, + const unsigned char *bytes_to_send, size_t bytes_to_send_size, + tsi_handshaker_result *handshaker_result) { + grpc_error *error = GRPC_ERROR_NONE; + // Read more if we need to. + if (result == TSI_INCOMPLETE_DATA) { + GPR_ASSERT(bytes_to_send_size == 0); + grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, + &h->on_handshake_data_received_from_peer); + return error; + } if (result != TSI_OK) { return grpc_set_tsi_error_result( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshake failed"), result); } - // Send data. - grpc_slice to_send = - grpc_slice_from_copied_buffer((const char *)h->handshake_buffer, offset); - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &h->outgoing); - grpc_slice_buffer_add(&h->outgoing, to_send); - grpc_endpoint_write(exec_ctx, h->args->endpoint, &h->outgoing, - &h->on_handshake_data_sent_to_peer); - return GRPC_ERROR_NONE; + // Update handshaker result. + if (handshaker_result != NULL) { + GPR_ASSERT(h->handshaker_result == NULL); + h->handshaker_result = handshaker_result; + } + if (bytes_to_send_size > 0) { + // Send data to peer, if needed. + grpc_slice to_send = grpc_slice_from_copied_buffer( + (const char *)bytes_to_send, bytes_to_send_size); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &h->outgoing); + grpc_slice_buffer_add(&h->outgoing, to_send); + grpc_endpoint_write(exec_ctx, h->args->endpoint, &h->outgoing, + &h->on_handshake_data_sent_to_peer); + } else if (handshaker_result == NULL) { + // There is nothing to send, but need to read from peer. + grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, + &h->on_handshake_data_received_from_peer); + } else { + // Handshake has finished, check peer and so on. + error = check_peer_locked(exec_ctx, h); + } + return error; +} + +static void on_handshake_next_done_grpc_wrapper( + tsi_result result, void *user_data, const unsigned char *bytes_to_send, + size_t bytes_to_send_size, tsi_handshaker_result *handshaker_result) { + security_handshaker *h = user_data; + // This callback will be invoked by TSI in a non-grpc thread, so it's + // safe to create our own exec_ctx here. + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_mu_lock(&h->mu); + grpc_error *error = + on_handshake_next_done_locked(&exec_ctx, h, result, bytes_to_send, + bytes_to_send_size, handshaker_result); + if (error != GRPC_ERROR_NONE) { + security_handshake_failed_locked(&exec_ctx, h, error); + gpr_mu_unlock(&h->mu); + security_handshaker_unref(&exec_ctx, h); + } else { + gpr_mu_unlock(&h->mu); + } + grpc_exec_ctx_finish(&exec_ctx); +} + +static grpc_error *do_handshaker_next_locked( + grpc_exec_ctx *exec_ctx, security_handshaker *h, + const unsigned char *bytes_received, size_t bytes_received_size) { + // Invoke TSI handshaker. + unsigned char *bytes_to_send = NULL; + size_t bytes_to_send_size = 0; + tsi_handshaker_result *handshaker_result = NULL; + tsi_result result = tsi_handshaker_next( + h->handshaker, bytes_received, bytes_received_size, &bytes_to_send, + &bytes_to_send_size, &handshaker_result, + &on_handshake_next_done_grpc_wrapper, h); + if (result == TSI_ASYNC) { + // Handshaker operating asynchronously. Nothing else to do here; + // callback will be invoked in a TSI thread. + return GRPC_ERROR_NONE; + } + // Handshaker returned synchronously. Invoke callback directly in + // this thread with our existing exec_ctx. + return on_handshake_next_done_locked(exec_ctx, h, result, bytes_to_send, + bytes_to_send_size, handshaker_result); } static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, @@ -241,72 +306,34 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, security_handshaker_unref(exec_ctx, h); return; } - // Process received data. - tsi_result result = TSI_OK; - size_t consumed_slice_size = 0; + // Copy all slices received. size_t i; + size_t bytes_received_size = 0; for (i = 0; i < h->args->read_buffer->count; i++) { - consumed_slice_size = GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i]); - result = tsi_handshaker_process_bytes_from_peer( - h->handshaker, GRPC_SLICE_START_PTR(h->args->read_buffer->slices[i]), - &consumed_slice_size); - if (!tsi_handshaker_is_in_progress(h->handshaker)) break; + bytes_received_size += GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i]); } - if (tsi_handshaker_is_in_progress(h->handshaker)) { - /* We may need more data. */ - if (result == TSI_INCOMPLETE_DATA) { - grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, - &h->on_handshake_data_received_from_peer); - goto done; - } else { - error = send_handshake_bytes_to_peer_locked(exec_ctx, h); - if (error != GRPC_ERROR_NONE) { - security_handshake_failed_locked(exec_ctx, h, error); - gpr_mu_unlock(&h->mu); - security_handshaker_unref(exec_ctx, h); - return; - } - goto done; - } + if (bytes_received_size > h->handshake_buffer_size) { + h->handshake_buffer = gpr_realloc(h->handshake_buffer, bytes_received_size); + h->handshake_buffer_size = bytes_received_size; } - if (result != TSI_OK) { - security_handshake_failed_locked( - exec_ctx, h, - grpc_set_tsi_error_result( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshake failed"), result)); - gpr_mu_unlock(&h->mu); - security_handshaker_unref(exec_ctx, h); - return; - } - /* Handshake is done and successful this point. */ - bool has_left_overs_in_current_slice = - (consumed_slice_size < - GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i])); - size_t num_left_overs = (has_left_overs_in_current_slice ? 1 : 0) + - h->args->read_buffer->count - i - 1; - if (num_left_overs > 0) { - /* Put the leftovers in our buffer (ownership transfered). */ - if (has_left_overs_in_current_slice) { - grpc_slice tail = grpc_slice_split_tail(&h->args->read_buffer->slices[i], - consumed_slice_size); - grpc_slice_buffer_add(&h->left_overs, tail); - /* split_tail above increments refcount. */ - grpc_slice_unref_internal(exec_ctx, tail); - } - grpc_slice_buffer_addn( - &h->left_overs, &h->args->read_buffer->slices[i + 1], - num_left_overs - (size_t)has_left_overs_in_current_slice); + size_t offset = 0; + for (i = 0; i < h->args->read_buffer->count; i++) { + size_t slice_size = GPR_SLICE_LENGTH(h->args->read_buffer->slices[i]); + memcpy(h->handshake_buffer + offset, + GRPC_SLICE_START_PTR(h->args->read_buffer->slices[i]), slice_size); + offset += slice_size; } - // Check peer. - error = check_peer_locked(exec_ctx, h); + // Call TSI handshaker. + error = do_handshaker_next_locked(exec_ctx, h, h->handshake_buffer, + bytes_received_size); + if (error != GRPC_ERROR_NONE) { security_handshake_failed_locked(exec_ctx, h, error); gpr_mu_unlock(&h->mu); security_handshaker_unref(exec_ctx, h); - return; + } else { + gpr_mu_unlock(&h->mu); } -done: - gpr_mu_unlock(&h->mu); } static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *arg, @@ -321,8 +348,8 @@ static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *arg, security_handshaker_unref(exec_ctx, h); return; } - /* We may be done. */ - if (tsi_handshaker_is_in_progress(h->handshaker)) { + // We may be done. + if (h->handshaker_result == NULL) { grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, &h->on_handshake_data_received_from_peer); } else { @@ -371,7 +398,7 @@ static void security_handshaker_do_handshake(grpc_exec_ctx *exec_ctx, h->args = args; h->on_handshake_done = on_handshake_done; gpr_ref(&h->refs); - grpc_error *error = send_handshake_bytes_to_peer_locked(exec_ctx, h); + grpc_error *error = do_handshaker_next_locked(exec_ctx, h, NULL, 0); if (error != GRPC_ERROR_NONE) { security_handshake_failed_locked(exec_ctx, h, error); gpr_mu_unlock(&h->mu); @@ -404,7 +431,6 @@ static grpc_handshaker *security_handshaker_create( grpc_schedule_on_exec_ctx); grpc_closure_init(&h->on_peer_checked, on_peer_checked, h, grpc_schedule_on_exec_ctx); - grpc_slice_buffer_init(&h->left_overs); grpc_slice_buffer_init(&h->outgoing); return &h->base; } |