aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/security/transport
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2017-08-29 16:59:07 -0700
committerGravatar GitHub <noreply@github.com>2017-08-29 16:59:07 -0700
commitbf19961d0a49b43cb528392efeb4880eeebb9b5e (patch)
tree1d4c96db4d3bdc05c634e5d386c14a77845a1758 /src/core/lib/security/transport
parent9811915ba3fa1ccdf44b6a70fe1b1dd4782cd508 (diff)
Revert "Implement call combiner"
Diffstat (limited to 'src/core/lib/security/transport')
-rw-r--r--src/core/lib/security/transport/client_auth_filter.c183
-rw-r--r--src/core/lib/security/transport/server_auth_filter.c89
2 files changed, 151 insertions, 121 deletions
diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c
index e3f0163a6c..531a88434f 100644
--- a/src/core/lib/security/transport/client_auth_filter.c
+++ b/src/core/lib/security/transport/client_auth_filter.c
@@ -39,7 +39,6 @@
/* We can have a per-call credentials. */
typedef struct {
- grpc_call_combiner *call_combiner;
grpc_call_credentials *creds;
bool have_host;
bool have_method;
@@ -50,11 +49,17 @@ typedef struct {
pollset_set so that work can progress when this call wants work to progress
*/
grpc_polling_entity *pollent;
+ gpr_atm security_context_set;
+ gpr_mu security_context_mu;
grpc_credentials_mdelem_array md_array;
grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT];
grpc_auth_metadata_context auth_md_context;
- grpc_closure async_cancel_closure;
- grpc_closure async_result_closure;
+ grpc_closure closure;
+ // Either 0 (no cancellation and no async operation in flight),
+ // a grpc_closure* (if the lowest bit is 0),
+ // or a grpc_error* (if the lowest bit is 1).
+ gpr_atm cancellation_state;
+ grpc_closure cancel_closure;
} call_data;
/* We can have a per-channel credentials. */
@@ -63,6 +68,43 @@ typedef struct {
grpc_auth_context *auth_context;
} channel_data;
+static void decode_cancel_state(gpr_atm cancel_state, grpc_closure **func,
+ grpc_error **error) {
+ // If the lowest bit is 1, the value is a grpc_error*.
+ // Otherwise, if non-zdero, the value is a grpc_closure*.
+ if (cancel_state & 1) {
+ *error = (grpc_error *)(cancel_state & ~(gpr_atm)1);
+ } else if (cancel_state != 0) {
+ *func = (grpc_closure *)cancel_state;
+ }
+}
+
+static gpr_atm encode_cancel_state_error(grpc_error *error) {
+ // Set the lowest bit to 1 to indicate that it's an error.
+ return (gpr_atm)1 | (gpr_atm)error;
+}
+
+// Returns an error if the call has been cancelled. Otherwise, sets the
+// cancellation function to be called upon cancellation.
+static grpc_error *set_cancel_func(grpc_call_element *elem,
+ grpc_iomgr_cb_func func) {
+ call_data *calld = (call_data *)elem->call_data;
+ // Decode original state.
+ gpr_atm original_state = gpr_atm_acq_load(&calld->cancellation_state);
+ grpc_error *original_error = GRPC_ERROR_NONE;
+ grpc_closure *original_func = NULL;
+ decode_cancel_state(original_state, &original_func, &original_error);
+ // If error is set, return it.
+ if (original_error != GRPC_ERROR_NONE) return GRPC_ERROR_REF(original_error);
+ // Otherwise, store func.
+ GRPC_CLOSURE_INIT(&calld->cancel_closure, func, elem,
+ grpc_schedule_on_exec_ctx);
+ GPR_ASSERT(((gpr_atm)&calld->cancel_closure & (gpr_atm)1) == 0);
+ gpr_atm_rel_store(&calld->cancellation_state,
+ (gpr_atm)&calld->cancel_closure);
+ return GRPC_ERROR_NONE;
+}
+
static void reset_auth_metadata_context(
grpc_auth_metadata_context *auth_md_context) {
if (auth_md_context->service_url != NULL) {
@@ -93,7 +135,6 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *arg,
grpc_transport_stream_op_batch *batch = (grpc_transport_stream_op_batch *)arg;
grpc_call_element *elem = batch->handler_private.extra_arg;
call_data *calld = elem->call_data;
- grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, NULL);
reset_auth_metadata_context(&calld->auth_md_context);
grpc_error *error = GRPC_ERROR_REF(input_error);
if (error == GRPC_ERROR_NONE) {
@@ -112,8 +153,7 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *arg,
} else {
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAUTHENTICATED);
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error,
- calld->call_combiner);
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error);
}
}
@@ -183,8 +223,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Incompatible credentials set on channel and call."),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED),
- calld->call_combiner);
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED));
return;
}
} else {
@@ -195,23 +234,22 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
build_auth_metadata_context(&chand->security_connector->base,
chand->auth_context, calld);
+ grpc_error *cancel_error = set_cancel_func(elem, cancel_get_request_metadata);
+ if (cancel_error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch,
+ cancel_error);
+ return;
+ }
GPR_ASSERT(calld->pollent != NULL);
-
- GRPC_CLOSURE_INIT(&calld->async_result_closure, on_credentials_metadata,
- batch, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&calld->closure, on_credentials_metadata, batch,
+ grpc_schedule_on_exec_ctx);
grpc_error *error = GRPC_ERROR_NONE;
if (grpc_call_credentials_get_request_metadata(
exec_ctx, calld->creds, calld->pollent, calld->auth_md_context,
- &calld->md_array, &calld->async_result_closure, &error)) {
+ &calld->md_array, &calld->closure, &error)) {
// Synchronous return; invoke on_credentials_metadata() directly.
on_credentials_metadata(exec_ctx, batch, error);
GRPC_ERROR_UNREF(error);
- } else {
- // Async return; register cancellation closure with call combiner.
- GRPC_CLOSURE_INIT(&calld->async_cancel_closure, cancel_get_request_metadata,
- elem, grpc_schedule_on_exec_ctx);
- grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner,
- &calld->async_cancel_closure);
}
}
@@ -220,7 +258,7 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_transport_stream_op_batch *batch = (grpc_transport_stream_op_batch *)arg;
grpc_call_element *elem = batch->handler_private.extra_arg;
call_data *calld = elem->call_data;
- grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, NULL);
+
if (error == GRPC_ERROR_NONE) {
send_security_metadata(exec_ctx, elem, batch);
} else {
@@ -233,8 +271,7 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *arg,
exec_ctx, batch,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg),
GRPC_ERROR_INT_GRPC_STATUS,
- GRPC_STATUS_UNAUTHENTICATED),
- calld->call_combiner);
+ GRPC_STATUS_UNAUTHENTICATED));
gpr_free(error_msg);
}
}
@@ -245,7 +282,7 @@ static void cancel_check_call_host(grpc_exec_ctx *exec_ctx, void *arg,
call_data *calld = (call_data *)elem->call_data;
channel_data *chand = (channel_data *)elem->channel_data;
grpc_channel_security_connector_cancel_check_call_host(
- exec_ctx, chand->security_connector, &calld->async_result_closure,
+ exec_ctx, chand->security_connector, &calld->closure,
GRPC_ERROR_REF(error));
}
@@ -258,19 +295,52 @@ static void auth_start_transport_stream_op_batch(
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- if (!batch->cancel_stream) {
- GPR_ASSERT(batch->payload->context != NULL);
- if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) {
- batch->payload->context[GRPC_CONTEXT_SECURITY].value =
- grpc_client_security_context_create();
- batch->payload->context[GRPC_CONTEXT_SECURITY].destroy =
- grpc_client_security_context_destroy;
+ if (batch->cancel_stream) {
+ while (true) {
+ // Decode the original cancellation state.
+ gpr_atm original_state = gpr_atm_acq_load(&calld->cancellation_state);
+ grpc_error *cancel_error = GRPC_ERROR_NONE;
+ grpc_closure *func = NULL;
+ decode_cancel_state(original_state, &func, &cancel_error);
+ // If we had already set a cancellation error, there's nothing
+ // more to do.
+ if (cancel_error != GRPC_ERROR_NONE) break;
+ // If there's a cancel func, call it.
+ // Note that even if the cancel func has been changed by some
+ // other thread between when we decoded it and now, it will just
+ // be a no-op.
+ cancel_error = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
+ if (func != NULL) {
+ GRPC_CLOSURE_SCHED(exec_ctx, func, GRPC_ERROR_REF(cancel_error));
+ }
+ // Encode the new error into cancellation state.
+ if (gpr_atm_full_cas(&calld->cancellation_state, original_state,
+ encode_cancel_state_error(cancel_error))) {
+ break; // Success.
+ }
+ // The cas failed, so try again.
+ }
+ } else {
+ /* double checked lock over security context to ensure it's set once */
+ if (gpr_atm_acq_load(&calld->security_context_set) == 0) {
+ gpr_mu_lock(&calld->security_context_mu);
+ if (gpr_atm_acq_load(&calld->security_context_set) == 0) {
+ GPR_ASSERT(batch->payload->context != NULL);
+ if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) {
+ batch->payload->context[GRPC_CONTEXT_SECURITY].value =
+ grpc_client_security_context_create();
+ batch->payload->context[GRPC_CONTEXT_SECURITY].destroy =
+ grpc_client_security_context_destroy;
+ }
+ grpc_client_security_context *sec_ctx =
+ batch->payload->context[GRPC_CONTEXT_SECURITY].value;
+ GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter");
+ sec_ctx->auth_context =
+ GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter");
+ gpr_atm_rel_store(&calld->security_context_set, 1);
+ }
+ gpr_mu_unlock(&calld->security_context_mu);
}
- grpc_client_security_context *sec_ctx =
- batch->payload->context[GRPC_CONTEXT_SECURITY].value;
- GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter");
- sec_ctx->auth_context =
- GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter");
}
if (batch->send_initial_metadata) {
@@ -295,25 +365,26 @@ static void auth_start_transport_stream_op_batch(
}
}
if (calld->have_host) {
- batch->handler_private.extra_arg = elem;
- GRPC_CLOSURE_INIT(&calld->async_result_closure, on_host_checked, batch,
- grpc_schedule_on_exec_ctx);
- char *call_host = grpc_slice_to_c_string(calld->host);
- grpc_error *error = GRPC_ERROR_NONE;
- if (grpc_channel_security_connector_check_call_host(
- exec_ctx, chand->security_connector, call_host,
- chand->auth_context, &calld->async_result_closure, &error)) {
- // Synchronous return; invoke on_host_checked() directly.
- on_host_checked(exec_ctx, batch, error);
- GRPC_ERROR_UNREF(error);
+ grpc_error *cancel_error = set_cancel_func(elem, cancel_check_call_host);
+ if (cancel_error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch,
+ cancel_error);
} else {
- // Async return; register cancellation closure with call combiner.
- GRPC_CLOSURE_INIT(&calld->async_cancel_closure, cancel_check_call_host,
- elem, grpc_schedule_on_exec_ctx);
- grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner,
- &calld->async_cancel_closure);
+ char *call_host = grpc_slice_to_c_string(calld->host);
+ batch->handler_private.extra_arg = elem;
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (grpc_channel_security_connector_check_call_host(
+ exec_ctx, chand->security_connector, call_host,
+ chand->auth_context,
+ GRPC_CLOSURE_INIT(&calld->closure, on_host_checked, batch,
+ grpc_schedule_on_exec_ctx),
+ &error)) {
+ // Synchronous return; invoke on_host_checked() directly.
+ on_host_checked(exec_ctx, batch, error);
+ GRPC_ERROR_UNREF(error);
+ }
+ gpr_free(call_host);
}
- gpr_free(call_host);
GPR_TIMER_END("auth_start_transport_stream_op_batch", 0);
return; /* early exit */
}
@@ -329,7 +400,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
- calld->call_combiner = args->call_combiner;
+ memset(calld, 0, sizeof(*calld));
+ gpr_mu_init(&calld->security_context_mu);
return GRPC_ERROR_NONE;
}
@@ -354,6 +426,12 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_slice_unref_internal(exec_ctx, calld->method);
}
reset_auth_metadata_context(&calld->auth_md_context);
+ gpr_mu_destroy(&calld->security_context_mu);
+ gpr_atm cancel_state = gpr_atm_acq_load(&calld->cancellation_state);
+ grpc_error *cancel_error = GRPC_ERROR_NONE;
+ grpc_closure *cancel_func = NULL;
+ decode_cancel_state(cancel_state, &cancel_func, &cancel_error);
+ GRPC_ERROR_UNREF(cancel_error);
}
/* Constructor for channel_data */
@@ -412,5 +490,6 @@ const grpc_channel_filter grpc_client_auth_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"client-auth"};
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index b721ce4a22..9bf3f0ca0f 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -26,15 +26,7 @@
#include "src/core/lib/security/transport/auth_filters.h"
#include "src/core/lib/slice/slice_internal.h"
-typedef enum {
- STATE_INIT = 0,
- STATE_DONE,
- STATE_CANCELLED,
-} async_state;
-
typedef struct call_data {
- grpc_call_combiner *call_combiner;
- grpc_call_stack *owning_call;
grpc_transport_stream_op_batch *recv_initial_metadata_batch;
grpc_closure *original_recv_initial_metadata_ready;
grpc_closure recv_initial_metadata_ready;
@@ -42,8 +34,6 @@ typedef struct call_data {
const grpc_metadata *consumed_md;
size_t num_consumed_md;
grpc_auth_context *auth_context;
- grpc_closure cancel_closure;
- gpr_atm state; // async_state
} call_data;
typedef struct channel_data {
@@ -88,92 +78,54 @@ static grpc_filtered_mdelem remove_consumed_md(grpc_exec_ctx *exec_ctx,
return GRPC_FILTERED_MDELEM(md);
}
-static void on_md_processing_done_inner(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- const grpc_metadata *consumed_md,
- size_t num_consumed_md,
- const grpc_metadata *response_md,
- size_t num_response_md,
- grpc_error *error) {
+/* called from application code */
+static void on_md_processing_done(
+ void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
+ const grpc_metadata *response_md, size_t num_response_md,
+ grpc_status_code status, const char *error_details) {
+ grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch;
- grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, NULL);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
/* TODO(jboeuf): Implement support for response_md. */
if (response_md != NULL && num_response_md > 0) {
gpr_log(GPR_INFO,
"response_md in auth metadata processing not supported for now. "
"Ignoring...");
}
- if (error == GRPC_ERROR_NONE) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (status == GRPC_STATUS_OK) {
calld->consumed_md = consumed_md;
calld->num_consumed_md = num_consumed_md;
error = grpc_metadata_batch_filter(
- exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata,
+ &exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata,
remove_consumed_md, elem, "Response metadata filtering error");
- }
- GRPC_CLOSURE_SCHED(exec_ctx, calld->original_recv_initial_metadata_ready,
- error);
-}
-
-// Called from application code.
-static void on_md_processing_done(
- void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
- const grpc_metadata *response_md, size_t num_response_md,
- grpc_status_code status, const char *error_details) {
- grpc_call_element *elem = user_data;
- call_data *calld = elem->call_data;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- // If the call was not cancelled while we were in flight, process the result.
- if (gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT,
- (gpr_atm)STATE_DONE)) {
- grpc_error *error = GRPC_ERROR_NONE;
- if (status != GRPC_STATUS_OK) {
- if (error_details == NULL) {
- error_details = "Authentication metadata processing failed.";
- }
- error = grpc_error_set_int(
- GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details),
- GRPC_ERROR_INT_GRPC_STATUS, status);
+ } else {
+ if (error_details == NULL) {
+ error_details = "Authentication metadata processing failed.";
}
- on_md_processing_done_inner(&exec_ctx, elem, consumed_md, num_consumed_md,
- response_md, num_response_md, error);
+ error =
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details),
+ GRPC_ERROR_INT_GRPC_STATUS, status);
}
- // Clean up.
for (size_t i = 0; i < calld->md.count; i++) {
grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key);
grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value);
}
grpc_metadata_array_destroy(&calld->md);
- GRPC_CALL_STACK_UNREF(&exec_ctx, calld->owning_call, "server_auth_metadata");
+ GRPC_CLOSURE_SCHED(&exec_ctx, calld->original_recv_initial_metadata_ready,
+ error);
grpc_exec_ctx_finish(&exec_ctx);
}
-static void cancel_call(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- grpc_call_element *elem = (grpc_call_element *)arg;
- call_data *calld = elem->call_data;
- // If the result was not already processed, invoke the callback now.
- if (gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT,
- (gpr_atm)STATE_CANCELLED)) {
- on_md_processing_done_inner(exec_ctx, elem, NULL, 0, NULL, 0,
- GRPC_ERROR_REF(error));
- }
-}
-
static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- grpc_call_element *elem = (grpc_call_element *)arg;
+ grpc_call_element *elem = arg;
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch;
if (error == GRPC_ERROR_NONE) {
if (chand->creds != NULL && chand->creds->processor.process != NULL) {
- // We're calling out to the application, so we need to make sure
- // to drop the call combiner early if we get cancelled.
- GRPC_CLOSURE_INIT(&calld->cancel_closure, cancel_call, elem,
- grpc_schedule_on_exec_ctx);
- grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner,
- &calld->cancel_closure);
- GRPC_CALL_STACK_REF(calld->owning_call, "server_auth_metadata");
calld->md = metadata_batch_to_md_array(
batch->payload->recv_initial_metadata.recv_initial_metadata);
chand->creds->processor.process(
@@ -207,8 +159,6 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- calld->call_combiner = args->call_combiner;
- calld->owning_call = args->call_stack;
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
@@ -268,5 +218,6 @@ const grpc_channel_filter grpc_server_auth_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"server-auth"};