diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/client_config/connector.h | 3 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 3 | ||||
-rw-r--r-- | src/core/httpcli/httpcli_security_connector.c | 18 | ||||
-rw-r--r-- | src/core/security/client_auth_filter.c | 44 | ||||
-rw-r--r-- | src/core/security/credentials.c | 6 | ||||
-rw-r--r-- | src/core/security/handshake.c | 24 | ||||
-rw-r--r-- | src/core/security/security_connector.c | 196 | ||||
-rw-r--r-- | src/core/security/security_connector.h | 70 | ||||
-rw-r--r-- | src/core/security/server_auth_filter.c | 2 | ||||
-rw-r--r-- | src/core/security/server_secure_chttp2.c | 10 | ||||
-rw-r--r-- | src/core/surface/call.c | 14 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 1 | ||||
-rw-r--r-- | src/core/surface/init.c | 4 | ||||
-rw-r--r-- | src/core/surface/secure_channel_create.c | 12 | ||||
-rw-r--r-- | src/core/transport/byte_stream.c | 8 | ||||
-rw-r--r-- | src/core/transport/byte_stream.h | 5 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_data.c | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 33 |
18 files changed, 254 insertions, 201 deletions
diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h index a649f143ae..b4482fa2ee 100644 --- a/src/core/client_config/connector.h +++ b/src/core/client_config/connector.h @@ -65,6 +65,9 @@ typedef struct { /** any additional filters (owned by the caller of connect) */ const grpc_channel_filter **filters; size_t num_filters; + + /** channel arguments (to be passed to the filters) */ + const grpc_channel_args *channel_args; } grpc_connect_out_args; struct grpc_connector_vtable { diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index afb1cdbd6d..9a332c4d67 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -493,7 +493,8 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { con = gpr_malloc(channel_stack_size); stk = CHANNEL_STACK_FROM_CONNECTION(con); grpc_channel_stack_init(exec_ctx, 1, connection_destroy, con, filters, - num_filters, c->args, "CONNECTED_SUBCHANNEL", stk); + num_filters, c->connecting_result.channel_args, + "CONNECTED_SUBCHANNEL", stk); grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); gpr_free((void *)c->connecting_result.filters); memset(&c->connecting_result, 0, sizeof(c->connecting_result)); diff --git a/src/core/httpcli/httpcli_security_connector.c b/src/core/httpcli/httpcli_security_connector.c index a5aa551373..ba7cba25f9 100644 --- a/src/core/httpcli/httpcli_security_connector.c +++ b/src/core/httpcli/httpcli_security_connector.c @@ -68,7 +68,7 @@ static void httpcli_ssl_do_handshake(grpc_exec_ctx *exec_ctx, tsi_result result = TSI_OK; tsi_handshaker *handshaker; if (c->handshaker_factory == NULL) { - cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL); + cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL); return; } result = tsi_ssl_handshaker_factory_create_handshaker( @@ -76,17 +76,18 @@ static void httpcli_ssl_do_handshake(grpc_exec_ctx *exec_ctx, if (result != TSI_OK) { gpr_log(GPR_ERROR, "Handshaker creation failed with error %s.", tsi_result_to_string(result)); - cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL); + cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL); } else { grpc_do_security_handshake(exec_ctx, handshaker, sc, nonsecure_endpoint, cb, user_data); } } -static grpc_security_status httpcli_ssl_check_peer(grpc_security_connector *sc, - tsi_peer peer, - grpc_security_check_cb cb, - void *user_data) { +static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx, + grpc_security_connector *sc, + tsi_peer peer, + grpc_security_peer_check_cb cb, + void *user_data) { grpc_httpcli_ssl_channel_security_connector *c = (grpc_httpcli_ssl_channel_security_connector *)sc; grpc_security_status status = GRPC_SECURITY_OK; @@ -98,8 +99,8 @@ static grpc_security_status httpcli_ssl_check_peer(grpc_security_connector *sc, c->secure_peer_name); status = GRPC_SECURITY_ERROR; } + cb(exec_ctx, user_data, status, NULL); tsi_peer_destruct(&peer); - return status; } static grpc_security_connector_vtable httpcli_ssl_vtable = { @@ -149,7 +150,8 @@ typedef struct { static void on_secure_transport_setup_done(grpc_exec_ctx *exec_ctx, void *rp, grpc_security_status status, - grpc_endpoint *secure_endpoint) { + grpc_endpoint *secure_endpoint, + grpc_auth_context *auth_context) { on_done_closure *c = rp; if (status != GRPC_SECURITY_OK) { gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status); diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c index b1fd733c91..1cb247d874 100644 --- a/src/core/security/client_auth_filter.c +++ b/src/core/security/client_auth_filter.c @@ -68,6 +68,7 @@ typedef struct { /* We can have a per-channel credentials. */ typedef struct { grpc_channel_security_connector *security_connector; + grpc_auth_context *auth_context; } channel_data; static void reset_auth_metadata_context( @@ -122,6 +123,7 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data, } void build_auth_metadata_context(grpc_security_connector *sc, + grpc_auth_context *auth_context, call_data *calld) { char *service = gpr_strdup(grpc_mdstr_as_c_string(calld->method)); char *last_slash = strrchr(service, '/'); @@ -145,7 +147,7 @@ void build_auth_metadata_context(grpc_security_connector *sc, calld->auth_md_context.service_url = service_url; calld->auth_md_context.method_name = method_name; calld->auth_md_context.channel_auth_context = - GRPC_AUTH_CONTEXT_REF(sc->auth_context, "grpc_auth_metadata_context"); + GRPC_AUTH_CONTEXT_REF(auth_context, "grpc_auth_metadata_context"); gpr_free(service); } @@ -179,7 +181,8 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, call_creds_has_md ? ctx->creds : channel_call_creds); } - build_auth_metadata_context(&chand->security_connector->base, calld); + build_auth_metadata_context(&chand->security_connector->base, + chand->auth_context, calld); calld->op = *op; /* Copy op (originates from the caller's stack). */ GPR_ASSERT(calld->pollset); grpc_call_credentials_get_request_metadata( @@ -230,7 +233,7 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, sec_ctx = op->context[GRPC_CONTEXT_SECURITY].value; GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter"); sec_ctx->auth_context = GRPC_AUTH_CONTEXT_REF( - chand->security_connector->base.auth_context, "client_auth_filter"); + chand->auth_context, "client_auth_filter"); } if (op->send_initial_metadata != NULL) { @@ -247,27 +250,13 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, } } if (calld->host != NULL) { - grpc_security_status status; const char *call_host = grpc_mdstr_as_c_string(calld->host); calld->op = *op; /* Copy op (originates from the caller's stack). */ - status = grpc_channel_security_connector_check_call_host( - exec_ctx, chand->security_connector, call_host, on_host_checked, - elem); - if (status != GRPC_SECURITY_OK) { - if (status == GRPC_SECURITY_ERROR) { - char *error_msg; - gpr_asprintf(&error_msg, - "Invalid host %s set in :authority metadata.", - call_host); - bubble_up_error(exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, - error_msg); - gpr_free(error_msg); - } - return; /* early exit */ - } + grpc_channel_security_connector_check_call_host( + exec_ctx, chand->security_connector, call_host, chand->auth_context, + on_host_checked, elem); + return; /* early exit */ } - send_security_metadata(exec_ctx, elem, op); - return; /* early exit */ } /* pass control down the stack */ @@ -307,6 +296,9 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element_args *args) { grpc_security_connector *sc = grpc_find_security_connector_in_args(args->channel_args); + grpc_auth_context *auth_context = + grpc_find_auth_context_in_args(args->channel_args); + /* grab pointers to our data from the channel element */ channel_data *chand = elem->channel_data; @@ -315,12 +307,15 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, path */ GPR_ASSERT(!args->is_last); GPR_ASSERT(sc != NULL); + GPR_ASSERT(auth_context != NULL); /* initialize members */ GPR_ASSERT(sc->is_client_side); chand->security_connector = (grpc_channel_security_connector *)GRPC_SECURITY_CONNECTOR_REF( sc, "client_auth_filter"); + chand->auth_context = + GRPC_AUTH_CONTEXT_REF(auth_context, "client_auth_filter"); } /* Destructor for channel data */ @@ -328,10 +323,11 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { /* grab pointers to our data from the channel element */ channel_data *chand = elem->channel_data; - grpc_channel_security_connector *ctx = chand->security_connector; - if (ctx != NULL) { - GRPC_SECURITY_CONNECTOR_UNREF(&ctx->base, "client_auth_filter"); + grpc_channel_security_connector *sc = chand->security_connector; + if (sc != NULL) { + GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "client_auth_filter"); } + GRPC_AUTH_CONTEXT_UNREF(chand->auth_context, "client_auth_filter"); } const grpc_channel_filter grpc_client_auth_filter = { diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index a0054741ad..1d1c3b098a 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -179,8 +179,8 @@ void grpc_server_credentials_set_auth_metadata_processor( GRPC_API_TRACE( "grpc_server_credentials_set_auth_metadata_processor(" "creds=%p, " - "processor=grpc_auth_metadata_processor { process: %lx, state: %p })", - 3, (creds, (unsigned long)processor.process, processor.state)); + "processor=grpc_auth_metadata_processor { process: %p, state: %p })", + 3, (creds, (void*)(gpr_intptr)processor.process, processor.state)); if (creds == NULL) return; if (creds->processor.destroy != NULL && creds->processor.state != NULL) { creds->processor.destroy(creds->processor.state); @@ -881,7 +881,7 @@ static grpc_security_status fake_transport_security_create_security_connector( grpc_channel_credentials *c, grpc_call_credentials *call_creds, const char *target, const grpc_channel_args *args, grpc_channel_security_connector **sc, grpc_channel_args **new_args) { - *sc = grpc_fake_channel_security_connector_create(call_creds, 1); + *sc = grpc_fake_channel_security_connector_create(call_creds); return GRPC_SECURITY_OK; } diff --git a/src/core/security/handshake.c b/src/core/security/handshake.c index 6734187fce..364b765396 100644 --- a/src/core/security/handshake.c +++ b/src/core/security/handshake.c @@ -35,6 +35,7 @@ #include <string.h> +#include "src/core/security/security_context.h" #include "src/core/security/secure_endpoint.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -56,6 +57,7 @@ typedef struct { void *user_data; grpc_closure on_handshake_data_sent_to_peer; grpc_closure on_handshake_data_received_from_peer; + grpc_auth_context *auth_context; } grpc_security_handshake; static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, @@ -96,7 +98,8 @@ static void security_handshake_done(grpc_exec_ctx *exec_ctx, security_connector_remove_handshake(h); } if (is_success) { - h->cb(exec_ctx, h->user_data, GRPC_SECURITY_OK, h->secure_endpoint); + h->cb(exec_ctx, h->user_data, GRPC_SECURITY_OK, h->secure_endpoint, + h->auth_context); } else { if (h->secure_endpoint != NULL) { grpc_endpoint_shutdown(exec_ctx, h->secure_endpoint); @@ -104,19 +107,21 @@ static void security_handshake_done(grpc_exec_ctx *exec_ctx, } else { grpc_endpoint_destroy(exec_ctx, h->wrapped_endpoint); } - h->cb(exec_ctx, h->user_data, GRPC_SECURITY_ERROR, NULL); + h->cb(exec_ctx, h->user_data, GRPC_SECURITY_ERROR, NULL, NULL); } if (h->handshaker != NULL) tsi_handshaker_destroy(h->handshaker); if (h->handshake_buffer != NULL) gpr_free(h->handshake_buffer); gpr_slice_buffer_destroy(&h->left_overs); gpr_slice_buffer_destroy(&h->outgoing); gpr_slice_buffer_destroy(&h->incoming); + GRPC_AUTH_CONTEXT_UNREF(h->auth_context, "handshake"); GRPC_SECURITY_CONNECTOR_UNREF(h->connector, "handshake"); gpr_free(h); } static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_security_status status) { + grpc_security_status status, + grpc_auth_context *auth_context) { grpc_security_handshake *h = user_data; tsi_frame_protector *protector; tsi_result result; @@ -125,6 +130,7 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *user_data, security_handshake_done(exec_ctx, h, 0); return; } + h->auth_context = GRPC_AUTH_CONTEXT_REF(auth_context, "handshake"); result = tsi_handshaker_create_frame_protector(h->handshaker, NULL, &protector); if (result != TSI_OK) { @@ -143,7 +149,6 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *user_data, } static void check_peer(grpc_exec_ctx *exec_ctx, grpc_security_handshake *h) { - grpc_security_status peer_status; tsi_peer peer; tsi_result result = tsi_handshaker_extract_peer(h->handshaker, &peer); @@ -153,15 +158,8 @@ static void check_peer(grpc_exec_ctx *exec_ctx, grpc_security_handshake *h) { security_handshake_done(exec_ctx, h, 0); return; } - peer_status = grpc_security_connector_check_peer(h->connector, peer, - on_peer_checked, h); - if (peer_status == GRPC_SECURITY_ERROR) { - gpr_log(GPR_ERROR, "Peer check failed."); - security_handshake_done(exec_ctx, h, 0); - return; - } else if (peer_status == GRPC_SECURITY_OK) { - on_peer_checked(exec_ctx, h, peer_status); - } + grpc_security_connector_check_peer(exec_ctx, h->connector, peer, + on_peer_checked, h); } static void send_handshake_bytes_to_peer(grpc_exec_ctx *exec_ctx, diff --git a/src/core/security/security_connector.c b/src/core/security/security_connector.c index 8c6ab0b8a4..204cd324f6 100644 --- a/src/core/security/security_connector.c +++ b/src/core/security/security_connector.c @@ -124,27 +124,34 @@ void grpc_security_connector_do_handshake(grpc_exec_ctx *exec_ctx, grpc_security_handshake_done_cb cb, void *user_data) { if (sc == NULL || nonsecure_endpoint == NULL) { - cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL); + cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL); } else { sc->vtable->do_handshake(exec_ctx, sc, nonsecure_endpoint, cb, user_data); } } -grpc_security_status grpc_security_connector_check_peer( - grpc_security_connector *sc, tsi_peer peer, grpc_security_check_cb cb, - void *user_data) { +void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx, + grpc_security_connector *sc, + tsi_peer peer, + grpc_security_peer_check_cb cb, + void *user_data) { if (sc == NULL) { + cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL); tsi_peer_destruct(&peer); - return GRPC_SECURITY_ERROR; + } else { + sc->vtable->check_peer(exec_ctx, sc, peer, cb, user_data); } - return sc->vtable->check_peer(sc, peer, cb, user_data); } -grpc_security_status grpc_channel_security_connector_check_call_host( +void grpc_channel_security_connector_check_call_host( grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, - const char *host, grpc_security_check_cb cb, void *user_data) { - if (sc == NULL || sc->check_call_host == NULL) return GRPC_SECURITY_ERROR; - return sc->check_call_host(exec_ctx, sc, host, cb, user_data); + const char *host, grpc_auth_context *auth_context, + grpc_security_call_host_check_cb cb, void *user_data) { + if (sc == NULL || sc->check_call_host == NULL) { + cb(exec_ctx, user_data, GRPC_SECURITY_ERROR); + } else { + sc->check_call_host(exec_ctx, sc, host, auth_context, cb, user_data); + } } #ifdef GRPC_SECURITY_CONNECTOR_REFCOUNT_DEBUG @@ -221,30 +228,23 @@ grpc_security_connector *grpc_find_security_connector_in_args( /* -- Fake implementation. -- */ -typedef struct { - grpc_channel_security_connector base; - int call_host_check_is_async; -} grpc_fake_channel_security_connector; - static void fake_channel_destroy(grpc_security_connector *sc) { grpc_channel_security_connector *c = (grpc_channel_security_connector *)sc; grpc_call_credentials_unref(c->request_metadata_creds); - GRPC_AUTH_CONTEXT_UNREF(sc->auth_context, "connector"); gpr_free(sc); } static void fake_server_destroy(grpc_security_connector *sc) { - GRPC_AUTH_CONTEXT_UNREF(sc->auth_context, "connector"); gpr_mu_destroy(&sc->mu); gpr_free(sc); } -static grpc_security_status fake_check_peer(grpc_security_connector *sc, - tsi_peer peer, - grpc_security_check_cb cb, - void *user_data) { +static void fake_check_peer(grpc_exec_ctx *exec_ctx, + grpc_security_connector *sc, tsi_peer peer, + grpc_security_peer_check_cb cb, void *user_data) { const char *prop_name; grpc_security_status status = GRPC_SECURITY_OK; + grpc_auth_context *auth_context = NULL; if (peer.property_count != 1) { gpr_log(GPR_ERROR, "Fake peers should only have 1 property."); status = GRPC_SECURITY_ERROR; @@ -264,28 +264,24 @@ static grpc_security_status fake_check_peer(grpc_security_connector *sc, status = GRPC_SECURITY_ERROR; goto end; } - GRPC_AUTH_CONTEXT_UNREF(sc->auth_context, "connector"); - sc->auth_context = grpc_auth_context_create(NULL); + auth_context = grpc_auth_context_create(NULL); grpc_auth_context_add_cstring_property( - sc->auth_context, GRPC_TRANSPORT_SECURITY_TYPE_PROPERTY_NAME, + auth_context, GRPC_TRANSPORT_SECURITY_TYPE_PROPERTY_NAME, GRPC_FAKE_TRANSPORT_SECURITY_TYPE); end: + cb(exec_ctx, user_data, status, auth_context); + grpc_auth_context_unref(auth_context); tsi_peer_destruct(&peer); - return status; } -static grpc_security_status fake_channel_check_call_host( - grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, - const char *host, grpc_security_check_cb cb, void *user_data) { - grpc_fake_channel_security_connector *c = - (grpc_fake_channel_security_connector *)sc; - if (c->call_host_check_is_async) { - cb(exec_ctx, user_data, GRPC_SECURITY_OK); - return GRPC_SECURITY_PENDING; - } else { - return GRPC_SECURITY_OK; - } +static void fake_channel_check_call_host(grpc_exec_ctx *exec_ctx, + grpc_channel_security_connector *sc, + const char *host, + grpc_auth_context *auth_context, + grpc_security_call_host_check_cb cb, + void *user_data) { + cb(exec_ctx, user_data, GRPC_SECURITY_OK); } static void fake_channel_do_handshake(grpc_exec_ctx *exec_ctx, @@ -313,20 +309,17 @@ static grpc_security_connector_vtable fake_server_vtable = { fake_server_destroy, fake_server_do_handshake, fake_check_peer}; grpc_channel_security_connector *grpc_fake_channel_security_connector_create( - grpc_call_credentials *request_metadata_creds, - int call_host_check_is_async) { - grpc_fake_channel_security_connector *c = - gpr_malloc(sizeof(grpc_fake_channel_security_connector)); - memset(c, 0, sizeof(grpc_fake_channel_security_connector)); - gpr_ref_init(&c->base.base.refcount, 1); - c->base.base.is_client_side = 1; - c->base.base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME; - c->base.base.vtable = &fake_channel_vtable; - c->base.request_metadata_creds = + grpc_call_credentials *request_metadata_creds) { + grpc_channel_security_connector *c = gpr_malloc(sizeof(*c)); + memset(c, 0, sizeof(*c)); + gpr_ref_init(&c->base.refcount, 1); + c->base.is_client_side = 1; + c->base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME; + c->base.vtable = &fake_channel_vtable; + c->request_metadata_creds = grpc_call_credentials_ref(request_metadata_creds); - c->base.check_call_host = fake_channel_check_call_host; - c->call_host_check_is_async = call_host_check_is_async; - return &c->base; + c->check_call_host = fake_channel_check_call_host; + return c; } grpc_security_connector *grpc_fake_server_security_connector_create(void) { @@ -347,7 +340,6 @@ typedef struct { tsi_ssl_handshaker_factory *handshaker_factory; char *target_name; char *overridden_target_name; - tsi_peer peer; } grpc_ssl_channel_security_connector; typedef struct { @@ -364,8 +356,6 @@ static void ssl_channel_destroy(grpc_security_connector *sc) { } if (c->target_name != NULL) gpr_free(c->target_name); if (c->overridden_target_name != NULL) gpr_free(c->overridden_target_name); - tsi_peer_destruct(&c->peer); - GRPC_AUTH_CONTEXT_UNREF(sc->auth_context, "connector"); gpr_free(sc); } @@ -376,7 +366,6 @@ static void ssl_server_destroy(grpc_security_connector *sc) { if (c->handshaker_factory != NULL) { tsi_ssl_handshaker_factory_destroy(c->handshaker_factory); } - GRPC_AUTH_CONTEXT_UNREF(sc->auth_context, "connector"); gpr_mu_destroy(&sc->mu); gpr_free(sc); } @@ -410,7 +399,7 @@ static void ssl_channel_do_handshake(grpc_exec_ctx *exec_ctx, : c->target_name, &handshaker); if (status != GRPC_SECURITY_OK) { - cb(exec_ctx, user_data, status, NULL); + cb(exec_ctx, user_data, status, NULL, NULL); } else { grpc_do_security_handshake(exec_ctx, handshaker, sc, nonsecure_endpoint, cb, user_data); @@ -428,7 +417,7 @@ static void ssl_server_do_handshake(grpc_exec_ctx *exec_ctx, grpc_security_status status = ssl_create_handshaker(c->handshaker_factory, 0, NULL, &handshaker); if (status != GRPC_SECURITY_OK) { - cb(exec_ctx, user_data, status, NULL); + cb(exec_ctx, user_data, status, NULL, NULL); } else { grpc_do_security_handshake(exec_ctx, handshaker, sc, nonsecure_endpoint, cb, user_data); @@ -488,7 +477,8 @@ grpc_auth_context *tsi_ssl_peer_to_auth_context(const tsi_peer *peer) { static grpc_security_status ssl_check_peer(grpc_security_connector *sc, const char *peer_name, - const tsi_peer *peer) { + const tsi_peer *peer, + grpc_auth_context **auth_context) { /* Check the ALPN. */ const tsi_peer_property *p = tsi_peer_get_property_by_name(peer, TSI_SSL_ALPN_SELECTED_PROTOCOL); @@ -506,54 +496,96 @@ static grpc_security_status ssl_check_peer(grpc_security_connector *sc, gpr_log(GPR_ERROR, "Peer name %s is not in peer certificate", peer_name); return GRPC_SECURITY_ERROR; } - if (sc->auth_context != NULL) { - GRPC_AUTH_CONTEXT_UNREF(sc->auth_context, "connector"); - } - sc->auth_context = tsi_ssl_peer_to_auth_context(peer); + *auth_context = tsi_ssl_peer_to_auth_context(peer); return GRPC_SECURITY_OK; } -static grpc_security_status ssl_channel_check_peer(grpc_security_connector *sc, - tsi_peer peer, - grpc_security_check_cb cb, - void *user_data) { +static void ssl_channel_check_peer( + grpc_exec_ctx *exec_ctx, grpc_security_connector *sc, tsi_peer peer, + grpc_security_peer_check_cb cb, void *user_data) { grpc_ssl_channel_security_connector *c = (grpc_ssl_channel_security_connector *)sc; grpc_security_status status; - tsi_peer_destruct(&c->peer); - c->peer = peer; + grpc_auth_context *auth_context = NULL; status = ssl_check_peer(sc, c->overridden_target_name != NULL ? c->overridden_target_name : c->target_name, - &peer); - return status; + &peer, &auth_context); + cb(exec_ctx, user_data, status, auth_context); + grpc_auth_context_unref(auth_context); + tsi_peer_destruct(&peer); } -static grpc_security_status ssl_server_check_peer(grpc_security_connector *sc, - tsi_peer peer, - grpc_security_check_cb cb, - void *user_data) { - grpc_security_status status = ssl_check_peer(sc, NULL, &peer); +static void ssl_server_check_peer( + grpc_exec_ctx *exec_ctx, grpc_security_connector *sc, tsi_peer peer, + grpc_security_peer_check_cb cb, void *user_data) { + grpc_auth_context *auth_context = NULL; + grpc_security_status status = ssl_check_peer(sc, NULL, &peer, &auth_context); tsi_peer_destruct(&peer); - return status; + cb(exec_ctx, user_data, status, auth_context); + grpc_auth_context_unref(auth_context); } -static grpc_security_status ssl_channel_check_call_host( - grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, - const char *host, grpc_security_check_cb cb, void *user_data) { +static void add_shalow_auth_property_to_peer(tsi_peer *peer, + const grpc_auth_property *prop, + const char *tsi_prop_name) { + tsi_peer_property *tsi_prop = &peer->properties[peer->property_count++]; + tsi_prop->name = (char *)tsi_prop_name; + tsi_prop->value.data = prop->value; + tsi_prop->value.length = prop->value_length; +} + +tsi_peer tsi_shallow_peer_from_ssl_auth_context( + const grpc_auth_context *auth_context) { + size_t max_num_props = 0; + grpc_auth_property_iterator it; + const grpc_auth_property *prop; + tsi_peer peer; + memset(&peer, 0, sizeof(peer)); + + it = grpc_auth_context_property_iterator(auth_context); + while (grpc_auth_property_iterator_next(&it) != NULL) max_num_props++; + + if (max_num_props > 0) { + peer.properties = gpr_malloc(max_num_props * sizeof(tsi_peer_property)); + it = grpc_auth_context_property_iterator(auth_context); + while ((prop = grpc_auth_property_iterator_next(&it)) != NULL) { + if (strcmp(prop->name, GRPC_X509_SAN_PROPERTY_NAME) == 0) { + add_shalow_auth_property_to_peer( + &peer, prop, TSI_X509_SUBJECT_ALTERNATIVE_NAME_PEER_PROPERTY); + } else if (strcmp(prop->name, GRPC_X509_CN_PROPERTY_NAME) == 0) { + add_shalow_auth_property_to_peer( + &peer, prop, TSI_X509_SUBJECT_COMMON_NAME_PEER_PROPERTY); + } + } + } + return peer; +} + +void tsi_shallow_peer_destruct(tsi_peer *peer) { + if (peer->properties != NULL) gpr_free(peer->properties); +} + +static void ssl_channel_check_call_host(grpc_exec_ctx *exec_ctx, + grpc_channel_security_connector *sc, + const char *host, + grpc_auth_context *auth_context, + grpc_security_call_host_check_cb cb, + void *user_data) { grpc_ssl_channel_security_connector *c = (grpc_ssl_channel_security_connector *)sc; - - if (ssl_host_matches_name(&c->peer, host)) return GRPC_SECURITY_OK; + grpc_security_status status = GRPC_SECURITY_ERROR; + tsi_peer peer = tsi_shallow_peer_from_ssl_auth_context(auth_context); + if (ssl_host_matches_name(&peer, host)) status = GRPC_SECURITY_OK; /* If the target name was overridden, then the original target_name was 'checked' transitively during the previous peer check at the end of the handshake. */ if (c->overridden_target_name != NULL && strcmp(host, c->target_name) == 0) { - return GRPC_SECURITY_OK; - } else { - return GRPC_SECURITY_ERROR; + status = GRPC_SECURITY_OK; } + cb(exec_ctx, user_data, status); + tsi_shallow_peer_destruct(&peer); } static grpc_security_connector_vtable ssl_channel_vtable = { diff --git a/src/core/security/security_connector.h b/src/core/security/security_connector.h index 7edb05a662..b5f3ff17f4 100644 --- a/src/core/security/security_connector.h +++ b/src/core/security/security_connector.h @@ -42,7 +42,6 @@ typedef enum { GRPC_SECURITY_OK = 0, - GRPC_SECURITY_PENDING, GRPC_SECURITY_ERROR } grpc_security_status; @@ -60,23 +59,24 @@ typedef struct grpc_security_connector grpc_security_connector; #define GRPC_SECURITY_CONNECTOR_ARG "grpc.security_connector" -typedef void (*grpc_security_check_cb)(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_security_status status); +typedef void (*grpc_security_peer_check_cb)(grpc_exec_ctx *exec_ctx, + void *user_data, + grpc_security_status status, + grpc_auth_context *auth_context); /* Ownership of the secure_endpoint is transfered. */ -typedef void (*grpc_security_handshake_done_cb)(grpc_exec_ctx *exec_ctx, - void *user_data, - grpc_security_status status, - grpc_endpoint *secure_endpoint); +typedef void (*grpc_security_handshake_done_cb)( + grpc_exec_ctx *exec_ctx, void *user_data, grpc_security_status status, + grpc_endpoint *secure_endpoint, grpc_auth_context *auth_context); typedef struct { void (*destroy)(grpc_security_connector *sc); void (*do_handshake)(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc, grpc_endpoint *nonsecure_endpoint, grpc_security_handshake_done_cb cb, void *user_data); - grpc_security_status (*check_peer)(grpc_security_connector *sc, tsi_peer peer, - grpc_security_check_cb cb, - void *user_data); + void (*check_peer)(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc, + tsi_peer peer, grpc_security_peer_check_cb cb, + void *user_data); } grpc_security_connector_vtable; typedef struct grpc_security_connector_handshake_list { @@ -89,9 +89,8 @@ struct grpc_security_connector { gpr_refcount refcount; int is_client_side; const char *url_scheme; - grpc_auth_context *auth_context; /* Populated after the peer is checked. */ /* Used on server side only. */ - /* TODO(yangg) maybe create a grpc_server_security_connector with these */ + /* TODO(yangg): Create a grpc_server_security_connector with these. */ gpr_mu mu; grpc_security_connector_handshake_list *handshaking_handshakes; const grpc_channel_args *channel_args; @@ -124,16 +123,13 @@ void grpc_security_connector_do_handshake(grpc_exec_ctx *exec_ctx, grpc_security_handshake_done_cb cb, void *user_data); -/* Check the peer. - Implementations can choose to check the peer either synchronously or - asynchronously. In the first case, a successful call will return - GRPC_SECURITY_OK. In the asynchronous case, the call will return - GRPC_SECURITY_PENDING unless an error is detected early on. - Ownership of the peer is transfered. -*/ -grpc_security_status grpc_security_connector_check_peer( - grpc_security_connector *sc, tsi_peer peer, grpc_security_check_cb cb, - void *user_data); +/* Check the peer. Callee takes ownership of the peer object. + The callback will include the resulting auth_context. */ +void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx, + grpc_security_connector *sc, + tsi_peer peer, + grpc_security_peer_check_cb cb, + void *user_data); void grpc_security_connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_security_connector *connector); @@ -155,32 +151,31 @@ grpc_security_connector *grpc_find_security_connector_in_args( typedef struct grpc_channel_security_connector grpc_channel_security_connector; +typedef void (*grpc_security_call_host_check_cb)(grpc_exec_ctx *exec_ctx, + void *user_data, + grpc_security_status status); + struct grpc_channel_security_connector { grpc_security_connector base; /* requires is_client_side to be non 0. */ grpc_call_credentials *request_metadata_creds; - grpc_security_status (*check_call_host)(grpc_exec_ctx *exec_ctx, - grpc_channel_security_connector *sc, - const char *host, - grpc_security_check_cb cb, - void *user_data); + void (*check_call_host)(grpc_exec_ctx *exec_ctx, + grpc_channel_security_connector *sc, const char *host, + grpc_auth_context *auth_context, + grpc_security_call_host_check_cb cb, void *user_data); }; -/* Checks that the host that will be set for a call is acceptable. - Implementations can choose do the check either synchronously or - asynchronously. In the first case, a successful call will return - GRPC_SECURITY_OK. In the asynchronous case, the call will return - GRPC_SECURITY_PENDING unless an error is detected early on. */ -grpc_security_status grpc_channel_security_connector_check_call_host( +/* Checks that the host that will be set for a call is acceptable. */ +void grpc_channel_security_connector_check_call_host( grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, - const char *host, grpc_security_check_cb cb, void *user_data); + const char *host, grpc_auth_context *auth_context, + grpc_security_call_host_check_cb cb, void *user_data); /* --- Creation security connectors. --- */ /* For TESTING ONLY! Creates a fake connector that emulates real channel security. */ grpc_channel_security_connector *grpc_fake_channel_security_connector_create( - grpc_call_credentials *request_metadata_creds, - int call_host_check_is_async); + grpc_call_credentials *request_metadata_creds); /* For TESTING ONLY! Creates a fake connector that emulates real server security. */ @@ -244,5 +239,8 @@ const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer, /* Exposed for testing only. */ grpc_auth_context *tsi_ssl_peer_to_auth_context(const tsi_peer *peer); +tsi_peer tsi_shallow_peer_from_ssl_auth_context( + const grpc_auth_context *auth_context); +void tsi_shallow_peer_destruct(tsi_peer *peer); #endif /* GRPC_INTERNAL_CORE_SECURITY_SECURITY_CONNECTOR_H */ diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index 5cfee6d139..d5c8c54369 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -140,7 +140,7 @@ static void on_md_processing_done( message = gpr_slice_from_copied_string(error_details); calld->transport_op.send_initial_metadata = NULL; if (calld->transport_op.send_message != NULL) { - grpc_byte_stream_destroy(calld->transport_op.send_message); + grpc_byte_stream_destroy(&exec_ctx, calld->transport_op.send_message); calld->transport_op.send_message = NULL; } calld->transport_op.send_trailing_metadata = NULL; diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index d1468e40e0..d7fad33854 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -81,14 +81,15 @@ static void state_unref(grpc_server_secure_state *state) { } static void setup_transport(grpc_exec_ctx *exec_ctx, void *statep, - grpc_transport *transport) { + grpc_transport *transport, + grpc_auth_context *auth_context) { static grpc_channel_filter const *extra_filters[] = { &grpc_server_auth_filter, &grpc_http_server_filter}; grpc_server_secure_state *state = statep; grpc_channel_args *args_copy; grpc_arg args_to_add[2]; args_to_add[0] = grpc_server_credentials_to_arg(state->creds); - args_to_add[1] = grpc_auth_context_to_arg(state->sc->auth_context); + args_to_add[1] = grpc_auth_context_to_arg(auth_context); args_copy = grpc_channel_args_copy_and_add( grpc_server_get_channel_args(state->server), args_to_add, GPR_ARRAY_SIZE(args_to_add)); @@ -99,7 +100,8 @@ static void setup_transport(grpc_exec_ctx *exec_ctx, void *statep, static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, grpc_security_status status, - grpc_endpoint *secure_endpoint) { + grpc_endpoint *secure_endpoint, + grpc_auth_context *auth_context) { grpc_server_secure_state *state = statep; grpc_transport *transport; if (status == GRPC_SECURITY_OK) { @@ -109,7 +111,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, transport = grpc_create_chttp2_transport( exec_ctx, grpc_server_get_channel_args(state->server), secure_endpoint, 0); - setup_transport(exec_ctx, state, transport); + setup_transport(exec_ctx, state, transport, auth_context); grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0); } else { /* We need to consume this here, because the server may already have diff --git a/src/core/surface/call.c b/src/core/surface/call.c index f8dde0748b..73c1996908 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -360,7 +360,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, int success) { &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]); } if (c->receiving_stream != NULL) { - grpc_byte_stream_destroy(c->receiving_stream); + grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); } grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c)); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call"); @@ -951,7 +951,7 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, (*call->receiving_buffer)->data.raw.slice_buffer.length; if (remaining == 0) { call->receiving_message = 0; - grpc_byte_stream_destroy(call->receiving_stream); + grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); call->receiving_stream = NULL; if (gpr_unref(&bctl->steps_to_complete)) { post_batch_completion(exec_ctx, bctl); @@ -979,7 +979,7 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, call->receiving_slice); continue_receiving_slices(exec_ctx, bctl); } else { - grpc_byte_stream_destroy(call->receiving_stream); + grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); call->receiving_stream = NULL; grpc_byte_buffer_destroy(*call->receiving_buffer); *call->receiving_buffer = NULL; @@ -1068,7 +1068,7 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, if (call->receiving_stream == NULL) { *call->receiving_buffer = NULL; - call->receiving_message = 0; + call->receiving_message = 0; if (gpr_unref(&bctl->steps_to_complete)) { post_batch_completion(exec_ctx, bctl); } @@ -1076,10 +1076,10 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_channel_get_max_message_length(call->channel)) { cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL, "Max message size exceeded"); - grpc_byte_stream_destroy(call->receiving_stream); + grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); call->receiving_stream = NULL; *call->receiving_buffer = NULL; - call->receiving_message = 0; + call->receiving_message = 0; if (gpr_unref(&bctl->steps_to_complete)) { post_batch_completion(exec_ctx, bctl); } @@ -1367,7 +1367,7 @@ done_with_error: } if (bctl->send_message) { call->sending_message = 0; - grpc_byte_stream_destroy(&call->sending_stream.base); + grpc_byte_stream_destroy(exec_ctx, &call->sending_stream.base); } if (bctl->send_final_op) { call->sent_final_op = 0; diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 97ec23408f..49083f0870 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -104,6 +104,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, int success) { grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, 0); GPR_ASSERT(c->result->transport); + c->result->channel_args = c->args.channel_args; c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *)); c->result->filters[0] = &grpc_http_client_filter; c->result->num_filters = 1; diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 82027af651..81166e8ec5 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -82,8 +82,8 @@ static grpc_plugin g_all_of_the_plugins[MAX_PLUGINS]; static int g_number_of_plugins = 0; void grpc_register_plugin(void (*init)(void), void (*destroy)(void)) { - GRPC_API_TRACE("grpc_register_plugin(init=%lx, destroy=%lx)", 2, - ((unsigned long)init, (unsigned long)destroy)); + GRPC_API_TRACE("grpc_register_plugin(init=%p, destroy=%p)", 2, + ((void*)(gpr_intptr)init, (void*)(gpr_intptr)destroy)); GPR_ASSERT(g_number_of_plugins != MAX_PLUGINS); g_all_of_the_plugins[g_number_of_plugins].init = init; g_all_of_the_plugins[g_number_of_plugins].destroy = destroy; diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 92bd53411d..552a570713 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -49,6 +49,7 @@ #include "src/core/iomgr/tcp_client.h" #include "src/core/security/auth_filters.h" #include "src/core/security/credentials.h" +#include "src/core/security/security_context.h" #include "src/core/surface/api_trace.h" #include "src/core/surface/channel.h" #include "src/core/transport/chttp2_transport.h" @@ -88,9 +89,11 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_security_status status, - grpc_endpoint *secure_endpoint) { + grpc_endpoint *secure_endpoint, + grpc_auth_context *auth_context) { connector *c = arg; grpc_closure *notify; + grpc_channel_args *args_copy = NULL; gpr_mu_lock(&c->mu); if (c->connecting_endpoint == NULL) { memset(c->result, 0, sizeof(*c->result)); @@ -101,12 +104,17 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, c->connecting_endpoint = NULL; gpr_mu_unlock(&c->mu); } else { + grpc_arg auth_context_arg; c->connecting_endpoint = NULL; gpr_mu_unlock(&c->mu); c->result->transport = grpc_create_chttp2_transport( exec_ctx, c->args.channel_args, secure_endpoint, 1); grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, 0); + auth_context_arg = grpc_auth_context_to_arg(auth_context); + args_copy = grpc_channel_args_copy_and_add(c->args.channel_args, + &auth_context_arg, 1); + c->result->channel_args = args_copy; c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2); c->result->filters[0] = &grpc_http_client_filter; c->result->filters[1] = &grpc_client_auth_filter; @@ -114,7 +122,9 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, } notify = c->notify; c->notify = NULL; + /* look at c->args which are connector args. */ notify->cb(exec_ctx, notify->cb_arg, 1); + if (args_copy != NULL) grpc_channel_args_destroy(args_copy); } static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, diff --git a/src/core/transport/byte_stream.c b/src/core/transport/byte_stream.c index 81e8e77ccb..89e20489e7 100644 --- a/src/core/transport/byte_stream.c +++ b/src/core/transport/byte_stream.c @@ -44,8 +44,9 @@ int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, on_complete); } -void grpc_byte_stream_destroy(grpc_byte_stream *byte_stream) { - byte_stream->destroy(byte_stream); +void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream) { + byte_stream->destroy(exec_ctx, byte_stream); } /* slice_buffer_stream */ @@ -61,7 +62,8 @@ static int slice_buffer_stream_next(grpc_exec_ctx *exec_ctx, return 1; } -static void slice_buffer_stream_destroy(grpc_byte_stream *byte_stream) {} +static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream) {} void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream, gpr_slice_buffer *slice_buffer, diff --git a/src/core/transport/byte_stream.h b/src/core/transport/byte_stream.h index c94d8ff275..5f2fe573e8 100644 --- a/src/core/transport/byte_stream.h +++ b/src/core/transport/byte_stream.h @@ -52,7 +52,7 @@ struct grpc_byte_stream { int (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, gpr_slice *slice, size_t max_size_hint, grpc_closure *on_complete); - void (*destroy)(grpc_byte_stream *byte_stream); + void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); }; /* returns 1 if the bytes are available immediately (in which case @@ -72,7 +72,8 @@ int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, gpr_slice *slice, size_t max_size_hint, grpc_closure *on_complete); -void grpc_byte_stream_destroy(grpc_byte_stream *byte_stream); +void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream); /* grpc_byte_stream that wraps a slice buffer */ typedef struct grpc_slice_buffer_stream { diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index 60a3ce23d5..5b16ce6334 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -58,7 +58,7 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx, } while ( (bs = grpc_chttp2_incoming_frame_queue_pop(&parser->incoming_frames))) { - grpc_byte_stream_destroy(bs); + grpc_byte_stream_destroy(exec_ctx, bs); } } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 4561c0bfa9..70f7eed4fe 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -134,7 +134,12 @@ static void connectivity_state_set( static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global); -static void fail_pending_writes(grpc_exec_ctx *exec_ctx, +static void incoming_byte_stream_update_flow_control( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, size_t max_size_hint, + size_t have_already); + +static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_global *stream_global); /* @@ -532,7 +537,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, while ( (bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) { - grpc_byte_stream_destroy(bs); + grpc_byte_stream_destroy(exec_ctx, bs); } GPR_ASSERT(s->global.send_initial_metadata_finished == NULL); @@ -642,7 +647,8 @@ void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing); - while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, &stream_global)) { + while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, + &stream_global)) { fail_pending_writes(exec_ctx, stream_global); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes"); } @@ -867,6 +873,13 @@ static void perform_stream_op_locked( GPR_ASSERT(stream_global->recv_message_ready == NULL); stream_global->recv_message_ready = op->recv_message_ready; stream_global->recv_message = op->recv_message; + if (stream_global->id != 0 && + (stream_global->incoming_frames.head == NULL || + stream_global->incoming_frames.head->is_tail)) { + incoming_byte_stream_update_flow_control( + transport_global, stream_global, transport_global->stream_lookahead, + 0); + } grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } @@ -1021,7 +1034,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, while (stream_global->seen_error && (bs = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames)) != NULL) { - grpc_byte_stream_destroy(bs); + grpc_byte_stream_destroy(exec_ctx, bs); } if (stream_global->incoming_frames.head == NULL) { grpc_chttp2_incoming_metadata_buffer_publish( @@ -1122,7 +1135,7 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, } } -static void fail_pending_writes(grpc_exec_ctx *exec_ctx, +static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_global *stream_global) { grpc_chttp2_complete_closure_step( exec_ctx, &stream_global->send_initial_metadata_finished, 0); @@ -1528,7 +1541,8 @@ static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream *bs) { } } -static void incoming_byte_stream_destroy(grpc_byte_stream *byte_stream) { +static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream) { incoming_byte_stream_unref((grpc_chttp2_incoming_byte_stream *)byte_stream); } @@ -1598,13 +1612,6 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( add_to_queue->tail->next_message = incoming_byte_stream; } add_to_queue->tail = incoming_byte_stream; - if (frame_size == 0) { - lock(TRANSPORT_FROM_PARSING(transport_parsing)); - incoming_byte_stream_update_flow_control( - &TRANSPORT_FROM_PARSING(transport_parsing)->global, - &STREAM_FROM_PARSING(stream_parsing)->global, 0, 0); - unlock(exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing)); - } return incoming_byte_stream; } |