From ad4d2dde0052efbbf49d64b0843c45f0381cfeb3 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Wed, 6 Dec 2017 09:05:05 -0800 Subject: Revert "All instances of exec_ctx being passed around in src/core removed" --- src/core/lib/security/transport/secure_endpoint.cc | 120 ++++++++++++--------- 1 file changed, 67 insertions(+), 53 deletions(-) (limited to 'src/core/lib/security/transport/secure_endpoint.cc') diff --git a/src/core/lib/security/transport/secure_endpoint.cc b/src/core/lib/security/transport/secure_endpoint.cc index e5c089de9c..4cd317a06d 100644 --- a/src/core/lib/security/transport/secure_endpoint.cc +++ b/src/core/lib/security/transport/secure_endpoint.cc @@ -63,27 +63,28 @@ typedef struct { grpc_core::TraceFlag grpc_trace_secure_endpoint(false, "secure_endpoint"); -static void destroy(secure_endpoint* secure_ep) { +static void destroy(grpc_exec_ctx* exec_ctx, secure_endpoint* secure_ep) { secure_endpoint* ep = secure_ep; - grpc_endpoint_destroy(ep->wrapped_ep); + grpc_endpoint_destroy(exec_ctx, ep->wrapped_ep); tsi_frame_protector_destroy(ep->protector); - tsi_zero_copy_grpc_protector_destroy(ep->zero_copy_protector); - grpc_slice_buffer_destroy_internal(&ep->leftover_bytes); - grpc_slice_unref_internal(ep->read_staging_buffer); - grpc_slice_unref_internal(ep->write_staging_buffer); - grpc_slice_buffer_destroy_internal(&ep->output_buffer); - grpc_slice_buffer_destroy_internal(&ep->source_buffer); + tsi_zero_copy_grpc_protector_destroy(exec_ctx, ep->zero_copy_protector); + grpc_slice_buffer_destroy_internal(exec_ctx, &ep->leftover_bytes); + grpc_slice_unref_internal(exec_ctx, ep->read_staging_buffer); + grpc_slice_unref_internal(exec_ctx, ep->write_staging_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, &ep->output_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, &ep->source_buffer); gpr_mu_destroy(&ep->protector_mu); gpr_free(ep); } #ifndef NDEBUG -#define SECURE_ENDPOINT_UNREF(ep, reason) \ - secure_endpoint_unref((ep), (reason), __FILE__, __LINE__) +#define SECURE_ENDPOINT_UNREF(exec_ctx, ep, reason) \ + secure_endpoint_unref((exec_ctx), (ep), (reason), __FILE__, __LINE__) #define SECURE_ENDPOINT_REF(ep, reason) \ secure_endpoint_ref((ep), (reason), __FILE__, __LINE__) -static void secure_endpoint_unref(secure_endpoint* ep, const char* reason, - const char* file, int line) { +static void secure_endpoint_unref(grpc_exec_ctx* exec_ctx, secure_endpoint* ep, + const char* reason, const char* file, + int line) { if (grpc_trace_secure_endpoint.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&ep->ref.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, @@ -91,7 +92,7 @@ static void secure_endpoint_unref(secure_endpoint* ep, const char* reason, val - 1); } if (gpr_unref(&ep->ref)) { - destroy(ep); + destroy(exec_ctx, ep); } } @@ -106,11 +107,13 @@ static void secure_endpoint_ref(secure_endpoint* ep, const char* reason, gpr_ref(&ep->ref); } #else -#define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep)) +#define SECURE_ENDPOINT_UNREF(exec_ctx, ep, reason) \ + secure_endpoint_unref((exec_ctx), (ep)) #define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep)) -static void secure_endpoint_unref(secure_endpoint* ep) { +static void secure_endpoint_unref(grpc_exec_ctx* exec_ctx, + secure_endpoint* ep) { if (gpr_unref(&ep->ref)) { - destroy(ep); + destroy(exec_ctx, ep); } } @@ -125,7 +128,8 @@ static void flush_read_staging_buffer(secure_endpoint* ep, uint8_t** cur, *end = GRPC_SLICE_END_PTR(ep->read_staging_buffer); } -static void call_read_cb(secure_endpoint* ep, grpc_error* error) { +static void call_read_cb(grpc_exec_ctx* exec_ctx, secure_endpoint* ep, + grpc_error* error) { if (grpc_trace_secure_endpoint.enabled()) { size_t i; for (i = 0; i < ep->read_buffer->count; i++) { @@ -136,11 +140,12 @@ static void call_read_cb(secure_endpoint* ep, grpc_error* error) { } } ep->read_buffer = nullptr; - GRPC_CLOSURE_SCHED(ep->read_cb, error); - SECURE_ENDPOINT_UNREF(ep, "read"); + GRPC_CLOSURE_SCHED(exec_ctx, ep->read_cb, error); + SECURE_ENDPOINT_UNREF(exec_ctx, ep, "read"); } -static void on_read(void* user_data, grpc_error* error) { +static void on_read(grpc_exec_ctx* exec_ctx, void* user_data, + grpc_error* error) { unsigned i; uint8_t keep_looping = 0; tsi_result result = TSI_OK; @@ -149,16 +154,17 @@ static void on_read(void* user_data, grpc_error* error) { uint8_t* end = GRPC_SLICE_END_PTR(ep->read_staging_buffer); if (error != GRPC_ERROR_NONE) { - grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer); - call_read_cb(ep, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Secure read failed", &error, 1)); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, ep->read_buffer); + call_read_cb(exec_ctx, ep, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Secure read failed", &error, 1)); return; } if (ep->zero_copy_protector != nullptr) { // Use zero-copy grpc protector to unprotect. result = tsi_zero_copy_grpc_protector_unprotect( - ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer); + exec_ctx, ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer); } else { // Use frame protector to unprotect. /* TODO(yangg) check error, maybe bail out early */ @@ -211,35 +217,37 @@ static void on_read(void* user_data, grpc_error* error) { /* TODO(yangg) experiment with moving this block after read_cb to see if it helps latency */ - grpc_slice_buffer_reset_and_unref_internal(&ep->source_buffer); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &ep->source_buffer); if (result != TSI_OK) { - grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, ep->read_buffer); call_read_cb( - ep, grpc_set_tsi_error_result( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unwrap failed"), result)); + exec_ctx, ep, + grpc_set_tsi_error_result( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unwrap failed"), result)); return; } - call_read_cb(ep, GRPC_ERROR_NONE); + call_read_cb(exec_ctx, ep, GRPC_ERROR_NONE); } -static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, - grpc_closure* cb) { +static void endpoint_read(grpc_exec_ctx* exec_ctx, grpc_endpoint* secure_ep, + grpc_slice_buffer* slices, grpc_closure* cb) { secure_endpoint* ep = (secure_endpoint*)secure_ep; ep->read_cb = cb; ep->read_buffer = slices; - grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, ep->read_buffer); SECURE_ENDPOINT_REF(ep, "read"); if (ep->leftover_bytes.count) { grpc_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer); GPR_ASSERT(ep->leftover_bytes.count == 0); - on_read(ep, GRPC_ERROR_NONE); + on_read(exec_ctx, ep, GRPC_ERROR_NONE); return; } - grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read); + grpc_endpoint_read(exec_ctx, ep->wrapped_ep, &ep->source_buffer, + &ep->on_read); } static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur, @@ -250,8 +258,8 @@ static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur, *end = GRPC_SLICE_END_PTR(ep->write_staging_buffer); } -static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, - grpc_closure* cb) { +static void endpoint_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* secure_ep, + grpc_slice_buffer* slices, grpc_closure* cb) { GPR_TIMER_BEGIN("secure_endpoint.endpoint_write", 0); unsigned i; @@ -260,7 +268,7 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, uint8_t* cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer); uint8_t* end = GRPC_SLICE_END_PTR(ep->write_staging_buffer); - grpc_slice_buffer_reset_and_unref_internal(&ep->output_buffer); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &ep->output_buffer); if (grpc_trace_secure_endpoint.enabled()) { for (i = 0; i < slices->count; i++) { @@ -273,8 +281,8 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, if (ep->zero_copy_protector != nullptr) { // Use zero-copy grpc protector to protect. - result = tsi_zero_copy_grpc_protector_protect(ep->zero_copy_protector, - slices, &ep->output_buffer); + result = tsi_zero_copy_grpc_protector_protect( + exec_ctx, ep->zero_copy_protector, slices, &ep->output_buffer); } else { // Use frame protector to protect. for (i = 0; i < slices->count; i++) { @@ -332,44 +340,50 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, if (result != TSI_OK) { /* TODO(yangg) do different things according to the error type? */ - grpc_slice_buffer_reset_and_unref_internal(&ep->output_buffer); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &ep->output_buffer); GRPC_CLOSURE_SCHED( - cb, grpc_set_tsi_error_result( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Wrap failed"), result)); + exec_ctx, cb, + grpc_set_tsi_error_result( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Wrap failed"), result)); GPR_TIMER_END("secure_endpoint.endpoint_write", 0); return; } - grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb); + grpc_endpoint_write(exec_ctx, ep->wrapped_ep, &ep->output_buffer, cb); GPR_TIMER_END("secure_endpoint.endpoint_write", 0); } -static void endpoint_shutdown(grpc_endpoint* secure_ep, grpc_error* why) { +static void endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* secure_ep, + grpc_error* why) { secure_endpoint* ep = (secure_endpoint*)secure_ep; - grpc_endpoint_shutdown(ep->wrapped_ep, why); + grpc_endpoint_shutdown(exec_ctx, ep->wrapped_ep, why); } -static void endpoint_destroy(grpc_endpoint* secure_ep) { +static void endpoint_destroy(grpc_exec_ctx* exec_ctx, + grpc_endpoint* secure_ep) { secure_endpoint* ep = (secure_endpoint*)secure_ep; - SECURE_ENDPOINT_UNREF(ep, "destroy"); + SECURE_ENDPOINT_UNREF(exec_ctx, ep, "destroy"); } -static void endpoint_add_to_pollset(grpc_endpoint* secure_ep, +static void endpoint_add_to_pollset(grpc_exec_ctx* exec_ctx, + grpc_endpoint* secure_ep, grpc_pollset* pollset) { secure_endpoint* ep = (secure_endpoint*)secure_ep; - grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset); + grpc_endpoint_add_to_pollset(exec_ctx, ep->wrapped_ep, pollset); } -static void endpoint_add_to_pollset_set(grpc_endpoint* secure_ep, +static void endpoint_add_to_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_endpoint* secure_ep, grpc_pollset_set* pollset_set) { secure_endpoint* ep = (secure_endpoint*)secure_ep; - grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set); + grpc_endpoint_add_to_pollset_set(exec_ctx, ep->wrapped_ep, pollset_set); } -static void endpoint_delete_from_pollset_set(grpc_endpoint* secure_ep, +static void endpoint_delete_from_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_endpoint* secure_ep, grpc_pollset_set* pollset_set) { secure_endpoint* ep = (secure_endpoint*)secure_ep; - grpc_endpoint_delete_from_pollset_set(ep->wrapped_ep, pollset_set); + grpc_endpoint_delete_from_pollset_set(exec_ctx, ep->wrapped_ep, pollset_set); } static char* endpoint_get_peer(grpc_endpoint* secure_ep) { -- cgit v1.2.3