diff options
author | 2017-09-01 09:00:06 -0700 | |
---|---|---|
committer | 2017-09-01 09:00:06 -0700 | |
commit | 764cf04a13958d72db5a22eb4bbb9370e00777f5 (patch) | |
tree | 28f265c5458bb537688a1e81535877e2926deb52 /src/core/lib/security/transport/server_auth_filter.c | |
parent | c928cfee2b94a99747f97ff8c5fb09277a1352b7 (diff) |
Revert "Revert "Implement call combiner""
This reverts commit bf19961d0a49b43cb528392efeb4880eeebb9b5e.
Diffstat (limited to 'src/core/lib/security/transport/server_auth_filter.c')
-rw-r--r-- | src/core/lib/security/transport/server_auth_filter.c | 89 |
1 files changed, 69 insertions, 20 deletions
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index 9bf3f0ca0f..b721ce4a22 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -26,7 +26,15 @@ #include "src/core/lib/security/transport/auth_filters.h" #include "src/core/lib/slice/slice_internal.h" +typedef enum { + STATE_INIT = 0, + STATE_DONE, + STATE_CANCELLED, +} async_state; + typedef struct call_data { + grpc_call_combiner *call_combiner; + grpc_call_stack *owning_call; grpc_transport_stream_op_batch *recv_initial_metadata_batch; grpc_closure *original_recv_initial_metadata_ready; grpc_closure recv_initial_metadata_ready; @@ -34,6 +42,8 @@ typedef struct call_data { const grpc_metadata *consumed_md; size_t num_consumed_md; grpc_auth_context *auth_context; + grpc_closure cancel_closure; + gpr_atm state; // async_state } call_data; typedef struct channel_data { @@ -78,54 +88,92 @@ static grpc_filtered_mdelem remove_consumed_md(grpc_exec_ctx *exec_ctx, return GRPC_FILTERED_MDELEM(md); } -/* called from application code */ -static void on_md_processing_done( - void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md, - const grpc_metadata *response_md, size_t num_response_md, - grpc_status_code status, const char *error_details) { - grpc_call_element *elem = user_data; +static void on_md_processing_done_inner(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + const grpc_metadata *consumed_md, + size_t num_consumed_md, + const grpc_metadata *response_md, + size_t num_response_md, + grpc_error *error) { call_data *calld = elem->call_data; grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, NULL); /* TODO(jboeuf): Implement support for response_md. */ if (response_md != NULL && num_response_md > 0) { gpr_log(GPR_INFO, "response_md in auth metadata processing not supported for now. " "Ignoring..."); } - grpc_error *error = GRPC_ERROR_NONE; - if (status == GRPC_STATUS_OK) { + if (error == GRPC_ERROR_NONE) { calld->consumed_md = consumed_md; calld->num_consumed_md = num_consumed_md; error = grpc_metadata_batch_filter( - &exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata, + exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata, remove_consumed_md, elem, "Response metadata filtering error"); - } else { - if (error_details == NULL) { - error_details = "Authentication metadata processing failed."; + } + GRPC_CLOSURE_SCHED(exec_ctx, calld->original_recv_initial_metadata_ready, + error); +} + +// Called from application code. +static void on_md_processing_done( + void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md, + const grpc_metadata *response_md, size_t num_response_md, + grpc_status_code status, const char *error_details) { + grpc_call_element *elem = user_data; + call_data *calld = elem->call_data; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + // If the call was not cancelled while we were in flight, process the result. + if (gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT, + (gpr_atm)STATE_DONE)) { + grpc_error *error = GRPC_ERROR_NONE; + if (status != GRPC_STATUS_OK) { + if (error_details == NULL) { + error_details = "Authentication metadata processing failed."; + } + error = grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details), + GRPC_ERROR_INT_GRPC_STATUS, status); } - error = - grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details), - GRPC_ERROR_INT_GRPC_STATUS, status); + on_md_processing_done_inner(&exec_ctx, elem, consumed_md, num_consumed_md, + response_md, num_response_md, error); } + // Clean up. for (size_t i = 0; i < calld->md.count; i++) { grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key); grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value); } grpc_metadata_array_destroy(&calld->md); - GRPC_CLOSURE_SCHED(&exec_ctx, calld->original_recv_initial_metadata_ready, - error); + GRPC_CALL_STACK_UNREF(&exec_ctx, calld->owning_call, "server_auth_metadata"); grpc_exec_ctx_finish(&exec_ctx); } +static void cancel_call(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + grpc_call_element *elem = (grpc_call_element *)arg; + call_data *calld = elem->call_data; + // If the result was not already processed, invoke the callback now. + if (gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT, + (gpr_atm)STATE_CANCELLED)) { + on_md_processing_done_inner(exec_ctx, elem, NULL, 0, NULL, 0, + GRPC_ERROR_REF(error)); + } +} + static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_call_element *elem = arg; + grpc_call_element *elem = (grpc_call_element *)arg; channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch; if (error == GRPC_ERROR_NONE) { if (chand->creds != NULL && chand->creds->processor.process != NULL) { + // We're calling out to the application, so we need to make sure + // to drop the call combiner early if we get cancelled. + GRPC_CLOSURE_INIT(&calld->cancel_closure, cancel_call, elem, + grpc_schedule_on_exec_ctx); + grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, + &calld->cancel_closure); + GRPC_CALL_STACK_REF(calld->owning_call, "server_auth_metadata"); calld->md = metadata_batch_to_md_array( batch->payload->recv_initial_metadata.recv_initial_metadata); chand->creds->processor.process( @@ -159,6 +207,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, const grpc_call_element_args *args) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; + calld->call_combiner = args->call_combiner; + calld->owning_call = args->call_stack; GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); @@ -218,6 +268,5 @@ const grpc_channel_filter grpc_server_auth_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, - grpc_call_next_get_peer, grpc_channel_next_get_info, "server-auth"}; |