diff options
Diffstat (limited to 'src/core/security')
-rw-r--r-- | src/core/security/client_auth_filter.c | 54 | ||||
-rw-r--r-- | src/core/security/credentials.c | 85 | ||||
-rw-r--r-- | src/core/security/credentials.h | 13 | ||||
-rw-r--r-- | src/core/security/google_default_credentials.c | 17 | ||||
-rw-r--r-- | src/core/security/jwt_verifier.c | 20 | ||||
-rw-r--r-- | src/core/security/jwt_verifier.h | 4 | ||||
-rw-r--r-- | src/core/security/secure_endpoint.c | 92 | ||||
-rw-r--r-- | src/core/security/secure_transport_setup.c | 111 | ||||
-rw-r--r-- | src/core/security/secure_transport_setup.h | 5 | ||||
-rw-r--r-- | src/core/security/security_connector.c | 10 | ||||
-rw-r--r-- | src/core/security/security_connector.h | 8 | ||||
-rw-r--r-- | src/core/security/server_auth_filter.c | 27 | ||||
-rw-r--r-- | src/core/security/server_secure_chttp2.c | 36 |
13 files changed, 257 insertions, 225 deletions
diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c index f3ecfd0e60..0a2de4f7bb 100644 --- a/src/core/security/client_auth_filter.c +++ b/src/core/security/client_auth_filter.c @@ -76,17 +76,18 @@ typedef struct { } channel_data; static void bubble_up_error(grpc_call_element *elem, grpc_status_code status, - const char *error_msg) { + const char *error_msg, grpc_call_list *call_list) { call_data *calld = elem->call_data; gpr_log(GPR_ERROR, "Client side authentication failure: %s", error_msg); grpc_transport_stream_op_add_cancellation(&calld->op, status); - grpc_call_next_op(elem, &calld->op); + grpc_call_next_op(elem, &calld->op, call_list); } static void on_credentials_metadata(void *user_data, grpc_credentials_md *md_elems, size_t num_md, - grpc_credentials_status status) { + grpc_credentials_status status, + grpc_call_list *call_list) { grpc_call_element *elem = (grpc_call_element *)user_data; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; @@ -95,7 +96,7 @@ static void on_credentials_metadata(void *user_data, size_t i; if (status != GRPC_CREDENTIALS_OK) { bubble_up_error(elem, GRPC_STATUS_UNAUTHENTICATED, - "Credentials failed to get metadata."); + "Credentials failed to get metadata.", call_list); return; } GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT); @@ -108,7 +109,7 @@ static void on_credentials_metadata(void *user_data, grpc_mdelem_from_slices(chand->md_ctx, gpr_slice_ref(md_elems[i].key), gpr_slice_ref(md_elems[i].value))); } - grpc_call_next_op(elem, op); + grpc_call_next_op(elem, op, call_list); } static char *build_service_url(const char *url_scheme, call_data *calld) { @@ -132,7 +133,8 @@ static char *build_service_url(const char *url_scheme, call_data *calld) { } static void send_security_metadata(grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op *op, + grpc_call_list *call_list) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_client_security_context *ctx = @@ -148,7 +150,7 @@ static void send_security_metadata(grpc_call_element *elem, if (!channel_creds_has_md && !call_creds_has_md) { /* Skip sending metadata altogether. */ - grpc_call_next_op(elem, op); + grpc_call_next_op(elem, op, call_list); return; } @@ -157,7 +159,8 @@ static void send_security_metadata(grpc_call_element *elem, grpc_composite_credentials_create(channel_creds, ctx->creds, NULL); if (calld->creds == NULL) { bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT, - "Incompatible credentials set on channel and call."); + "Incompatible credentials set on channel and call.", + call_list); return; } } else { @@ -169,22 +172,24 @@ static void send_security_metadata(grpc_call_element *elem, build_service_url(chand->security_connector->base.url_scheme, calld); calld->op = *op; /* Copy op (originates from the caller's stack). */ GPR_ASSERT(calld->pollset); - grpc_credentials_get_request_metadata( - calld->creds, calld->pollset, service_url, on_credentials_metadata, elem); + grpc_credentials_get_request_metadata(calld->creds, calld->pollset, + service_url, on_credentials_metadata, + elem, call_list); gpr_free(service_url); } -static void on_host_checked(void *user_data, grpc_security_status status) { +static void on_host_checked(void *user_data, grpc_security_status status, + grpc_call_list *call_list) { grpc_call_element *elem = (grpc_call_element *)user_data; call_data *calld = elem->call_data; if (status == GRPC_SECURITY_OK) { - send_security_metadata(elem, &calld->op); + send_security_metadata(elem, &calld->op, call_list); } else { char *error_msg; gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.", grpc_mdstr_as_c_string(calld->host)); - bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT, error_msg); + bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT, error_msg, call_list); gpr_free(error_msg); } } @@ -195,7 +200,8 @@ static void on_host_checked(void *user_data, grpc_security_status status) { op contains type and call direction information, in addition to the data that is being sent or received. */ static void auth_start_transport_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op *op, + grpc_call_list *call_list) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; @@ -247,26 +253,28 @@ static void auth_start_transport_op(grpc_call_element *elem, const char *call_host = grpc_mdstr_as_c_string(calld->host); calld->op = *op; /* Copy op (originates from the caller's stack). */ status = grpc_channel_security_connector_check_call_host( - chand->security_connector, call_host, on_host_checked, elem); + chand->security_connector, call_host, on_host_checked, elem, + call_list); if (status != GRPC_SECURITY_OK) { if (status == GRPC_SECURITY_ERROR) { char *error_msg; gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.", call_host); - bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT, error_msg); + bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT, error_msg, + call_list); gpr_free(error_msg); } return; /* early exit */ } } - send_security_metadata(elem, op); + send_security_metadata(elem, op, call_list); return; /* early exit */ } } - /* pass control up or down the stack */ - grpc_call_next_op(elem, op); + /* pass control down the stack */ + grpc_call_next_op(elem, op, call_list); } /* Constructor for call_data */ @@ -285,7 +293,8 @@ static void init_call_elem(grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element *elem) { +static void destroy_call_elem(grpc_call_element *elem, + grpc_call_list *call_list) { call_data *calld = elem->call_data; grpc_credentials_unref(calld->creds); if (calld->host != NULL) { @@ -300,7 +309,7 @@ static void destroy_call_elem(grpc_call_element *elem) { static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *metadata_context, int is_first, - int is_last) { + int is_last, grpc_call_list *call_list) { grpc_security_connector *sc = grpc_find_security_connector_in_args(args); /* grab pointers to our data from the channel element */ channel_data *chand = elem->channel_data; @@ -326,7 +335,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, } /* Destructor for channel data */ -static void destroy_channel_elem(grpc_channel_element *elem) { +static void destroy_channel_elem(grpc_channel_element *elem, + grpc_call_list *call_list) { /* grab pointers to our data from the channel element */ channel_data *chand = elem->channel_data; grpc_channel_security_connector *ctx = chand->security_connector; diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index f151fba959..086c428112 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -108,16 +108,17 @@ void grpc_credentials_get_request_metadata(grpc_credentials *creds, grpc_pollset *pollset, const char *service_url, grpc_credentials_metadata_cb cb, - void *user_data) { + void *user_data, + grpc_call_list *call_list) { if (creds == NULL || !grpc_credentials_has_request_metadata(creds) || creds->vtable->get_request_metadata == NULL) { if (cb != NULL) { - cb(user_data, NULL, 0, GRPC_CREDENTIALS_OK); + cb(user_data, NULL, 0, GRPC_CREDENTIALS_OK, call_list); } return; } creds->vtable->get_request_metadata(creds, pollset, service_url, cb, - user_data); + user_data, call_list); } grpc_security_status grpc_credentials_create_security_connector( @@ -375,7 +376,8 @@ static void jwt_get_request_metadata(grpc_credentials *creds, grpc_pollset *pollset, const char *service_url, grpc_credentials_metadata_cb cb, - void *user_data) { + void *user_data, + grpc_call_list *call_list) { grpc_service_account_jwt_access_credentials *c = (grpc_service_account_jwt_access_credentials *)creds; gpr_timespec refresh_threshold = gpr_time_from_seconds( @@ -419,10 +421,11 @@ static void jwt_get_request_metadata(grpc_credentials *creds, } if (jwt_md != NULL) { - cb(user_data, jwt_md->entries, jwt_md->num_entries, GRPC_CREDENTIALS_OK); + cb(user_data, jwt_md->entries, jwt_md->num_entries, GRPC_CREDENTIALS_OK, + call_list); grpc_credentials_md_store_unref(jwt_md); } else { - cb(user_data, NULL, 0, GRPC_CREDENTIALS_ERROR); + cb(user_data, NULL, 0, GRPC_CREDENTIALS_ERROR, call_list); } } @@ -568,7 +571,8 @@ end: } static void on_oauth2_token_fetcher_http_response( - void *user_data, const grpc_httpcli_response *response) { + void *user_data, const grpc_httpcli_response *response, + grpc_call_list *call_list) { grpc_credentials_metadata_request *r = (grpc_credentials_metadata_request *)user_data; grpc_oauth2_token_fetcher_credentials *c = @@ -583,10 +587,10 @@ static void on_oauth2_token_fetcher_http_response( c->token_expiration = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), token_lifetime); r->cb(r->user_data, c->access_token_md->entries, - c->access_token_md->num_entries, status); + c->access_token_md->num_entries, status, call_list); } else { c->token_expiration = gpr_inf_past(GPR_CLOCK_REALTIME); - r->cb(r->user_data, NULL, 0, status); + r->cb(r->user_data, NULL, 0, status, call_list); } gpr_mu_unlock(&c->mu); grpc_credentials_metadata_request_destroy(r); @@ -594,7 +598,8 @@ static void on_oauth2_token_fetcher_http_response( static void oauth2_token_fetcher_get_request_metadata( grpc_credentials *creds, grpc_pollset *pollset, const char *service_url, - grpc_credentials_metadata_cb cb, void *user_data) { + grpc_credentials_metadata_cb cb, void *user_data, + grpc_call_list *call_list) { grpc_oauth2_token_fetcher_credentials *c = (grpc_oauth2_token_fetcher_credentials *)creds; gpr_timespec refresh_threshold = gpr_time_from_seconds( @@ -613,13 +618,14 @@ static void oauth2_token_fetcher_get_request_metadata( } if (cached_access_token_md != NULL) { cb(user_data, cached_access_token_md->entries, - cached_access_token_md->num_entries, GRPC_CREDENTIALS_OK); + cached_access_token_md->num_entries, GRPC_CREDENTIALS_OK, call_list); grpc_credentials_md_store_unref(cached_access_token_md); } else { c->fetch_func( grpc_credentials_metadata_request_create(creds, cb, user_data), &c->httpcli_context, pollset, on_oauth2_token_fetcher_http_response, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), refresh_threshold)); + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), refresh_threshold), + call_list); } } @@ -644,7 +650,8 @@ static grpc_credentials_vtable compute_engine_vtable = { static void compute_engine_fetch_oauth2( grpc_credentials_metadata_request *metadata_req, grpc_httpcli_context *httpcli_context, grpc_pollset *pollset, - grpc_httpcli_response_cb response_cb, gpr_timespec deadline) { + grpc_httpcli_response_cb response_cb, gpr_timespec deadline, + grpc_call_list *call_list) { grpc_httpcli_header header = {"Metadata-Flavor", "Google"}; grpc_httpcli_request request; memset(&request, 0, sizeof(grpc_httpcli_request)); @@ -653,7 +660,7 @@ static void compute_engine_fetch_oauth2( request.hdr_count = 1; request.hdrs = &header; grpc_httpcli_get(httpcli_context, pollset, &request, deadline, response_cb, - metadata_req); + metadata_req, call_list); } grpc_credentials *grpc_google_compute_engine_credentials_create( @@ -683,7 +690,8 @@ static grpc_credentials_vtable refresh_token_vtable = { static void refresh_token_fetch_oauth2( grpc_credentials_metadata_request *metadata_req, grpc_httpcli_context *httpcli_context, grpc_pollset *pollset, - grpc_httpcli_response_cb response_cb, gpr_timespec deadline) { + grpc_httpcli_response_cb response_cb, gpr_timespec deadline, + grpc_call_list *call_list) { grpc_google_refresh_token_credentials *c = (grpc_google_refresh_token_credentials *)metadata_req->creds; grpc_httpcli_header header = {"Content-Type", @@ -700,7 +708,7 @@ static void refresh_token_fetch_oauth2( request.hdrs = &header; request.handshaker = &grpc_httpcli_ssl; grpc_httpcli_post(httpcli_context, pollset, &request, body, strlen(body), - deadline, response_cb, metadata_req); + deadline, response_cb, metadata_req, call_list); gpr_free(body); } @@ -743,20 +751,23 @@ static int md_only_test_has_request_metadata_only( return 1; } -void on_simulated_token_fetch_done(void *user_data) { +static void on_simulated_token_fetch_done(void *user_data) { grpc_credentials_metadata_request *r = (grpc_credentials_metadata_request *)user_data; grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)r->creds; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; r->cb(r->user_data, c->md_store->entries, c->md_store->num_entries, - GRPC_CREDENTIALS_OK); + GRPC_CREDENTIALS_OK, &call_list); grpc_credentials_metadata_request_destroy(r); + grpc_call_list_run(&call_list); } static void md_only_test_get_request_metadata(grpc_credentials *creds, grpc_pollset *pollset, const char *service_url, grpc_credentials_metadata_cb cb, - void *user_data) { + void *user_data, + grpc_call_list *call_list) { grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)creds; if (c->is_async) { @@ -765,7 +776,7 @@ static void md_only_test_get_request_metadata(grpc_credentials *creds, grpc_credentials_metadata_request_create(creds, cb, user_data); gpr_thd_new(&thd_id, on_simulated_token_fetch_done, cb_arg, NULL); } else { - cb(user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK); + cb(user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK, call_list); } } @@ -809,9 +820,10 @@ static void access_token_get_request_metadata(grpc_credentials *creds, grpc_pollset *pollset, const char *service_url, grpc_credentials_metadata_cb cb, - void *user_data) { + void *user_data, + grpc_call_list *call_list) { grpc_access_token_credentials *c = (grpc_access_token_credentials *)creds; - cb(user_data, c->access_token_md->entries, 1, GRPC_CREDENTIALS_OK); + cb(user_data, c->access_token_md->entries, 1, GRPC_CREDENTIALS_OK, call_list); } static grpc_credentials_vtable access_token_vtable = { @@ -958,11 +970,12 @@ static void composite_md_context_destroy( static void composite_metadata_cb(void *user_data, grpc_credentials_md *md_elems, size_t num_md, - grpc_credentials_status status) { + grpc_credentials_status status, + grpc_call_list *call_list) { grpc_composite_credentials_metadata_context *ctx = (grpc_composite_credentials_metadata_context *)user_data; if (status != GRPC_CREDENTIALS_OK) { - ctx->cb(ctx->user_data, NULL, 0, status); + ctx->cb(ctx->user_data, NULL, 0, status, call_list); return; } @@ -980,28 +993,30 @@ static void composite_metadata_cb(void *user_data, grpc_credentials *inner_creds = ctx->composite_creds->inner.creds_array[ctx->creds_index++]; if (grpc_credentials_has_request_metadata(inner_creds)) { - grpc_credentials_get_request_metadata(inner_creds, ctx->pollset, - ctx->service_url, - composite_metadata_cb, ctx); + grpc_credentials_get_request_metadata( + inner_creds, ctx->pollset, ctx->service_url, composite_metadata_cb, + ctx, call_list); return; } } /* We're done!. */ ctx->cb(ctx->user_data, ctx->md_elems->entries, ctx->md_elems->num_entries, - GRPC_CREDENTIALS_OK); + GRPC_CREDENTIALS_OK, call_list); composite_md_context_destroy(ctx); + grpc_call_list_run(call_list); } static void composite_get_request_metadata(grpc_credentials *creds, grpc_pollset *pollset, const char *service_url, grpc_credentials_metadata_cb cb, - void *user_data) { + void *user_data, + grpc_call_list *call_list) { grpc_composite_credentials *c = (grpc_composite_credentials *)creds; grpc_composite_credentials_metadata_context *ctx; if (!grpc_credentials_has_request_metadata(creds)) { - cb(user_data, NULL, 0, GRPC_CREDENTIALS_OK); + cb(user_data, NULL, 0, GRPC_CREDENTIALS_OK, call_list); return; } ctx = gpr_malloc(sizeof(grpc_composite_credentials_metadata_context)); @@ -1016,7 +1031,8 @@ static void composite_get_request_metadata(grpc_credentials *creds, grpc_credentials *inner_creds = c->inner.creds_array[ctx->creds_index++]; if (grpc_credentials_has_request_metadata(inner_creds)) { grpc_credentials_get_request_metadata(inner_creds, pollset, service_url, - composite_metadata_cb, ctx); + composite_metadata_cb, ctx, + call_list); return; } } @@ -1152,10 +1168,11 @@ static void iam_get_request_metadata(grpc_credentials *creds, grpc_pollset *pollset, const char *service_url, grpc_credentials_metadata_cb cb, - void *user_data) { + void *user_data, + grpc_call_list *call_list) { grpc_google_iam_credentials *c = (grpc_google_iam_credentials *)creds; - cb(user_data, c->iam_md->entries, c->iam_md->num_entries, - GRPC_CREDENTIALS_OK); + cb(user_data, c->iam_md->entries, c->iam_md->num_entries, GRPC_CREDENTIALS_OK, + call_list); } static grpc_credentials_vtable iam_vtable = { diff --git a/src/core/security/credentials.h b/src/core/security/credentials.h index 8e4fed7615..a93d0ec653 100644 --- a/src/core/security/credentials.h +++ b/src/core/security/credentials.h @@ -126,7 +126,8 @@ char *grpc_get_well_known_google_credentials_file_path(void); typedef void (*grpc_credentials_metadata_cb)(void *user_data, grpc_credentials_md *md_elems, size_t num_md, - grpc_credentials_status status); + grpc_credentials_status status, + grpc_call_list *call_list); typedef struct { void (*destruct)(grpc_credentials *c); @@ -134,8 +135,8 @@ typedef struct { int (*has_request_metadata_only)(const grpc_credentials *c); void (*get_request_metadata)(grpc_credentials *c, grpc_pollset *pollset, const char *service_url, - grpc_credentials_metadata_cb cb, - void *user_data); + grpc_credentials_metadata_cb cb, void *user_data, + grpc_call_list *call_list); grpc_security_status (*create_security_connector)( grpc_credentials *c, const char *target, const grpc_channel_args *args, grpc_credentials *request_metadata_creds, @@ -156,7 +157,8 @@ void grpc_credentials_get_request_metadata(grpc_credentials *creds, grpc_pollset *pollset, const char *service_url, grpc_credentials_metadata_cb cb, - void *user_data); + void *user_data, + grpc_call_list *call_list); /* Creates a security connector for the channel. May also create new channel args for the channel to be used in place of the passed in const args if @@ -274,7 +276,8 @@ typedef void (*grpc_fetch_oauth2_func)(grpc_credentials_metadata_request *req, grpc_httpcli_context *http_context, grpc_pollset *pollset, grpc_httpcli_response_cb response_cb, - gpr_timespec deadline); + gpr_timespec deadline, + grpc_call_list *call_list); typedef struct { grpc_credentials base; diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index 874dd59e84..5da29f5ad5 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -63,7 +63,8 @@ typedef struct { } compute_engine_detector; static void on_compute_engine_detection_http_response( - void *user_data, const grpc_httpcli_response *response) { + void *user_data, const grpc_httpcli_response *response, + grpc_call_list *call_list) { compute_engine_detector *detector = (compute_engine_detector *)user_data; if (response != NULL && response->status == 200 && response->hdr_count > 0) { /* Internet providers can return a generic response to all requests, so @@ -84,12 +85,16 @@ static void on_compute_engine_detection_http_response( gpr_mu_unlock(GRPC_POLLSET_MU(&detector->pollset)); } -static void destroy_pollset(void *p) { grpc_pollset_destroy(p); } +static void destroy_pollset(void *p, int s, grpc_call_list *call_list) { + grpc_pollset_destroy(p); +} static int is_stack_running_on_compute_engine(void) { compute_engine_detector detector; grpc_httpcli_request request; grpc_httpcli_context context; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; + grpc_closure destroy_closure; /* The http call is local. If it takes more than one sec, it is for sure not on compute engine. */ @@ -108,7 +113,9 @@ static int is_stack_running_on_compute_engine(void) { grpc_httpcli_get( &context, &detector.pollset, &request, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay), - on_compute_engine_detection_http_response, &detector); + on_compute_engine_detection_http_response, &detector, &call_list); + + grpc_call_list_run(&call_list); /* Block until we get the response. This is not ideal but this should only be called once for the lifetime of the process by the default credentials. */ @@ -121,7 +128,9 @@ static int is_stack_running_on_compute_engine(void) { gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset)); grpc_httpcli_context_destroy(&context); - grpc_pollset_shutdown(&detector.pollset, destroy_pollset, &detector.pollset); + grpc_closure_init(&destroy_closure, destroy_pollset, &detector.pollset); + grpc_pollset_shutdown(&detector.pollset, &destroy_closure, &call_list); + grpc_call_list_run(&call_list); return detector.success; } diff --git a/src/core/security/jwt_verifier.c b/src/core/security/jwt_verifier.c index 790f2178db..a6736c922c 100644 --- a/src/core/security/jwt_verifier.c +++ b/src/core/security/jwt_verifier.c @@ -570,7 +570,8 @@ end: } static void on_keys_retrieved(void *user_data, - const grpc_httpcli_response *response) { + const grpc_httpcli_response *response, + grpc_call_list *call_list) { grpc_json *json = json_from_http(response); verifier_cb_ctx *ctx = (verifier_cb_ctx *)user_data; EVP_PKEY *verification_key = NULL; @@ -611,7 +612,8 @@ end: } static void on_openid_config_retrieved(void *user_data, - const grpc_httpcli_response *response) { + const grpc_httpcli_response *response, + grpc_call_list *call_list) { const grpc_json *cur; grpc_json *json = json_from_http(response); verifier_cb_ctx *ctx = (verifier_cb_ctx *)user_data; @@ -643,7 +645,7 @@ static void on_openid_config_retrieved(void *user_data, grpc_httpcli_get( &ctx->verifier->http_ctx, ctx->pollset, &req, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay), - on_keys_retrieved, ctx); + on_keys_retrieved, ctx, call_list); grpc_json_destroy(json); gpr_free(req.host); return; @@ -682,7 +684,8 @@ static void verifier_put_mapping(grpc_jwt_verifier *v, const char *email_domain, } /* Takes ownership of ctx. */ -static void retrieve_key_and_verify(verifier_cb_ctx *ctx) { +static void retrieve_key_and_verify(verifier_cb_ctx *ctx, + grpc_call_list *call_list) { const char *at_sign; grpc_httpcli_response_cb http_cb; char *path_prefix = NULL; @@ -745,7 +748,7 @@ static void retrieve_key_and_verify(verifier_cb_ctx *ctx) { grpc_httpcli_get( &ctx->verifier->http_ctx, ctx->pollset, &req, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay), - http_cb, ctx); + http_cb, ctx, call_list); gpr_free(req.host); gpr_free(req.path); return; @@ -758,8 +761,8 @@ error: void grpc_jwt_verifier_verify(grpc_jwt_verifier *verifier, grpc_pollset *pollset, const char *jwt, const char *audience, - grpc_jwt_verification_done_cb cb, - void *user_data) { + grpc_jwt_verification_done_cb cb, void *user_data, + grpc_call_list *call_list) { const char *dot = NULL; grpc_json *json; jose_header *header = NULL; @@ -792,7 +795,8 @@ void grpc_jwt_verifier_verify(grpc_jwt_verifier *verifier, if (GPR_SLICE_IS_EMPTY(signature)) goto error; retrieve_key_and_verify( verifier_cb_ctx_create(verifier, pollset, header, claims, audience, - signature, jwt, signed_jwt_len, user_data, cb)); + signature, jwt, signed_jwt_len, user_data, cb), + call_list); return; error: diff --git a/src/core/security/jwt_verifier.h b/src/core/security/jwt_verifier.h index 7a32debfcb..394935302b 100644 --- a/src/core/security/jwt_verifier.h +++ b/src/core/security/jwt_verifier.h @@ -123,8 +123,8 @@ typedef void (*grpc_jwt_verification_done_cb)(void *user_data, void grpc_jwt_verifier_verify(grpc_jwt_verifier *verifier, grpc_pollset *pollset, const char *jwt, const char *audience, - grpc_jwt_verification_done_cb cb, - void *user_data); + grpc_jwt_verification_done_cb cb, void *user_data, + grpc_call_list *call_list); /* --- TESTING ONLY exposed functions. --- */ diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c index dcafc0fc0f..a501544341 100644 --- a/src/core/security/secure_endpoint.c +++ b/src/core/security/secure_endpoint.c @@ -67,9 +67,9 @@ typedef struct { int grpc_trace_secure_endpoint = 0; -static void destroy(secure_endpoint *secure_ep) { +static void destroy(secure_endpoint *secure_ep, grpc_call_list *call_list) { secure_endpoint *ep = secure_ep; - grpc_endpoint_destroy(ep->wrapped_ep); + grpc_endpoint_destroy(ep->wrapped_ep, call_list); tsi_frame_protector_destroy(ep->protector); gpr_slice_buffer_destroy(&ep->leftover_bytes); gpr_slice_unref(ep->read_staging_buffer); @@ -102,11 +102,12 @@ static void secure_endpoint_ref(secure_endpoint *ep, const char *reason, gpr_ref(&ep->ref); } #else -#define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep)) +#define SECURE_ENDPOINT_UNREF(ep, reason, cl) secure_endpoint_unref((ep), (cl)) #define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep)) -static void secure_endpoint_unref(secure_endpoint *ep) { +static void secure_endpoint_unref(secure_endpoint *ep, + grpc_call_list *call_list) { if (gpr_unref(&ep->ref)) { - destroy(ep); + destroy(ep, call_list); } } @@ -121,7 +122,8 @@ static void flush_read_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur, *end = GPR_SLICE_END_PTR(ep->read_staging_buffer); } -static void call_read_cb(secure_endpoint *ep, int success) { +static void call_read_cb(secure_endpoint *ep, int success, + grpc_call_list *call_list) { if (grpc_trace_secure_endpoint) { size_t i; for (i = 0; i < ep->read_buffer->count; i++) { @@ -132,11 +134,11 @@ static void call_read_cb(secure_endpoint *ep, int success) { } } ep->read_buffer = NULL; - ep->read_cb->cb(ep->read_cb->cb_arg, success); - SECURE_ENDPOINT_UNREF(ep, "read"); + grpc_call_list_add(call_list, ep->read_cb, success); + SECURE_ENDPOINT_UNREF(ep, "read", call_list); } -static int on_read(void *user_data, int success) { +static void on_read(void *user_data, int success, grpc_call_list *call_list) { unsigned i; gpr_uint8 keep_looping = 0; tsi_result result = TSI_OK; @@ -146,7 +148,8 @@ static int on_read(void *user_data, int success) { if (!success) { gpr_slice_buffer_reset_and_unref(ep->read_buffer); - return 0; + call_read_cb(ep, 0, call_list); + return; } /* TODO(yangg) check error, maybe bail out early */ @@ -202,21 +205,16 @@ static int on_read(void *user_data, int success) { if (result != TSI_OK) { gpr_slice_buffer_reset_and_unref(ep->read_buffer); - return 0; + call_read_cb(ep, 0, call_list); + return; } - return 1; -} - -static void on_read_cb(void *user_data, int success) { - call_read_cb(user_data, on_read(user_data, success)); + call_read_cb(ep, 1, call_list); } -static grpc_endpoint_op_status endpoint_read(grpc_endpoint *secure_ep, - gpr_slice_buffer *slices, - grpc_closure *cb) { +static void endpoint_read(grpc_endpoint *secure_ep, gpr_slice_buffer *slices, + grpc_closure *cb, grpc_call_list *call_list) { secure_endpoint *ep = (secure_endpoint *)secure_ep; - int immediate_read_success = -1; ep->read_cb = cb; ep->read_buffer = slices; gpr_slice_buffer_reset_and_unref(ep->read_buffer); @@ -224,27 +222,13 @@ static grpc_endpoint_op_status endpoint_read(grpc_endpoint *secure_ep, if (ep->leftover_bytes.count) { gpr_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer); GPR_ASSERT(ep->leftover_bytes.count == 0); - return on_read(ep, 1) ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR; + on_read(ep, 1, call_list); + return; } SECURE_ENDPOINT_REF(ep, "read"); - - switch ( - grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read)) { - case GRPC_ENDPOINT_DONE: - immediate_read_success = on_read(ep, 1); - break; - case GRPC_ENDPOINT_PENDING: - return GRPC_ENDPOINT_PENDING; - case GRPC_ENDPOINT_ERROR: - immediate_read_success = on_read(ep, 0); - break; - } - - GPR_ASSERT(immediate_read_success != -1); - SECURE_ENDPOINT_UNREF(ep, "read"); - - return immediate_read_success ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR; + grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read, + call_list); } static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur, @@ -255,9 +239,8 @@ static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur, *end = GPR_SLICE_END_PTR(ep->write_staging_buffer); } -static grpc_endpoint_op_status endpoint_write(grpc_endpoint *secure_ep, - gpr_slice_buffer *slices, - grpc_closure *cb) { +static void endpoint_write(grpc_endpoint *secure_ep, gpr_slice_buffer *slices, + grpc_closure *cb, grpc_call_list *call_list) { unsigned i; tsi_result result = TSI_OK; secure_endpoint *ep = (secure_endpoint *)secure_ep; @@ -329,32 +312,37 @@ static grpc_endpoint_op_status endpoint_write(grpc_endpoint *secure_ep, if (result != TSI_OK) { /* TODO(yangg) do different things according to the error type? */ gpr_slice_buffer_reset_and_unref(&ep->output_buffer); - return GRPC_ENDPOINT_ERROR; + grpc_call_list_add(call_list, cb, 0); + return; } - return grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb); + grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb, call_list); } -static void endpoint_shutdown(grpc_endpoint *secure_ep) { +static void endpoint_shutdown(grpc_endpoint *secure_ep, + grpc_call_list *call_list) { secure_endpoint *ep = (secure_endpoint *)secure_ep; - grpc_endpoint_shutdown(ep->wrapped_ep); + grpc_endpoint_shutdown(ep->wrapped_ep, call_list); } -static void endpoint_destroy(grpc_endpoint *secure_ep) { +static void endpoint_destroy(grpc_endpoint *secure_ep, + grpc_call_list *call_list) { secure_endpoint *ep = (secure_endpoint *)secure_ep; - SECURE_ENDPOINT_UNREF(ep, "destroy"); + SECURE_ENDPOINT_UNREF(ep, "destroy", call_list); } static void endpoint_add_to_pollset(grpc_endpoint *secure_ep, - grpc_pollset *pollset) { + grpc_pollset *pollset, + grpc_call_list *call_list) { secure_endpoint *ep = (secure_endpoint *)secure_ep; - grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset); + grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset, call_list); } static void endpoint_add_to_pollset_set(grpc_endpoint *secure_ep, - grpc_pollset_set *pollset_set) { + grpc_pollset_set *pollset_set, + grpc_call_list *call_list) { secure_endpoint *ep = (secure_endpoint *)secure_ep; - grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set); + grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set, call_list); } static char *endpoint_get_peer(grpc_endpoint *secure_ep) { @@ -386,7 +374,7 @@ grpc_endpoint *grpc_secure_endpoint_create( gpr_slice_buffer_init(&ep->output_buffer); gpr_slice_buffer_init(&ep->source_buffer); ep->read_buffer = NULL; - grpc_closure_init(&ep->on_read, on_read_cb, ep); + grpc_closure_init(&ep->on_read, on_read, ep); gpr_mu_init(&ep->protector_mu); gpr_ref_init(&ep->ref, 1); return &ep->base; diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c index 181aa74ba9..1412ab21ad 100644 --- a/src/core/security/secure_transport_setup.c +++ b/src/core/security/secure_transport_setup.c @@ -58,23 +58,27 @@ typedef struct { grpc_closure on_handshake_data_received_from_peer; } grpc_secure_transport_setup; -static void on_handshake_data_received_from_peer(void *setup, int success); +static void on_handshake_data_received_from_peer(void *setup, int success, + grpc_call_list *call_list); -static void on_handshake_data_sent_to_peer(void *setup, int success); +static void on_handshake_data_sent_to_peer(void *setup, int success, + grpc_call_list *call_list); static void secure_transport_setup_done(grpc_secure_transport_setup *s, - int is_success) { + int is_success, + grpc_call_list *call_list) { if (is_success) { s->cb(s->user_data, GRPC_SECURITY_OK, s->wrapped_endpoint, - s->secure_endpoint); + s->secure_endpoint, call_list); } else { if (s->secure_endpoint != NULL) { - grpc_endpoint_shutdown(s->secure_endpoint); - grpc_endpoint_destroy(s->secure_endpoint); + grpc_endpoint_shutdown(s->secure_endpoint, call_list); + grpc_endpoint_destroy(s->secure_endpoint, call_list); } else { - grpc_endpoint_destroy(s->wrapped_endpoint); + grpc_endpoint_destroy(s->wrapped_endpoint, call_list); } - s->cb(s->user_data, GRPC_SECURITY_ERROR, s->wrapped_endpoint, NULL); + s->cb(s->user_data, GRPC_SECURITY_ERROR, s->wrapped_endpoint, NULL, + call_list); } if (s->handshaker != NULL) tsi_handshaker_destroy(s->handshaker); if (s->handshake_buffer != NULL) gpr_free(s->handshake_buffer); @@ -85,13 +89,14 @@ static void secure_transport_setup_done(grpc_secure_transport_setup *s, gpr_free(s); } -static void on_peer_checked(void *user_data, grpc_security_status status) { +static void on_peer_checked(void *user_data, grpc_security_status status, + grpc_call_list *call_list) { grpc_secure_transport_setup *s = user_data; tsi_frame_protector *protector; tsi_result result; if (status != GRPC_SECURITY_OK) { gpr_log(GPR_ERROR, "Error checking peer."); - secure_transport_setup_done(s, 0); + secure_transport_setup_done(s, 0, call_list); return; } result = @@ -99,7 +104,7 @@ static void on_peer_checked(void *user_data, grpc_security_status status) { if (result != TSI_OK) { gpr_log(GPR_ERROR, "Frame protector creation failed with error %s.", tsi_result_to_string(result)); - secure_transport_setup_done(s, 0); + secure_transport_setup_done(s, 0, call_list); return; } s->secure_endpoint = @@ -107,11 +112,12 @@ static void on_peer_checked(void *user_data, grpc_security_status status) { s->left_overs.slices, s->left_overs.count); s->left_overs.count = 0; s->left_overs.length = 0; - secure_transport_setup_done(s, 1); + secure_transport_setup_done(s, 1, call_list); return; } -static void check_peer(grpc_secure_transport_setup *s) { +static void check_peer(grpc_secure_transport_setup *s, + grpc_call_list *call_list) { grpc_security_status peer_status; tsi_peer peer; tsi_result result = tsi_handshaker_extract_peer(s->handshaker, &peer); @@ -119,21 +125,22 @@ static void check_peer(grpc_secure_transport_setup *s) { if (result != TSI_OK) { gpr_log(GPR_ERROR, "Peer extraction failed with error %s", tsi_result_to_string(result)); - secure_transport_setup_done(s, 0); + secure_transport_setup_done(s, 0, call_list); return; } peer_status = grpc_security_connector_check_peer(s->connector, peer, on_peer_checked, s); if (peer_status == GRPC_SECURITY_ERROR) { gpr_log(GPR_ERROR, "Peer check failed."); - secure_transport_setup_done(s, 0); + secure_transport_setup_done(s, 0, call_list); return; } else if (peer_status == GRPC_SECURITY_OK) { - on_peer_checked(s, peer_status); + on_peer_checked(s, peer_status, call_list); } } -static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) { +static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s, + grpc_call_list *call_list) { size_t offset = 0; tsi_result result = TSI_OK; gpr_slice to_send; @@ -153,7 +160,7 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) { if (result != TSI_OK) { gpr_log(GPR_ERROR, "Handshake failed with error %s", tsi_result_to_string(result)); - secure_transport_setup_done(s, 0); + secure_transport_setup_done(s, 0, call_list); return; } @@ -163,21 +170,12 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) { gpr_slice_buffer_add(&s->outgoing, to_send); /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ - switch (grpc_endpoint_write(s->wrapped_endpoint, &s->outgoing, - &s->on_handshake_data_sent_to_peer)) { - case GRPC_ENDPOINT_ERROR: - gpr_log(GPR_ERROR, "Could not send handshake data to peer."); - secure_transport_setup_done(s, 0); - break; - case GRPC_ENDPOINT_DONE: - on_handshake_data_sent_to_peer(s, 1); - break; - case GRPC_ENDPOINT_PENDING: - break; - } + grpc_endpoint_write(s->wrapped_endpoint, &s->outgoing, + &s->on_handshake_data_sent_to_peer, call_list); } -static void on_handshake_data_received_from_peer(void *setup, int success) { +static void on_handshake_data_received_from_peer(void *setup, int success, + grpc_call_list *call_list) { grpc_secure_transport_setup *s = setup; size_t consumed_slice_size = 0; tsi_result result = TSI_OK; @@ -187,7 +185,7 @@ static void on_handshake_data_received_from_peer(void *setup, int success) { if (!success) { gpr_log(GPR_ERROR, "Read failed."); - secure_transport_setup_done(s, 0); + secure_transport_setup_done(s, 0, call_list); return; } @@ -202,20 +200,11 @@ static void on_handshake_data_received_from_peer(void *setup, int success) { if (tsi_handshaker_is_in_progress(s->handshaker)) { /* We may need more data. */ if (result == TSI_INCOMPLETE_DATA) { - switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming, - &s->on_handshake_data_received_from_peer)) { - case GRPC_ENDPOINT_DONE: - on_handshake_data_received_from_peer(s, 1); - break; - case GRPC_ENDPOINT_ERROR: - on_handshake_data_received_from_peer(s, 0); - break; - case GRPC_ENDPOINT_PENDING: - break; - } + grpc_endpoint_read(s->wrapped_endpoint, &s->incoming, + &s->on_handshake_data_received_from_peer, call_list); return; } else { - send_handshake_bytes_to_peer(s); + send_handshake_bytes_to_peer(s, call_list); return; } } @@ -223,7 +212,7 @@ static void on_handshake_data_received_from_peer(void *setup, int success) { if (result != TSI_OK) { gpr_log(GPR_ERROR, "Handshake failed with error %s", tsi_result_to_string(result)); - secure_transport_setup_done(s, 0); + secure_transport_setup_done(s, 0, call_list); return; } @@ -233,7 +222,7 @@ static void on_handshake_data_received_from_peer(void *setup, int success) { num_left_overs = (has_left_overs_in_current_slice ? 1 : 0) + s->incoming.count - i - 1; if (num_left_overs == 0) { - check_peer(s); + check_peer(s, call_list); return; } /* Put the leftovers in our buffer (ownership transfered). */ @@ -247,51 +236,41 @@ static void on_handshake_data_received_from_peer(void *setup, int success) { gpr_slice_buffer_addn( &s->left_overs, &s->incoming.slices[i + 1], num_left_overs - (size_t)has_left_overs_in_current_slice); - check_peer(s); + check_peer(s, call_list); } /* If setup is NULL, the setup is done. */ -static void on_handshake_data_sent_to_peer(void *setup, int success) { +static void on_handshake_data_sent_to_peer(void *setup, int success, + grpc_call_list *call_list) { grpc_secure_transport_setup *s = setup; /* Make sure that write is OK. */ if (!success) { gpr_log(GPR_ERROR, "Write failed."); - if (setup != NULL) secure_transport_setup_done(s, 0); + if (setup != NULL) secure_transport_setup_done(s, 0, call_list); return; } /* We may be done. */ if (tsi_handshaker_is_in_progress(s->handshaker)) { - /* TODO(klempner,jboeuf): This should probably use the client setup - deadline */ - switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming, - &s->on_handshake_data_received_from_peer)) { - case GRPC_ENDPOINT_ERROR: - on_handshake_data_received_from_peer(s, 0); - break; - case GRPC_ENDPOINT_PENDING: - break; - case GRPC_ENDPOINT_DONE: - on_handshake_data_received_from_peer(s, 1); - break; - } + grpc_endpoint_read(s->wrapped_endpoint, &s->incoming, + &s->on_handshake_data_received_from_peer, call_list); } else { - check_peer(s); + check_peer(s, call_list); } } void grpc_setup_secure_transport(grpc_security_connector *connector, grpc_endpoint *nonsecure_endpoint, grpc_secure_transport_setup_done_cb cb, - void *user_data) { + void *user_data, grpc_call_list *call_list) { grpc_security_status result = GRPC_SECURITY_OK; grpc_secure_transport_setup *s = gpr_malloc(sizeof(grpc_secure_transport_setup)); memset(s, 0, sizeof(grpc_secure_transport_setup)); result = grpc_security_connector_create_handshaker(connector, &s->handshaker); if (result != GRPC_SECURITY_OK) { - secure_transport_setup_done(s, 0); + secure_transport_setup_done(s, 0, call_list); return; } s->connector = @@ -308,5 +287,5 @@ void grpc_setup_secure_transport(grpc_security_connector *connector, gpr_slice_buffer_init(&s->left_overs); gpr_slice_buffer_init(&s->outgoing); gpr_slice_buffer_init(&s->incoming); - send_handshake_bytes_to_peer(s); + send_handshake_bytes_to_peer(s, call_list); } diff --git a/src/core/security/secure_transport_setup.h b/src/core/security/secure_transport_setup.h index d9b802556d..867726ff4c 100644 --- a/src/core/security/secure_transport_setup.h +++ b/src/core/security/secure_transport_setup.h @@ -42,12 +42,13 @@ /* Ownership of the secure_endpoint is transfered. */ typedef void (*grpc_secure_transport_setup_done_cb)( void *user_data, grpc_security_status status, - grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint); + grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint, + grpc_call_list *call_list); /* Calls the callback upon completion. */ void grpc_setup_secure_transport(grpc_security_connector *connector, grpc_endpoint *nonsecure_endpoint, grpc_secure_transport_setup_done_cb cb, - void *user_data); + void *user_data, grpc_call_list *call_list); #endif /* GRPC_INTERNAL_CORE_SECURITY_SECURE_TRANSPORT_SETUP_H */ diff --git a/src/core/security/security_connector.c b/src/core/security/security_connector.c index ba9ac68c5f..dac6a5a5fc 100644 --- a/src/core/security/security_connector.c +++ b/src/core/security/security_connector.c @@ -119,9 +119,9 @@ grpc_security_status grpc_security_connector_check_peer( grpc_security_status grpc_channel_security_connector_check_call_host( grpc_channel_security_connector *sc, const char *host, - grpc_security_check_cb cb, void *user_data) { + grpc_security_check_cb cb, void *user_data, grpc_call_list *call_list) { if (sc == NULL || sc->check_call_host == NULL) return GRPC_SECURITY_ERROR; - return sc->check_call_host(sc, host, cb, user_data); + return sc->check_call_host(sc, host, cb, user_data, call_list); } #ifdef GRPC_SECURITY_CONNECTOR_REFCOUNT_DEBUG @@ -275,11 +275,11 @@ end: static grpc_security_status fake_channel_check_call_host( grpc_channel_security_connector *sc, const char *host, - grpc_security_check_cb cb, void *user_data) { + grpc_security_check_cb cb, void *user_data, grpc_call_list *call_list) { grpc_fake_channel_security_connector *c = (grpc_fake_channel_security_connector *)sc; if (c->call_host_check_is_async) { - cb(user_data, GRPC_SECURITY_OK); + cb(user_data, GRPC_SECURITY_OK, call_list); return GRPC_SECURITY_PENDING; } else { return GRPC_SECURITY_OK; @@ -495,7 +495,7 @@ static grpc_security_status ssl_server_check_peer(grpc_security_connector *sc, static grpc_security_status ssl_channel_check_call_host( grpc_channel_security_connector *sc, const char *host, - grpc_security_check_cb cb, void *user_data) { + grpc_security_check_cb cb, void *user_data, grpc_call_list *call_list) { grpc_ssl_channel_security_connector *c = (grpc_ssl_channel_security_connector *)sc; diff --git a/src/core/security/security_connector.h b/src/core/security/security_connector.h index 2c9aa1c5a4..d78d6ad69f 100644 --- a/src/core/security/security_connector.h +++ b/src/core/security/security_connector.h @@ -61,7 +61,8 @@ typedef struct grpc_security_connector grpc_security_connector; #define GRPC_SECURITY_CONNECTOR_ARG "grpc.security_connector" typedef void (*grpc_security_check_cb)(void *user_data, - grpc_security_status status); + grpc_security_status status, + grpc_call_list *call_list); typedef struct { void (*destroy)(grpc_security_connector *sc); @@ -138,7 +139,8 @@ struct grpc_channel_security_connector { grpc_security_status (*check_call_host)(grpc_channel_security_connector *sc, const char *host, grpc_security_check_cb cb, - void *user_data); + void *user_data, + grpc_call_list *call_list); }; /* Checks that the host that will be set for a call is acceptable. @@ -148,7 +150,7 @@ struct grpc_channel_security_connector { GRPC_SECURITY_PENDING unless an error is detected early on. */ grpc_security_status grpc_channel_security_connector_check_call_host( grpc_channel_security_connector *sc, const char *host, - grpc_security_check_cb cb, void *user_data); + grpc_security_check_cb cb, void *user_data, grpc_call_list *call_list); /* --- Creation security connectors. --- */ diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index 8468697472..2376cbbeb3 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -109,12 +109,14 @@ static grpc_mdelem *remove_consumed_md(void *user_data, grpc_mdelem *md) { return md; } +/* called from application code */ static void on_md_processing_done( void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md, const grpc_metadata *response_md, size_t num_response_md, grpc_status_code status, const char *error_details) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; /* TODO(jboeuf): Implement support for response_md. */ if (response_md != NULL && num_response_md > 0) { @@ -129,7 +131,7 @@ static void on_md_processing_done( grpc_metadata_batch_filter(&calld->md_op->data.metadata, remove_consumed_md, elem); grpc_metadata_array_destroy(&calld->md); - calld->on_done_recv->cb(calld->on_done_recv->cb_arg, 1); + calld->on_done_recv->cb(calld->on_done_recv->cb_arg, 1, &call_list); } else { gpr_slice message; grpc_metadata_array_destroy(&calld->md); @@ -139,11 +141,14 @@ static void on_md_processing_done( message = gpr_slice_from_copied_string(error_details); grpc_sopb_reset(calld->recv_ops); grpc_transport_stream_op_add_close(&calld->transport_op, status, &message); - grpc_call_next_op(elem, &calld->transport_op); + grpc_call_next_op(elem, &calld->transport_op, &call_list); } + + grpc_call_list_run(&call_list); } -static void auth_on_recv(void *user_data, int success) { +static void auth_on_recv(void *user_data, int success, + grpc_call_list *call_list) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; @@ -164,7 +169,7 @@ static void auth_on_recv(void *user_data, int success) { return; } } - calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success); + calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success, call_list); } static void set_recv_ops_md_callbacks(grpc_call_element *elem, @@ -186,9 +191,10 @@ static void set_recv_ops_md_callbacks(grpc_call_element *elem, op contains type and call direction information, in addition to the data that is being sent or received. */ static void auth_start_transport_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op *op, + grpc_call_list *call_list) { set_recv_ops_md_callbacks(elem, op); - grpc_call_next_op(elem, op); + grpc_call_next_op(elem, op, call_list); } /* Constructor for call_data */ @@ -227,12 +233,14 @@ static void init_call_elem(grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element *elem) {} +static void destroy_call_elem(grpc_call_element *elem, + grpc_call_list *call_list) {} /* Constructor for channel_data */ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *mdctx, - int is_first, int is_last) { + int is_first, int is_last, + grpc_call_list *call_list) { grpc_security_connector *sc = grpc_find_security_connector_in_args(args); grpc_auth_metadata_processor *processor = grpc_find_auth_metadata_processor_in_args(args); @@ -256,7 +264,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, } /* Destructor for channel data */ -static void destroy_channel_elem(grpc_channel_element *elem) { +static void destroy_channel_elem(grpc_channel_element *elem, + grpc_call_list *call_list) { /* grab pointers to our data from the channel element */ channel_data *chand = elem->channel_data; GRPC_SECURITY_CONNECTOR_UNREF(chand->security_connector, diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index a6f50712f5..e6e2eee658 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -66,6 +66,7 @@ typedef struct grpc_server_secure_state { int is_shutdown; gpr_mu mu; gpr_refcount refcount; + grpc_closure destroy_closure; } grpc_server_secure_state; static void state_ref(grpc_server_secure_state *state) { @@ -127,7 +128,8 @@ static int remove_tcp_from_list_locked(grpc_server_secure_state *state, static void on_secure_transport_setup_done(void *statep, grpc_security_status status, grpc_endpoint *wrapped_endpoint, - grpc_endpoint *secure_endpoint) { + grpc_endpoint *secure_endpoint, + grpc_call_list *call_list) { grpc_server_secure_state *state = statep; grpc_transport *transport; grpc_mdctx *mdctx; @@ -137,16 +139,16 @@ static void on_secure_transport_setup_done(void *statep, remove_tcp_from_list_locked(state, wrapped_endpoint); if (!state->is_shutdown) { mdctx = grpc_mdctx_create(); - workqueue = grpc_workqueue_create(); + workqueue = grpc_workqueue_create(call_list); transport = grpc_create_chttp2_transport( grpc_server_get_channel_args(state->server), secure_endpoint, mdctx, - workqueue, 0); + 0); setup_transport(state, transport, mdctx, workqueue); grpc_chttp2_transport_start_reading(transport, NULL, 0); } else { /* We need to consume this here, because the server may already have gone * away. */ - grpc_endpoint_destroy(secure_endpoint); + grpc_endpoint_destroy(secure_endpoint, call_list); } gpr_mu_unlock(&state->mu); } else { @@ -158,7 +160,8 @@ static void on_secure_transport_setup_done(void *statep, state_unref(state); } -static void on_accept(void *statep, grpc_endpoint *tcp) { +static void on_accept(void *statep, grpc_endpoint *tcp, + grpc_call_list *call_list) { grpc_server_secure_state *state = statep; tcp_endpoint_list *node; state_ref(state); @@ -169,22 +172,24 @@ static void on_accept(void *statep, grpc_endpoint *tcp) { state->handshaking_tcp_endpoints = node; gpr_mu_unlock(&state->mu); grpc_setup_secure_transport(state->sc, tcp, on_secure_transport_setup_done, - state); + state, call_list); } /* Server callback: start listening on our ports */ static void start(grpc_server *server, void *statep, grpc_pollset **pollsets, - size_t pollset_count) { + size_t pollset_count, grpc_call_list *call_list) { grpc_server_secure_state *state = statep; - grpc_tcp_server_start(state->tcp, pollsets, pollset_count, on_accept, state); + grpc_tcp_server_start(state->tcp, pollsets, pollset_count, on_accept, state, + call_list); } -static void destroy_done(void *statep) { +static void destroy_done(void *statep, int success, grpc_call_list *call_list) { grpc_server_secure_state *state = statep; grpc_server_listener_destroy_done(state->server); gpr_mu_lock(&state->mu); while (state->handshaking_tcp_endpoints != NULL) { - grpc_endpoint_shutdown(state->handshaking_tcp_endpoints->tcp_endpoint); + grpc_endpoint_shutdown(state->handshaking_tcp_endpoints->tcp_endpoint, + call_list); remove_tcp_from_list_locked(state, state->handshaking_tcp_endpoints->tcp_endpoint); } @@ -194,14 +199,16 @@ static void destroy_done(void *statep) { /* Server callback: destroy the tcp listener (so we don't generate further callbacks) */ -static void destroy(grpc_server *server, void *statep) { +static void destroy(grpc_server *server, void *statep, + grpc_call_list *call_list) { grpc_server_secure_state *state = statep; grpc_tcp_server *tcp; gpr_mu_lock(&state->mu); state->is_shutdown = 1; tcp = state->tcp; gpr_mu_unlock(&state->mu); - grpc_tcp_server_destroy(tcp, destroy_done, state); + grpc_closure_init(&state->destroy_closure, destroy_done, state); + grpc_tcp_server_destroy(tcp, &state->destroy_closure, call_list); } int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, @@ -215,6 +222,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, int port_temp; grpc_security_status status = GRPC_SECURITY_ERROR; grpc_security_connector *sc = NULL; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; /* create security context */ if (creds == NULL) goto error; @@ -277,6 +285,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, /* Register with the server only upon success */ grpc_server_add_listener(server, state, start, destroy); + grpc_call_list_run(&call_list); return port_num; /* Error path: cleanup and return */ @@ -288,10 +297,11 @@ error: grpc_resolved_addresses_destroy(resolved); } if (tcp) { - grpc_tcp_server_destroy(tcp, NULL, NULL); + grpc_tcp_server_destroy(tcp, NULL, &call_list); } if (state) { gpr_free(state); } + grpc_call_list_run(&call_list); return 0; } |