diff options
author | 2017-12-06 09:05:05 -0800 | |
---|---|---|
committer | 2017-12-06 09:05:05 -0800 | |
commit | ad4d2dde0052efbbf49d64b0843c45f0381cfeb3 (patch) | |
tree | 6a657f8c6179d873b34505cdc24bce9462ca68eb /src/core/lib/security/transport/security_handshaker.cc | |
parent | a3df36cc2505a89c2f481eea4a66a87b3002844a (diff) |
Revert "All instances of exec_ctx being passed around in src/core removed"
Diffstat (limited to 'src/core/lib/security/transport/security_handshaker.cc')
-rw-r--r-- | src/core/lib/security/transport/security_handshaker.cc | 189 |
1 files changed, 106 insertions, 83 deletions
diff --git a/src/core/lib/security/transport/security_handshaker.cc b/src/core/lib/security/transport/security_handshaker.cc index 7623fbfd5b..7067b70cb6 100644 --- a/src/core/lib/security/transport/security_handshaker.cc +++ b/src/core/lib/security/transport/security_handshaker.cc @@ -65,7 +65,8 @@ typedef struct { tsi_handshaker_result* handshaker_result; } security_handshaker; -static size_t move_read_buffer_into_handshake_buffer(security_handshaker* h) { +static size_t move_read_buffer_into_handshake_buffer(grpc_exec_ctx* exec_ctx, + security_handshaker* h) { size_t bytes_in_read_buffer = h->args->read_buffer->length; if (h->handshake_buffer_size < bytes_in_read_buffer) { h->handshake_buffer = @@ -78,45 +79,48 @@ static size_t move_read_buffer_into_handshake_buffer(security_handshaker* h) { memcpy(h->handshake_buffer + offset, GRPC_SLICE_START_PTR(next_slice), GRPC_SLICE_LENGTH(next_slice)); offset += GRPC_SLICE_LENGTH(next_slice); - grpc_slice_unref_internal(next_slice); + grpc_slice_unref_internal(exec_ctx, next_slice); } return bytes_in_read_buffer; } -static void security_handshaker_unref(security_handshaker* h) { +static void security_handshaker_unref(grpc_exec_ctx* exec_ctx, + security_handshaker* h) { if (gpr_unref(&h->refs)) { gpr_mu_destroy(&h->mu); tsi_handshaker_destroy(h->handshaker); tsi_handshaker_result_destroy(h->handshaker_result); if (h->endpoint_to_destroy != nullptr) { - grpc_endpoint_destroy(h->endpoint_to_destroy); + grpc_endpoint_destroy(exec_ctx, h->endpoint_to_destroy); } if (h->read_buffer_to_destroy != nullptr) { - grpc_slice_buffer_destroy_internal(h->read_buffer_to_destroy); + grpc_slice_buffer_destroy_internal(exec_ctx, h->read_buffer_to_destroy); gpr_free(h->read_buffer_to_destroy); } gpr_free(h->handshake_buffer); - grpc_slice_buffer_destroy_internal(&h->outgoing); + grpc_slice_buffer_destroy_internal(exec_ctx, &h->outgoing); GRPC_AUTH_CONTEXT_UNREF(h->auth_context, "handshake"); - GRPC_SECURITY_CONNECTOR_UNREF(h->connector, "handshake"); + GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, h->connector, "handshake"); gpr_free(h); } } // Set args fields to NULL, saving the endpoint and read buffer for // later destruction. -static void cleanup_args_for_failure_locked(security_handshaker* h) { +static void cleanup_args_for_failure_locked(grpc_exec_ctx* exec_ctx, + security_handshaker* h) { h->endpoint_to_destroy = h->args->endpoint; h->args->endpoint = nullptr; h->read_buffer_to_destroy = h->args->read_buffer; h->args->read_buffer = nullptr; - grpc_channel_args_destroy(h->args->args); + grpc_channel_args_destroy(exec_ctx, h->args->args); h->args->args = nullptr; } // If the handshake failed or we're shutting down, clean up and invoke the // callback with the error. -static void security_handshake_failed_locked(security_handshaker* h, +static void security_handshake_failed_locked(grpc_exec_ctx* exec_ctx, + security_handshaker* h, grpc_error* error) { if (error == GRPC_ERROR_NONE) { // If we were shut down after the handshake succeeded but before an @@ -131,33 +135,34 @@ static void security_handshake_failed_locked(security_handshaker* h, // before destroying them, even if we know that there are no // pending read/write callbacks. This should be fixed, at which // point this can be removed. - grpc_endpoint_shutdown(h->args->endpoint, GRPC_ERROR_REF(error)); + grpc_endpoint_shutdown(exec_ctx, h->args->endpoint, GRPC_ERROR_REF(error)); // Not shutting down, so the write failed. Clean up before // invoking the callback. - cleanup_args_for_failure_locked(h); + cleanup_args_for_failure_locked(exec_ctx, h); // Set shutdown to true so that subsequent calls to // security_handshaker_shutdown() do nothing. h->shutdown = true; } // Invoke callback. - GRPC_CLOSURE_SCHED(h->on_handshake_done, error); + GRPC_CLOSURE_SCHED(exec_ctx, h->on_handshake_done, error); } -static void on_peer_checked_inner(security_handshaker* h, grpc_error* error) { +static void on_peer_checked_inner(grpc_exec_ctx* exec_ctx, + security_handshaker* h, grpc_error* error) { if (error != GRPC_ERROR_NONE || h->shutdown) { - security_handshake_failed_locked(h, GRPC_ERROR_REF(error)); + security_handshake_failed_locked(exec_ctx, h, GRPC_ERROR_REF(error)); return; } // Create zero-copy frame protector, if implemented. tsi_zero_copy_grpc_protector* zero_copy_protector = nullptr; tsi_result result = tsi_handshaker_result_create_zero_copy_grpc_protector( - h->handshaker_result, nullptr, &zero_copy_protector); + exec_ctx, h->handshaker_result, nullptr, &zero_copy_protector); if (result != TSI_OK && result != TSI_UNIMPLEMENTED) { error = grpc_set_tsi_error_result( GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Zero-copy frame protector creation failed"), result); - security_handshake_failed_locked(h, error); + security_handshake_failed_locked(exec_ctx, h, error); return; } // Create frame protector if zero-copy frame protector is NULL. @@ -169,7 +174,7 @@ static void on_peer_checked_inner(security_handshaker* h, grpc_error* error) { error = grpc_set_tsi_error_result(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Frame protector creation failed"), result); - security_handshake_failed_locked(h, error); + security_handshake_failed_locked(exec_ctx, h, error); return; } } @@ -184,7 +189,7 @@ static void on_peer_checked_inner(security_handshaker* h, grpc_error* error) { grpc_slice_from_copied_buffer((char*)unused_bytes, unused_bytes_size); h->args->endpoint = grpc_secure_endpoint_create( protector, zero_copy_protector, h->args->endpoint, &slice, 1); - grpc_slice_unref_internal(slice); + grpc_slice_unref_internal(exec_ctx, slice); } else { h->args->endpoint = grpc_secure_endpoint_create( protector, zero_copy_protector, h->args->endpoint, nullptr, 0); @@ -196,23 +201,25 @@ static void on_peer_checked_inner(security_handshaker* h, grpc_error* error) { grpc_channel_args* tmp_args = h->args->args; h->args->args = grpc_channel_args_copy_and_add(tmp_args, &auth_context_arg, 1); - grpc_channel_args_destroy(tmp_args); + grpc_channel_args_destroy(exec_ctx, tmp_args); // Invoke callback. - GRPC_CLOSURE_SCHED(h->on_handshake_done, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, h->on_handshake_done, GRPC_ERROR_NONE); // Set shutdown to true so that subsequent calls to // security_handshaker_shutdown() do nothing. h->shutdown = true; } -static void on_peer_checked(void* arg, grpc_error* error) { +static void on_peer_checked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { security_handshaker* h = (security_handshaker*)arg; gpr_mu_lock(&h->mu); - on_peer_checked_inner(h, error); + on_peer_checked_inner(exec_ctx, h, error); gpr_mu_unlock(&h->mu); - security_handshaker_unref(h); + security_handshaker_unref(exec_ctx, h); } -static grpc_error* check_peer_locked(security_handshaker* h) { +static grpc_error* check_peer_locked(grpc_exec_ctx* exec_ctx, + security_handshaker* h) { tsi_peer peer; tsi_result result = tsi_handshaker_result_extract_peer(h->handshaker_result, &peer); @@ -220,20 +227,20 @@ static grpc_error* check_peer_locked(security_handshaker* h) { return grpc_set_tsi_error_result( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Peer extraction failed"), result); } - grpc_security_connector_check_peer(h->connector, peer, &h->auth_context, - &h->on_peer_checked); + grpc_security_connector_check_peer(exec_ctx, h->connector, peer, + &h->auth_context, &h->on_peer_checked); return GRPC_ERROR_NONE; } static grpc_error* on_handshake_next_done_locked( - security_handshaker* h, tsi_result result, + grpc_exec_ctx* exec_ctx, security_handshaker* h, tsi_result result, const unsigned char* bytes_to_send, size_t bytes_to_send_size, tsi_handshaker_result* handshaker_result) { grpc_error* error = GRPC_ERROR_NONE; // Read more if we need to. if (result == TSI_INCOMPLETE_DATA) { GPR_ASSERT(bytes_to_send_size == 0); - grpc_endpoint_read(h->args->endpoint, h->args->read_buffer, + grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, &h->on_handshake_data_received_from_peer); return error; } @@ -250,17 +257,17 @@ static grpc_error* on_handshake_next_done_locked( // Send data to peer, if needed. grpc_slice to_send = grpc_slice_from_copied_buffer( (const char*)bytes_to_send, bytes_to_send_size); - grpc_slice_buffer_reset_and_unref_internal(&h->outgoing); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &h->outgoing); grpc_slice_buffer_add(&h->outgoing, to_send); - grpc_endpoint_write(h->args->endpoint, &h->outgoing, + grpc_endpoint_write(exec_ctx, h->args->endpoint, &h->outgoing, &h->on_handshake_data_sent_to_peer); } else if (handshaker_result == nullptr) { // There is nothing to send, but need to read from peer. - grpc_endpoint_read(h->args->endpoint, h->args->read_buffer, + grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, &h->on_handshake_data_received_from_peer); } else { // Handshake has finished, check peer and so on. - error = check_peer_locked(h); + error = check_peer_locked(exec_ctx, h); } return error; } @@ -271,22 +278,24 @@ static void on_handshake_next_done_grpc_wrapper( security_handshaker* h = (security_handshaker*)user_data; // This callback will be invoked by TSI in a non-grpc thread, so it's // safe to create our own exec_ctx here. - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_mu_lock(&h->mu); - grpc_error* error = on_handshake_next_done_locked( - h, result, bytes_to_send, bytes_to_send_size, handshaker_result); + grpc_error* error = + on_handshake_next_done_locked(&exec_ctx, h, result, bytes_to_send, + bytes_to_send_size, handshaker_result); if (error != GRPC_ERROR_NONE) { - security_handshake_failed_locked(h, error); + security_handshake_failed_locked(&exec_ctx, h, error); gpr_mu_unlock(&h->mu); - security_handshaker_unref(h); + security_handshaker_unref(&exec_ctx, h); } else { gpr_mu_unlock(&h->mu); } + grpc_exec_ctx_finish(&exec_ctx); } static grpc_error* do_handshaker_next_locked( - security_handshaker* h, const unsigned char* bytes_received, - size_t bytes_received_size) { + grpc_exec_ctx* exec_ctx, security_handshaker* h, + const unsigned char* bytes_received, size_t bytes_received_size) { // Invoke TSI handshaker. const unsigned char* bytes_to_send = nullptr; size_t bytes_to_send_size = 0; @@ -302,57 +311,62 @@ static grpc_error* do_handshaker_next_locked( } // Handshaker returned synchronously. Invoke callback directly in // this thread with our existing exec_ctx. - return on_handshake_next_done_locked(h, result, bytes_to_send, + return on_handshake_next_done_locked(exec_ctx, h, result, bytes_to_send, bytes_to_send_size, handshaker_result); } -static void on_handshake_data_received_from_peer(void* arg, grpc_error* error) { +static void on_handshake_data_received_from_peer(grpc_exec_ctx* exec_ctx, + void* arg, grpc_error* error) { security_handshaker* h = (security_handshaker*)arg; gpr_mu_lock(&h->mu); if (error != GRPC_ERROR_NONE || h->shutdown) { security_handshake_failed_locked( - h, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Handshake read failed", &error, 1)); + exec_ctx, h, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Handshake read failed", &error, 1)); gpr_mu_unlock(&h->mu); - security_handshaker_unref(h); + security_handshaker_unref(exec_ctx, h); return; } // Copy all slices received. - size_t bytes_received_size = move_read_buffer_into_handshake_buffer(h); + size_t bytes_received_size = + move_read_buffer_into_handshake_buffer(exec_ctx, h); // Call TSI handshaker. - error = - do_handshaker_next_locked(h, h->handshake_buffer, bytes_received_size); + error = do_handshaker_next_locked(exec_ctx, h, h->handshake_buffer, + bytes_received_size); if (error != GRPC_ERROR_NONE) { - security_handshake_failed_locked(h, error); + security_handshake_failed_locked(exec_ctx, h, error); gpr_mu_unlock(&h->mu); - security_handshaker_unref(h); + security_handshaker_unref(exec_ctx, h); } else { gpr_mu_unlock(&h->mu); } } -static void on_handshake_data_sent_to_peer(void* arg, grpc_error* error) { +static void on_handshake_data_sent_to_peer(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { security_handshaker* h = (security_handshaker*)arg; gpr_mu_lock(&h->mu); if (error != GRPC_ERROR_NONE || h->shutdown) { security_handshake_failed_locked( - h, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Handshake write failed", &error, 1)); + exec_ctx, h, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Handshake write failed", &error, 1)); gpr_mu_unlock(&h->mu); - security_handshaker_unref(h); + security_handshaker_unref(exec_ctx, h); return; } // We may be done. if (h->handshaker_result == nullptr) { - grpc_endpoint_read(h->args->endpoint, h->args->read_buffer, + grpc_endpoint_read(exec_ctx, h->args->endpoint, h->args->read_buffer, &h->on_handshake_data_received_from_peer); } else { - error = check_peer_locked(h); + error = check_peer_locked(exec_ctx, h); if (error != GRPC_ERROR_NONE) { - security_handshake_failed_locked(h, error); + security_handshake_failed_locked(exec_ctx, h, error); gpr_mu_unlock(&h->mu); - security_handshaker_unref(h); + security_handshaker_unref(exec_ctx, h); return; } } @@ -363,25 +377,28 @@ static void on_handshake_data_sent_to_peer(void* arg, grpc_error* error) { // public handshaker API // -static void security_handshaker_destroy(grpc_handshaker* handshaker) { +static void security_handshaker_destroy(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker) { security_handshaker* h = (security_handshaker*)handshaker; - security_handshaker_unref(h); + security_handshaker_unref(exec_ctx, h); } -static void security_handshaker_shutdown(grpc_handshaker* handshaker, +static void security_handshaker_shutdown(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker, grpc_error* why) { security_handshaker* h = (security_handshaker*)handshaker; gpr_mu_lock(&h->mu); if (!h->shutdown) { h->shutdown = true; - grpc_endpoint_shutdown(h->args->endpoint, GRPC_ERROR_REF(why)); - cleanup_args_for_failure_locked(h); + grpc_endpoint_shutdown(exec_ctx, h->args->endpoint, GRPC_ERROR_REF(why)); + cleanup_args_for_failure_locked(exec_ctx, h); } gpr_mu_unlock(&h->mu); GRPC_ERROR_UNREF(why); } -static void security_handshaker_do_handshake(grpc_handshaker* handshaker, +static void security_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker, grpc_tcp_server_acceptor* acceptor, grpc_closure* on_handshake_done, grpc_handshaker_args* args) { @@ -390,13 +407,14 @@ static void security_handshaker_do_handshake(grpc_handshaker* handshaker, h->args = args; h->on_handshake_done = on_handshake_done; gpr_ref(&h->refs); - size_t bytes_received_size = move_read_buffer_into_handshake_buffer(h); - grpc_error* error = - do_handshaker_next_locked(h, h->handshake_buffer, bytes_received_size); + size_t bytes_received_size = + move_read_buffer_into_handshake_buffer(exec_ctx, h); + grpc_error* error = do_handshaker_next_locked( + exec_ctx, h, h->handshake_buffer, bytes_received_size); if (error != GRPC_ERROR_NONE) { - security_handshake_failed_locked(h, error); + security_handshake_failed_locked(exec_ctx, h, error); gpr_mu_unlock(&h->mu); - security_handshaker_unref(h); + security_handshaker_unref(exec_ctx, h); return; } gpr_mu_unlock(&h->mu); @@ -407,7 +425,8 @@ static const grpc_handshaker_vtable security_handshaker_vtable = { security_handshaker_do_handshake}; static grpc_handshaker* security_handshaker_create( - tsi_handshaker* handshaker, grpc_security_connector* connector) { + grpc_exec_ctx* exec_ctx, tsi_handshaker* handshaker, + grpc_security_connector* connector) { security_handshaker* h = (security_handshaker*)gpr_zalloc(sizeof(security_handshaker)); grpc_handshaker_init(&security_handshaker_vtable, &h->base); @@ -433,20 +452,23 @@ static grpc_handshaker* security_handshaker_create( // fail_handshaker // -static void fail_handshaker_destroy(grpc_handshaker* handshaker) { +static void fail_handshaker_destroy(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker) { gpr_free(handshaker); } -static void fail_handshaker_shutdown(grpc_handshaker* handshaker, +static void fail_handshaker_shutdown(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker, grpc_error* why) { GRPC_ERROR_UNREF(why); } -static void fail_handshaker_do_handshake(grpc_handshaker* handshaker, +static void fail_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker, grpc_tcp_server_acceptor* acceptor, grpc_closure* on_handshake_done, grpc_handshaker_args* args) { - GRPC_CLOSURE_SCHED(on_handshake_done, + GRPC_CLOSURE_SCHED(exec_ctx, on_handshake_done, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Failed to create security handshaker")); } @@ -466,27 +488,27 @@ static grpc_handshaker* fail_handshaker_create() { // static void client_handshaker_factory_add_handshakers( - grpc_handshaker_factory* handshaker_factory, const grpc_channel_args* args, - grpc_handshake_manager* handshake_mgr) { + grpc_exec_ctx* exec_ctx, grpc_handshaker_factory* handshaker_factory, + const grpc_channel_args* args, grpc_handshake_manager* handshake_mgr) { grpc_channel_security_connector* security_connector = (grpc_channel_security_connector*)grpc_security_connector_find_in_args( args); - grpc_channel_security_connector_add_handshakers(security_connector, + grpc_channel_security_connector_add_handshakers(exec_ctx, security_connector, handshake_mgr); } static void server_handshaker_factory_add_handshakers( - grpc_handshaker_factory* hf, const grpc_channel_args* args, - grpc_handshake_manager* handshake_mgr) { + grpc_exec_ctx* exec_ctx, grpc_handshaker_factory* hf, + const grpc_channel_args* args, grpc_handshake_manager* handshake_mgr) { grpc_server_security_connector* security_connector = (grpc_server_security_connector*)grpc_security_connector_find_in_args( args); - grpc_server_security_connector_add_handshakers(security_connector, + grpc_server_security_connector_add_handshakers(exec_ctx, security_connector, handshake_mgr); } static void handshaker_factory_destroy( - grpc_handshaker_factory* handshaker_factory) {} + grpc_exec_ctx* exec_ctx, grpc_handshaker_factory* handshaker_factory) {} static const grpc_handshaker_factory_vtable client_handshaker_factory_vtable = { client_handshaker_factory_add_handshakers, handshaker_factory_destroy}; @@ -505,13 +527,14 @@ static grpc_handshaker_factory server_handshaker_factory = { // grpc_handshaker* grpc_security_handshaker_create( - tsi_handshaker* handshaker, grpc_security_connector* connector) { + grpc_exec_ctx* exec_ctx, tsi_handshaker* handshaker, + grpc_security_connector* connector) { // If no TSI handshaker was created, return a handshaker that always fails. // Otherwise, return a real security handshaker. if (handshaker == nullptr) { return fail_handshaker_create(); } else { - return security_handshaker_create(handshaker, connector); + return security_handshaker_create(exec_ctx, handshaker, connector); } } |