aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/security
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-09-07 15:54:59 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-09-07 15:54:59 -0700
commit56969c2784911583825d75212ca027806d1de056 (patch)
tree35ccaa433f197d7977533e24210ddefb7632a12e /src/core/lib/security
parentf0ba70a9ea92b6c7422f40206e763141c05eb281 (diff)
parent41630a29507c8dd5b6110f0397b346b7feab442b (diff)
Merge github.com:grpc/grpc into pollset_kick_stats
Diffstat (limited to 'src/core/lib/security')
-rw-r--r--src/core/lib/security/transport/client_auth_filter.c203
-rw-r--r--src/core/lib/security/transport/server_auth_filter.c91
2 files changed, 139 insertions, 155 deletions
diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c
index 531a88434f..dd7dd44e79 100644
--- a/src/core/lib/security/transport/client_auth_filter.c
+++ b/src/core/lib/security/transport/client_auth_filter.c
@@ -39,6 +39,8 @@
/* We can have a per-call credentials. */
typedef struct {
+ grpc_call_stack *owning_call;
+ grpc_call_combiner *call_combiner;
grpc_call_credentials *creds;
bool have_host;
bool have_method;
@@ -49,17 +51,12 @@ typedef struct {
pollset_set so that work can progress when this call wants work to progress
*/
grpc_polling_entity *pollent;
- gpr_atm security_context_set;
- gpr_mu security_context_mu;
grpc_credentials_mdelem_array md_array;
grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT];
grpc_auth_metadata_context auth_md_context;
- grpc_closure closure;
- // Either 0 (no cancellation and no async operation in flight),
- // a grpc_closure* (if the lowest bit is 0),
- // or a grpc_error* (if the lowest bit is 1).
- gpr_atm cancellation_state;
- grpc_closure cancel_closure;
+ grpc_closure async_result_closure;
+ grpc_closure check_call_host_cancel_closure;
+ grpc_closure get_request_metadata_cancel_closure;
} call_data;
/* We can have a per-channel credentials. */
@@ -68,43 +65,6 @@ typedef struct {
grpc_auth_context *auth_context;
} channel_data;
-static void decode_cancel_state(gpr_atm cancel_state, grpc_closure **func,
- grpc_error **error) {
- // If the lowest bit is 1, the value is a grpc_error*.
- // Otherwise, if non-zdero, the value is a grpc_closure*.
- if (cancel_state & 1) {
- *error = (grpc_error *)(cancel_state & ~(gpr_atm)1);
- } else if (cancel_state != 0) {
- *func = (grpc_closure *)cancel_state;
- }
-}
-
-static gpr_atm encode_cancel_state_error(grpc_error *error) {
- // Set the lowest bit to 1 to indicate that it's an error.
- return (gpr_atm)1 | (gpr_atm)error;
-}
-
-// Returns an error if the call has been cancelled. Otherwise, sets the
-// cancellation function to be called upon cancellation.
-static grpc_error *set_cancel_func(grpc_call_element *elem,
- grpc_iomgr_cb_func func) {
- call_data *calld = (call_data *)elem->call_data;
- // Decode original state.
- gpr_atm original_state = gpr_atm_acq_load(&calld->cancellation_state);
- grpc_error *original_error = GRPC_ERROR_NONE;
- grpc_closure *original_func = NULL;
- decode_cancel_state(original_state, &original_func, &original_error);
- // If error is set, return it.
- if (original_error != GRPC_ERROR_NONE) return GRPC_ERROR_REF(original_error);
- // Otherwise, store func.
- GRPC_CLOSURE_INIT(&calld->cancel_closure, func, elem,
- grpc_schedule_on_exec_ctx);
- GPR_ASSERT(((gpr_atm)&calld->cancel_closure & (gpr_atm)1) == 0);
- gpr_atm_rel_store(&calld->cancellation_state,
- (gpr_atm)&calld->cancel_closure);
- return GRPC_ERROR_NONE;
-}
-
static void reset_auth_metadata_context(
grpc_auth_metadata_context *auth_md_context) {
if (auth_md_context->service_url != NULL) {
@@ -153,7 +113,8 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *arg,
} else {
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAUTHENTICATED);
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error);
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error,
+ calld->call_combiner);
}
}
@@ -191,8 +152,12 @@ static void cancel_get_request_metadata(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_call_element *elem = (grpc_call_element *)arg;
call_data *calld = (call_data *)elem->call_data;
- grpc_call_credentials_cancel_get_request_metadata(
- exec_ctx, calld->creds, &calld->md_array, GRPC_ERROR_REF(error));
+ if (error != GRPC_ERROR_NONE) {
+ grpc_call_credentials_cancel_get_request_metadata(
+ exec_ctx, calld->creds, &calld->md_array, GRPC_ERROR_REF(error));
+ }
+ GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
+ "cancel_get_request_metadata");
}
static void send_security_metadata(grpc_exec_ctx *exec_ctx,
@@ -223,7 +188,8 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Incompatible credentials set on channel and call."),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED));
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED),
+ calld->call_combiner);
return;
}
} else {
@@ -234,22 +200,25 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
build_auth_metadata_context(&chand->security_connector->base,
chand->auth_context, calld);
- grpc_error *cancel_error = set_cancel_func(elem, cancel_get_request_metadata);
- if (cancel_error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch,
- cancel_error);
- return;
- }
GPR_ASSERT(calld->pollent != NULL);
- GRPC_CLOSURE_INIT(&calld->closure, on_credentials_metadata, batch,
- grpc_schedule_on_exec_ctx);
+
+ GRPC_CLOSURE_INIT(&calld->async_result_closure, on_credentials_metadata,
+ batch, grpc_schedule_on_exec_ctx);
grpc_error *error = GRPC_ERROR_NONE;
if (grpc_call_credentials_get_request_metadata(
exec_ctx, calld->creds, calld->pollent, calld->auth_md_context,
- &calld->md_array, &calld->closure, &error)) {
+ &calld->md_array, &calld->async_result_closure, &error)) {
// Synchronous return; invoke on_credentials_metadata() directly.
on_credentials_metadata(exec_ctx, batch, error);
GRPC_ERROR_UNREF(error);
+ } else {
+ // Async return; register cancellation closure with call combiner.
+ GRPC_CALL_STACK_REF(calld->owning_call, "cancel_get_request_metadata");
+ grpc_call_combiner_set_notify_on_cancel(
+ exec_ctx, calld->call_combiner,
+ GRPC_CLOSURE_INIT(&calld->get_request_metadata_cancel_closure,
+ cancel_get_request_metadata, elem,
+ grpc_schedule_on_exec_ctx));
}
}
@@ -258,7 +227,6 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_transport_stream_op_batch *batch = (grpc_transport_stream_op_batch *)arg;
grpc_call_element *elem = batch->handler_private.extra_arg;
call_data *calld = elem->call_data;
-
if (error == GRPC_ERROR_NONE) {
send_security_metadata(exec_ctx, elem, batch);
} else {
@@ -271,7 +239,8 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *arg,
exec_ctx, batch,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg),
GRPC_ERROR_INT_GRPC_STATUS,
- GRPC_STATUS_UNAUTHENTICATED));
+ GRPC_STATUS_UNAUTHENTICATED),
+ calld->call_combiner);
gpr_free(error_msg);
}
}
@@ -281,9 +250,12 @@ static void cancel_check_call_host(grpc_exec_ctx *exec_ctx, void *arg,
grpc_call_element *elem = (grpc_call_element *)arg;
call_data *calld = (call_data *)elem->call_data;
channel_data *chand = (channel_data *)elem->channel_data;
- grpc_channel_security_connector_cancel_check_call_host(
- exec_ctx, chand->security_connector, &calld->closure,
- GRPC_ERROR_REF(error));
+ if (error != GRPC_ERROR_NONE) {
+ grpc_channel_security_connector_cancel_check_call_host(
+ exec_ctx, chand->security_connector, &calld->async_result_closure,
+ GRPC_ERROR_REF(error));
+ }
+ GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "cancel_check_call_host");
}
static void auth_start_transport_stream_op_batch(
@@ -295,52 +267,19 @@ static void auth_start_transport_stream_op_batch(
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- if (batch->cancel_stream) {
- while (true) {
- // Decode the original cancellation state.
- gpr_atm original_state = gpr_atm_acq_load(&calld->cancellation_state);
- grpc_error *cancel_error = GRPC_ERROR_NONE;
- grpc_closure *func = NULL;
- decode_cancel_state(original_state, &func, &cancel_error);
- // If we had already set a cancellation error, there's nothing
- // more to do.
- if (cancel_error != GRPC_ERROR_NONE) break;
- // If there's a cancel func, call it.
- // Note that even if the cancel func has been changed by some
- // other thread between when we decoded it and now, it will just
- // be a no-op.
- cancel_error = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
- if (func != NULL) {
- GRPC_CLOSURE_SCHED(exec_ctx, func, GRPC_ERROR_REF(cancel_error));
- }
- // Encode the new error into cancellation state.
- if (gpr_atm_full_cas(&calld->cancellation_state, original_state,
- encode_cancel_state_error(cancel_error))) {
- break; // Success.
- }
- // The cas failed, so try again.
- }
- } else {
- /* double checked lock over security context to ensure it's set once */
- if (gpr_atm_acq_load(&calld->security_context_set) == 0) {
- gpr_mu_lock(&calld->security_context_mu);
- if (gpr_atm_acq_load(&calld->security_context_set) == 0) {
- GPR_ASSERT(batch->payload->context != NULL);
- if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) {
- batch->payload->context[GRPC_CONTEXT_SECURITY].value =
- grpc_client_security_context_create();
- batch->payload->context[GRPC_CONTEXT_SECURITY].destroy =
- grpc_client_security_context_destroy;
- }
- grpc_client_security_context *sec_ctx =
- batch->payload->context[GRPC_CONTEXT_SECURITY].value;
- GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter");
- sec_ctx->auth_context =
- GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter");
- gpr_atm_rel_store(&calld->security_context_set, 1);
- }
- gpr_mu_unlock(&calld->security_context_mu);
+ if (!batch->cancel_stream) {
+ GPR_ASSERT(batch->payload->context != NULL);
+ if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) {
+ batch->payload->context[GRPC_CONTEXT_SECURITY].value =
+ grpc_client_security_context_create();
+ batch->payload->context[GRPC_CONTEXT_SECURITY].destroy =
+ grpc_client_security_context_destroy;
}
+ grpc_client_security_context *sec_ctx =
+ batch->payload->context[GRPC_CONTEXT_SECURITY].value;
+ GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter");
+ sec_ctx->auth_context =
+ GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter");
}
if (batch->send_initial_metadata) {
@@ -365,26 +304,27 @@ static void auth_start_transport_stream_op_batch(
}
}
if (calld->have_host) {
- grpc_error *cancel_error = set_cancel_func(elem, cancel_check_call_host);
- if (cancel_error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch,
- cancel_error);
+ batch->handler_private.extra_arg = elem;
+ GRPC_CLOSURE_INIT(&calld->async_result_closure, on_host_checked, batch,
+ grpc_schedule_on_exec_ctx);
+ char *call_host = grpc_slice_to_c_string(calld->host);
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (grpc_channel_security_connector_check_call_host(
+ exec_ctx, chand->security_connector, call_host,
+ chand->auth_context, &calld->async_result_closure, &error)) {
+ // Synchronous return; invoke on_host_checked() directly.
+ on_host_checked(exec_ctx, batch, error);
+ GRPC_ERROR_UNREF(error);
} else {
- char *call_host = grpc_slice_to_c_string(calld->host);
- batch->handler_private.extra_arg = elem;
- grpc_error *error = GRPC_ERROR_NONE;
- if (grpc_channel_security_connector_check_call_host(
- exec_ctx, chand->security_connector, call_host,
- chand->auth_context,
- GRPC_CLOSURE_INIT(&calld->closure, on_host_checked, batch,
- grpc_schedule_on_exec_ctx),
- &error)) {
- // Synchronous return; invoke on_host_checked() directly.
- on_host_checked(exec_ctx, batch, error);
- GRPC_ERROR_UNREF(error);
- }
- gpr_free(call_host);
+ // Async return; register cancellation closure with call combiner.
+ GRPC_CALL_STACK_REF(calld->owning_call, "cancel_check_call_host");
+ grpc_call_combiner_set_notify_on_cancel(
+ exec_ctx, calld->call_combiner,
+ GRPC_CLOSURE_INIT(&calld->check_call_host_cancel_closure,
+ cancel_check_call_host, elem,
+ grpc_schedule_on_exec_ctx));
}
+ gpr_free(call_host);
GPR_TIMER_END("auth_start_transport_stream_op_batch", 0);
return; /* early exit */
}
@@ -400,8 +340,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
- memset(calld, 0, sizeof(*calld));
- gpr_mu_init(&calld->security_context_mu);
+ calld->owning_call = args->call_stack;
+ calld->call_combiner = args->call_combiner;
return GRPC_ERROR_NONE;
}
@@ -426,12 +366,6 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_slice_unref_internal(exec_ctx, calld->method);
}
reset_auth_metadata_context(&calld->auth_md_context);
- gpr_mu_destroy(&calld->security_context_mu);
- gpr_atm cancel_state = gpr_atm_acq_load(&calld->cancellation_state);
- grpc_error *cancel_error = GRPC_ERROR_NONE;
- grpc_closure *cancel_func = NULL;
- decode_cancel_state(cancel_state, &cancel_func, &cancel_error);
- GRPC_ERROR_UNREF(cancel_error);
}
/* Constructor for channel_data */
@@ -490,6 +424,5 @@ const grpc_channel_filter grpc_client_auth_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"client-auth"};
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index 9bf3f0ca0f..7f523c0883 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -26,7 +26,15 @@
#include "src/core/lib/security/transport/auth_filters.h"
#include "src/core/lib/slice/slice_internal.h"
+typedef enum {
+ STATE_INIT = 0,
+ STATE_DONE,
+ STATE_CANCELLED,
+} async_state;
+
typedef struct call_data {
+ grpc_call_combiner *call_combiner;
+ grpc_call_stack *owning_call;
grpc_transport_stream_op_batch *recv_initial_metadata_batch;
grpc_closure *original_recv_initial_metadata_ready;
grpc_closure recv_initial_metadata_ready;
@@ -34,6 +42,8 @@ typedef struct call_data {
const grpc_metadata *consumed_md;
size_t num_consumed_md;
grpc_auth_context *auth_context;
+ grpc_closure cancel_closure;
+ gpr_atm state; // async_state
} call_data;
typedef struct channel_data {
@@ -78,54 +88,94 @@ static grpc_filtered_mdelem remove_consumed_md(grpc_exec_ctx *exec_ctx,
return GRPC_FILTERED_MDELEM(md);
}
-/* called from application code */
-static void on_md_processing_done(
- void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
- const grpc_metadata *response_md, size_t num_response_md,
- grpc_status_code status, const char *error_details) {
- grpc_call_element *elem = user_data;
+static void on_md_processing_done_inner(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ const grpc_metadata *consumed_md,
+ size_t num_consumed_md,
+ const grpc_metadata *response_md,
+ size_t num_response_md,
+ grpc_error *error) {
call_data *calld = elem->call_data;
grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
/* TODO(jboeuf): Implement support for response_md. */
if (response_md != NULL && num_response_md > 0) {
gpr_log(GPR_INFO,
"response_md in auth metadata processing not supported for now. "
"Ignoring...");
}
- grpc_error *error = GRPC_ERROR_NONE;
- if (status == GRPC_STATUS_OK) {
+ if (error == GRPC_ERROR_NONE) {
calld->consumed_md = consumed_md;
calld->num_consumed_md = num_consumed_md;
error = grpc_metadata_batch_filter(
- &exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata,
+ exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata,
remove_consumed_md, elem, "Response metadata filtering error");
- } else {
- if (error_details == NULL) {
- error_details = "Authentication metadata processing failed.";
+ }
+ GRPC_CLOSURE_SCHED(exec_ctx, calld->original_recv_initial_metadata_ready,
+ error);
+}
+
+// Called from application code.
+static void on_md_processing_done(
+ void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
+ const grpc_metadata *response_md, size_t num_response_md,
+ grpc_status_code status, const char *error_details) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ // If the call was not cancelled while we were in flight, process the result.
+ if (gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT,
+ (gpr_atm)STATE_DONE)) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (status != GRPC_STATUS_OK) {
+ if (error_details == NULL) {
+ error_details = "Authentication metadata processing failed.";
+ }
+ error = grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details),
+ GRPC_ERROR_INT_GRPC_STATUS, status);
}
- error =
- grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details),
- GRPC_ERROR_INT_GRPC_STATUS, status);
+ on_md_processing_done_inner(&exec_ctx, elem, consumed_md, num_consumed_md,
+ response_md, num_response_md, error);
}
+ // Clean up.
for (size_t i = 0; i < calld->md.count; i++) {
grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key);
grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value);
}
grpc_metadata_array_destroy(&calld->md);
- GRPC_CLOSURE_SCHED(&exec_ctx, calld->original_recv_initial_metadata_ready,
- error);
+ GRPC_CALL_STACK_UNREF(&exec_ctx, calld->owning_call, "server_auth_metadata");
grpc_exec_ctx_finish(&exec_ctx);
}
+static void cancel_call(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ grpc_call_element *elem = (grpc_call_element *)arg;
+ call_data *calld = elem->call_data;
+ // If the result was not already processed, invoke the callback now.
+ if (error != GRPC_ERROR_NONE &&
+ gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT,
+ (gpr_atm)STATE_CANCELLED)) {
+ on_md_processing_done_inner(exec_ctx, elem, NULL, 0, NULL, 0,
+ GRPC_ERROR_REF(error));
+ }
+ GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "cancel_call");
+}
+
static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- grpc_call_element *elem = arg;
+ grpc_call_element *elem = (grpc_call_element *)arg;
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch;
if (error == GRPC_ERROR_NONE) {
if (chand->creds != NULL && chand->creds->processor.process != NULL) {
+ // We're calling out to the application, so we need to make sure
+ // to drop the call combiner early if we get cancelled.
+ GRPC_CALL_STACK_REF(calld->owning_call, "cancel_call");
+ GRPC_CLOSURE_INIT(&calld->cancel_closure, cancel_call, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner,
+ &calld->cancel_closure);
+ GRPC_CALL_STACK_REF(calld->owning_call, "server_auth_metadata");
calld->md = metadata_batch_to_md_array(
batch->payload->recv_initial_metadata.recv_initial_metadata);
chand->creds->processor.process(
@@ -159,6 +209,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
+ calld->call_combiner = args->call_combiner;
+ calld->owning_call = args->call_stack;
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
@@ -218,6 +270,5 @@ const grpc_channel_filter grpc_server_auth_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"server-auth"};