diff options
author | 2017-12-06 09:05:05 -0800 | |
---|---|---|
committer | 2017-12-06 09:05:05 -0800 | |
commit | ad4d2dde0052efbbf49d64b0843c45f0381cfeb3 (patch) | |
tree | 6a657f8c6179d873b34505cdc24bce9462ca68eb /src/core/lib/security/transport | |
parent | a3df36cc2505a89c2f481eea4a66a87b3002844a (diff) |
Revert "All instances of exec_ctx being passed around in src/core removed"
Diffstat (limited to 'src/core/lib/security/transport')
8 files changed, 372 insertions, 281 deletions
diff --git a/src/core/lib/security/transport/client_auth_filter.cc b/src/core/lib/security/transport/client_auth_filter.cc index cd3c2e3f19..326f4d7773 100644 --- a/src/core/lib/security/transport/client_auth_filter.cc +++ b/src/core/lib/security/transport/client_auth_filter.cc @@ -90,7 +90,8 @@ static void add_error(grpc_error** combined, grpc_error* error) { *combined = grpc_error_add_child(*combined, error); } -static void on_credentials_metadata(void* arg, grpc_error* input_error) { +static void on_credentials_metadata(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* input_error) { grpc_transport_stream_op_batch* batch = (grpc_transport_stream_op_batch*)arg; grpc_call_element* elem = (grpc_call_element*)batch->handler_private.extra_arg; @@ -104,16 +105,16 @@ static void on_credentials_metadata(void* arg, grpc_error* input_error) { batch->payload->send_initial_metadata.send_initial_metadata; for (size_t i = 0; i < calld->md_array.size; ++i) { add_error(&error, grpc_metadata_batch_add_tail( - mdb, &calld->md_links[i], + exec_ctx, mdb, &calld->md_links[i], GRPC_MDELEM_REF(calld->md_array.md[i]))); } } if (error == GRPC_ERROR_NONE) { - grpc_call_next_op(elem, batch); + grpc_call_next_op(exec_ctx, elem, batch); } else { error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED); - grpc_transport_stream_op_batch_finish_with_failure(batch, error, + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error, calld->call_combiner); } } @@ -155,17 +156,20 @@ void grpc_auth_metadata_context_build( gpr_free(host_and_port); } -static void cancel_get_request_metadata(void* arg, grpc_error* error) { +static void cancel_get_request_metadata(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; if (error != GRPC_ERROR_NONE) { grpc_call_credentials_cancel_get_request_metadata( - calld->creds, &calld->md_array, GRPC_ERROR_REF(error)); + exec_ctx, calld->creds, &calld->md_array, GRPC_ERROR_REF(error)); } - GRPC_CALL_STACK_UNREF(calld->owning_call, "cancel_get_request_metadata"); + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, + "cancel_get_request_metadata"); } -static void send_security_metadata(grpc_call_element* elem, +static void send_security_metadata(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; @@ -179,7 +183,7 @@ static void send_security_metadata(grpc_call_element* elem, if (channel_call_creds == nullptr && !call_creds_has_md) { /* Skip sending metadata altogether. */ - grpc_call_next_op(elem, batch); + grpc_call_next_op(exec_ctx, elem, batch); return; } @@ -188,7 +192,7 @@ static void send_security_metadata(grpc_call_element* elem, ctx->creds, nullptr); if (calld->creds == nullptr) { grpc_transport_stream_op_batch_finish_with_failure( - batch, + exec_ctx, batch, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Incompatible credentials set on channel and call."), @@ -211,29 +215,30 @@ static void send_security_metadata(grpc_call_element* elem, batch, grpc_schedule_on_exec_ctx); grpc_error* error = GRPC_ERROR_NONE; if (grpc_call_credentials_get_request_metadata( - calld->creds, calld->pollent, calld->auth_md_context, + exec_ctx, calld->creds, calld->pollent, calld->auth_md_context, &calld->md_array, &calld->async_result_closure, &error)) { // Synchronous return; invoke on_credentials_metadata() directly. - on_credentials_metadata(batch, error); + on_credentials_metadata(exec_ctx, batch, error); GRPC_ERROR_UNREF(error); } else { // Async return; register cancellation closure with call combiner. GRPC_CALL_STACK_REF(calld->owning_call, "cancel_get_request_metadata"); grpc_call_combiner_set_notify_on_cancel( - calld->call_combiner, + exec_ctx, calld->call_combiner, GRPC_CLOSURE_INIT(&calld->get_request_metadata_cancel_closure, cancel_get_request_metadata, elem, grpc_schedule_on_exec_ctx)); } } -static void on_host_checked(void* arg, grpc_error* error) { +static void on_host_checked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { grpc_transport_stream_op_batch* batch = (grpc_transport_stream_op_batch*)arg; grpc_call_element* elem = (grpc_call_element*)batch->handler_private.extra_arg; call_data* calld = (call_data*)elem->call_data; if (error == GRPC_ERROR_NONE) { - send_security_metadata(elem, batch); + send_security_metadata(exec_ctx, elem, batch); } else { char* error_msg; char* host = grpc_slice_to_c_string(calld->host); @@ -241,7 +246,7 @@ static void on_host_checked(void* arg, grpc_error* error) { host); gpr_free(host); grpc_transport_stream_op_batch_finish_with_failure( - batch, + exec_ctx, batch, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED), @@ -250,20 +255,22 @@ static void on_host_checked(void* arg, grpc_error* error) { } } -static void cancel_check_call_host(void* arg, grpc_error* error) { +static void cancel_check_call_host(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; if (error != GRPC_ERROR_NONE) { grpc_channel_security_connector_cancel_check_call_host( - chand->security_connector, &calld->async_result_closure, + exec_ctx, chand->security_connector, &calld->async_result_closure, GRPC_ERROR_REF(error)); } - GRPC_CALL_STACK_UNREF(calld->owning_call, "cancel_check_call_host"); + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "cancel_check_call_host"); } static void auth_start_transport_stream_op_batch( - grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_transport_stream_op_batch* batch) { GPR_TIMER_BEGIN("auth_start_transport_stream_op_batch", 0); /* grab pointers to our data from the call element */ @@ -296,13 +303,13 @@ static void auth_start_transport_stream_op_batch( */ if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_AUTHORITY)) { if (calld->have_host) { - grpc_slice_unref_internal(calld->host); + grpc_slice_unref_internal(exec_ctx, calld->host); } calld->host = grpc_slice_ref_internal(GRPC_MDVALUE(md)); calld->have_host = true; } else if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_PATH)) { if (calld->have_method) { - grpc_slice_unref_internal(calld->method); + grpc_slice_unref_internal(exec_ctx, calld->method); } calld->method = grpc_slice_ref_internal(GRPC_MDVALUE(md)); calld->have_method = true; @@ -315,16 +322,16 @@ static void auth_start_transport_stream_op_batch( char* call_host = grpc_slice_to_c_string(calld->host); grpc_error* error = GRPC_ERROR_NONE; if (grpc_channel_security_connector_check_call_host( - chand->security_connector, call_host, chand->auth_context, - &calld->async_result_closure, &error)) { + exec_ctx, chand->security_connector, call_host, + chand->auth_context, &calld->async_result_closure, &error)) { // Synchronous return; invoke on_host_checked() directly. - on_host_checked(batch, error); + on_host_checked(exec_ctx, batch, error); GRPC_ERROR_UNREF(error); } else { // Async return; register cancellation closure with call combiner. GRPC_CALL_STACK_REF(calld->owning_call, "cancel_check_call_host"); grpc_call_combiner_set_notify_on_cancel( - calld->call_combiner, + exec_ctx, calld->call_combiner, GRPC_CLOSURE_INIT(&calld->check_call_host_cancel_closure, cancel_check_call_host, elem, grpc_schedule_on_exec_ctx)); @@ -336,12 +343,13 @@ static void auth_start_transport_stream_op_batch( } /* pass control down the stack */ - grpc_call_next_op(elem, batch); + grpc_call_next_op(exec_ctx, elem, batch); GPR_TIMER_END("auth_start_transport_stream_op_batch", 0); } /* Constructor for call_data */ -static grpc_error* init_call_elem(grpc_call_element* elem, +static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, const grpc_call_element_args* args) { call_data* calld = (call_data*)elem->call_data; calld->owning_call = args->call_stack; @@ -349,30 +357,32 @@ static grpc_error* init_call_elem(grpc_call_element* elem, return GRPC_ERROR_NONE; } -static void set_pollset_or_pollset_set(grpc_call_element* elem, +static void set_pollset_or_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, grpc_polling_entity* pollent) { call_data* calld = (call_data*)elem->call_data; calld->pollent = pollent; } /* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element* elem, +static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { call_data* calld = (call_data*)elem->call_data; - grpc_credentials_mdelem_array_destroy(&calld->md_array); - grpc_call_credentials_unref(calld->creds); + grpc_credentials_mdelem_array_destroy(exec_ctx, &calld->md_array); + grpc_call_credentials_unref(exec_ctx, calld->creds); if (calld->have_host) { - grpc_slice_unref_internal(calld->host); + grpc_slice_unref_internal(exec_ctx, calld->host); } if (calld->have_method) { - grpc_slice_unref_internal(calld->method); + grpc_slice_unref_internal(exec_ctx, calld->method); } grpc_auth_metadata_context_reset(&calld->auth_md_context); } /* Constructor for channel_data */ -static grpc_error* init_channel_elem(grpc_channel_element* elem, +static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, grpc_channel_element_args* args) { grpc_security_connector* sc = grpc_security_connector_find_in_args(args->channel_args); @@ -405,12 +415,13 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem, } /* Destructor for channel data */ -static void destroy_channel_elem(grpc_channel_element* elem) { +static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem) { /* grab pointers to our data from the channel element */ channel_data* chand = (channel_data*)elem->channel_data; grpc_channel_security_connector* sc = chand->security_connector; if (sc != nullptr) { - GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "client_auth_filter"); + GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, &sc->base, "client_auth_filter"); } GRPC_AUTH_CONTEXT_UNREF(chand->auth_context, "client_auth_filter"); } diff --git a/src/core/lib/security/transport/lb_targets_info.cc b/src/core/lib/security/transport/lb_targets_info.cc index 183b1ebf35..c07be35840 100644 --- a/src/core/lib/security/transport/lb_targets_info.cc +++ b/src/core/lib/security/transport/lb_targets_info.cc @@ -28,8 +28,8 @@ static void* targets_info_copy(void* p) { return grpc_slice_hash_table_ref((grpc_slice_hash_table*)p); } -static void targets_info_destroy(void* p) { - grpc_slice_hash_table_unref((grpc_slice_hash_table*)p); +static void targets_info_destroy(grpc_exec_ctx* exec_ctx, void* p) { + grpc_slice_hash_table_unref(exec_ctx, (grpc_slice_hash_table*)p); } static int targets_info_cmp(void* a, void* b) { return grpc_slice_hash_table_cmp((const grpc_slice_hash_table*)a, diff --git a/src/core/lib/security/transport/secure_endpoint.cc b/src/core/lib/security/transport/secure_endpoint.cc index e5c089de9c..4cd317a06d 100644 --- a/src/core/lib/security/transport/secure_endpoint.cc +++ b/src/core/lib/security/transport/secure_endpoint.cc @@ -63,27 +63,28 @@ typedef struct { grpc_core::TraceFlag grpc_trace_secure_endpoint(false, "secure_endpoint"); -static void destroy(secure_endpoint* secure_ep) { +static void destroy(grpc_exec_ctx* exec_ctx, secure_endpoint* secure_ep) { secure_endpoint* ep = secure_ep; - grpc_endpoint_destroy(ep->wrapped_ep); + grpc_endpoint_destroy(exec_ctx, ep->wrapped_ep); tsi_frame_protector_destroy(ep->protector); - tsi_zero_copy_grpc_protector_destroy(ep->zero_copy_protector); - grpc_slice_buffer_destroy_internal(&ep->leftover_bytes); - grpc_slice_unref_internal(ep->read_staging_buffer); - grpc_slice_unref_internal(ep->write_staging_buffer); - grpc_slice_buffer_destroy_internal(&ep->output_buffer); - grpc_slice_buffer_destroy_internal(&ep->source_buffer); + tsi_zero_copy_grpc_protector_destroy(exec_ctx, ep->zero_copy_protector); + grpc_slice_buffer_destroy_internal(exec_ctx, &ep->leftover_bytes); + grpc_slice_unref_internal(exec_ctx, ep->read_staging_buffer); + grpc_slice_unref_internal(exec_ctx, ep->write_staging_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, &ep->output_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, &ep->source_buffer); gpr_mu_destroy(&ep->protector_mu); gpr_free(ep); } #ifndef NDEBUG -#define SECURE_ENDPOINT_UNREF(ep, reason) \ - secure_endpoint_unref((ep), (reason), __FILE__, __LINE__) +#define SECURE_ENDPOINT_UNREF(exec_ctx, ep, reason) \ + secure_endpoint_unref((exec_ctx), (ep), (reason), __FILE__, __LINE__) #define SECURE_ENDPOINT_REF(ep, reason) \ secure_endpoint_ref((ep), (reason), __FILE__, __LINE__) -static void secure_endpoint_unref(secure_endpoint* ep, const char* reason, - const char* file, int line) { +static void secure_endpoint_unref(grpc_exec_ctx* exec_ctx, secure_endpoint* ep, + const char* reason, const char* file, + int line) { if (grpc_trace_secure_endpoint.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&ep->ref.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, @@ -91,7 +92,7 @@ static void secure_endpoint_unref(secure_endpoint* ep, const char* reason, val - 1); } if (gpr_unref(&ep->ref)) { - destroy(ep); + destroy(exec_ctx, ep); } } @@ -106,11 +107,13 @@ static void secure_endpoint_ref(secure_endpoint* ep, const char* reason, gpr_ref(&ep->ref); } #else -#define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep)) +#define SECURE_ENDPOINT_UNREF(exec_ctx, ep, reason) \ + secure_endpoint_unref((exec_ctx), (ep)) #define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep)) -static void secure_endpoint_unref(secure_endpoint* ep) { +static void secure_endpoint_unref(grpc_exec_ctx* exec_ctx, + secure_endpoint* ep) { if (gpr_unref(&ep->ref)) { - destroy(ep); + destroy(exec_ctx, ep); } } @@ -125,7 +128,8 @@ static void flush_read_staging_buffer(secure_endpoint* ep, uint8_t** cur, *end = GRPC_SLICE_END_PTR(ep->read_staging_buffer); } -static void call_read_cb(secure_endpoint* ep, grpc_error* error) { +static void call_read_cb(grpc_exec_ctx* exec_ctx, secure_endpoint* ep, + grpc_error* error) { if (grpc_trace_secure_endpoint.enabled()) { size_t i; for (i = 0; i < ep->read_buffer->count; i++) { @@ -136,11 +140,12 @@ static void call_read_cb(secure_endpoint* ep, grpc_error* error) { } } ep->read_buffer = nullptr; - GRPC_CLOSURE_SCHED(ep->read_cb, error); - SECURE_ENDPOINT_UNREF(ep, "read"); + GRPC_CLOSURE_SCHED(exec_ctx, ep->read_cb, error); + SECURE_ENDPOINT_UNREF(exec_ctx, ep, "read"); } -static void on_read(void* user_data, grpc_error* error) { +static void on_read(grpc_exec_ctx* exec_ctx, void* user_data, + grpc_error* error) { unsigned i; uint8_t keep_looping = 0; tsi_result result = TSI_OK; @@ -149,16 +154,17 @@ static void on_read(void* user_data, grpc_error* error) { uint8_t* end = GRPC_SLICE_END_PTR(ep->read_staging_buffer); if (error != GRPC_ERROR_NONE) { - grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer); - call_read_cb(ep, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Secure read failed", &error, 1)); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, ep->read_buffer); + call_read_cb(exec_ctx, ep, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Secure read failed", &error, 1)); return; } if (ep->zero_copy_protector != nullptr) { // Use zero-copy grpc protector to unprotect. result = tsi_zero_copy_grpc_protector_unprotect( - ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer); + exec_ctx, ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer); } else { // Use frame protector to unprotect. /* TODO(yangg) check error, maybe bail out early */ @@ -211,35 +217,37 @@ static void on_read(void* user_data, grpc_error* error) { /* TODO(yangg) experiment with moving this block after read_cb to see if it helps latency */ - grpc_slice_buffer_reset_and_unref_internal(&ep->source_buffer); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &ep->source_buffer); if (result != TSI_OK) { - grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, ep->read_buffer); call_read_cb( - ep, grpc_set_tsi_error_result( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unwrap failed"), result)); + exec_ctx, ep, + grpc_set_tsi_error_result( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unwrap failed"), result)); return; } - call_read_cb(ep, GRPC_ERROR_NONE); + call_read_cb(exec_ctx, ep, GRPC_ERROR_NONE); } -static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, - grpc_closure* cb) { +static void endpoint_read(grpc_exec_ctx* exec_ctx, grpc_endpoint* secure_ep, + grpc_slice_buffer* slices, grpc_closure* cb) { secure_endpoint* ep = (secure_endpoint*)secure_ep; ep->read_cb = cb; ep->read_buffer = slices; - grpc_slice_buffer_reset_and_unref_internal(ep->read_buffer); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, ep->read_buffer); SECURE_ENDPOINT_REF(ep, "read"); if (ep->leftover_bytes.count) { grpc_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer); GPR_ASSERT(ep->leftover_bytes.count == 0); - on_read(ep, GRPC_ERROR_NONE); + on_read(exec_ctx, ep, GRPC_ERROR_NONE); return; } - grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read); + grpc_endpoint_read(exec_ctx, ep->wrapped_ep, &ep->source_buffer, + &ep->on_read); } static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur, @@ -250,8 +258,8 @@ static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur, *end = GRPC_SLICE_END_PTR(ep->write_staging_buffer); } -static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, - grpc_closure* cb) { +static void endpoint_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* secure_ep, + grpc_slice_buffer* slices, grpc_closure* cb) { GPR_TIMER_BEGIN("secure_endpoint.endpoint_write", 0); unsigned i; @@ -260,7 +268,7 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, uint8_t* cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer); uint8_t* end = GRPC_SLICE_END_PTR(ep->write_staging_buffer); - grpc_slice_buffer_reset_and_unref_internal(&ep->output_buffer); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &ep->output_buffer); if (grpc_trace_secure_endpoint.enabled()) { for (i = 0; i < slices->count; i++) { @@ -273,8 +281,8 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, if (ep->zero_copy_protector != nullptr) { // Use zero-copy grpc protector to protect. - result = tsi_zero_copy_grpc_protector_protect(ep->zero_copy_protector, - slices, &ep->output_buffer); + result = tsi_zero_copy_grpc_protector_protect( + exec_ctx, ep->zero_copy_protector, slices, &ep->output_buffer); } else { // Use frame protector to protect. for (i = 0; i < slices->count; i++) { @@ -332,44 +340,50 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, if (result != TSI_OK) { /* TODO(yangg) do different things according to the error type? */ - grpc_slice_buffer_reset_and_unref_internal(&ep->output_buffer); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &ep->output_buffer); GRPC_CLOSURE_SCHED( - cb, grpc_set_tsi_error_result( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Wrap failed"), result)); + exec_ctx, cb, + grpc_set_tsi_error_result( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Wrap failed"), result)); GPR_TIMER_END("secure_endpoint.endpoint_write", 0); return; } - grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb); + grpc_endpoint_write(exec_ctx, ep->wrapped_ep, &ep->output_buffer, cb); GPR_TIMER_END("secure_endpoint.endpoint_write", 0); } -static void endpoint_shutdown(grpc_endpoint* secure_ep, grpc_error* why) { +static void endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* secure_ep, + grpc_error* why) { secure_endpoint* ep = (secure_endpoint*)secure_ep; - grpc_endpoint_shutdown(ep->wrapped_ep, why); + grpc_endpoint_shutdown(exec_ctx, ep->wrapped_ep, why); } -static void endpoint_destroy(grpc_endpoint* secure_ep) { +static void endpoint_destroy(grpc_exec_ctx* exec_ctx, + grpc_endpoint* secure_ep) { secure_endpoint* ep = (secure_endpoint*)secure_ep; - SECURE_ENDPOINT_UNREF(ep, "destroy"); + SECURE_ENDPOINT_UNREF(exec_ctx, ep, "destroy"); } -static void endpoint_add_to_pollset(grpc_endpoint* secure_ep, +static void endpoint_add_to_pollset(grpc_exec_ctx* exec_ctx, + grpc_endpoint* secure_ep, grpc_pollset* pollset) { secure_endpoint* ep = (secure_endpoint*)secure_ep; - grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset); + grpc_endpoint_add_to_pollset(exec_ctx, ep->wrapped_ep, pollset); } -static void endpoint_add_to_pollset_set(grpc_endpoint* secure_ep, +static void endpoint_add_to_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_endpoint* secure_ep, grpc_pollset_set* pollset_set) { secure_endpoint* ep = (secure_endpoint*)secure_ep; - grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set); + grpc_endpoint_add_to_pollset_set(exec_ctx, ep->wrapped_ep, pollset_set); } -static void endpoint_delete_from_pollset_set(grpc_endpoint* secure_ep, +static void endpoint_delete_from_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_endpoint* secure_ep, grpc_pollset_set* pollset_set) { secure_endpoint* ep = (secure_endpoint*)secure_ep; - grpc_endpoint_delete_from_pollset_set(ep->wrapped_ep, pollset_set); + grpc_endpoint_delete_from_pollset_set(exec_ctx, ep->wrapped_ep, pollset_set); } static char* endpoint_get_peer(grpc_endpoint* secure_ep) { diff --git a/src/core/lib/security/transport/security_connector.cc b/src/core/lib/security/transport/security_connector.cc index fd139714da..c56e459aeb 100644 --- a/src/core/lib/security/transport/security_connector.cc +++ b/src/core/lib/security/transport/security_connector.cc @@ -105,32 +105,33 @@ const tsi_peer_property* tsi_peer_get_property_by_name(const tsi_peer* peer, } void grpc_channel_security_connector_add_handshakers( - grpc_channel_security_connector* connector, + grpc_exec_ctx* exec_ctx, grpc_channel_security_connector* connector, grpc_handshake_manager* handshake_mgr) { if (connector != nullptr) { - connector->add_handshakers(connector, handshake_mgr); + connector->add_handshakers(exec_ctx, connector, handshake_mgr); } } void grpc_server_security_connector_add_handshakers( - grpc_server_security_connector* connector, + grpc_exec_ctx* exec_ctx, grpc_server_security_connector* connector, grpc_handshake_manager* handshake_mgr) { if (connector != nullptr) { - connector->add_handshakers(connector, handshake_mgr); + connector->add_handshakers(exec_ctx, connector, handshake_mgr); } } -void grpc_security_connector_check_peer(grpc_security_connector* sc, +void grpc_security_connector_check_peer(grpc_exec_ctx* exec_ctx, + grpc_security_connector* sc, tsi_peer peer, grpc_auth_context** auth_context, grpc_closure* on_peer_checked) { if (sc == nullptr) { - GRPC_CLOSURE_SCHED(on_peer_checked, + GRPC_CLOSURE_SCHED(exec_ctx, on_peer_checked, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "cannot check peer -- no security connector")); tsi_peer_destruct(&peer); } else { - sc->vtable->check_peer(sc, peer, auth_context, on_peer_checked); + sc->vtable->check_peer(exec_ctx, sc, peer, auth_context, on_peer_checked); } } @@ -168,26 +169,26 @@ int grpc_server_security_connector_cmp(grpc_server_security_connector* sc1, } bool grpc_channel_security_connector_check_call_host( - grpc_channel_security_connector* sc, const char* host, - grpc_auth_context* auth_context, grpc_closure* on_call_host_checked, - grpc_error** error) { + grpc_exec_ctx* exec_ctx, grpc_channel_security_connector* sc, + const char* host, grpc_auth_context* auth_context, + grpc_closure* on_call_host_checked, grpc_error** error) { if (sc == nullptr || sc->check_call_host == nullptr) { *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "cannot check call host -- no security connector"); return true; } - return sc->check_call_host(sc, host, auth_context, on_call_host_checked, - error); + return sc->check_call_host(exec_ctx, sc, host, auth_context, + on_call_host_checked, error); } void grpc_channel_security_connector_cancel_check_call_host( - grpc_channel_security_connector* sc, grpc_closure* on_call_host_checked, - grpc_error* error) { + grpc_exec_ctx* exec_ctx, grpc_channel_security_connector* sc, + grpc_closure* on_call_host_checked, grpc_error* error) { if (sc == nullptr || sc->cancel_check_call_host == nullptr) { GRPC_ERROR_UNREF(error); return; } - sc->cancel_check_call_host(sc, on_call_host_checked, error); + sc->cancel_check_call_host(exec_ctx, sc, on_call_host_checked, error); } #ifndef NDEBUG @@ -204,14 +205,15 @@ grpc_security_connector* grpc_security_connector_ref( #else grpc_security_connector* grpc_security_connector_ref( grpc_security_connector* sc) { - if (sc == nullptr) return nullptr; + if (sc == NULL) return NULL; #endif gpr_ref(&sc->refcount); return sc; } #ifndef NDEBUG -void grpc_security_connector_unref(grpc_security_connector* sc, +void grpc_security_connector_unref(grpc_exec_ctx* exec_ctx, + grpc_security_connector* sc, const char* file, int line, const char* reason) { if (sc == nullptr) return; @@ -222,14 +224,15 @@ void grpc_security_connector_unref(grpc_security_connector* sc, val, val - 1, reason); } #else -void grpc_security_connector_unref(grpc_security_connector* sc) { - if (sc == nullptr) return; +void grpc_security_connector_unref(grpc_exec_ctx* exec_ctx, + grpc_security_connector* sc) { + if (sc == NULL) return; #endif - if (gpr_unref(&sc->refcount)) sc->vtable->destroy(sc); + if (gpr_unref(&sc->refcount)) sc->vtable->destroy(exec_ctx, sc); } -static void connector_arg_destroy(void* p) { - GRPC_SECURITY_CONNECTOR_UNREF((grpc_security_connector*)p, +static void connector_arg_destroy(grpc_exec_ctx* exec_ctx, void* p) { + GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, (grpc_security_connector*)p, "connector_arg_destroy"); } @@ -306,16 +309,20 @@ typedef struct { bool is_lb_channel; } grpc_fake_channel_security_connector; -static void fake_channel_destroy(grpc_security_connector* sc) { +static void fake_channel_destroy(grpc_exec_ctx* exec_ctx, + grpc_security_connector* sc) { grpc_fake_channel_security_connector* c = (grpc_fake_channel_security_connector*)sc; - grpc_call_credentials_unref(c->base.request_metadata_creds); + grpc_call_credentials_unref(exec_ctx, c->base.request_metadata_creds); gpr_free(c->target); gpr_free(c->expected_targets); gpr_free(c); } -static void fake_server_destroy(grpc_security_connector* sc) { gpr_free(sc); } +static void fake_server_destroy(grpc_exec_ctx* exec_ctx, + grpc_security_connector* sc) { + gpr_free(sc); +} static bool fake_check_target(const char* target_type, const char* target, const char* set_str) { @@ -379,7 +386,8 @@ done: if (!success) abort(); } -static void fake_check_peer(grpc_security_connector* sc, tsi_peer peer, +static void fake_check_peer(grpc_exec_ctx* exec_ctx, + grpc_security_connector* sc, tsi_peer peer, grpc_auth_context** auth_context, grpc_closure* on_peer_checked) { const char* prop_name; @@ -411,23 +419,25 @@ static void fake_check_peer(grpc_security_connector* sc, tsi_peer peer, *auth_context, GRPC_TRANSPORT_SECURITY_TYPE_PROPERTY_NAME, GRPC_FAKE_TRANSPORT_SECURITY_TYPE); end: - GRPC_CLOSURE_SCHED(on_peer_checked, error); + GRPC_CLOSURE_SCHED(exec_ctx, on_peer_checked, error); tsi_peer_destruct(&peer); } -static void fake_channel_check_peer(grpc_security_connector* sc, tsi_peer peer, +static void fake_channel_check_peer(grpc_exec_ctx* exec_ctx, + grpc_security_connector* sc, tsi_peer peer, grpc_auth_context** auth_context, grpc_closure* on_peer_checked) { - fake_check_peer(sc, peer, auth_context, on_peer_checked); + fake_check_peer(exec_ctx, sc, peer, auth_context, on_peer_checked); grpc_fake_channel_security_connector* c = (grpc_fake_channel_security_connector*)sc; fake_secure_name_check(c->target, c->expected_targets, c->is_lb_channel); } -static void fake_server_check_peer(grpc_security_connector* sc, tsi_peer peer, +static void fake_server_check_peer(grpc_exec_ctx* exec_ctx, + grpc_security_connector* sc, tsi_peer peer, grpc_auth_context** auth_context, grpc_closure* on_peer_checked) { - fake_check_peer(sc, peer, auth_context, on_peer_checked); + fake_check_peer(exec_ctx, sc, peer, auth_context, on_peer_checked); } static int fake_channel_cmp(grpc_security_connector* sc1, @@ -456,7 +466,8 @@ static int fake_server_cmp(grpc_security_connector* sc1, (grpc_server_security_connector*)sc2); } -static bool fake_channel_check_call_host(grpc_channel_security_connector* sc, +static bool fake_channel_check_call_host(grpc_exec_ctx* exec_ctx, + grpc_channel_security_connector* sc, const char* host, grpc_auth_context* auth_context, grpc_closure* on_call_host_checked, @@ -465,26 +476,29 @@ static bool fake_channel_check_call_host(grpc_channel_security_connector* sc, } static void fake_channel_cancel_check_call_host( - grpc_channel_security_connector* sc, grpc_closure* on_call_host_checked, - grpc_error* error) { + grpc_exec_ctx* exec_ctx, grpc_channel_security_connector* sc, + grpc_closure* on_call_host_checked, grpc_error* error) { GRPC_ERROR_UNREF(error); } static void fake_channel_add_handshakers( - grpc_channel_security_connector* sc, + grpc_exec_ctx* exec_ctx, grpc_channel_security_connector* sc, grpc_handshake_manager* handshake_mgr) { grpc_handshake_manager_add( handshake_mgr, grpc_security_handshaker_create( - tsi_create_fake_handshaker(true /* is_client */), &sc->base)); + exec_ctx, tsi_create_fake_handshaker(true /* is_client */), + &sc->base)); } -static void fake_server_add_handshakers(grpc_server_security_connector* sc, +static void fake_server_add_handshakers(grpc_exec_ctx* exec_ctx, + grpc_server_security_connector* sc, grpc_handshake_manager* handshake_mgr) { grpc_handshake_manager_add( handshake_mgr, grpc_security_handshaker_create( - tsi_create_fake_handshaker(false /* is_client */), &sc->base)); + exec_ctx, tsi_create_fake_handshaker(false /* is_client */), + &sc->base)); } static grpc_security_connector_vtable fake_channel_vtable = { @@ -551,11 +565,12 @@ static bool server_connector_has_cert_config_fetcher( return server_creds->certificate_config_fetcher.cb != nullptr; } -static void ssl_channel_destroy(grpc_security_connector* sc) { +static void ssl_channel_destroy(grpc_exec_ctx* exec_ctx, + grpc_security_connector* sc) { grpc_ssl_channel_security_connector* c = (grpc_ssl_channel_security_connector*)sc; - grpc_channel_credentials_unref(c->base.channel_creds); - grpc_call_credentials_unref(c->base.request_metadata_creds); + grpc_channel_credentials_unref(exec_ctx, c->base.channel_creds); + grpc_call_credentials_unref(exec_ctx, c->base.request_metadata_creds); tsi_ssl_client_handshaker_factory_unref(c->client_handshaker_factory); c->client_handshaker_factory = nullptr; if (c->target_name != nullptr) gpr_free(c->target_name); @@ -563,16 +578,18 @@ static void ssl_channel_destroy(grpc_security_connector* sc) { gpr_free(sc); } -static void ssl_server_destroy(grpc_security_connector* sc) { +static void ssl_server_destroy(grpc_exec_ctx* exec_ctx, + grpc_security_connector* sc) { grpc_ssl_server_security_connector* c = (grpc_ssl_server_security_connector*)sc; - grpc_server_credentials_unref(c->base.server_creds); + grpc_server_credentials_unref(exec_ctx, c->base.server_creds); tsi_ssl_server_handshaker_factory_unref(c->server_handshaker_factory); c->server_handshaker_factory = nullptr; gpr_free(sc); } -static void ssl_channel_add_handshakers(grpc_channel_security_connector* sc, +static void ssl_channel_add_handshakers(grpc_exec_ctx* exec_ctx, + grpc_channel_security_connector* sc, grpc_handshake_manager* handshake_mgr) { grpc_ssl_channel_security_connector* c = (grpc_ssl_channel_security_connector*)sc; @@ -590,8 +607,9 @@ static void ssl_channel_add_handshakers(grpc_channel_security_connector* sc, } // Create handshakers. grpc_handshake_manager_add( - handshake_mgr, grpc_security_handshaker_create( - tsi_create_adapter_handshaker(tsi_hs), &sc->base)); + handshake_mgr, + grpc_security_handshaker_create( + exec_ctx, tsi_create_adapter_handshaker(tsi_hs), &sc->base)); } static const char** fill_alpn_protocol_strings(size_t* num_alpn_protocols) { @@ -683,7 +701,8 @@ static bool try_fetch_ssl_server_credentials( return status; } -static void ssl_server_add_handshakers(grpc_server_security_connector* sc, +static void ssl_server_add_handshakers(grpc_exec_ctx* exec_ctx, + grpc_server_security_connector* sc, grpc_handshake_manager* handshake_mgr) { grpc_ssl_server_security_connector* c = (grpc_ssl_server_security_connector*)sc; @@ -699,8 +718,9 @@ static void ssl_server_add_handshakers(grpc_server_security_connector* sc, } // Create handshakers. grpc_handshake_manager_add( - handshake_mgr, grpc_security_handshaker_create( - tsi_create_adapter_handshaker(tsi_hs), &sc->base)); + handshake_mgr, + grpc_security_handshaker_create( + exec_ctx, tsi_create_adapter_handshaker(tsi_hs), &sc->base)); } static int ssl_host_matches_name(const tsi_peer* peer, const char* peer_name) { @@ -784,7 +804,8 @@ static grpc_error* ssl_check_peer(grpc_security_connector* sc, return GRPC_ERROR_NONE; } -static void ssl_channel_check_peer(grpc_security_connector* sc, tsi_peer peer, +static void ssl_channel_check_peer(grpc_exec_ctx* exec_ctx, + grpc_security_connector* sc, tsi_peer peer, grpc_auth_context** auth_context, grpc_closure* on_peer_checked) { grpc_ssl_channel_security_connector* c = @@ -794,16 +815,17 @@ static void ssl_channel_check_peer(grpc_security_connector* sc, tsi_peer peer, ? c->overridden_target_name : c->target_name, &peer, auth_context); - GRPC_CLOSURE_SCHED(on_peer_checked, error); + GRPC_CLOSURE_SCHED(exec_ctx, on_peer_checked, error); tsi_peer_destruct(&peer); } -static void ssl_server_check_peer(grpc_security_connector* sc, tsi_peer peer, +static void ssl_server_check_peer(grpc_exec_ctx* exec_ctx, + grpc_security_connector* sc, tsi_peer peer, grpc_auth_context** auth_context, grpc_closure* on_peer_checked) { grpc_error* error = ssl_check_peer(sc, nullptr, &peer, auth_context); tsi_peer_destruct(&peer); - GRPC_CLOSURE_SCHED(on_peer_checked, error); + GRPC_CLOSURE_SCHED(exec_ctx, on_peer_checked, error); } static int ssl_channel_cmp(grpc_security_connector* sc1, @@ -873,7 +895,8 @@ void tsi_shallow_peer_destruct(tsi_peer* peer) { if (peer->properties != nullptr) gpr_free(peer->properties); } -static bool ssl_channel_check_call_host(grpc_channel_security_connector* sc, +static bool ssl_channel_check_call_host(grpc_exec_ctx* exec_ctx, + grpc_channel_security_connector* sc, const char* host, grpc_auth_context* auth_context, grpc_closure* on_call_host_checked, @@ -899,8 +922,8 @@ static bool ssl_channel_check_call_host(grpc_channel_security_connector* sc, } static void ssl_channel_cancel_check_call_host( - grpc_channel_security_connector* sc, grpc_closure* on_call_host_checked, - grpc_error* error) { + grpc_exec_ctx* exec_ctx, grpc_channel_security_connector* sc, + grpc_closure* on_call_host_checked, grpc_error* error) { GRPC_ERROR_UNREF(error); } @@ -967,7 +990,7 @@ const char* grpc_get_default_ssl_roots(void) { } grpc_security_status grpc_ssl_channel_security_connector_create( - grpc_channel_credentials* channel_creds, + grpc_exec_ctx* exec_ctx, grpc_channel_credentials* channel_creds, grpc_call_credentials* request_metadata_creds, const grpc_ssl_config* config, const char* target_name, const char* overridden_target_name, grpc_channel_security_connector** sc) { @@ -1022,7 +1045,7 @@ grpc_security_status grpc_ssl_channel_security_connector_create( if (result != TSI_OK) { gpr_log(GPR_ERROR, "Handshaker factory creation failed with %s.", tsi_result_to_string(result)); - ssl_channel_destroy(&c->base.base); + ssl_channel_destroy(exec_ctx, &c->base.base); *sc = nullptr; goto error; } @@ -1050,7 +1073,8 @@ grpc_ssl_server_security_connector_initialize( } grpc_security_status grpc_ssl_server_security_connector_create( - grpc_server_credentials* gsc, grpc_server_security_connector** sc) { + grpc_exec_ctx* exec_ctx, grpc_server_credentials* gsc, + grpc_server_security_connector** sc) { tsi_result result = TSI_OK; grpc_ssl_server_credentials* server_credentials = (grpc_ssl_server_credentials*)gsc; @@ -1090,7 +1114,7 @@ grpc_security_status grpc_ssl_server_security_connector_create( if (retval == GRPC_SECURITY_OK) { *sc = &c->base; } else { - if (c != nullptr) ssl_server_destroy(&c->base.base); + if (c != nullptr) ssl_server_destroy(exec_ctx, &c->base.base); if (sc != nullptr) *sc = nullptr; } return retval; diff --git a/src/core/lib/security/transport/security_connector.h b/src/core/lib/security/transport/security_connector.h index 495821d247..03daba3a18 100644 --- a/src/core/lib/security/transport/security_connector.h +++ b/src/core/lib/security/transport/security_connector.h @@ -50,9 +50,9 @@ typedef struct grpc_security_connector grpc_security_connector; #define GRPC_ARG_SECURITY_CONNECTOR "grpc.security_connector" typedef struct { - void (*destroy)(grpc_security_connector* sc); - void (*check_peer)(grpc_security_connector* sc, tsi_peer peer, - grpc_auth_context** auth_context, + void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_security_connector* sc); + void (*check_peer)(grpc_exec_ctx* exec_ctx, grpc_security_connector* sc, + tsi_peer peer, grpc_auth_context** auth_context, grpc_closure* on_peer_checked); int (*cmp)(grpc_security_connector* sc, grpc_security_connector* other); } grpc_security_connector_vtable; @@ -67,25 +67,29 @@ struct grpc_security_connector { #ifndef NDEBUG #define GRPC_SECURITY_CONNECTOR_REF(p, r) \ grpc_security_connector_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_SECURITY_CONNECTOR_UNREF(p, r) \ - grpc_security_connector_unref((p), __FILE__, __LINE__, (r)) +#define GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, p, r) \ + grpc_security_connector_unref((exec_ctx), (p), __FILE__, __LINE__, (r)) grpc_security_connector* grpc_security_connector_ref( grpc_security_connector* policy, const char* file, int line, const char* reason); -void grpc_security_connector_unref(grpc_security_connector* policy, +void grpc_security_connector_unref(grpc_exec_ctx* exec_ctx, + grpc_security_connector* policy, const char* file, int line, const char* reason); #else #define GRPC_SECURITY_CONNECTOR_REF(p, r) grpc_security_connector_ref((p)) -#define GRPC_SECURITY_CONNECTOR_UNREF(p, r) grpc_security_connector_unref((p)) +#define GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, p, r) \ + grpc_security_connector_unref((exec_ctx), (p)) grpc_security_connector* grpc_security_connector_ref( grpc_security_connector* policy); -void grpc_security_connector_unref(grpc_security_connector* policy); +void grpc_security_connector_unref(grpc_exec_ctx* exec_ctx, + grpc_security_connector* policy); #endif /* Check the peer. Callee takes ownership of the peer object. When done, sets *auth_context and invokes on_peer_checked. */ -void grpc_security_connector_check_peer(grpc_security_connector* sc, +void grpc_security_connector_check_peer(grpc_exec_ctx* exec_ctx, + grpc_security_connector* sc, tsi_peer peer, grpc_auth_context** auth_context, grpc_closure* on_peer_checked); @@ -115,14 +119,17 @@ struct grpc_channel_security_connector { grpc_security_connector base; grpc_channel_credentials* channel_creds; grpc_call_credentials* request_metadata_creds; - bool (*check_call_host)(grpc_channel_security_connector* sc, const char* host, + bool (*check_call_host)(grpc_exec_ctx* exec_ctx, + grpc_channel_security_connector* sc, const char* host, grpc_auth_context* auth_context, grpc_closure* on_call_host_checked, grpc_error** error); - void (*cancel_check_call_host)(grpc_channel_security_connector* sc, + void (*cancel_check_call_host)(grpc_exec_ctx* exec_ctx, + grpc_channel_security_connector* sc, grpc_closure* on_call_host_checked, grpc_error* error); - void (*add_handshakers)(grpc_channel_security_connector* sc, + void (*add_handshakers)(grpc_exec_ctx* exec_ctx, + grpc_channel_security_connector* sc, grpc_handshake_manager* handshake_mgr); }; @@ -135,20 +142,20 @@ int grpc_channel_security_connector_cmp(grpc_channel_security_connector* sc1, /// be set to indicate the result. Otherwise, \a on_call_host_checked /// will be invoked when complete. bool grpc_channel_security_connector_check_call_host( - grpc_channel_security_connector* sc, const char* host, - grpc_auth_context* auth_context, grpc_closure* on_call_host_checked, - grpc_error** error); + grpc_exec_ctx* exec_ctx, grpc_channel_security_connector* sc, + const char* host, grpc_auth_context* auth_context, + grpc_closure* on_call_host_checked, grpc_error** error); /// Cancels a pending asychronous call to /// grpc_channel_security_connector_check_call_host() with /// \a on_call_host_checked as its callback. void grpc_channel_security_connector_cancel_check_call_host( - grpc_channel_security_connector* sc, grpc_closure* on_call_host_checked, - grpc_error* error); + grpc_exec_ctx* exec_ctx, grpc_channel_security_connector* sc, + grpc_closure* on_call_host_checked, grpc_error* error); /* Registers handshakers with \a handshake_mgr. */ void grpc_channel_security_connector_add_handshakers( - grpc_channel_security_connector* connector, + grpc_exec_ctx* exec_ctx, grpc_channel_security_connector* connector, grpc_handshake_manager* handshake_mgr); /* --- server_security_connector object. --- @@ -161,7 +168,8 @@ typedef struct grpc_server_security_connector grpc_server_security_connector; struct grpc_server_security_connector { grpc_security_connector base; grpc_server_credentials* server_creds; - void (*add_handshakers)(grpc_server_security_connector* sc, + void (*add_handshakers)(grpc_exec_ctx* exec_ctx, + grpc_server_security_connector* sc, grpc_handshake_manager* handshake_mgr); }; @@ -170,7 +178,8 @@ int grpc_server_security_connector_cmp(grpc_server_security_connector* sc1, grpc_server_security_connector* sc2); void grpc_server_security_connector_add_handshakers( - grpc_server_security_connector* sc, grpc_handshake_manager* handshake_mgr); + grpc_exec_ctx* exec_ctx, grpc_server_security_connector* sc, + grpc_handshake_manager* handshake_mgr); /* --- Creation security connectors. --- */ @@ -207,7 +216,7 @@ typedef struct { specific error code otherwise. */ grpc_security_status grpc_ssl_channel_security_connector_create( - grpc_channel_credentials* channel_creds, + grpc_exec_ctx* exec_ctx, grpc_channel_credentials* channel_creds, grpc_call_credentials* request_metadata_creds, const grpc_ssl_config* config, const char* target_name, const char* overridden_target_name, grpc_channel_security_connector** sc); @@ -233,7 +242,7 @@ typedef struct { specific error code otherwise. */ grpc_security_status grpc_ssl_server_security_connector_create( - grpc_server_credentials* server_credentials, + grpc_exec_ctx* exec_ctx, grpc_server_credentials* server_credentials, grpc_server_security_connector** sc); /* Util. */ diff --git a/src/core/lib/security/transport/security_handshaker.cc b/src/core/lib/security/transport/security_handshaker.cc index 7623fbfd5b..7067b70cb6 100644 --- a/src/core/lib/security/transport/security_handshaker.cc +++ b/src/core/lib/security/transport/security_handshaker.cc @@ -65,7 +65,8 @@ typedef struct { tsi_handshaker_result* handshaker_result; } security_handshaker; -static size_t move_read_buffer_into_handshake_buffer(security_handshaker* h) { +static size_t move_read_buffer_into_handshake_buffer(grpc_exec_ctx* exec_ctx, + security_handshaker* h) { size_t bytes_in_read_buffer = h->args->read_buffer->length; if (h->handshake_buffer_size < bytes_in_read_buffer) { h->handshake_buffer = @@ -78,45 +79,48 @@ static size_t move_read_buffer_into_handshake_buffer(security_handshaker* h) { memcpy(h->handshake_buffer + offset, GRPC_SLICE_START_PTR(next_slice), GRPC_SLICE_LENGTH(next_slice)); offset += GRPC_SLICE_LENGTH(next_slice); - grpc_slice_unref_internal(next_slice); + grpc_slice_unref_internal(exec_ctx, next_slice); } return bytes_in_read_buffer; } -static void security_handshaker_unref(security_handshaker* h) { +static void security_handshaker_unref(grpc_exec_ctx* exec_ctx, + security_handshaker* h) { if (gpr_unref(&h->refs)) { gpr_mu_destroy(&h->mu); tsi_handshaker_destroy(h->handshaker); tsi_handshaker_result_destroy(h->handshaker_result); if (h->endpoint_to_destroy != nullptr) { - grpc_endpoint_destroy(h->endpoint_to_destroy); + grpc_endpoint_destroy(exec_ctx, h->endpoint_to_destroy); } if (h->read_buffer_to_destroy != nullptr) { - grpc_slice_buffer_destroy_internal(h->read_buffer_to_destroy); + grpc_slice_buffer_destroy_internal(exec_ctx, h->read_buffer_to_destroy); gpr_free(h->read_buffer_to_destroy); } gpr_free(h->handshake_buffer); - grpc_slice_buffer_destroy_internal(&h->outgoing); + grpc_slice_buffer_destroy_internal(exec_ctx, &h->outgoing); GRPC_AUTH_CONTEXT_UNREF(h->auth_context, "handshake"); - GRPC_SECURITY_CONNECTOR_UNREF(h->connector, "handshake"); + GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, h->connector, "handshake"); gpr_free(h); } } // Set args fields to NULL, saving the endpoint and read buffer for // later destruction. -static void cleanup_args_for_failure_locked(security_handshaker* h) { +static void cleanup_args_for_failure_locked(grpc_exec_ctx* exec_ctx, + security_handshaker* h) { h->endpoint_to_destroy = h->args->endpoint; h->args->endpoint = nullptr; h->read_buffer_to_destroy = h->args->read_buffer; h->args->read_buffer = nullptr; - grpc_channel_args_destroy(h->args->args); + grpc_channel_args_destroy(exec_ctx, h->args->args); h->args->args = nullptr; } // If the handshake failed or we're shutting down, clean up and invoke the // callback with the error. -static void security_handshake_failed_locked(security_handshaker* h, +static void security_handshake_failed_locked(grpc_exec_ctx* exec_ctx, + security_handshaker* h, grpc_error* error) { if (error == GRPC_ERROR_NONE) { // If we were shut down after the handshake succeeded but before an @@ -131,33 +135,34 @@ static void security_handshake_failed_locked(security_handshaker* h, // before destroying them, even if we know that there are no // pending read/write callbacks. This should be fixed, at which // point this can be removed. - grpc_endpoint_shutdown(h->args->endpoint, GRPC_ERROR_REF(error)); + grpc_endpoint_shutdown(exec_ctx, h->args->endpoint, GRPC_ERROR_REF(error)); // Not shutting down, so the write failed. Clean up before // invoking the callback. - cleanup_args_for_failure_locked(h); + cleanup_args_for_failure_locked(exec_ctx, h); // Set shutdown to true so that subsequent calls to // security_handshaker_shutdown() do nothing. h->shutdown = true; } // Invoke callback. - GRPC_CLOSURE_SCHED(h->on_handshake_done, error); + GRPC_CLOSURE_SCHED(exec_ctx, h->on_handshake_done, error); } -static void on_peer_checked_inner(security_handshaker* h, grpc_error* error) { +static void on_peer_checked_inner(grpc_exec_ctx* exec_ctx, + security_handshaker* h, grpc_error* error) { if (error != GRPC_ERROR_NONE || h->shutdown) { - security_handshake_failed_locked(h, GRPC_ERROR_REF(error)); + security_handshake_failed_locked(exec_ctx, h, GRPC_ERROR_REF(error)); return; } // Create zero-copy frame protector, if implemented. tsi_zero_copy_grpc_protector* zero_copy_protector = nullptr; tsi_result result = tsi_handshaker_result_create_zero_copy_grpc_protector( - h->handshaker_result, nullptr, &zero_copy_protector); + exec_ctx, h->handshaker_result, nullptr, &zero_copy_protector); if (result != TSI_OK && result != TSI_UNIMPLEMENTED) { error = grpc_set_tsi_error_result( GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Zero-copy frame protector creation failed"), result); - security_handshake_failed_locked(h, error); + security_handshake_failed_locked(exec_ctx, h, error); return; } // Create frame protector if zero-copy frame protector is NULL. @@ -169,7 +174,7 @@ static void on_peer_checked_inner(security_handshaker* h, grpc_error* error) { error = grpc_set_tsi_error_result(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Frame protector creation failed"), result); - security_handshake_failed_locked(h, error); + security_handshake_failed_locked(exec_ctx, h, error); return; } } @@ -184,7 +189,7 @@ static void on_peer_checked_inner(security_handshaker* h, grpc_error* error) { grpc_slice_from_copied_buffer((char*)unused_bytes, unused_bytes_size); h->args->endpoint = grpc_secure_endpoint_create( protector, zero_copy_protector, h->args->endpoint, &slice, 1); - grpc_slice_unref_internal(slice); + grpc_slice_unref_internal(exec_ctx, slice); } else { h->args->endpoint = grpc_secure_endpoint_create( protector, zero_copy_protector, h->args->endpoint, nullptr, 0); @@ -196,23 +201,25 @@ static void on_peer_checked_inner(security_handshaker* h, grpc_error* error) { grpc_channel_args* tmp_args = h->args->args; h->args->args = grpc_channel_args_copy_and_add(tmp_args, &auth_context_arg, 1); - grpc_channel_args_destroy(tmp_args); + grpc_channel_args_destroy(exec_ctx, tmp_args); // Invoke callback. - GRPC_CLOSURE_SCHED(h->on_handshake_done, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, h->on_handshake_done, GRPC_ERROR_NONE); // Set shutdown to true so that subsequent calls to // security_handshaker_shutdown() do nothing. h->shutdown = true; } -static void on_peer_checked(void* arg, grpc_error* error) { +static void on_peer_checked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { security_handshaker* h = (security_handshaker*)arg; gpr_mu_lock(&h->mu); - on_peer_checked_inner(h, error); + on_peer_checked_inner(exec_ctx, h, error); gpr_mu_unlock(&h->mu); - security_handshaker_unref(h); + security_handshaker_unref(exec_ctx, h); } -static grpc_error* check_peer_locked(security_handshaker* h) { +static grpc_error* check_peer_locked(grpc_exec_ctx* exec_ctx, + security_handshaker* h) { tsi_peer peer; tsi_result result = tsi_handshaker_result_extract_peer(h->handshaker_result, &peer); @@ -220,20 +227,20 @@ static grpc_error* check_peer_locked(security_handshaker* h) { return grpc_set_tsi_error_result( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Peer extraction failed"), result); } - grpc_security_connector_check_peer(h->connector, peer, &h->auth_context, - &h->on_peer_checked); + grpc_security_connector_check_peer(exec_ctx, h->connector, peer, + &h->auth_context, &h->on_peer_checked); return GRPC_ERROR_NONE; } static grpc_error* on_handshake_next_done_locked( - security_handshaker* h, tsi_result result, + grpc_exec_ctx* exec_ctx, security_handshaker* h, tsi_result result, const unsigned char* bytes_to_send, size_t bytes_to_send_size, tsi_handshaker_result* handshaker_result) { grpc_error* error = GRPC_ERROR_NONE; // Read more if we need to. if (result == TSI_INCOMPLETE_DATA) { GPR_ASSERT(bytes_to_send_size == 0); - grpc_endpoint_read(h->args->endpoint, h->args->read_buffer, + grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, &h->on_handshake_data_received_from_peer); return error; } @@ -250,17 +257,17 @@ static grpc_error* on_handshake_next_done_locked( // Send data to peer, if needed. grpc_slice to_send = grpc_slice_from_copied_buffer( (const char*)bytes_to_send, bytes_to_send_size); - grpc_slice_buffer_reset_and_unref_internal(&h->outgoing); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &h->outgoing); grpc_slice_buffer_add(&h->outgoing, to_send); - grpc_endpoint_write(h->args->endpoint, &h->outgoing, + grpc_endpoint_write(exec_ctx, h->args->endpoint, &h->outgoing, &h->on_handshake_data_sent_to_peer); } else if (handshaker_result == nullptr) { // There is nothing to send, but need to read from peer. - grpc_endpoint_read(h->args->endpoint, h->args->read_buffer, + grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, &h->on_handshake_data_received_from_peer); } else { // Handshake has finished, check peer and so on. - error = check_peer_locked(h); + error = check_peer_locked(exec_ctx, h); } return error; } @@ -271,22 +278,24 @@ static void on_handshake_next_done_grpc_wrapper( security_handshaker* h = (security_handshaker*)user_data; // This callback will be invoked by TSI in a non-grpc thread, so it's // safe to create our own exec_ctx here. - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_mu_lock(&h->mu); - grpc_error* error = on_handshake_next_done_locked( - h, result, bytes_to_send, bytes_to_send_size, handshaker_result); + grpc_error* error = + on_handshake_next_done_locked(&exec_ctx, h, result, bytes_to_send, + bytes_to_send_size, handshaker_result); if (error != GRPC_ERROR_NONE) { - security_handshake_failed_locked(h, error); + security_handshake_failed_locked(&exec_ctx, h, error); gpr_mu_unlock(&h->mu); - security_handshaker_unref(h); + security_handshaker_unref(&exec_ctx, h); } else { gpr_mu_unlock(&h->mu); } + grpc_exec_ctx_finish(&exec_ctx); } static grpc_error* do_handshaker_next_locked( - security_handshaker* h, const unsigned char* bytes_received, - size_t bytes_received_size) { + grpc_exec_ctx* exec_ctx, security_handshaker* h, + const unsigned char* bytes_received, size_t bytes_received_size) { // Invoke TSI handshaker. const unsigned char* bytes_to_send = nullptr; size_t bytes_to_send_size = 0; @@ -302,57 +311,62 @@ static grpc_error* do_handshaker_next_locked( } // Handshaker returned synchronously. Invoke callback directly in // this thread with our existing exec_ctx. - return on_handshake_next_done_locked(h, result, bytes_to_send, + return on_handshake_next_done_locked(exec_ctx, h, result, bytes_to_send, bytes_to_send_size, handshaker_result); } -static void on_handshake_data_received_from_peer(void* arg, grpc_error* error) { +static void on_handshake_data_received_from_peer(grpc_exec_ctx* exec_ctx, + void* arg, grpc_error* error) { security_handshaker* h = (security_handshaker*)arg; gpr_mu_lock(&h->mu); if (error != GRPC_ERROR_NONE || h->shutdown) { security_handshake_failed_locked( - h, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Handshake read failed", &error, 1)); + exec_ctx, h, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Handshake read failed", &error, 1)); gpr_mu_unlock(&h->mu); - security_handshaker_unref(h); + security_handshaker_unref(exec_ctx, h); return; } // Copy all slices received. - size_t bytes_received_size = move_read_buffer_into_handshake_buffer(h); + size_t bytes_received_size = + move_read_buffer_into_handshake_buffer(exec_ctx, h); // Call TSI handshaker. - error = - do_handshaker_next_locked(h, h->handshake_buffer, bytes_received_size); + error = do_handshaker_next_locked(exec_ctx, h, h->handshake_buffer, + bytes_received_size); if (error != GRPC_ERROR_NONE) { - security_handshake_failed_locked(h, error); + security_handshake_failed_locked(exec_ctx, h, error); gpr_mu_unlock(&h->mu); - security_handshaker_unref(h); + security_handshaker_unref(exec_ctx, h); } else { gpr_mu_unlock(&h->mu); } } -static void on_handshake_data_sent_to_peer(void* arg, grpc_error* error) { +static void on_handshake_data_sent_to_peer(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { security_handshaker* h = (security_handshaker*)arg; gpr_mu_lock(&h->mu); if (error != GRPC_ERROR_NONE || h->shutdown) { security_handshake_failed_locked( - h, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Handshake write failed", &error, 1)); + exec_ctx, h, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Handshake write failed", &error, 1)); gpr_mu_unlock(&h->mu); - security_handshaker_unref(h); + security_handshaker_unref(exec_ctx, h); return; } // We may be done. if (h->handshaker_result == nullptr) { - grpc_endpoint_read(h->args->endpoint, h->args->read_buffer, + grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, &h->on_handshake_data_received_from_peer); } else { - error = check_peer_locked(h); + error = check_peer_locked(exec_ctx, h); if (error != GRPC_ERROR_NONE) { - security_handshake_failed_locked(h, error); + security_handshake_failed_locked(exec_ctx, h, error); gpr_mu_unlock(&h->mu); - security_handshaker_unref(h); + security_handshaker_unref(exec_ctx, h); return; } } @@ -363,25 +377,28 @@ static void on_handshake_data_sent_to_peer(void* arg, grpc_error* error) { // public handshaker API // -static void security_handshaker_destroy(grpc_handshaker* handshaker) { +static void security_handshaker_destroy(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker) { security_handshaker* h = (security_handshaker*)handshaker; - security_handshaker_unref(h); + security_handshaker_unref(exec_ctx, h); } -static void security_handshaker_shutdown(grpc_handshaker* handshaker, +static void security_handshaker_shutdown(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker, grpc_error* why) { security_handshaker* h = (security_handshaker*)handshaker; gpr_mu_lock(&h->mu); if (!h->shutdown) { h->shutdown = true; - grpc_endpoint_shutdown(h->args->endpoint, GRPC_ERROR_REF(why)); - cleanup_args_for_failure_locked(h); + grpc_endpoint_shutdown(exec_ctx, h->args->endpoint, GRPC_ERROR_REF(why)); + cleanup_args_for_failure_locked(exec_ctx, h); } gpr_mu_unlock(&h->mu); GRPC_ERROR_UNREF(why); } -static void security_handshaker_do_handshake(grpc_handshaker* handshaker, +static void security_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker, grpc_tcp_server_acceptor* acceptor, grpc_closure* on_handshake_done, grpc_handshaker_args* args) { @@ -390,13 +407,14 @@ static void security_handshaker_do_handshake(grpc_handshaker* handshaker, h->args = args; h->on_handshake_done = on_handshake_done; gpr_ref(&h->refs); - size_t bytes_received_size = move_read_buffer_into_handshake_buffer(h); - grpc_error* error = - do_handshaker_next_locked(h, h->handshake_buffer, bytes_received_size); + size_t bytes_received_size = + move_read_buffer_into_handshake_buffer(exec_ctx, h); + grpc_error* error = do_handshaker_next_locked( + exec_ctx, h, h->handshake_buffer, bytes_received_size); if (error != GRPC_ERROR_NONE) { - security_handshake_failed_locked(h, error); + security_handshake_failed_locked(exec_ctx, h, error); gpr_mu_unlock(&h->mu); - security_handshaker_unref(h); + security_handshaker_unref(exec_ctx, h); return; } gpr_mu_unlock(&h->mu); @@ -407,7 +425,8 @@ static const grpc_handshaker_vtable security_handshaker_vtable = { security_handshaker_do_handshake}; static grpc_handshaker* security_handshaker_create( - tsi_handshaker* handshaker, grpc_security_connector* connector) { + grpc_exec_ctx* exec_ctx, tsi_handshaker* handshaker, + grpc_security_connector* connector) { security_handshaker* h = (security_handshaker*)gpr_zalloc(sizeof(security_handshaker)); grpc_handshaker_init(&security_handshaker_vtable, &h->base); @@ -433,20 +452,23 @@ static grpc_handshaker* security_handshaker_create( // fail_handshaker // -static void fail_handshaker_destroy(grpc_handshaker* handshaker) { +static void fail_handshaker_destroy(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker) { gpr_free(handshaker); } -static void fail_handshaker_shutdown(grpc_handshaker* handshaker, +static void fail_handshaker_shutdown(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker, grpc_error* why) { GRPC_ERROR_UNREF(why); } -static void fail_handshaker_do_handshake(grpc_handshaker* handshaker, +static void fail_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker, grpc_tcp_server_acceptor* acceptor, grpc_closure* on_handshake_done, grpc_handshaker_args* args) { - GRPC_CLOSURE_SCHED(on_handshake_done, + GRPC_CLOSURE_SCHED(exec_ctx, on_handshake_done, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Failed to create security handshaker")); } @@ -466,27 +488,27 @@ static grpc_handshaker* fail_handshaker_create() { // static void client_handshaker_factory_add_handshakers( - grpc_handshaker_factory* handshaker_factory, const grpc_channel_args* args, - grpc_handshake_manager* handshake_mgr) { + grpc_exec_ctx* exec_ctx, grpc_handshaker_factory* handshaker_factory, + const grpc_channel_args* args, grpc_handshake_manager* handshake_mgr) { grpc_channel_security_connector* security_connector = (grpc_channel_security_connector*)grpc_security_connector_find_in_args( args); - grpc_channel_security_connector_add_handshakers(security_connector, + grpc_channel_security_connector_add_handshakers(exec_ctx, security_connector, handshake_mgr); } static void server_handshaker_factory_add_handshakers( - grpc_handshaker_factory* hf, const grpc_channel_args* args, - grpc_handshake_manager* handshake_mgr) { + grpc_exec_ctx* exec_ctx, grpc_handshaker_factory* hf, + const grpc_channel_args* args, grpc_handshake_manager* handshake_mgr) { grpc_server_security_connector* security_connector = (grpc_server_security_connector*)grpc_security_connector_find_in_args( args); - grpc_server_security_connector_add_handshakers(security_connector, + grpc_server_security_connector_add_handshakers(exec_ctx, security_connector, handshake_mgr); } static void handshaker_factory_destroy( - grpc_handshaker_factory* handshaker_factory) {} + grpc_exec_ctx* exec_ctx, grpc_handshaker_factory* handshaker_factory) {} static const grpc_handshaker_factory_vtable client_handshaker_factory_vtable = { client_handshaker_factory_add_handshakers, handshaker_factory_destroy}; @@ -505,13 +527,14 @@ static grpc_handshaker_factory server_handshaker_factory = { // grpc_handshaker* grpc_security_handshaker_create( - tsi_handshaker* handshaker, grpc_security_connector* connector) { + grpc_exec_ctx* exec_ctx, tsi_handshaker* handshaker, + grpc_security_connector* connector) { // If no TSI handshaker was created, return a handshaker that always fails. // Otherwise, return a real security handshaker. if (handshaker == nullptr) { return fail_handshaker_create(); } else { - return security_handshaker_create(handshaker, connector); + return security_handshaker_create(exec_ctx, handshaker, connector); } } diff --git a/src/core/lib/security/transport/security_handshaker.h b/src/core/lib/security/transport/security_handshaker.h index 6cd6446b5a..6c3a0510ce 100644 --- a/src/core/lib/security/transport/security_handshaker.h +++ b/src/core/lib/security/transport/security_handshaker.h @@ -25,7 +25,8 @@ /// Creates a security handshaker using \a handshaker. grpc_handshaker* grpc_security_handshaker_create( - tsi_handshaker* handshaker, grpc_security_connector* connector); + grpc_exec_ctx* exec_ctx, tsi_handshaker* handshaker, + grpc_security_connector* connector); /// Registers security handshaker factories. void grpc_security_register_handshaker_factories(); diff --git a/src/core/lib/security/transport/server_auth_filter.cc b/src/core/lib/security/transport/server_auth_filter.cc index 73653f2a66..9cf368acd0 100644 --- a/src/core/lib/security/transport/server_auth_filter.cc +++ b/src/core/lib/security/transport/server_auth_filter.cc @@ -73,7 +73,8 @@ static grpc_metadata_array metadata_batch_to_md_array( return result; } -static grpc_filtered_mdelem remove_consumed_md(void* user_data, +static grpc_filtered_mdelem remove_consumed_md(grpc_exec_ctx* exec_ctx, + void* user_data, grpc_mdelem md) { grpc_call_element* elem = (grpc_call_element*)user_data; call_data* calld = (call_data*)elem->call_data; @@ -87,7 +88,8 @@ static grpc_filtered_mdelem remove_consumed_md(void* user_data, return GRPC_FILTERED_MDELEM(md); } -static void on_md_processing_done_inner(grpc_call_element* elem, +static void on_md_processing_done_inner(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, const grpc_metadata* consumed_md, size_t num_consumed_md, const grpc_metadata* response_md, @@ -105,10 +107,11 @@ static void on_md_processing_done_inner(grpc_call_element* elem, calld->consumed_md = consumed_md; calld->num_consumed_md = num_consumed_md; error = grpc_metadata_batch_filter( - batch->payload->recv_initial_metadata.recv_initial_metadata, + exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata, remove_consumed_md, elem, "Response metadata filtering error"); } - GRPC_CLOSURE_SCHED(calld->original_recv_initial_metadata_ready, error); + GRPC_CLOSURE_SCHED(exec_ctx, calld->original_recv_initial_metadata_ready, + error); } // Called from application code. @@ -118,7 +121,7 @@ static void on_md_processing_done( grpc_status_code status, const char* error_details) { grpc_call_element* elem = (grpc_call_element*)user_data; call_data* calld = (call_data*)elem->call_data; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; // If the call was not cancelled while we were in flight, process the result. if (gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT, (gpr_atm)STATE_DONE)) { @@ -131,32 +134,34 @@ static void on_md_processing_done( GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details), GRPC_ERROR_INT_GRPC_STATUS, status); } - on_md_processing_done_inner(elem, consumed_md, num_consumed_md, response_md, - num_response_md, error); + on_md_processing_done_inner(&exec_ctx, elem, consumed_md, num_consumed_md, + response_md, num_response_md, error); } // Clean up. for (size_t i = 0; i < calld->md.count; i++) { - grpc_slice_unref_internal(calld->md.metadata[i].key); - grpc_slice_unref_internal(calld->md.metadata[i].value); + grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key); + grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value); } grpc_metadata_array_destroy(&calld->md); - GRPC_CALL_STACK_UNREF(calld->owning_call, "server_auth_metadata"); + GRPC_CALL_STACK_UNREF(&exec_ctx, calld->owning_call, "server_auth_metadata"); + grpc_exec_ctx_finish(&exec_ctx); } -static void cancel_call(void* arg, grpc_error* error) { +static void cancel_call(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; // If the result was not already processed, invoke the callback now. if (error != GRPC_ERROR_NONE && gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT, (gpr_atm)STATE_CANCELLED)) { - on_md_processing_done_inner(elem, nullptr, 0, nullptr, 0, + on_md_processing_done_inner(exec_ctx, elem, nullptr, 0, nullptr, 0, GRPC_ERROR_REF(error)); } - GRPC_CALL_STACK_UNREF(calld->owning_call, "cancel_call"); + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "cancel_call"); } -static void recv_initial_metadata_ready(void* arg, grpc_error* error) { +static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; channel_data* chand = (channel_data*)elem->channel_data; call_data* calld = (call_data*)elem->call_data; @@ -168,7 +173,7 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { GRPC_CALL_STACK_REF(calld->owning_call, "cancel_call"); GRPC_CLOSURE_INIT(&calld->cancel_closure, cancel_call, elem, grpc_schedule_on_exec_ctx); - grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, + grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, &calld->cancel_closure); GRPC_CALL_STACK_REF(calld->owning_call, "server_auth_metadata"); calld->md = metadata_batch_to_md_array( @@ -179,12 +184,13 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { return; } } - GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, + GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_initial_metadata_ready, GRPC_ERROR_REF(error)); } static void auth_start_transport_stream_op_batch( - grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_transport_stream_op_batch* batch) { call_data* calld = (call_data*)elem->call_data; if (batch->recv_initial_metadata) { // Inject our callback. @@ -194,11 +200,12 @@ static void auth_start_transport_stream_op_batch( batch->payload->recv_initial_metadata.recv_initial_metadata_ready = &calld->recv_initial_metadata_ready; } - grpc_call_next_op(elem, batch); + grpc_call_next_op(exec_ctx, elem, batch); } /* Constructor for call_data */ -static grpc_error* init_call_elem(grpc_call_element* elem, +static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, const grpc_call_element_args* args) { call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; @@ -224,12 +231,13 @@ static grpc_error* init_call_elem(grpc_call_element* elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element* elem, +static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) {} /* Constructor for channel_data */ -static grpc_error* init_channel_elem(grpc_channel_element* elem, +static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, grpc_channel_element_args* args) { GPR_ASSERT(!args->is_last); channel_data* chand = (channel_data*)elem->channel_data; @@ -245,10 +253,11 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem, } /* Destructor for channel data */ -static void destroy_channel_elem(grpc_channel_element* elem) { +static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem) { channel_data* chand = (channel_data*)elem->channel_data; GRPC_AUTH_CONTEXT_UNREF(chand->auth_context, "server_auth_filter"); - grpc_server_credentials_unref(chand->creds); + grpc_server_credentials_unref(exec_ctx, chand->creds); } const grpc_channel_filter grpc_server_auth_filter = { |