diff options
author | Craig Tiller <ctiller@google.com> | 2017-09-07 15:54:59 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-09-07 15:54:59 -0700 |
commit | 56969c2784911583825d75212ca027806d1de056 (patch) | |
tree | 35ccaa433f197d7977533e24210ddefb7632a12e /src/core/lib/security | |
parent | f0ba70a9ea92b6c7422f40206e763141c05eb281 (diff) | |
parent | 41630a29507c8dd5b6110f0397b346b7feab442b (diff) |
Merge github.com:grpc/grpc into pollset_kick_stats
Diffstat (limited to 'src/core/lib/security')
-rw-r--r-- | src/core/lib/security/transport/client_auth_filter.c | 203 | ||||
-rw-r--r-- | src/core/lib/security/transport/server_auth_filter.c | 91 |
2 files changed, 139 insertions, 155 deletions
diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index 531a88434f..dd7dd44e79 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -39,6 +39,8 @@ /* We can have a per-call credentials. */ typedef struct { + grpc_call_stack *owning_call; + grpc_call_combiner *call_combiner; grpc_call_credentials *creds; bool have_host; bool have_method; @@ -49,17 +51,12 @@ typedef struct { pollset_set so that work can progress when this call wants work to progress */ grpc_polling_entity *pollent; - gpr_atm security_context_set; - gpr_mu security_context_mu; grpc_credentials_mdelem_array md_array; grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT]; grpc_auth_metadata_context auth_md_context; - grpc_closure closure; - // Either 0 (no cancellation and no async operation in flight), - // a grpc_closure* (if the lowest bit is 0), - // or a grpc_error* (if the lowest bit is 1). - gpr_atm cancellation_state; - grpc_closure cancel_closure; + grpc_closure async_result_closure; + grpc_closure check_call_host_cancel_closure; + grpc_closure get_request_metadata_cancel_closure; } call_data; /* We can have a per-channel credentials. */ @@ -68,43 +65,6 @@ typedef struct { grpc_auth_context *auth_context; } channel_data; -static void decode_cancel_state(gpr_atm cancel_state, grpc_closure **func, - grpc_error **error) { - // If the lowest bit is 1, the value is a grpc_error*. - // Otherwise, if non-zdero, the value is a grpc_closure*. - if (cancel_state & 1) { - *error = (grpc_error *)(cancel_state & ~(gpr_atm)1); - } else if (cancel_state != 0) { - *func = (grpc_closure *)cancel_state; - } -} - -static gpr_atm encode_cancel_state_error(grpc_error *error) { - // Set the lowest bit to 1 to indicate that it's an error. - return (gpr_atm)1 | (gpr_atm)error; -} - -// Returns an error if the call has been cancelled. Otherwise, sets the -// cancellation function to be called upon cancellation. -static grpc_error *set_cancel_func(grpc_call_element *elem, - grpc_iomgr_cb_func func) { - call_data *calld = (call_data *)elem->call_data; - // Decode original state. - gpr_atm original_state = gpr_atm_acq_load(&calld->cancellation_state); - grpc_error *original_error = GRPC_ERROR_NONE; - grpc_closure *original_func = NULL; - decode_cancel_state(original_state, &original_func, &original_error); - // If error is set, return it. - if (original_error != GRPC_ERROR_NONE) return GRPC_ERROR_REF(original_error); - // Otherwise, store func. - GRPC_CLOSURE_INIT(&calld->cancel_closure, func, elem, - grpc_schedule_on_exec_ctx); - GPR_ASSERT(((gpr_atm)&calld->cancel_closure & (gpr_atm)1) == 0); - gpr_atm_rel_store(&calld->cancellation_state, - (gpr_atm)&calld->cancel_closure); - return GRPC_ERROR_NONE; -} - static void reset_auth_metadata_context( grpc_auth_metadata_context *auth_md_context) { if (auth_md_context->service_url != NULL) { @@ -153,7 +113,8 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *arg, } else { error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED); - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error, + calld->call_combiner); } } @@ -191,8 +152,12 @@ static void cancel_get_request_metadata(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_call_element *elem = (grpc_call_element *)arg; call_data *calld = (call_data *)elem->call_data; - grpc_call_credentials_cancel_get_request_metadata( - exec_ctx, calld->creds, &calld->md_array, GRPC_ERROR_REF(error)); + if (error != GRPC_ERROR_NONE) { + grpc_call_credentials_cancel_get_request_metadata( + exec_ctx, calld->creds, &calld->md_array, GRPC_ERROR_REF(error)); + } + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, + "cancel_get_request_metadata"); } static void send_security_metadata(grpc_exec_ctx *exec_ctx, @@ -223,7 +188,8 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Incompatible credentials set on channel and call."), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED)); + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED), + calld->call_combiner); return; } } else { @@ -234,22 +200,25 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, build_auth_metadata_context(&chand->security_connector->base, chand->auth_context, calld); - grpc_error *cancel_error = set_cancel_func(elem, cancel_get_request_metadata); - if (cancel_error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, - cancel_error); - return; - } GPR_ASSERT(calld->pollent != NULL); - GRPC_CLOSURE_INIT(&calld->closure, on_credentials_metadata, batch, - grpc_schedule_on_exec_ctx); + + GRPC_CLOSURE_INIT(&calld->async_result_closure, on_credentials_metadata, + batch, grpc_schedule_on_exec_ctx); grpc_error *error = GRPC_ERROR_NONE; if (grpc_call_credentials_get_request_metadata( exec_ctx, calld->creds, calld->pollent, calld->auth_md_context, - &calld->md_array, &calld->closure, &error)) { + &calld->md_array, &calld->async_result_closure, &error)) { // Synchronous return; invoke on_credentials_metadata() directly. on_credentials_metadata(exec_ctx, batch, error); GRPC_ERROR_UNREF(error); + } else { + // Async return; register cancellation closure with call combiner. + GRPC_CALL_STACK_REF(calld->owning_call, "cancel_get_request_metadata"); + grpc_call_combiner_set_notify_on_cancel( + exec_ctx, calld->call_combiner, + GRPC_CLOSURE_INIT(&calld->get_request_metadata_cancel_closure, + cancel_get_request_metadata, elem, + grpc_schedule_on_exec_ctx)); } } @@ -258,7 +227,6 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *arg, grpc_transport_stream_op_batch *batch = (grpc_transport_stream_op_batch *)arg; grpc_call_element *elem = batch->handler_private.extra_arg; call_data *calld = elem->call_data; - if (error == GRPC_ERROR_NONE) { send_security_metadata(exec_ctx, elem, batch); } else { @@ -271,7 +239,8 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *arg, exec_ctx, batch, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg), GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNAUTHENTICATED)); + GRPC_STATUS_UNAUTHENTICATED), + calld->call_combiner); gpr_free(error_msg); } } @@ -281,9 +250,12 @@ static void cancel_check_call_host(grpc_exec_ctx *exec_ctx, void *arg, grpc_call_element *elem = (grpc_call_element *)arg; call_data *calld = (call_data *)elem->call_data; channel_data *chand = (channel_data *)elem->channel_data; - grpc_channel_security_connector_cancel_check_call_host( - exec_ctx, chand->security_connector, &calld->closure, - GRPC_ERROR_REF(error)); + if (error != GRPC_ERROR_NONE) { + grpc_channel_security_connector_cancel_check_call_host( + exec_ctx, chand->security_connector, &calld->async_result_closure, + GRPC_ERROR_REF(error)); + } + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "cancel_check_call_host"); } static void auth_start_transport_stream_op_batch( @@ -295,52 +267,19 @@ static void auth_start_transport_stream_op_batch( call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - if (batch->cancel_stream) { - while (true) { - // Decode the original cancellation state. - gpr_atm original_state = gpr_atm_acq_load(&calld->cancellation_state); - grpc_error *cancel_error = GRPC_ERROR_NONE; - grpc_closure *func = NULL; - decode_cancel_state(original_state, &func, &cancel_error); - // If we had already set a cancellation error, there's nothing - // more to do. - if (cancel_error != GRPC_ERROR_NONE) break; - // If there's a cancel func, call it. - // Note that even if the cancel func has been changed by some - // other thread between when we decoded it and now, it will just - // be a no-op. - cancel_error = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); - if (func != NULL) { - GRPC_CLOSURE_SCHED(exec_ctx, func, GRPC_ERROR_REF(cancel_error)); - } - // Encode the new error into cancellation state. - if (gpr_atm_full_cas(&calld->cancellation_state, original_state, - encode_cancel_state_error(cancel_error))) { - break; // Success. - } - // The cas failed, so try again. - } - } else { - /* double checked lock over security context to ensure it's set once */ - if (gpr_atm_acq_load(&calld->security_context_set) == 0) { - gpr_mu_lock(&calld->security_context_mu); - if (gpr_atm_acq_load(&calld->security_context_set) == 0) { - GPR_ASSERT(batch->payload->context != NULL); - if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) { - batch->payload->context[GRPC_CONTEXT_SECURITY].value = - grpc_client_security_context_create(); - batch->payload->context[GRPC_CONTEXT_SECURITY].destroy = - grpc_client_security_context_destroy; - } - grpc_client_security_context *sec_ctx = - batch->payload->context[GRPC_CONTEXT_SECURITY].value; - GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter"); - sec_ctx->auth_context = - GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter"); - gpr_atm_rel_store(&calld->security_context_set, 1); - } - gpr_mu_unlock(&calld->security_context_mu); + if (!batch->cancel_stream) { + GPR_ASSERT(batch->payload->context != NULL); + if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) { + batch->payload->context[GRPC_CONTEXT_SECURITY].value = + grpc_client_security_context_create(); + batch->payload->context[GRPC_CONTEXT_SECURITY].destroy = + grpc_client_security_context_destroy; } + grpc_client_security_context *sec_ctx = + batch->payload->context[GRPC_CONTEXT_SECURITY].value; + GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter"); + sec_ctx->auth_context = + GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter"); } if (batch->send_initial_metadata) { @@ -365,26 +304,27 @@ static void auth_start_transport_stream_op_batch( } } if (calld->have_host) { - grpc_error *cancel_error = set_cancel_func(elem, cancel_check_call_host); - if (cancel_error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, - cancel_error); + batch->handler_private.extra_arg = elem; + GRPC_CLOSURE_INIT(&calld->async_result_closure, on_host_checked, batch, + grpc_schedule_on_exec_ctx); + char *call_host = grpc_slice_to_c_string(calld->host); + grpc_error *error = GRPC_ERROR_NONE; + if (grpc_channel_security_connector_check_call_host( + exec_ctx, chand->security_connector, call_host, + chand->auth_context, &calld->async_result_closure, &error)) { + // Synchronous return; invoke on_host_checked() directly. + on_host_checked(exec_ctx, batch, error); + GRPC_ERROR_UNREF(error); } else { - char *call_host = grpc_slice_to_c_string(calld->host); - batch->handler_private.extra_arg = elem; - grpc_error *error = GRPC_ERROR_NONE; - if (grpc_channel_security_connector_check_call_host( - exec_ctx, chand->security_connector, call_host, - chand->auth_context, - GRPC_CLOSURE_INIT(&calld->closure, on_host_checked, batch, - grpc_schedule_on_exec_ctx), - &error)) { - // Synchronous return; invoke on_host_checked() directly. - on_host_checked(exec_ctx, batch, error); - GRPC_ERROR_UNREF(error); - } - gpr_free(call_host); + // Async return; register cancellation closure with call combiner. + GRPC_CALL_STACK_REF(calld->owning_call, "cancel_check_call_host"); + grpc_call_combiner_set_notify_on_cancel( + exec_ctx, calld->call_combiner, + GRPC_CLOSURE_INIT(&calld->check_call_host_cancel_closure, + cancel_check_call_host, elem, + grpc_schedule_on_exec_ctx)); } + gpr_free(call_host); GPR_TIMER_END("auth_start_transport_stream_op_batch", 0); return; /* early exit */ } @@ -400,8 +340,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { call_data *calld = elem->call_data; - memset(calld, 0, sizeof(*calld)); - gpr_mu_init(&calld->security_context_mu); + calld->owning_call = args->call_stack; + calld->call_combiner = args->call_combiner; return GRPC_ERROR_NONE; } @@ -426,12 +366,6 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_slice_unref_internal(exec_ctx, calld->method); } reset_auth_metadata_context(&calld->auth_md_context); - gpr_mu_destroy(&calld->security_context_mu); - gpr_atm cancel_state = gpr_atm_acq_load(&calld->cancellation_state); - grpc_error *cancel_error = GRPC_ERROR_NONE; - grpc_closure *cancel_func = NULL; - decode_cancel_state(cancel_state, &cancel_func, &cancel_error); - GRPC_ERROR_UNREF(cancel_error); } /* Constructor for channel_data */ @@ -490,6 +424,5 @@ const grpc_channel_filter grpc_client_auth_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, - grpc_call_next_get_peer, grpc_channel_next_get_info, "client-auth"}; diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index 9bf3f0ca0f..7f523c0883 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -26,7 +26,15 @@ #include "src/core/lib/security/transport/auth_filters.h" #include "src/core/lib/slice/slice_internal.h" +typedef enum { + STATE_INIT = 0, + STATE_DONE, + STATE_CANCELLED, +} async_state; + typedef struct call_data { + grpc_call_combiner *call_combiner; + grpc_call_stack *owning_call; grpc_transport_stream_op_batch *recv_initial_metadata_batch; grpc_closure *original_recv_initial_metadata_ready; grpc_closure recv_initial_metadata_ready; @@ -34,6 +42,8 @@ typedef struct call_data { const grpc_metadata *consumed_md; size_t num_consumed_md; grpc_auth_context *auth_context; + grpc_closure cancel_closure; + gpr_atm state; // async_state } call_data; typedef struct channel_data { @@ -78,54 +88,94 @@ static grpc_filtered_mdelem remove_consumed_md(grpc_exec_ctx *exec_ctx, return GRPC_FILTERED_MDELEM(md); } -/* called from application code */ -static void on_md_processing_done( - void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md, - const grpc_metadata *response_md, size_t num_response_md, - grpc_status_code status, const char *error_details) { - grpc_call_element *elem = user_data; +static void on_md_processing_done_inner(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + const grpc_metadata *consumed_md, + size_t num_consumed_md, + const grpc_metadata *response_md, + size_t num_response_md, + grpc_error *error) { call_data *calld = elem->call_data; grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; /* TODO(jboeuf): Implement support for response_md. */ if (response_md != NULL && num_response_md > 0) { gpr_log(GPR_INFO, "response_md in auth metadata processing not supported for now. " "Ignoring..."); } - grpc_error *error = GRPC_ERROR_NONE; - if (status == GRPC_STATUS_OK) { + if (error == GRPC_ERROR_NONE) { calld->consumed_md = consumed_md; calld->num_consumed_md = num_consumed_md; error = grpc_metadata_batch_filter( - &exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata, + exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata, remove_consumed_md, elem, "Response metadata filtering error"); - } else { - if (error_details == NULL) { - error_details = "Authentication metadata processing failed."; + } + GRPC_CLOSURE_SCHED(exec_ctx, calld->original_recv_initial_metadata_ready, + error); +} + +// Called from application code. +static void on_md_processing_done( + void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md, + const grpc_metadata *response_md, size_t num_response_md, + grpc_status_code status, const char *error_details) { + grpc_call_element *elem = user_data; + call_data *calld = elem->call_data; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + // If the call was not cancelled while we were in flight, process the result. + if (gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT, + (gpr_atm)STATE_DONE)) { + grpc_error *error = GRPC_ERROR_NONE; + if (status != GRPC_STATUS_OK) { + if (error_details == NULL) { + error_details = "Authentication metadata processing failed."; + } + error = grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details), + GRPC_ERROR_INT_GRPC_STATUS, status); } - error = - grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details), - GRPC_ERROR_INT_GRPC_STATUS, status); + on_md_processing_done_inner(&exec_ctx, elem, consumed_md, num_consumed_md, + response_md, num_response_md, error); } + // Clean up. for (size_t i = 0; i < calld->md.count; i++) { grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key); grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value); } grpc_metadata_array_destroy(&calld->md); - GRPC_CLOSURE_SCHED(&exec_ctx, calld->original_recv_initial_metadata_ready, - error); + GRPC_CALL_STACK_UNREF(&exec_ctx, calld->owning_call, "server_auth_metadata"); grpc_exec_ctx_finish(&exec_ctx); } +static void cancel_call(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + grpc_call_element *elem = (grpc_call_element *)arg; + call_data *calld = elem->call_data; + // If the result was not already processed, invoke the callback now. + if (error != GRPC_ERROR_NONE && + gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT, + (gpr_atm)STATE_CANCELLED)) { + on_md_processing_done_inner(exec_ctx, elem, NULL, 0, NULL, 0, + GRPC_ERROR_REF(error)); + } + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "cancel_call"); +} + static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_call_element *elem = arg; + grpc_call_element *elem = (grpc_call_element *)arg; channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch; if (error == GRPC_ERROR_NONE) { if (chand->creds != NULL && chand->creds->processor.process != NULL) { + // We're calling out to the application, so we need to make sure + // to drop the call combiner early if we get cancelled. + GRPC_CALL_STACK_REF(calld->owning_call, "cancel_call"); + GRPC_CLOSURE_INIT(&calld->cancel_closure, cancel_call, elem, + grpc_schedule_on_exec_ctx); + grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, + &calld->cancel_closure); + GRPC_CALL_STACK_REF(calld->owning_call, "server_auth_metadata"); calld->md = metadata_batch_to_md_array( batch->payload->recv_initial_metadata.recv_initial_metadata); chand->creds->processor.process( @@ -159,6 +209,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, const grpc_call_element_args *args) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; + calld->call_combiner = args->call_combiner; + calld->owning_call = args->call_stack; GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); @@ -218,6 +270,5 @@ const grpc_channel_filter grpc_server_auth_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, - grpc_call_next_get_peer, grpc_channel_next_get_info, "server-auth"}; |