diff options
Diffstat (limited to 'src')
36 files changed, 1428 insertions, 330 deletions
diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c index 83b7682848..43ef5fb3ff 100644 --- a/src/core/channel/census_filter.c +++ b/src/core/channel/census_filter.c @@ -197,7 +197,7 @@ static void destroy_channel_elem(grpc_channel_element* elem) { channel_data* chand = elem->channel_data; GPR_ASSERT(chand != NULL); if (chand->path_str != NULL) { - grpc_mdstr_unref(chand->path_str); + GRPC_MDSTR_UNREF(chand->path_str); } } diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 871e970eb8..f890f99237 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -427,7 +427,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { GRPC_RESOLVER_REF(resolver, "channel-next"); gpr_mu_unlock(&chand->mu_config); GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); - grpc_resolver_next(chand->resolver, &chand->incoming_configuration, + grpc_resolver_next(resolver, &chand->incoming_configuration, &chand->on_config_changed); GRPC_RESOLVER_UNREF(resolver, "channel-next"); } else { diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 581eb13f58..63e4912397 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -108,13 +108,13 @@ static void hc_mutate_op(grpc_call_element *elem, /* Send : prefixed headers, which have to be before any application layer headers. */ grpc_metadata_batch_add_head(&op->data.metadata, &calld->method, - grpc_mdelem_ref(channeld->method)); + GRPC_MDELEM_REF(channeld->method)); grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme, - grpc_mdelem_ref(channeld->scheme)); + GRPC_MDELEM_REF(channeld->scheme)); grpc_metadata_batch_add_tail(&op->data.metadata, &calld->te_trailers, - grpc_mdelem_ref(channeld->te_trailers)); + GRPC_MDELEM_REF(channeld->te_trailers)); grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type, - grpc_mdelem_ref(channeld->content_type)); + GRPC_MDELEM_REF(channeld->content_type)); break; } } @@ -196,11 +196,11 @@ static void destroy_channel_elem(grpc_channel_element *elem) { /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; - grpc_mdelem_unref(channeld->te_trailers); - grpc_mdelem_unref(channeld->method); - grpc_mdelem_unref(channeld->scheme); - grpc_mdelem_unref(channeld->content_type); - grpc_mdelem_unref(channeld->status); + GRPC_MDELEM_UNREF(channeld->te_trailers); + GRPC_MDELEM_UNREF(channeld->method); + GRPC_MDELEM_UNREF(channeld->scheme); + GRPC_MDELEM_UNREF(channeld->content_type); + GRPC_MDELEM_UNREF(channeld->status); } const grpc_channel_filter grpc_http_client_filter = { diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index db0bf590c6..a6cbb5a7f4 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -129,9 +129,9 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { /* translate host to :authority since :authority may be omitted */ grpc_mdelem *authority = grpc_mdelem_from_metadata_strings( - channeld->mdctx, grpc_mdstr_ref(channeld->authority_key), - grpc_mdstr_ref(md->value)); - grpc_mdelem_unref(md); + channeld->mdctx, GRPC_MDSTR_REF(channeld->authority_key), + GRPC_MDSTR_REF(md->value)); + GRPC_MDELEM_UNREF(md); return authority; } else { return md; @@ -193,7 +193,7 @@ static void hs_mutate_op(grpc_call_element *elem, if (op->type != GRPC_OP_METADATA) continue; calld->sent_status = 1; grpc_metadata_batch_add_head(&op->data.metadata, &calld->status, - grpc_mdelem_ref(channeld->status_ok)); + GRPC_MDELEM_REF(channeld->status_ok)); break; } } @@ -264,17 +264,17 @@ static void destroy_channel_elem(grpc_channel_element *elem) { /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; - grpc_mdelem_unref(channeld->te_trailers); - grpc_mdelem_unref(channeld->status_ok); - grpc_mdelem_unref(channeld->status_not_found); - grpc_mdelem_unref(channeld->method_post); - grpc_mdelem_unref(channeld->http_scheme); - grpc_mdelem_unref(channeld->https_scheme); - grpc_mdelem_unref(channeld->grpc_scheme); - grpc_mdelem_unref(channeld->content_type); - grpc_mdstr_unref(channeld->path_key); - grpc_mdstr_unref(channeld->authority_key); - grpc_mdstr_unref(channeld->host_key); + GRPC_MDELEM_UNREF(channeld->te_trailers); + GRPC_MDELEM_UNREF(channeld->status_ok); + GRPC_MDELEM_UNREF(channeld->status_not_found); + GRPC_MDELEM_UNREF(channeld->method_post); + GRPC_MDELEM_UNREF(channeld->http_scheme); + GRPC_MDELEM_UNREF(channeld->https_scheme); + GRPC_MDELEM_UNREF(channeld->grpc_scheme); + GRPC_MDELEM_UNREF(channeld->content_type); + GRPC_MDSTR_UNREF(channeld->path_key); + GRPC_MDSTR_UNREF(channeld->authority_key); + GRPC_MDSTR_UNREF(channeld->host_key); } const grpc_channel_filter grpc_http_server_filter = { diff --git a/src/core/httpcli/httpcli.h b/src/core/httpcli/httpcli.h index 06699e88c2..ab98178f8a 100644 --- a/src/core/httpcli/httpcli.h +++ b/src/core/httpcli/httpcli.h @@ -85,7 +85,7 @@ typedef struct grpc_httpcli_response { char *body; } grpc_httpcli_response; -/* Callback for grpc_httpcli_get */ +/* Callback for grpc_httpcli_get and grpc_httpcli_post. */ typedef void (*grpc_httpcli_response_cb)(void *user_data, const grpc_httpcli_response *response); @@ -100,8 +100,6 @@ void grpc_httpcli_context_destroy(grpc_httpcli_context *context); 'request' contains request parameters - these are caller owned and can be destroyed once the call returns 'deadline' contains a deadline for the request (or gpr_inf_future) - 'em' points to a caller owned event manager that must be alive for the - lifetime of the request 'on_response' is a callback to report results to (and 'user_data' is a user supplied pointer to pass to said call) */ void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset, diff --git a/src/core/json/json.h b/src/core/json/json.h index b78b42a5b2..cac18ad885 100644 --- a/src/core/json/json.h +++ b/src/core/json/json.h @@ -53,7 +53,7 @@ typedef struct grpc_json { } grpc_json; /* The next two functions are going to parse the input string, and - * destroy it in the process, in order to use its space to store + * modify it in the process, in order to use its space to store * all of the keys and values for the returned object tree. * * They assume UTF-8 input stream, and will output UTF-8 encoded diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c index 632a742250..9e49a807f1 100644 --- a/src/core/security/client_auth_filter.c +++ b/src/core/security/client_auth_filter.c @@ -234,11 +234,11 @@ static void auth_start_transport_op(grpc_call_element *elem, /* Pointer comparison is OK for md_elems created from the same context. */ if (md->key == chand->authority_string) { - if (calld->host != NULL) grpc_mdstr_unref(calld->host); - calld->host = grpc_mdstr_ref(md->value); + if (calld->host != NULL) GRPC_MDSTR_UNREF(calld->host); + calld->host = GRPC_MDSTR_REF(md->value); } else if (md->key == chand->path_string) { - if (calld->method != NULL) grpc_mdstr_unref(calld->method); - calld->method = grpc_mdstr_ref(md->value); + if (calld->method != NULL) GRPC_MDSTR_UNREF(calld->method); + calld->method = GRPC_MDSTR_REF(md->value); } } if (calld->host != NULL) { @@ -288,10 +288,10 @@ static void destroy_call_elem(grpc_call_element *elem) { call_data *calld = elem->call_data; grpc_credentials_unref(calld->creds); if (calld->host != NULL) { - grpc_mdstr_unref(calld->host); + GRPC_MDSTR_UNREF(calld->host); } if (calld->method != NULL) { - grpc_mdstr_unref(calld->method); + GRPC_MDSTR_UNREF(calld->method); } } @@ -330,16 +330,16 @@ static void destroy_channel_elem(grpc_channel_element *elem) { if (ctx != NULL) GRPC_SECURITY_CONNECTOR_UNREF(&ctx->base, "client_auth_filter"); if (chand->authority_string != NULL) { - grpc_mdstr_unref(chand->authority_string); + GRPC_MDSTR_UNREF(chand->authority_string); } if (chand->error_msg_key != NULL) { - grpc_mdstr_unref(chand->error_msg_key); + GRPC_MDSTR_UNREF(chand->error_msg_key); } if (chand->status_key != NULL) { - grpc_mdstr_unref(chand->status_key); + GRPC_MDSTR_UNREF(chand->status_key); } if (chand->path_string != NULL) { - grpc_mdstr_unref(chand->path_string); + GRPC_MDSTR_UNREF(chand->path_string); } } diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index e79e9ce351..8d694c2f79 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -461,7 +461,6 @@ typedef struct { grpc_credentials_md_store *access_token_md; gpr_timespec token_expiration; grpc_httpcli_context httpcli_context; - grpc_pollset_set pollset_set; grpc_fetch_oauth2_func fetch_func; } grpc_oauth2_token_fetcher_credentials; @@ -635,7 +634,7 @@ static void init_oauth2_token_fetcher(grpc_oauth2_token_fetcher_credentials *c, gpr_mu_init(&c->mu); c->token_expiration = gpr_inf_past; c->fetch_func = fetch_func; - grpc_pollset_set_init(&c->pollset_set); + grpc_httpcli_context_init(&c->httpcli_context); } /* -- ComputeEngine credentials. -- */ @@ -876,6 +875,59 @@ grpc_credentials *grpc_fake_oauth2_credentials_create( return &c->base; } +/* -- Oauth2 Access Token credentials. -- */ + +typedef struct { + grpc_credentials base; + grpc_credentials_md_store *access_token_md; +} grpc_access_token_credentials; + +static void access_token_destroy(grpc_credentials *creds) { + grpc_access_token_credentials *c = (grpc_access_token_credentials *)creds; + grpc_credentials_md_store_unref(c->access_token_md); + gpr_free(c); +} + +static int access_token_has_request_metadata(const grpc_credentials *creds) { + return 1; +} + +static int access_token_has_request_metadata_only( + const grpc_credentials *creds) { + return 1; +} + +static void access_token_get_request_metadata(grpc_credentials *creds, + grpc_pollset *pollset, + const char *service_url, + grpc_credentials_metadata_cb cb, + void *user_data) { + grpc_access_token_credentials *c = (grpc_access_token_credentials *)creds; + cb(user_data, c->access_token_md->entries, 1, GRPC_CREDENTIALS_OK); +} + +static grpc_credentials_vtable access_token_vtable = { + access_token_destroy, access_token_has_request_metadata, + access_token_has_request_metadata_only, access_token_get_request_metadata, + NULL}; + +grpc_credentials *grpc_access_token_credentials_create( + const char *access_token) { + grpc_access_token_credentials *c = + gpr_malloc(sizeof(grpc_access_token_credentials)); + char *token_md_value; + memset(c, 0, sizeof(grpc_access_token_credentials)); + c->base.type = GRPC_CREDENTIALS_TYPE_OAUTH2; + c->base.vtable = &access_token_vtable; + gpr_ref_init(&c->base.refcount, 1); + c->access_token_md = grpc_credentials_md_store_create(1); + gpr_asprintf(&token_md_value, "Bearer %s", access_token); + grpc_credentials_md_store_add_cstrings( + c->access_token_md, GRPC_AUTHORIZATION_METADATA_KEY, token_md_value); + gpr_free(token_md_value); + return &c->base; +} + /* -- Fake transport security credentials. -- */ static void fake_transport_security_credentials_destroy( diff --git a/src/core/security/jwt_verifier.c b/src/core/security/jwt_verifier.c new file mode 100644 index 0000000000..01007a1a84 --- /dev/null +++ b/src/core/security/jwt_verifier.c @@ -0,0 +1,830 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimser. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimser + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/security/jwt_verifier.h" + +#include <string.h> + +#include "src/core/httpcli/httpcli.h" +#include "src/core/security/base64.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/sync.h> +#include <openssl/pem.h> + +/* --- Utils. --- */ + +const char *grpc_jwt_verifier_status_to_string( + grpc_jwt_verifier_status status) { + switch (status) { + case GRPC_JWT_VERIFIER_OK: + return "OK"; + case GRPC_JWT_VERIFIER_BAD_SIGNATURE: + return "BAD_SIGNATURE"; + case GRPC_JWT_VERIFIER_BAD_FORMAT: + return "BAD_FORMAT"; + case GRPC_JWT_VERIFIER_BAD_AUDIENCE: + return "BAD_AUDIENCE"; + case GRPC_JWT_VERIFIER_KEY_RETRIEVAL_ERROR: + return "KEY_RETRIEVAL_ERROR"; + case GRPC_JWT_VERIFIER_TIME_CONSTRAINT_FAILURE: + return "TIME_CONSTRAINT_FAILURE"; + case GRPC_JWT_VERIFIER_GENERIC_ERROR: + return "GENERIC_ERROR"; + default: + return "UNKNOWN"; + } +} + +static const EVP_MD *evp_md_from_alg(const char *alg) { + if (strcmp(alg, "RS256") == 0) { + return EVP_sha256(); + } else if (strcmp(alg, "RS384") == 0) { + return EVP_sha384(); + } else if (strcmp(alg, "RS512") == 0) { + return EVP_sha512(); + } else { + return NULL; + } +} + +static grpc_json *parse_json_part_from_jwt(const char *str, size_t len, + gpr_slice *buffer) { + grpc_json *json; + + *buffer = grpc_base64_decode_with_len(str, len, 1); + if (GPR_SLICE_IS_EMPTY(*buffer)) { + gpr_log(GPR_ERROR, "Invalid base64."); + return NULL; + } + json = grpc_json_parse_string_with_len((char *)GPR_SLICE_START_PTR(*buffer), + GPR_SLICE_LENGTH(*buffer)); + if (json == NULL) { + gpr_slice_unref(*buffer); + gpr_log(GPR_ERROR, "JSON parsing error."); + } + return json; +} + +static const char *validate_string_field(const grpc_json *json, + const char *key) { + if (json->type != GRPC_JSON_STRING) { + gpr_log(GPR_ERROR, "Invalid %s field [%s]", key, json->value); + return NULL; + } + return json->value; +} + +static gpr_timespec validate_time_field(const grpc_json *json, + const char *key) { + gpr_timespec result = gpr_time_0; + if (json->type != GRPC_JSON_NUMBER) { + gpr_log(GPR_ERROR, "Invalid %s field [%s]", key, json->value); + return result; + } + result.tv_sec = strtol(json->value, NULL, 10); + return result; +} + +/* --- JOSE header. see http://tools.ietf.org/html/rfc7515#section-4 --- */ + +typedef struct { + const char *alg; + const char *kid; + const char *typ; + /* TODO(jboeuf): Add others as needed (jku, jwk, x5u, x5c and so on...). */ + gpr_slice buffer; +} jose_header; + +static void jose_header_destroy(jose_header *h) { + gpr_slice_unref(h->buffer); + gpr_free(h); +} + +/* Takes ownership of json and buffer. */ +static jose_header *jose_header_from_json(grpc_json *json, gpr_slice buffer) { + grpc_json *cur; + jose_header *h = gpr_malloc(sizeof(jose_header)); + memset(h, 0, sizeof(jose_header)); + h->buffer = buffer; + for (cur = json->child; cur != NULL; cur = cur->next) { + if (strcmp(cur->key, "alg") == 0) { + /* We only support RSA-1.5 signatures for now. + Beware of this if we add HMAC support: + https://auth0.com/blog/2015/03/31/critical-vulnerabilities-in-json-web-token-libraries/ + */ + if (cur->type != GRPC_JSON_STRING || strncmp(cur->value, "RS", 2) || + evp_md_from_alg(cur->value) == NULL) { + gpr_log(GPR_ERROR, "Invalid alg field [%s]", cur->value); + goto error; + } + h->alg = cur->value; + } else if (strcmp(cur->key, "typ") == 0) { + h->typ = validate_string_field(cur, "typ"); + if (h->typ == NULL) goto error; + } else if (strcmp(cur->key, "kid") == 0) { + h->kid = validate_string_field(cur, "kid"); + if (h->kid == NULL) goto error; + } + } + if (h->alg == NULL) { + gpr_log(GPR_ERROR, "Missing alg field."); + goto error; + } + grpc_json_destroy(json); + h->buffer = buffer; + return h; + +error: + grpc_json_destroy(json); + jose_header_destroy(h); + return NULL; +} + +/* --- JWT claims. see http://tools.ietf.org/html/rfc7519#section-4.1 */ + +struct grpc_jwt_claims { + /* Well known properties already parsed. */ + const char *sub; + const char *iss; + const char *aud; + const char *jti; + gpr_timespec iat; + gpr_timespec exp; + gpr_timespec nbf; + + grpc_json *json; + gpr_slice buffer; +}; + + +void grpc_jwt_claims_destroy(grpc_jwt_claims *claims) { + grpc_json_destroy(claims->json); + gpr_slice_unref(claims->buffer); + gpr_free(claims); +} + +const grpc_json *grpc_jwt_claims_json(const grpc_jwt_claims *claims) { + if (claims == NULL) return NULL; + return claims->json; +} + +const char *grpc_jwt_claims_subject(const grpc_jwt_claims *claims) { + if (claims == NULL) return NULL; + return claims->sub; +} + +const char *grpc_jwt_claims_issuer(const grpc_jwt_claims *claims) { + if (claims == NULL) return NULL; + return claims->iss; +} + +const char *grpc_jwt_claims_id(const grpc_jwt_claims *claims) { + if (claims == NULL) return NULL; + return claims->jti; +} + +const char *grpc_jwt_claims_audience(const grpc_jwt_claims *claims) { + if (claims == NULL) return NULL; + return claims->aud; +} + +gpr_timespec grpc_jwt_claims_issued_at(const grpc_jwt_claims *claims) { + if (claims == NULL) return gpr_inf_past; + return claims->iat; +} + +gpr_timespec grpc_jwt_claims_expires_at(const grpc_jwt_claims *claims) { + if (claims == NULL) return gpr_inf_future; + return claims->exp; +} + +gpr_timespec grpc_jwt_claims_not_before(const grpc_jwt_claims *claims) { + if (claims == NULL) return gpr_inf_past; + return claims->nbf; +} + +/* Takes ownership of json and buffer even in case of failure. */ +grpc_jwt_claims *grpc_jwt_claims_from_json(grpc_json *json, gpr_slice buffer) { + grpc_json *cur; + grpc_jwt_claims *claims = gpr_malloc(sizeof(grpc_jwt_claims)); + memset(claims, 0, sizeof(grpc_jwt_claims)); + claims->json = json; + claims->buffer = buffer; + claims->iat = gpr_inf_past; + claims->nbf = gpr_inf_past; + claims->exp = gpr_inf_future; + + /* Per the spec, all fields are optional. */ + for (cur = json->child; cur != NULL; cur = cur->next) { + if (strcmp(cur->key, "sub") == 0) { + claims->sub = validate_string_field(cur, "sub"); + if (claims->sub == NULL) goto error; + } else if (strcmp(cur->key, "iss") == 0) { + claims->iss = validate_string_field(cur, "iss"); + if (claims->iss == NULL) goto error; + } else if (strcmp(cur->key, "aud") == 0) { + claims->aud = validate_string_field(cur, "aud"); + if (claims->aud == NULL) goto error; + } else if (strcmp(cur->key, "jti") == 0) { + claims->jti = validate_string_field(cur, "jti"); + if (claims->jti == NULL) goto error; + } else if (strcmp(cur->key, "iat") == 0) { + claims->iat = validate_time_field(cur, "iat"); + if (gpr_time_cmp(claims->iat, gpr_time_0) == 0) goto error; + } else if (strcmp(cur->key, "exp") == 0) { + claims->exp = validate_time_field(cur, "exp"); + if (gpr_time_cmp(claims->exp, gpr_time_0) == 0) goto error; + } else if (strcmp(cur->key, "nbf") == 0) { + claims->nbf = validate_time_field(cur, "nbf"); + if (gpr_time_cmp(claims->nbf, gpr_time_0) == 0) goto error; + } + } + return claims; + +error: + grpc_jwt_claims_destroy(claims); + return NULL; +} + +grpc_jwt_verifier_status grpc_jwt_claims_check(const grpc_jwt_claims *claims, + const char *audience) { + gpr_timespec skewed_now; + int audience_ok; + + GPR_ASSERT(claims != NULL); + + skewed_now = gpr_time_add(gpr_now(), grpc_jwt_verifier_clock_skew); + if (gpr_time_cmp(skewed_now, claims->nbf) < 0) { + gpr_log(GPR_ERROR, "JWT is not valid yet."); + return GRPC_JWT_VERIFIER_TIME_CONSTRAINT_FAILURE; + } + skewed_now = gpr_time_sub(gpr_now(), grpc_jwt_verifier_clock_skew); + if (gpr_time_cmp(skewed_now, claims->exp) > 0) { + gpr_log(GPR_ERROR, "JWT is expired."); + return GRPC_JWT_VERIFIER_TIME_CONSTRAINT_FAILURE; + } + + if (audience == NULL) { + audience_ok = claims->aud == NULL; + } else { + audience_ok = claims->aud != NULL && strcmp(audience, claims->aud) == 0; + } + if (!audience_ok) { + gpr_log(GPR_ERROR, "Audience mismatch: expected %s and found %s.", + audience == NULL ? "NULL" : audience, + claims->aud == NULL ? "NULL" : claims->aud); + return GRPC_JWT_VERIFIER_BAD_AUDIENCE; + } + return GRPC_JWT_VERIFIER_OK; +} + +/* --- verifier_cb_ctx object. --- */ + +typedef struct { + grpc_jwt_verifier *verifier; + grpc_pollset *pollset; + jose_header *header; + grpc_jwt_claims *claims; + char *audience; + gpr_slice signature; + gpr_slice signed_data; + void *user_data; + grpc_jwt_verification_done_cb user_cb; +} verifier_cb_ctx; + +/* Takes ownership of the header, claims and signature. */ +static verifier_cb_ctx *verifier_cb_ctx_create( + grpc_jwt_verifier *verifier, grpc_pollset *pollset, + jose_header * header, grpc_jwt_claims *claims, const char *audience, + gpr_slice signature, const char *signed_jwt, size_t signed_jwt_len, + void *user_data, grpc_jwt_verification_done_cb cb) { + verifier_cb_ctx *ctx = gpr_malloc(sizeof(verifier_cb_ctx)); + memset(ctx, 0, sizeof(verifier_cb_ctx)); + ctx->verifier = verifier; + ctx->pollset = pollset; + ctx->header = header; + ctx->audience = gpr_strdup(audience); + ctx->claims = claims; + ctx->signature = signature; + ctx->signed_data = gpr_slice_from_copied_buffer(signed_jwt, signed_jwt_len); + ctx->user_data = user_data; + ctx->user_cb = cb; + return ctx; +} + +void verifier_cb_ctx_destroy(verifier_cb_ctx *ctx) { + if (ctx->audience != NULL) gpr_free(ctx->audience); + if (ctx->claims != NULL) grpc_jwt_claims_destroy(ctx->claims); + gpr_slice_unref(ctx->signature); + gpr_slice_unref(ctx->signed_data); + jose_header_destroy(ctx->header); + /* TODO: see what to do with claims... */ + gpr_free(ctx); +} + +/* --- grpc_jwt_verifier object. --- */ + +/* Clock skew defaults to one minute. */ +gpr_timespec grpc_jwt_verifier_clock_skew = {60, 0}; + +/* Max delay defaults to one minute. */ +gpr_timespec grpc_jwt_verifier_max_delay = {60, 0}; + +typedef struct { + char *email_domain; + char *key_url_prefix; +} email_key_mapping; + +struct grpc_jwt_verifier { + email_key_mapping *mappings; + size_t num_mappings; /* Should be very few, linear search ok. */ + size_t allocated_mappings; + grpc_httpcli_context http_ctx; +}; + +static grpc_json *json_from_http(const grpc_httpcli_response *response) { + grpc_json *json = NULL; + + if (response == NULL) { + gpr_log(GPR_ERROR, "HTTP response is NULL."); + return NULL; + } + if (response->status != 200) { + gpr_log(GPR_ERROR, "Call to http server failed with error %d.", + response->status); + return NULL; + } + + json = grpc_json_parse_string_with_len(response->body, response->body_length); + if (json == NULL) { + gpr_log(GPR_ERROR, "Invalid JSON found in response."); + } + return json; +} + +static const grpc_json *find_property_by_name(const grpc_json *json, + const char *name) { + const grpc_json *cur; + for (cur = json->child; cur != NULL; cur = cur->next) { + if (strcmp(cur->key, name) == 0) return cur; + } + return NULL; +} + +static EVP_PKEY *extract_pkey_from_x509(const char *x509_str) { + X509 *x509 = NULL; + EVP_PKEY *result = NULL; + BIO *bio = BIO_new(BIO_s_mem()); + BIO_write(bio, x509_str, strlen(x509_str)); + x509 = PEM_read_bio_X509(bio, NULL, NULL, NULL); + if (x509 == NULL) { + gpr_log(GPR_ERROR, "Unable to parse x509 cert."); + goto end; + } + result = X509_get_pubkey(x509); + if (result == NULL) { + gpr_log(GPR_ERROR, "Cannot find public key in X509 cert."); + } + +end: + BIO_free(bio); + if (x509 != NULL) X509_free(x509); + return result; +} + +static BIGNUM *bignum_from_base64(const char *b64) { + BIGNUM *result = NULL; + gpr_slice bin; + + if (b64 == NULL) return NULL; + bin = grpc_base64_decode(b64, 1); + if (GPR_SLICE_IS_EMPTY(bin)) { + gpr_log(GPR_ERROR, "Invalid base64 for big num."); + return NULL; + } + result = BN_bin2bn(GPR_SLICE_START_PTR(bin), GPR_SLICE_LENGTH(bin), NULL); + gpr_slice_unref(bin); + return result; +} + +static EVP_PKEY *pkey_from_jwk(const grpc_json *json, const char *kty) { + const grpc_json *key_prop; + RSA *rsa = NULL; + EVP_PKEY *result = NULL; + + GPR_ASSERT(kty != NULL && json != NULL); + if (strcmp(kty, "RSA") != 0) { + gpr_log(GPR_ERROR, "Unsupported key type %s.", kty); + goto end; + } + rsa = RSA_new(); + if (rsa == NULL) { + gpr_log(GPR_ERROR, "Could not create rsa key."); + goto end; + } + for (key_prop = json->child; key_prop != NULL; key_prop = key_prop->next) { + if (strcmp(key_prop->key, "n") == 0) { + rsa->n = bignum_from_base64(validate_string_field(key_prop, "n")); + if (rsa->n == NULL) goto end; + } else if (strcmp(key_prop->key, "e") == 0) { + rsa->e = bignum_from_base64(validate_string_field(key_prop, "e")); + if (rsa->e == NULL) goto end; + } + } + if (rsa->e == NULL || rsa->n == NULL) { + gpr_log(GPR_ERROR, "Missing RSA public key field."); + goto end; + } + result = EVP_PKEY_new(); + EVP_PKEY_set1_RSA(result, rsa); /* uprefs rsa. */ + +end: + if (rsa != NULL) RSA_free(rsa); + return result; +} + +static EVP_PKEY *find_verification_key(const grpc_json *json, + const char *header_alg, + const char *header_kid) { + const grpc_json *jkey; + const grpc_json *jwk_keys; + /* Try to parse the json as a JWK set: + https://tools.ietf.org/html/rfc7517#section-5. */ + jwk_keys = find_property_by_name(json, "keys"); + if (jwk_keys == NULL) { + /* Use the google proprietary format which is: + { <kid1>: <x5091>, <kid2>: <x5092>, ... } */ + const grpc_json *cur = find_property_by_name(json, header_kid); + if (cur == NULL) return NULL; + return extract_pkey_from_x509(cur->value); + } + + if (jwk_keys->type != GRPC_JSON_ARRAY) { + gpr_log(GPR_ERROR, + "Unexpected value type of keys property in jwks key set."); + return NULL; + } + /* Key format is specified in: + https://tools.ietf.org/html/rfc7518#section-6. */ + for (jkey = jwk_keys->child; jkey != NULL; jkey = jkey->next) { + grpc_json *key_prop; + const char *alg = NULL; + const char *kid = NULL; + const char *kty = NULL; + + if (jkey->type != GRPC_JSON_OBJECT) continue; + for (key_prop = jkey->child; key_prop != NULL; key_prop = key_prop->next) { + if (strcmp(key_prop->key, "alg") == 0 && + key_prop->type == GRPC_JSON_STRING) { + alg = key_prop->value; + } else if (strcmp(key_prop->key, "kid") == 0 && + key_prop->type == GRPC_JSON_STRING) { + kid = key_prop->value; + } else if (strcmp(key_prop->key, "kty") == 0 && + key_prop->type == GRPC_JSON_STRING) { + kty = key_prop->value; + } + } + if (alg != NULL && kid != NULL && kty != NULL && + strcmp(kid, header_kid) == 0 && strcmp(alg, header_alg) == 0) { + return pkey_from_jwk(jkey, kty); + } + } + gpr_log(GPR_ERROR, + "Could not find matching key in key set for kid=%s and alg=%s", + header_kid, header_alg); + return NULL; +} + +static int verify_jwt_signature(EVP_PKEY *key, const char *alg, + gpr_slice signature, gpr_slice signed_data) { + EVP_MD_CTX *md_ctx = EVP_MD_CTX_create(); + const EVP_MD *md = evp_md_from_alg(alg); + int result = 0; + + GPR_ASSERT(md != NULL); /* Checked before. */ + if (md_ctx == NULL) { + gpr_log(GPR_ERROR, "Could not create EVP_MD_CTX."); + goto end; + } + if (EVP_DigestVerifyInit(md_ctx, NULL, md, NULL, key) != 1) { + gpr_log(GPR_ERROR, "EVP_DigestVerifyInit failed."); + goto end; + } + if (EVP_DigestVerifyUpdate(md_ctx, GPR_SLICE_START_PTR(signed_data), + GPR_SLICE_LENGTH(signed_data)) != 1) { + gpr_log(GPR_ERROR, "EVP_DigestVerifyUpdate failed."); + goto end; + } + if (EVP_DigestVerifyFinal(md_ctx, GPR_SLICE_START_PTR(signature), + GPR_SLICE_LENGTH(signature)) != 1) { + gpr_log(GPR_ERROR, "JWT signature verification failed."); + goto end; + } + result = 1; + +end: + if (md_ctx != NULL) EVP_MD_CTX_destroy(md_ctx); + return result; +} + +static void on_keys_retrieved(void *user_data, + const grpc_httpcli_response *response) { + grpc_json *json = json_from_http(response); + verifier_cb_ctx *ctx = (verifier_cb_ctx *)user_data; + EVP_PKEY *verification_key = NULL; + grpc_jwt_verifier_status status = GRPC_JWT_VERIFIER_GENERIC_ERROR; + grpc_jwt_claims *claims = NULL; + + if (json == NULL) { + status = GRPC_JWT_VERIFIER_KEY_RETRIEVAL_ERROR; + goto end; + } + verification_key = + find_verification_key(json, ctx->header->alg, ctx->header->kid); + if (verification_key == NULL) { + gpr_log(GPR_ERROR, "Could not find verification key with kid %s.", + ctx->header->kid); + status = GRPC_JWT_VERIFIER_KEY_RETRIEVAL_ERROR; + goto end; + } + + if (!verify_jwt_signature(verification_key, ctx->header->alg, ctx->signature, + ctx->signed_data)) { + status = GRPC_JWT_VERIFIER_BAD_SIGNATURE; + goto end; + } + + status = grpc_jwt_claims_check(ctx->claims, ctx->audience); + if (status == GRPC_JWT_VERIFIER_OK) { + /* Pass ownership. */ + claims = ctx->claims; + ctx->claims = NULL; + } + +end: + if (json != NULL) grpc_json_destroy(json); + if (verification_key != NULL) EVP_PKEY_free(verification_key); + ctx->user_cb(ctx->user_data, status, claims); + verifier_cb_ctx_destroy(ctx); +} + +static void on_openid_config_retrieved(void *user_data, + const grpc_httpcli_response *response) { + const grpc_json* cur; + grpc_json *json = json_from_http(response); + verifier_cb_ctx *ctx = (verifier_cb_ctx *)user_data; + grpc_httpcli_request req; + const char *jwks_uri; + + /* TODO(jboeuf): Cache the jwks_uri in order to avoid this hop next time.*/ + if (json == NULL) goto error; + cur = find_property_by_name(json, "jwks_uri"); + if (cur == NULL) { + gpr_log(GPR_ERROR, "Could not find jwks_uri in openid config."); + goto error; + } + jwks_uri = validate_string_field(cur, "jwks_uri"); + if (jwks_uri == NULL) goto error; + if (strstr(jwks_uri, "https://") != jwks_uri) { + gpr_log(GPR_ERROR, "Invalid non https jwks_uri: %s.", jwks_uri); + goto error; + } + jwks_uri += 8; + req.use_ssl = 1; + req.host = gpr_strdup(jwks_uri); + req.path = strchr(jwks_uri, '/'); + if (req.path == NULL) { + req.path = ""; + } else { + *(req.host + (req.path - jwks_uri)) = '\0'; + } + grpc_httpcli_get(&ctx->verifier->http_ctx, ctx->pollset, &req, + gpr_time_add(gpr_now(), grpc_jwt_verifier_max_delay), + on_keys_retrieved, ctx); + grpc_json_destroy(json); + gpr_free(req.host); + return; + +error: + if (json != NULL) grpc_json_destroy(json); + ctx->user_cb(ctx->user_data, GRPC_JWT_VERIFIER_KEY_RETRIEVAL_ERROR, NULL); + verifier_cb_ctx_destroy(ctx); +} + +static email_key_mapping *verifier_get_mapping( + grpc_jwt_verifier *v, const char *email_domain) { + size_t i; + if (v->mappings == NULL) return NULL; + for (i = 0; i < v->num_mappings; i++) { + if (strcmp(email_domain, v->mappings[i].email_domain) == 0) { + return &v->mappings[i]; + } + } + return NULL; +} + +static void verifier_put_mapping(grpc_jwt_verifier *v, const char *email_domain, + const char *key_url_prefix) { + email_key_mapping *mapping = verifier_get_mapping(v, email_domain); + GPR_ASSERT(v->num_mappings < v->allocated_mappings); + if (mapping != NULL) { + gpr_free(mapping->key_url_prefix); + mapping->key_url_prefix = gpr_strdup(key_url_prefix); + return; + } + v->mappings[v->num_mappings].email_domain = gpr_strdup(email_domain); + v->mappings[v->num_mappings].key_url_prefix = gpr_strdup(key_url_prefix); + v->num_mappings++; + GPR_ASSERT(v->num_mappings <= v->allocated_mappings); +} + +/* Takes ownership of ctx. */ +static void retrieve_key_and_verify(verifier_cb_ctx *ctx) { + const char *at_sign; + grpc_httpcli_response_cb http_cb; + char *path_prefix = NULL; + const char *iss; + grpc_httpcli_request req; + memset(&req, 0, sizeof(grpc_httpcli_request)); + req.use_ssl = 1; + + GPR_ASSERT(ctx != NULL && ctx->header != NULL && ctx->claims != NULL); + iss = ctx->claims->iss; + if (ctx->header->kid == NULL) { + gpr_log(GPR_ERROR, "Missing kid in jose header."); + goto error; + } + if (iss == NULL) { + gpr_log(GPR_ERROR, "Missing iss in claims."); + goto error; + } + + /* This code relies on: + https://openid.net/specs/openid-connect-discovery-1_0.html + Nobody seems to implement the account/email/webfinger part 2. of the spec + so we will rely instead on email/url mappings if we detect such an issuer. + Part 4, on the other hand is implemented by both google and salesforce. */ + + /* Very non-sophisticated way to detect an email address. Should be good + enough for now... */ + at_sign = strchr(iss, '@'); + if (at_sign != NULL) { + email_key_mapping *mapping; + const char *email_domain = at_sign + 1; + GPR_ASSERT(ctx->verifier != NULL); + mapping = verifier_get_mapping(ctx->verifier, email_domain); + if (mapping == NULL) { + gpr_log(GPR_ERROR, "Missing mapping for issuer email."); + goto error; + } + req.host = gpr_strdup(mapping->key_url_prefix); + path_prefix = strchr(req.host, '/'); + if (path_prefix == NULL) { + gpr_asprintf(&req.path, "/%s", iss); + } else { + *(path_prefix++) = '\0'; + gpr_asprintf(&req.path, "/%s/%s", path_prefix, iss); + } + http_cb = on_keys_retrieved; + } else { + req.host = gpr_strdup(strstr(iss, "https://") == iss ? iss + 8 : iss); + path_prefix = strchr(req.host, '/'); + if (path_prefix == NULL) { + req.path = gpr_strdup(GRPC_OPENID_CONFIG_URL_SUFFIX); + } else { + *(path_prefix++) = 0; + gpr_asprintf(&req.path, "/%s%s", path_prefix, + GRPC_OPENID_CONFIG_URL_SUFFIX); + } + http_cb = on_openid_config_retrieved; + } + + grpc_httpcli_get(&ctx->verifier->http_ctx, ctx->pollset, &req, + gpr_time_add(gpr_now(), grpc_jwt_verifier_max_delay), + http_cb, ctx); + gpr_free(req.host); + gpr_free(req.path); + return; + +error: + ctx->user_cb(ctx->user_data, GRPC_JWT_VERIFIER_KEY_RETRIEVAL_ERROR, NULL); + verifier_cb_ctx_destroy(ctx); +} + +void grpc_jwt_verifier_verify(grpc_jwt_verifier *verifier, + grpc_pollset *pollset, const char *jwt, + const char *audience, + grpc_jwt_verification_done_cb cb, + void *user_data) { + const char *dot = NULL; + grpc_json *json; + jose_header *header = NULL; + grpc_jwt_claims *claims = NULL; + gpr_slice header_buffer; + gpr_slice claims_buffer; + gpr_slice signature; + size_t signed_jwt_len; + const char *cur = jwt; + + GPR_ASSERT(verifier != NULL && jwt != NULL && audience != NULL && cb != NULL); + dot = strchr(cur, '.'); + if (dot == NULL) goto error; + json = parse_json_part_from_jwt(cur, dot - cur, &header_buffer); + if (json == NULL) goto error; + header = jose_header_from_json(json, header_buffer); + if (header == NULL) goto error; + + cur = dot + 1; + dot = strchr(cur, '.'); + if (dot == NULL) goto error; + json = parse_json_part_from_jwt(cur, dot - cur, &claims_buffer); + if (json == NULL) goto error; + claims = grpc_jwt_claims_from_json(json, claims_buffer); + if (claims == NULL) goto error; + + signed_jwt_len = (size_t)(dot - jwt); + cur = dot + 1; + signature = grpc_base64_decode(cur, 1); + if (GPR_SLICE_IS_EMPTY(signature)) goto error; + retrieve_key_and_verify( + verifier_cb_ctx_create(verifier, pollset, header, claims, audience, + signature, jwt, signed_jwt_len, user_data, cb)); + return; + +error: + if (header != NULL) jose_header_destroy(header); + if (claims != NULL) grpc_jwt_claims_destroy(claims); + cb(user_data, GRPC_JWT_VERIFIER_BAD_FORMAT, NULL); +} + +grpc_jwt_verifier *grpc_jwt_verifier_create( + const grpc_jwt_verifier_email_domain_key_url_mapping *mappings, + size_t num_mappings) { + grpc_jwt_verifier *v = gpr_malloc(sizeof(grpc_jwt_verifier)); + memset(v, 0, sizeof(grpc_jwt_verifier)); + grpc_httpcli_context_init(&v->http_ctx); + + /* We know at least of one mapping. */ + v->allocated_mappings = 1 + num_mappings; + v->mappings = gpr_malloc(v->allocated_mappings * sizeof(email_key_mapping)); + verifier_put_mapping(v, GRPC_GOOGLE_SERVICE_ACCOUNTS_EMAIL_DOMAIN, + GRPC_GOOGLE_SERVICE_ACCOUNTS_KEY_URL_PREFIX); + /* User-Provided mappings. */ + if (mappings != NULL) { + size_t i; + for (i = 0; i < num_mappings; i++) { + verifier_put_mapping(v, mappings[i].email_domain, + mappings[i].key_url_prefix); + } + } + return v; +} + +void grpc_jwt_verifier_destroy(grpc_jwt_verifier *v) { + size_t i; + if (v == NULL) return; + grpc_httpcli_context_destroy(&v->http_ctx); + if (v->mappings != NULL) { + for (i = 0; i < v->num_mappings; i++) { + gpr_free(v->mappings[i].email_domain); + gpr_free(v->mappings[i].key_url_prefix); + } + gpr_free(v->mappings); + } + gpr_free(v); +} + diff --git a/src/core/security/jwt_verifier.h b/src/core/security/jwt_verifier.h new file mode 100644 index 0000000000..8077e24883 --- /dev/null +++ b/src/core/security/jwt_verifier.h @@ -0,0 +1,136 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimser. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimser + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_SECURITY_JWT_VERIFIER_H +#define GRPC_INTERNAL_CORE_SECURITY_JWT_VERIFIER_H + +#include "src/core/iomgr/pollset.h" +#include "src/core/json/json.h" + +#include <grpc/support/slice.h> +#include <grpc/support/time.h> + +/* --- Constants. --- */ + +#define GRPC_OPENID_CONFIG_URL_SUFFIX "/.well-known/openid-configuration" +#define GRPC_GOOGLE_SERVICE_ACCOUNTS_EMAIL_DOMAIN \ + "developer.gserviceaccount.com" +#define GRPC_GOOGLE_SERVICE_ACCOUNTS_KEY_URL_PREFIX \ + "www.googleapis.com/robot/v1/metadata/x509" + +/* --- grpc_jwt_verifier_status. --- */ + +typedef enum { + GRPC_JWT_VERIFIER_OK = 0, + GRPC_JWT_VERIFIER_BAD_SIGNATURE, + GRPC_JWT_VERIFIER_BAD_FORMAT, + GRPC_JWT_VERIFIER_BAD_AUDIENCE, + GRPC_JWT_VERIFIER_KEY_RETRIEVAL_ERROR, + GRPC_JWT_VERIFIER_TIME_CONSTRAINT_FAILURE, + GRPC_JWT_VERIFIER_GENERIC_ERROR +} grpc_jwt_verifier_status; + +const char *grpc_jwt_verifier_status_to_string(grpc_jwt_verifier_status status); + +/* --- grpc_jwt_claims. --- */ + +typedef struct grpc_jwt_claims grpc_jwt_claims; + +void grpc_jwt_claims_destroy(grpc_jwt_claims *claims); + +/* Returns the whole JSON tree of the claims. */ +const grpc_json *grpc_jwt_claims_json(const grpc_jwt_claims *claims); + +/* Access to registered claims in https://tools.ietf.org/html/rfc7519#page-9 */ +const char *grpc_jwt_claims_subject(const grpc_jwt_claims *claims); +const char *grpc_jwt_claims_issuer(const grpc_jwt_claims *claims); +const char *grpc_jwt_claims_id(const grpc_jwt_claims *claims); +const char *grpc_jwt_claims_audience(const grpc_jwt_claims *claims); +gpr_timespec grpc_jwt_claims_issued_at(const grpc_jwt_claims *claims); +gpr_timespec grpc_jwt_claims_expires_at(const grpc_jwt_claims *claims); +gpr_timespec grpc_jwt_claims_not_before(const grpc_jwt_claims *claims); + +/* --- grpc_jwt_verifier. --- */ + +typedef struct grpc_jwt_verifier grpc_jwt_verifier; + +typedef struct { + /* The email domain is the part after the @ sign. */ + const char *email_domain; + + /* The key url prefix will be used to get the public key from the issuer: + https://<key_url_prefix>/<issuer_email> + Therefore the key_url_prefix must NOT contain https://. */ + const char *key_url_prefix; +} grpc_jwt_verifier_email_domain_key_url_mapping; + +/* Globals to control the verifier. Not thread-safe. */ +extern gpr_timespec grpc_jwt_verifier_clock_skew; +extern gpr_timespec grpc_jwt_verifier_max_delay; + +/* The verifier can be created with some custom mappings to help with key + discovery in the case where the issuer is an email address. + mappings can be NULL in which case num_mappings MUST be 0. + A verifier object has one built-in mapping (unless overridden): + GRPC_GOOGLE_SERVICE_ACCOUNTS_EMAIL_DOMAIN -> + GRPC_GOOGLE_SERVICE_ACCOUNTS_KEY_URL_PREFIX.*/ +grpc_jwt_verifier *grpc_jwt_verifier_create( + const grpc_jwt_verifier_email_domain_key_url_mapping *mappings, + size_t num_mappings); + +/*The verifier must not be destroyed if there are still outstanding callbacks.*/ +void grpc_jwt_verifier_destroy(grpc_jwt_verifier *verifier); + +/* User provided callback that will be called when the verification of the JWT + is done (maybe in another thread). + It is the responsibility of the callee to call grpc_jwt_claims_destroy on + the claims. */ +typedef void (*grpc_jwt_verification_done_cb)(void *user_data, + grpc_jwt_verifier_status status, + grpc_jwt_claims *claims); + +/* Verifies for the JWT for the given expected audience. */ +void grpc_jwt_verifier_verify(grpc_jwt_verifier *verifier, + grpc_pollset *pollset, const char *jwt, + const char *audience, + grpc_jwt_verification_done_cb cb, + void *user_data); + +/* --- TESTING ONLY exposed functions. --- */ + +grpc_jwt_claims *grpc_jwt_claims_from_json(grpc_json *json, gpr_slice buffer); +grpc_jwt_verifier_status grpc_jwt_claims_check(const grpc_jwt_claims *claims, + const char *audience); + +#endif /* GRPC_INTERNAL_CORE_SECURITY_JWT_VERIFIER_H */ + diff --git a/src/core/surface/call.c b/src/core/surface/call.c index a28a542c8d..fb3b0b1918 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -367,18 +367,18 @@ static void destroy_call(void *call, int ignored_success) { gpr_mu_destroy(&c->mu); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (c->status[i].details) { - grpc_mdstr_unref(c->status[i].details); + GRPC_MDSTR_UNREF(c->status[i].details); } } for (i = 0; i < c->owned_metadata_count; i++) { - grpc_mdelem_unref(c->owned_metadata[i]); + GRPC_MDELEM_UNREF(c->owned_metadata[i]); } gpr_free(c->owned_metadata); for (i = 0; i < GPR_ARRAY_SIZE(c->buffered_metadata); i++) { gpr_free(c->buffered_metadata[i].metadata); } for (i = 0; i < c->send_initial_metadata_count; i++) { - grpc_mdelem_unref(c->send_initial_metadata[i].md); + GRPC_MDELEM_UNREF(c->send_initial_metadata[i].md); } for (i = 0; i < GRPC_CONTEXT_COUNT; i++) { if (c->context[i].destroy) { @@ -435,7 +435,7 @@ static void set_decode_compression_level(grpc_call *call, static void set_status_details(grpc_call *call, status_source source, grpc_mdstr *status) { if (call->status[source].details != NULL) { - grpc_mdstr_unref(call->status[source].details); + GRPC_MDSTR_UNREF(call->status[source].details); } call->status[source].details = status; } @@ -614,7 +614,7 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, case GRPC_IOREQ_SEND_STATUS: if (call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details != NULL) { - grpc_mdstr_unref( + GRPC_MDSTR_UNREF( call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details); call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details = NULL; @@ -943,7 +943,7 @@ static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op) { &mdb, &call->details_link, grpc_mdelem_from_metadata_strings( call->metadata_context, - grpc_mdstr_ref( + GRPC_MDSTR_REF( grpc_channel_get_message_string(call->channel)), data.send_status.details)); call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details = @@ -1051,7 +1051,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, reqs[i].data.send_status.code); if (reqs[i].data.send_status.details) { set_status_details(call, STATUS_FROM_SERVER_STATUS, - grpc_mdstr_ref(reqs[i].data.send_status.details)); + GRPC_MDSTR_REF(reqs[i].data.send_status.details)); } } have_ops |= 1u << op; @@ -1255,7 +1255,7 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { if (key == grpc_channel_get_status_string(call->channel)) { set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); } else if (key == grpc_channel_get_message_string(call->channel)) { - set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); + set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(md->value)); } else if (key == grpc_channel_get_compresssion_level_string(call->channel)) { set_decode_compression_level(call, decode_compression(md)); @@ -1291,10 +1291,10 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { grpc_mdctx_lock(mdctx); for (l = md->list.head; l; l = l->next) { - if (l->md) grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + if (l->md) GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md); } for (l = md->garbage.head; l; l = l->next) { - grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md); } grpc_mdctx_unlock(mdctx); } diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index f8151c121c..eeae3b507c 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -104,7 +104,7 @@ grpc_channel *grpc_channel_create_from_filters( char buf[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(i, buf); channel->grpc_status_elem[i] = grpc_mdelem_from_metadata_strings( - mdctx, grpc_mdstr_ref(channel->grpc_status_string), + mdctx, GRPC_MDSTR_REF(channel->grpc_status_string), grpc_mdstr_from_string(mdctx, buf)); } channel->path_string = grpc_mdstr_from_string(mdctx, ":path"); @@ -157,10 +157,10 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, return grpc_channel_create_call_internal( channel, cq, grpc_mdelem_from_metadata_strings( - channel->metadata_context, grpc_mdstr_ref(channel->path_string), + channel->metadata_context, GRPC_MDSTR_REF(channel->path_string), grpc_mdstr_from_string(channel->metadata_context, method)), grpc_mdelem_from_metadata_strings( - channel->metadata_context, grpc_mdstr_ref(channel->authority_string), + channel->metadata_context, GRPC_MDSTR_REF(channel->authority_string), grpc_mdstr_from_string(channel->metadata_context, host)), deadline); } @@ -169,10 +169,10 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method, const char *host) { registered_call *rc = gpr_malloc(sizeof(registered_call)); rc->path = grpc_mdelem_from_metadata_strings( - channel->metadata_context, grpc_mdstr_ref(channel->path_string), + channel->metadata_context, GRPC_MDSTR_REF(channel->path_string), grpc_mdstr_from_string(channel->metadata_context, method)); rc->authority = grpc_mdelem_from_metadata_strings( - channel->metadata_context, grpc_mdstr_ref(channel->authority_string), + channel->metadata_context, GRPC_MDSTR_REF(channel->authority_string), grpc_mdstr_from_string(channel->metadata_context, host)); gpr_mu_lock(&channel->registered_call_mu); rc->next = channel->registered_calls; @@ -186,8 +186,8 @@ grpc_call *grpc_channel_create_registered_call( void *registered_call_handle, gpr_timespec deadline) { registered_call *rc = registered_call_handle; return grpc_channel_create_call_internal( - channel, completion_queue, grpc_mdelem_ref(rc->path), - grpc_mdelem_ref(rc->authority), deadline); + channel, completion_queue, GRPC_MDELEM_REF(rc->path), + GRPC_MDELEM_REF(rc->authority), deadline); } #ifdef GRPC_CHANNEL_REF_COUNT_DEBUG @@ -205,18 +205,18 @@ static void destroy_channel(void *p, int ok) { size_t i; grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel)); for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { - grpc_mdelem_unref(channel->grpc_status_elem[i]); + GRPC_MDELEM_UNREF(channel->grpc_status_elem[i]); } - grpc_mdstr_unref(channel->grpc_status_string); - grpc_mdstr_unref(channel->grpc_compression_level_string); - grpc_mdstr_unref(channel->grpc_message_string); - grpc_mdstr_unref(channel->path_string); - grpc_mdstr_unref(channel->authority_string); + GRPC_MDSTR_UNREF(channel->grpc_status_string); + GRPC_MDSTR_UNREF(channel->grpc_compression_level_string); + GRPC_MDSTR_UNREF(channel->grpc_message_string); + GRPC_MDSTR_UNREF(channel->path_string); + GRPC_MDSTR_UNREF(channel->authority_string); while (channel->registered_calls) { registered_call *rc = channel->registered_calls; channel->registered_calls = rc->next; - grpc_mdelem_unref(rc->path); - grpc_mdelem_unref(rc->authority); + GRPC_MDELEM_UNREF(rc->path); + GRPC_MDELEM_UNREF(rc->authority); gpr_free(rc); } grpc_mdctx_unref(channel->metadata_context); @@ -267,12 +267,12 @@ grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel) { grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) { - return grpc_mdelem_ref(channel->grpc_status_elem[i]); + return GRPC_MDELEM_REF(channel->grpc_status_elem[i]); } else { char tmp[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(i, tmp); return grpc_mdelem_from_metadata_strings( - channel->metadata_context, grpc_mdstr_ref(channel->grpc_status_string), + channel->metadata_context, GRPC_MDSTR_REF(channel->grpc_status_string), grpc_mdstr_from_string(channel->metadata_context, tmp)); } } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index f29d47c17c..a9d8940631 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -51,7 +51,7 @@ #include <grpc/support/string_util.h> #include <grpc/support/useful.h> -typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list; +typedef enum { PENDING_START, CALL_LIST_COUNT } call_list; typedef struct listener { void *arg; @@ -114,7 +114,6 @@ typedef struct channel_registered_method { struct channel_data { grpc_server *server; - size_t num_calls; grpc_connectivity_state connectivity_state; grpc_channel *channel; grpc_mdstr *path_key; @@ -183,7 +182,11 @@ typedef enum { struct call_data { grpc_call *call; + /** protects state */ + gpr_mu mu_state; + /** the current state of a call - see call_state */ call_state state; + grpc_mdstr *path; grpc_mdstr *host; gpr_timespec deadline; @@ -204,9 +207,7 @@ struct call_data { typedef struct { grpc_channel **channels; - grpc_channel **disconnects; size_t num_channels; - size_t num_disconnects; } channel_broadcaster; #define SERVER_FROM_CALL_ELEM(elem) \ @@ -225,26 +226,15 @@ static void maybe_finish_shutdown(grpc_server *server); static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) { channel_data *c; size_t count = 0; - size_t dc_count = 0; for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) { count++; - if (c->num_calls == 0) { - dc_count++; - } } cb->num_channels = count; - cb->num_disconnects = dc_count; cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels); - cb->disconnects = gpr_malloc(sizeof(*cb->channels) * cb->num_disconnects); count = 0; - dc_count = 0; for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) { cb->channels[count++] = c->channel; GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast"); - if (c->num_calls == 0) { - cb->disconnects[dc_count++] = c->channel; - GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast-disconnect"); - } } } @@ -280,19 +270,14 @@ static void send_shutdown(grpc_channel *channel, int send_goaway, } static void channel_broadcaster_shutdown(channel_broadcaster *cb, - int send_goaway, int send_disconnect) { + int send_goaway, int force_disconnect) { size_t i; for (i = 0; i < cb->num_channels; i++) { - send_shutdown(cb->channels[i], 1, 0); + send_shutdown(cb->channels[i], send_goaway, force_disconnect); GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast"); } - for (i = 0; i < cb->num_disconnects; i++) { - send_shutdown(cb->disconnects[i], 0, 1); - GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast-disconnect"); - } gpr_free(cb->channels); - gpr_free(cb->disconnects); } /* call list */ @@ -422,19 +407,23 @@ static void destroy_channel(channel_data *chand) { grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure); } -static void finish_start_new_rpc_and_unlock(grpc_server *server, - grpc_call_element *elem, - call_data **pending_root, - requested_call_array *array) { +static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, + call_data **pending_root, + requested_call_array *array) { requested_call rc; call_data *calld = elem->call_data; + gpr_mu_lock(&server->mu_call); if (array->count == 0) { + gpr_mu_lock(&calld->mu_state); calld->state = PENDING; + gpr_mu_unlock(&calld->mu_state); call_list_join(pending_root, calld, PENDING_START); gpr_mu_unlock(&server->mu_call); } else { rc = array->calls[--array->count]; + gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; + gpr_mu_unlock(&calld->mu_state); gpr_mu_unlock(&server->mu_call); begin_call(server, calld, &rc); } @@ -448,7 +437,6 @@ static void start_new_rpc(grpc_call_element *elem) { gpr_uint32 hash; channel_registered_method *rm; - gpr_mu_lock(&server->mu_call); if (chand->registered_methods && calld->path && calld->host) { /* TODO(ctiller): unify these two searches */ /* check for an exact match with host */ @@ -459,9 +447,8 @@ static void start_new_rpc(grpc_call_element *elem) { if (!rm) break; if (rm->host != calld->host) continue; if (rm->method != calld->path) continue; - finish_start_new_rpc_and_unlock(server, elem, - &rm->server_registered_method->pending, - &rm->server_registered_method->requested); + finish_start_new_rpc(server, elem, &rm->server_registered_method->pending, + &rm->server_registered_method->requested); return; } /* check for a wildcard method definition (no host set) */ @@ -472,14 +459,13 @@ static void start_new_rpc(grpc_call_element *elem) { if (!rm) break; if (rm->host != NULL) continue; if (rm->method != calld->path) continue; - finish_start_new_rpc_and_unlock(server, elem, - &rm->server_registered_method->pending, - &rm->server_registered_method->requested); + finish_start_new_rpc(server, elem, &rm->server_registered_method->pending, + &rm->server_registered_method->requested); return; } } - finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START], - &server->requested_calls); + finish_start_new_rpc(server, elem, &server->lists[PENDING_START], + &server->requested_calls); } static void kill_zombie(void *elem, int success) { @@ -501,15 +487,6 @@ static void maybe_finish_shutdown(grpc_server *server) { return; } - gpr_mu_lock(&server->mu_call); - if (server->lists[ALL_CALLS] != NULL) { - gpr_log(GPR_DEBUG, - "Waiting for all calls to finish before destroying server"); - gpr_mu_unlock(&server->mu_call); - return; - } - gpr_mu_unlock(&server->mu_call); - if (server->root_channel_data.next != &server->root_channel_data) { gpr_log(GPR_DEBUG, "Waiting for all channels to close before destroying server"); @@ -532,31 +509,19 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; if (md->key == chand->path_key) { - calld->path = grpc_mdstr_ref(md->value); + calld->path = GRPC_MDSTR_REF(md->value); return NULL; } else if (md->key == chand->authority_key) { - calld->host = grpc_mdstr_ref(md->value); + calld->host = GRPC_MDSTR_REF(md->value); return NULL; } return md; } -static int decrement_call_count(channel_data *chand) { - int disconnect = 0; - chand->num_calls--; - if (0 == chand->num_calls && chand->server->shutdown) { - disconnect = 1; - } - maybe_finish_shutdown(chand->server); - return disconnect; -} - static void server_on_recv(void *ptr, int success) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - int remove_res; - int disconnect = 0; if (success && !calld->got_initial_metadata) { size_t i; @@ -581,39 +546,33 @@ static void server_on_recv(void *ptr, int success) { case GRPC_STREAM_SEND_CLOSED: break; case GRPC_STREAM_RECV_CLOSED: - gpr_mu_lock(&chand->server->mu_call); + gpr_mu_lock(&calld->mu_state); if (calld->state == NOT_STARTED) { calld->state = ZOMBIED; + gpr_mu_unlock(&calld->mu_state); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_add_callback(&calld->kill_zombie_closure); + } else { + gpr_mu_unlock(&calld->mu_state); } - gpr_mu_unlock(&chand->server->mu_call); break; case GRPC_STREAM_CLOSED: - gpr_mu_lock(&chand->server->mu_call); + gpr_mu_lock(&calld->mu_state); if (calld->state == NOT_STARTED) { calld->state = ZOMBIED; + gpr_mu_unlock(&calld->mu_state); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_add_callback(&calld->kill_zombie_closure); } else if (calld->state == PENDING) { - call_list_remove(calld, PENDING_START); calld->state = ZOMBIED; + gpr_mu_unlock(&calld->mu_state); + gpr_mu_lock(&chand->server->mu_call); + call_list_remove(calld, PENDING_START); + gpr_mu_unlock(&chand->server->mu_call); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_add_callback(&calld->kill_zombie_closure); - } - remove_res = call_list_remove(calld, ALL_CALLS); - gpr_mu_unlock(&chand->server->mu_call); - gpr_mu_lock(&chand->server->mu_global); - if (remove_res) { - disconnect = decrement_call_count(chand); - if (disconnect) { - GRPC_CHANNEL_INTERNAL_REF(chand->channel, "send-disconnect"); - } - } - gpr_mu_unlock(&chand->server->mu_global); - if (disconnect) { - send_shutdown(chand->channel, 0, 1); - GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "send-disconnect"); + } else { + gpr_mu_unlock(&calld->mu_state); } break; } @@ -676,17 +635,10 @@ static void init_call_elem(grpc_call_element *elem, memset(calld, 0, sizeof(call_data)); calld->deadline = gpr_inf_future; calld->call = grpc_call_from_top_element(elem); + gpr_mu_init(&calld->mu_state); grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem); - gpr_mu_lock(&chand->server->mu_call); - call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS); - gpr_mu_unlock(&chand->server->mu_call); - - gpr_mu_lock(&chand->server->mu_global); - chand->num_calls++; - gpr_mu_unlock(&chand->server->mu_global); - server_ref(chand->server); if (initial_op) server_mutate_op(elem, initial_op); @@ -695,27 +647,22 @@ static void init_call_elem(grpc_call_element *elem, static void destroy_call_elem(grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - int removed[CALL_LIST_COUNT]; - size_t i; - gpr_mu_lock(&chand->server->mu_call); - for (i = 0; i < CALL_LIST_COUNT; i++) { - removed[i] = call_list_remove(elem->call_data, i); - } - gpr_mu_unlock(&chand->server->mu_call); - if (removed[ALL_CALLS]) { - gpr_mu_lock(&chand->server->mu_global); - decrement_call_count(chand); - gpr_mu_unlock(&chand->server->mu_global); + if (calld->state == PENDING) { + gpr_mu_lock(&chand->server->mu_call); + call_list_remove(elem->call_data, PENDING_START); + gpr_mu_unlock(&chand->server->mu_call); } if (calld->host) { - grpc_mdstr_unref(calld->host); + GRPC_MDSTR_UNREF(calld->host); } if (calld->path) { - grpc_mdstr_unref(calld->path); + GRPC_MDSTR_UNREF(calld->path); } + gpr_mu_destroy(&calld->mu_state); + server_unref(chand->server); } @@ -727,7 +674,6 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, GPR_ASSERT(is_first); GPR_ASSERT(!is_last); chand->server = NULL; - chand->num_calls = 0; chand->channel = NULL; chand->path_key = grpc_mdstr_from_string(metadata_context, ":path"); chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority"); @@ -744,10 +690,10 @@ static void destroy_channel_elem(grpc_channel_element *elem) { if (chand->registered_methods) { for (i = 0; i < chand->registered_method_slots; i++) { if (chand->registered_methods[i].method) { - grpc_mdstr_unref(chand->registered_methods[i].method); + GRPC_MDSTR_UNREF(chand->registered_methods[i].method); } if (chand->registered_methods[i].host) { - grpc_mdstr_unref(chand->registered_methods[i].host); + GRPC_MDSTR_UNREF(chand->registered_methods[i].host); } } gpr_free(chand->registered_methods); @@ -759,8 +705,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) { chand->next = chand->prev = chand; maybe_finish_shutdown(chand->server); gpr_mu_unlock(&chand->server->mu_global); - grpc_mdstr_unref(chand->path_key); - grpc_mdstr_unref(chand->authority_key); + GRPC_MDSTR_UNREF(chand->path_key); + GRPC_MDSTR_UNREF(chand->authority_key); server_unref(chand->server); } } @@ -1049,47 +995,13 @@ void grpc_server_listener_destroy_done(void *s) { } void grpc_server_cancel_all_calls(grpc_server *server) { - call_data *calld; - grpc_call **calls; - size_t call_count; - size_t call_capacity; - int is_first = 1; - size_t i; - - gpr_mu_lock(&server->mu_call); - - GPR_ASSERT(server->shutdown); - - if (!server->lists[ALL_CALLS]) { - gpr_mu_unlock(&server->mu_call); - return; - } - - call_capacity = 8; - call_count = 0; - calls = gpr_malloc(sizeof(grpc_call *) * call_capacity); - - for (calld = server->lists[ALL_CALLS]; - calld != server->lists[ALL_CALLS] || is_first; - calld = calld->links[ALL_CALLS].next) { - if (call_count == call_capacity) { - call_capacity *= 2; - calls = gpr_realloc(calls, sizeof(grpc_call *) * call_capacity); - } - calls[call_count++] = calld->call; - GRPC_CALL_INTERNAL_REF(calld->call, "cancel_all"); - is_first = 0; - } - - gpr_mu_unlock(&server->mu_call); + channel_broadcaster broadcaster; - for (i = 0; i < call_count; i++) { - grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE, - "Unavailable"); - GRPC_CALL_INTERNAL_UNREF(calls[i], "cancel_all", 1); - } + gpr_mu_lock(&server->mu_global); + channel_broadcaster_init(server, &broadcaster); + gpr_mu_unlock(&server->mu_global); - gpr_free(calls); + channel_broadcaster_shutdown(&broadcaster, 0, 1); } void grpc_server_destroy(grpc_server *server) { @@ -1145,10 +1057,12 @@ static grpc_call_error queue_call_request(grpc_server *server, requested_calls = &rc->data.registered.registered_method->requested; break; } - if (calld) { + if (calld != NULL) { + gpr_mu_unlock(&server->mu_call); + gpr_mu_lock(&calld->mu_state); GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; - gpr_mu_unlock(&server->mu_call); + gpr_mu_unlock(&calld->mu_state); begin_call(server, calld, rc); return GRPC_CALL_OK; } else { diff --git a/src/core/transport/chttp2/hpack_parser.c b/src/core/transport/chttp2/hpack_parser.c index c729e0f22f..f8bff42ed6 100644 --- a/src/core/transport/chttp2/hpack_parser.c +++ b/src/core/transport/chttp2/hpack_parser.c @@ -622,7 +622,7 @@ static const gpr_uint8 inverse_base64[256] = { static void on_hdr(grpc_chttp2_hpack_parser *p, grpc_mdelem *md, int add_to_table) { if (add_to_table) { - grpc_mdelem_ref(md); + GRPC_MDELEM_REF(md); grpc_chttp2_hptbl_add(&p->table, md); } p->on_header(p->on_header_user_data, md); @@ -711,7 +711,7 @@ static int parse_stream_dep0(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, static int finish_indexed_field(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, const gpr_uint8 *end) { grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index); - grpc_mdelem_ref(md); + GRPC_MDELEM_REF(md); on_hdr(p, md, 0); return parse_begin(p, cur, end); } @@ -740,7 +740,7 @@ static int finish_lithdr_incidx(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, const gpr_uint8 *end) { grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index); on_hdr(p, grpc_mdelem_from_metadata_strings(p->table.mdctx, - grpc_mdstr_ref(md->key), + GRPC_MDSTR_REF(md->key), take_string(p, &p->value)), 1); return parse_begin(p, cur, end); @@ -793,7 +793,7 @@ static int finish_lithdr_notidx(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, const gpr_uint8 *end) { grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index); on_hdr(p, grpc_mdelem_from_metadata_strings(p->table.mdctx, - grpc_mdstr_ref(md->key), + GRPC_MDSTR_REF(md->key), take_string(p, &p->value)), 0); return parse_begin(p, cur, end); @@ -846,7 +846,7 @@ static int finish_lithdr_nvridx(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, const gpr_uint8 *end) { grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index); on_hdr(p, grpc_mdelem_from_metadata_strings(p->table.mdctx, - grpc_mdstr_ref(md->key), + GRPC_MDSTR_REF(md->key), take_string(p, &p->value)), 0); return parse_begin(p, cur, end); @@ -1336,7 +1336,7 @@ static void on_header_not_set(void *user_data, grpc_mdelem *md) { valuehex); gpr_free(keyhex); gpr_free(valuehex); - grpc_mdelem_unref(md); + GRPC_MDELEM_UNREF(md); abort(); } diff --git a/src/core/transport/chttp2/hpack_table.c b/src/core/transport/chttp2/hpack_table.c index 372e71d68f..4fc154380e 100644 --- a/src/core/transport/chttp2/hpack_table.c +++ b/src/core/transport/chttp2/hpack_table.c @@ -122,10 +122,10 @@ void grpc_chttp2_hptbl_init(grpc_chttp2_hptbl *tbl, grpc_mdctx *mdctx) { void grpc_chttp2_hptbl_destroy(grpc_chttp2_hptbl *tbl) { size_t i; for (i = 0; i < GRPC_CHTTP2_LAST_STATIC_ENTRY; i++) { - grpc_mdelem_unref(tbl->static_ents[i]); + GRPC_MDELEM_UNREF(tbl->static_ents[i]); } for (i = 0; i < tbl->num_ents; i++) { - grpc_mdelem_unref( + GRPC_MDELEM_UNREF( tbl->ents[(tbl->first_ent + i) % GRPC_CHTTP2_MAX_TABLE_COUNT]); } } @@ -155,7 +155,7 @@ static void evict1(grpc_chttp2_hptbl *tbl) { GRPC_CHTTP2_HPACK_ENTRY_OVERHEAD; tbl->first_ent = (tbl->first_ent + 1) % GRPC_CHTTP2_MAX_TABLE_COUNT; tbl->num_ents--; - grpc_mdelem_unref(first_ent); + GRPC_MDELEM_UNREF(first_ent); } void grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl, grpc_mdelem *md) { diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c index 68e0912b9c..77162a6864 100644 --- a/src/core/transport/chttp2/incoming_metadata.c +++ b/src/core/transport/chttp2/incoming_metadata.c @@ -49,7 +49,7 @@ void grpc_chttp2_incoming_metadata_buffer_destroy( grpc_chttp2_incoming_metadata_buffer *buffer) { size_t i; for (i = 0; i < buffer->count; i++) { - grpc_mdelem_unref(buffer->elems[i].md); + GRPC_MDELEM_UNREF(buffer->elems[i].md); } gpr_free(buffer->elems); } diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 7f98a5bd71..bdd4b432eb 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -173,6 +173,8 @@ typedef struct { /** have we seen a goaway */ gpr_uint8 seen_goaway; + /** have we sent a goaway */ + gpr_uint8 sent_goaway; /** is this transport a client? */ gpr_uint8 is_client; @@ -557,8 +559,10 @@ void grpc_chttp2_add_incoming_goaway( void grpc_chttp2_register_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s); -void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, - grpc_chttp2_stream *s); +/* returns 1 if this is the last stream, 0 otherwise */ +int grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, + grpc_chttp2_stream *s) GRPC_MUST_USE_RESULT; +int grpc_chttp2_has_streams(grpc_chttp2_transport *t); void grpc_chttp2_for_all_streams( grpc_chttp2_transport_global *transport_global, void *user_data, void (*cb)(grpc_chttp2_transport_global *transport_global, void *user_data, diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 4664a0895c..130167f830 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -463,7 +463,7 @@ static grpc_chttp2_parse_error skip_parser( return GRPC_CHTTP2_PARSE_OK; } -static void skip_header(void *tp, grpc_mdelem *md) { grpc_mdelem_unref(md); } +static void skip_header(void *tp, grpc_mdelem *md) { GRPC_MDELEM_UNREF(md); } static int init_skip_frame_parser( grpc_chttp2_transport_parsing *transport_parsing, int is_header) { @@ -600,7 +600,7 @@ static void on_header(void *tp, grpc_mdelem *md) { grpc_chttp2_incoming_metadata_buffer_set_deadline( &stream_parsing->incoming_metadata, gpr_time_add(gpr_now(), *cached_timeout)); - grpc_mdelem_unref(md); + GRPC_MDELEM_UNREF(md); } else { grpc_chttp2_incoming_metadata_buffer_add(&stream_parsing->incoming_metadata, md); diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index cf78ac50cc..56ab82006a 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -247,19 +247,19 @@ static grpc_mdelem *add_elem(grpc_chttp2_hpack_compressor *c, } else if (c->entries_keys[HASH_FRAGMENT_3(key_hash)] == elem->key) { c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index; } else if (c->entries_keys[HASH_FRAGMENT_2(key_hash)] == NULL) { - c->entries_keys[HASH_FRAGMENT_2(key_hash)] = grpc_mdstr_ref(elem->key); + c->entries_keys[HASH_FRAGMENT_2(key_hash)] = GRPC_MDSTR_REF(elem->key); c->indices_keys[HASH_FRAGMENT_2(key_hash)] = new_index; } else if (c->entries_keys[HASH_FRAGMENT_3(key_hash)] == NULL) { - c->entries_keys[HASH_FRAGMENT_3(key_hash)] = grpc_mdstr_ref(elem->key); + c->entries_keys[HASH_FRAGMENT_3(key_hash)] = GRPC_MDSTR_REF(elem->key); c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index; } else if (c->indices_keys[HASH_FRAGMENT_2(key_hash)] < c->indices_keys[HASH_FRAGMENT_3(key_hash)]) { - grpc_mdstr_unref(c->entries_keys[HASH_FRAGMENT_2(key_hash)]); - c->entries_keys[HASH_FRAGMENT_2(key_hash)] = grpc_mdstr_ref(elem->key); + GRPC_MDSTR_UNREF(c->entries_keys[HASH_FRAGMENT_2(key_hash)]); + c->entries_keys[HASH_FRAGMENT_2(key_hash)] = GRPC_MDSTR_REF(elem->key); c->indices_keys[HASH_FRAGMENT_2(key_hash)] = new_index; } else { - grpc_mdstr_unref(c->entries_keys[HASH_FRAGMENT_3(key_hash)]); - c->entries_keys[HASH_FRAGMENT_3(key_hash)] = grpc_mdstr_ref(elem->key); + GRPC_MDSTR_UNREF(c->entries_keys[HASH_FRAGMENT_3(key_hash)]); + c->entries_keys[HASH_FRAGMENT_3(key_hash)] = GRPC_MDSTR_REF(elem->key); c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index; } @@ -439,10 +439,10 @@ static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline, grpc_mdelem *mdelem; grpc_chttp2_encode_timeout(gpr_time_sub(deadline, gpr_now()), timeout_str); mdelem = grpc_mdelem_from_metadata_strings( - c->mdctx, grpc_mdstr_ref(c->timeout_key_str), + c->mdctx, GRPC_MDSTR_REF(c->timeout_key_str), grpc_mdstr_from_string(c->mdctx, timeout_str)); mdelem = hpack_enc(c, mdelem, st); - if (mdelem) grpc_mdelem_unref(mdelem); + if (mdelem) GRPC_MDELEM_UNREF(mdelem); } gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id) { @@ -461,10 +461,10 @@ void grpc_chttp2_hpack_compressor_init(grpc_chttp2_hpack_compressor *c, void grpc_chttp2_hpack_compressor_destroy(grpc_chttp2_hpack_compressor *c) { int i; for (i = 0; i < GRPC_CHTTP2_HPACKC_NUM_VALUES; i++) { - if (c->entries_keys[i]) grpc_mdstr_unref(c->entries_keys[i]); - if (c->entries_elems[i]) grpc_mdelem_unref(c->entries_elems[i]); + if (c->entries_keys[i]) GRPC_MDSTR_UNREF(c->entries_keys[i]); + if (c->entries_elems[i]) GRPC_MDELEM_UNREF(c->entries_elems[i]); } - grpc_mdstr_unref(c->timeout_key_str); + GRPC_MDSTR_UNREF(c->timeout_key_str); } gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count, @@ -620,10 +620,10 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, op = &ops[unref_op]; if (op->type != GRPC_OP_METADATA) continue; for (l = op->data.metadata.list.head; l; l = l->next) { - if (l->md) grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + if (l->md) GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md); } for (l = op->data.metadata.garbage.head; l; l = l->next) { - grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md); } } grpc_mdctx_unlock(mdctx); diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 85691b32d2..4fea058c19 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -354,9 +354,14 @@ void grpc_chttp2_register_stream(grpc_chttp2_transport *t, stream_list_add_tail(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); } -void grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, +int grpc_chttp2_unregister_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - stream_list_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); + stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_ALL_STREAMS); + return stream_list_empty(t, GRPC_CHTTP2_LIST_ALL_STREAMS); +} + +int grpc_chttp2_has_streams(grpc_chttp2_transport *t) { + return !stream_list_empty(t, GRPC_CHTTP2_LIST_ALL_STREAMS); } void grpc_chttp2_for_all_streams( diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 70df146239..322ff39820 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -139,7 +139,7 @@ static void destruct_transport(grpc_chttp2_transport *t) { grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser); grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser); - grpc_mdstr_unref(t->parsing.str_grpc_timeout); + GRPC_MDSTR_UNREF(t->parsing.str_grpc_timeout); for (i = 0; i < STREAM_LIST_COUNT; i++) { GPR_ASSERT(t->lists[i].head == NULL); @@ -382,7 +382,9 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED || s->global.id == 0); GPR_ASSERT(!s->global.in_stream_map); - grpc_chttp2_unregister_stream(t, s); + if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { + close_transport_locked(t); + } if (!t->parsing_active && s->global.id) { GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map, s->global.id) == NULL); @@ -680,10 +682,14 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { } if (op->send_goaway) { + t->global.sent_goaway = 1; grpc_chttp2_goaway_append( t->global.last_incoming_stream_id, grpc_chttp2_grpc_status_to_http2_error(op->goaway_status), gpr_slice_ref(*op->goaway_message), &t->global.qbuf); + if (!grpc_chttp2_has_streams(t)) { + close_transport_locked(t); + } } if (op->set_accept_stream != NULL) { @@ -732,6 +738,9 @@ static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) { t->parsing.incoming_stream = NULL; grpc_chttp2_parsing_become_skip_parser(&t->parsing); } + if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { + close_transport_locked(t); + } new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) + grpc_chttp2_stream_map_size(&t->new_stream_map); diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c index c80d67823f..9cbb0952d0 100644 --- a/src/core/transport/metadata.c +++ b/src/core/transport/metadata.c @@ -48,6 +48,20 @@ #define INITIAL_STRTAB_CAPACITY 4 #define INITIAL_MDTAB_CAPACITY 4 +#ifdef GRPC_METADATA_REFCOUNT_DEBUG +#define DEBUG_ARGS , const char *file, int line +#define FWD_DEBUG_ARGS , file, line +#define INTERNAL_STRING_REF(s) internal_string_ref((s), __FILE__, __LINE__) +#define INTERNAL_STRING_UNREF(s) internal_string_unref((s), __FILE__, __LINE__) +#define REF_MD_LOCKED(s) ref_md_locked((s), __FILE__, __LINE__) +#else +#define DEBUG_ARGS +#define FWD_DEBUG_ARGS +#define INTERNAL_STRING_REF(s) internal_string_ref((s)) +#define INTERNAL_STRING_UNREF(s) internal_string_unref((s)) +#define REF_MD_LOCKED(s) ref_md_locked((s)) +#endif + typedef struct internal_string { /* must be byte compatible with grpc_mdstr */ gpr_slice slice; @@ -96,8 +110,8 @@ struct grpc_mdctx { size_t mdtab_capacity; }; -static void internal_string_ref(internal_string *s); -static void internal_string_unref(internal_string *s); +static void internal_string_ref(internal_string *s DEBUG_ARGS); +static void internal_string_unref(internal_string *s DEBUG_ARGS); static void discard_metadata(grpc_mdctx *ctx); static void gc_mdtab(grpc_mdctx *ctx); static void metadata_context_destroy_locked(grpc_mdctx *ctx); @@ -132,7 +146,15 @@ static void unlock(grpc_mdctx *ctx) { gpr_mu_unlock(&ctx->mu); } -static void ref_md_locked(internal_metadata *md) { +static void ref_md_locked(internal_metadata *md DEBUG_ARGS) { +#ifdef GRPC_METADATA_REFCOUNT_DEBUG + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "ELM REF:%p:%d->%d: '%s' = '%s'", md, + gpr_atm_no_barrier_load(&md->refcnt), + gpr_atm_no_barrier_load(&md->refcnt) + 1, + grpc_mdstr_as_c_string((grpc_mdstr *)md->key), + grpc_mdstr_as_c_string((grpc_mdstr *)md->value)); +#endif if (0 == gpr_atm_no_barrier_fetch_add(&md->refcnt, 1)) { md->context->mdtab_free--; } @@ -173,8 +195,8 @@ static void discard_metadata(grpc_mdctx *ctx) { while (cur) { GPR_ASSERT(gpr_atm_acq_load(&cur->refcnt) == 0); next = cur->bucket_next; - internal_string_unref(cur->key); - internal_string_unref(cur->value); + INTERNAL_STRING_UNREF(cur->key); + INTERNAL_STRING_UNREF(cur->value); if (cur->user_data) { cur->destroy_user_data(cur->user_data); } @@ -248,9 +270,19 @@ static void internal_destroy_string(internal_string *is) { gpr_free(is); } -static void internal_string_ref(internal_string *s) { ++s->refs; } +static void internal_string_ref(internal_string *s DEBUG_ARGS) { +#ifdef GRPC_METADATA_REFCOUNT_DEBUG + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "STR REF:%p:%d->%d: '%s'", s, + s->refs, s->refs + 1, grpc_mdstr_as_c_string((grpc_mdstr *)s)); +#endif + ++s->refs; +} -static void internal_string_unref(internal_string *s) { +static void internal_string_unref(internal_string *s DEBUG_ARGS) { +#ifdef GRPC_METADATA_REFCOUNT_DEBUG + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "STR UNREF:%p:%d->%d: '%s'", s, + s->refs, s->refs - 1, grpc_mdstr_as_c_string((grpc_mdstr *)s)); +#endif GPR_ASSERT(s->refs > 0); if (0 == --s->refs) { internal_destroy_string(s); @@ -262,7 +294,7 @@ static void slice_ref(void *p) { (internal_string *)((char *)p - offsetof(internal_string, refcount)); grpc_mdctx *ctx = is->context; lock(ctx); - internal_string_ref(is); + INTERNAL_STRING_REF(is); unlock(ctx); } @@ -271,7 +303,7 @@ static void slice_unref(void *p) { (internal_string *)((char *)p - offsetof(internal_string, refcount)); grpc_mdctx *ctx = is->context; lock(ctx); - internal_string_unref(is); + INTERNAL_STRING_UNREF(is); unlock(ctx); } @@ -297,7 +329,7 @@ grpc_mdstr *grpc_mdstr_from_buffer(grpc_mdctx *ctx, const gpr_uint8 *buf, for (s = ctx->strtab[hash % ctx->strtab_capacity]; s; s = s->bucket_next) { if (s->hash == hash && GPR_SLICE_LENGTH(s->slice) == length && 0 == memcmp(buf, GPR_SLICE_START_PTR(s->slice), length)) { - internal_string_ref(s); + INTERNAL_STRING_REF(s); unlock(ctx); return (grpc_mdstr *)s; } @@ -353,8 +385,8 @@ static void gc_mdtab(grpc_mdctx *ctx) { for (md = ctx->mdtab[i]; md; md = next) { next = md->bucket_next; if (gpr_atm_acq_load(&md->refcnt) == 0) { - internal_string_unref(md->key); - internal_string_unref(md->value); + INTERNAL_STRING_UNREF(md->key); + INTERNAL_STRING_UNREF(md->value); if (md->user_data) { md->destroy_user_data(md->user_data); } @@ -418,9 +450,9 @@ grpc_mdelem *grpc_mdelem_from_metadata_strings(grpc_mdctx *ctx, /* search for an existing pair */ for (md = ctx->mdtab[hash % ctx->mdtab_capacity]; md; md = md->bucket_next) { if (md->key == key && md->value == value) { - ref_md_locked(md); - internal_string_unref(key); - internal_string_unref(value); + REF_MD_LOCKED(md); + INTERNAL_STRING_UNREF(key); + INTERNAL_STRING_UNREF(value); unlock(ctx); return (grpc_mdelem *)md; } @@ -435,6 +467,12 @@ grpc_mdelem *grpc_mdelem_from_metadata_strings(grpc_mdctx *ctx, md->user_data = NULL; md->destroy_user_data = NULL; md->bucket_next = ctx->mdtab[hash % ctx->mdtab_capacity]; +#ifdef GRPC_METADATA_REFCOUNT_DEBUG + gpr_log(GPR_DEBUG, "ELM NEW:%p:%d: '%s' = '%s'", md, + gpr_atm_no_barrier_load(&md->refcnt), + grpc_mdstr_as_c_string((grpc_mdstr *)md->key), + grpc_mdstr_as_c_string((grpc_mdstr *)md->value)); +#endif ctx->mdtab[hash % ctx->mdtab_capacity] = md; ctx->mdtab_count++; @@ -469,8 +507,16 @@ grpc_mdelem *grpc_mdelem_from_string_and_buffer(grpc_mdctx *ctx, grpc_mdstr_from_buffer(ctx, value, value_length)); } -grpc_mdelem *grpc_mdelem_ref(grpc_mdelem *gmd) { +grpc_mdelem *grpc_mdelem_ref(grpc_mdelem *gmd DEBUG_ARGS) { internal_metadata *md = (internal_metadata *)gmd; +#ifdef GRPC_METADATA_REFCOUNT_DEBUG + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "ELM REF:%p:%d->%d: '%s' = '%s'", md, + gpr_atm_no_barrier_load(&md->refcnt), + gpr_atm_no_barrier_load(&md->refcnt) + 1, + grpc_mdstr_as_c_string((grpc_mdstr *)md->key), + grpc_mdstr_as_c_string((grpc_mdstr *)md->value)); +#endif /* we can assume the ref count is >= 1 as the application is calling this function - meaning that no adjustment to mdtab_free is necessary, simplifying the logic here to be just an atomic increment */ @@ -480,10 +526,18 @@ grpc_mdelem *grpc_mdelem_ref(grpc_mdelem *gmd) { return gmd; } -void grpc_mdelem_unref(grpc_mdelem *gmd) { +void grpc_mdelem_unref(grpc_mdelem *gmd DEBUG_ARGS) { internal_metadata *md = (internal_metadata *)gmd; grpc_mdctx *ctx = md->context; lock(ctx); +#ifdef GRPC_METADATA_REFCOUNT_DEBUG + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "ELM UNREF:%p:%d->%d: '%s' = '%s'", md, + gpr_atm_no_barrier_load(&md->refcnt), + gpr_atm_no_barrier_load(&md->refcnt) - 1, + grpc_mdstr_as_c_string((grpc_mdstr *)md->key), + grpc_mdstr_as_c_string((grpc_mdstr *)md->value)); +#endif assert(gpr_atm_no_barrier_load(&md->refcnt) >= 1); if (1 == gpr_atm_full_fetch_add(&md->refcnt, -1)) { ctx->mdtab_free++; @@ -495,20 +549,20 @@ const char *grpc_mdstr_as_c_string(grpc_mdstr *s) { return (const char *)GPR_SLICE_START_PTR(s->slice); } -grpc_mdstr *grpc_mdstr_ref(grpc_mdstr *gs) { +grpc_mdstr *grpc_mdstr_ref(grpc_mdstr *gs DEBUG_ARGS) { internal_string *s = (internal_string *)gs; grpc_mdctx *ctx = s->context; lock(ctx); - internal_string_ref(s); + internal_string_ref(s FWD_DEBUG_ARGS); unlock(ctx); return gs; } -void grpc_mdstr_unref(grpc_mdstr *gs) { +void grpc_mdstr_unref(grpc_mdstr *gs DEBUG_ARGS) { internal_string *s = (internal_string *)gs; grpc_mdctx *ctx = s->context; lock(ctx); - internal_string_unref(s); + internal_string_unref(s FWD_DEBUG_ARGS); unlock(ctx); } @@ -558,10 +612,19 @@ gpr_slice grpc_mdstr_as_base64_encoded_and_huffman_compressed(grpc_mdstr *gs) { void grpc_mdctx_lock(grpc_mdctx *ctx) { lock(ctx); } -void grpc_mdctx_locked_mdelem_unref(grpc_mdctx *ctx, grpc_mdelem *gmd) { +void grpc_mdctx_locked_mdelem_unref(grpc_mdctx *ctx, + grpc_mdelem *gmd DEBUG_ARGS) { internal_metadata *md = (internal_metadata *)gmd; grpc_mdctx *elem_ctx = md->context; GPR_ASSERT(ctx == elem_ctx); +#ifdef GRPC_METADATA_REFCOUNT_DEBUG + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "ELM UNREF:%p:%d->%d: '%s' = '%s'", md, + gpr_atm_no_barrier_load(&md->refcnt), + gpr_atm_no_barrier_load(&md->refcnt) - 1, + grpc_mdstr_as_c_string((grpc_mdstr *)md->key), + grpc_mdstr_as_c_string((grpc_mdstr *)md->value)); +#endif assert(gpr_atm_no_barrier_load(&md->refcnt) >= 1); if (1 == gpr_atm_full_fetch_add(&md->refcnt, -1)) { ctx->mdtab_free++; diff --git a/src/core/transport/metadata.h b/src/core/transport/metadata.h index 76e3f3c1f8..99b15322c3 100644 --- a/src/core/transport/metadata.h +++ b/src/core/transport/metadata.h @@ -127,11 +127,25 @@ void grpc_mdelem_set_user_data(grpc_mdelem *md, void (*destroy_func)(void *), void *user_data); /* Reference counting */ +#ifdef GRPC_METADATA_REFCOUNT_DEBUG +#define GRPC_MDSTR_REF(s) grpc_mdstr_ref((s), __FILE__, __LINE__) +#define GRPC_MDSTR_UNREF(s) grpc_mdstr_unref((s), __FILE__, __LINE__) +#define GRPC_MDELEM_REF(s) grpc_mdelem_ref((s), __FILE__, __LINE__) +#define GRPC_MDELEM_UNREF(s) grpc_mdelem_unref((s), __FILE__, __LINE__) +grpc_mdstr *grpc_mdstr_ref(grpc_mdstr *s, const char *file, int line); +void grpc_mdstr_unref(grpc_mdstr *s, const char *file, int line); +grpc_mdelem *grpc_mdelem_ref(grpc_mdelem *md, const char *file, int line); +void grpc_mdelem_unref(grpc_mdelem *md, const char *file, int line); +#else +#define GRPC_MDSTR_REF(s) grpc_mdstr_ref((s)) +#define GRPC_MDSTR_UNREF(s) grpc_mdstr_unref((s)) +#define GRPC_MDELEM_REF(s) grpc_mdelem_ref((s)) +#define GRPC_MDELEM_UNREF(s) grpc_mdelem_unref((s)) grpc_mdstr *grpc_mdstr_ref(grpc_mdstr *s); void grpc_mdstr_unref(grpc_mdstr *s); - grpc_mdelem *grpc_mdelem_ref(grpc_mdelem *md); void grpc_mdelem_unref(grpc_mdelem *md); +#endif /* Recover a char* from a grpc_mdstr. The returned string is null terminated. Does not promise that the returned string has no embedded nulls however. */ @@ -147,8 +161,18 @@ int grpc_mdstr_is_bin_suffixed(grpc_mdstr *s); /* Lock the metadata context: it's only safe to call _locked_ functions against this context from the calling thread until grpc_mdctx_unlock is called */ void grpc_mdctx_lock(grpc_mdctx *ctx); +#ifdef GRPC_METADATA_REFCOUNT_DEBUG +#define GRPC_MDCTX_LOCKED_MDELEM_UNREF(ctx, elem) \ + grpc_mdctx_locked_mdelem_unref((ctx), (elem), __FILE__, __LINE__) +/* Unref a metadata element */ +void grpc_mdctx_locked_mdelem_unref(grpc_mdctx *ctx, grpc_mdelem *elem, + const char *file, int line); +#else +#define GRPC_MDCTX_LOCKED_MDELEM_UNREF(ctx, elem) \ + grpc_mdctx_locked_mdelem_unref((ctx), (elem)) /* Unref a metadata element */ void grpc_mdctx_locked_mdelem_unref(grpc_mdctx *ctx, grpc_mdelem *elem); +#endif /* Unlock the metadata context */ void grpc_mdctx_unlock(grpc_mdctx *ctx); diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c index 81df5455f6..fdb50c6b71 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/stream_op.c @@ -211,10 +211,10 @@ void grpc_metadata_batch_init(grpc_metadata_batch *batch) { void grpc_metadata_batch_destroy(grpc_metadata_batch *batch) { grpc_linked_mdelem *l; for (l = batch->list.head; l; l = l->next) { - grpc_mdelem_unref(l->md); + GRPC_MDELEM_UNREF(l->md); } for (l = batch->garbage.head; l; l = l->next) { - grpc_mdelem_unref(l->md); + GRPC_MDELEM_UNREF(l->md); } } @@ -315,7 +315,7 @@ void grpc_metadata_batch_filter(grpc_metadata_batch *batch, assert_valid_list(&batch->list); link_head(&batch->garbage, l); } else if (filt != orig) { - grpc_mdelem_unref(orig); + GRPC_MDELEM_UNREF(orig); l->md = filt; } } diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index fe565944ed..2689e3028a 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -85,6 +85,6 @@ void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, op->cancel_with_status = status; } if (message) { - grpc_mdstr_unref(message); + GRPC_MDSTR_UNREF(message); } } diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 579bcc943f..1429737721 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -91,7 +91,9 @@ typedef struct grpc_transport_op { grpc_connectivity_state *connectivity_state; /** should the transport be disconnected */ int disconnect; - /** should we send a goaway? */ + /** should we send a goaway? + after a goaway is sent, once there are no more active calls on + the transport, the transport should disconnect */ int send_goaway; /** what should the goaway contain? */ grpc_status_code goaway_status; diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index 5bc6f6fd91..da31d000b3 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -60,7 +60,7 @@ Channel::~Channel() { grpc_channel_destroy(c_channel_); } Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, CompletionQueue* cq) { auto c_call = - method.channel_tag() + method.channel_tag() && context->authority().empty() ? grpc_channel_create_registered_call(c_channel_, cq->cq(), method.channel_tag(), context->raw_deadline()) diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc index b5134b3140..4d200908fb 100644 --- a/src/cpp/client/secure_credentials.cc +++ b/src/cpp/client/secure_credentials.cc @@ -117,6 +117,13 @@ std::shared_ptr<Credentials> RefreshTokenCredentials( grpc_refresh_token_credentials_create(json_refresh_token.c_str())); } +// Builds access token credentials. +std::shared_ptr<Credentials> AccessTokenCredentials( + const grpc::string& access_token) { + return WrapCredentials( + grpc_access_token_credentials_create(access_token.c_str())); +} + // Builds IAM credentials. std::shared_ptr<Credentials> IAMCredentials( const grpc::string& authorization_token, diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 033c18490b..e6761d6244 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -207,10 +207,11 @@ Server::~Server() { delete sync_methods_; } -bool Server::RegisterService(RpcService* service) { +bool Server::RegisterService(const grpc::string *host, RpcService* service) { for (int i = 0; i < service->GetMethodCount(); ++i) { RpcServiceMethod* method = service->GetMethod(i); - void* tag = grpc_server_register_method(server_, method->name(), nullptr); + void* tag = grpc_server_register_method( + server_, method->name(), host ? host->c_str() : nullptr); if (!tag) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); @@ -222,14 +223,14 @@ bool Server::RegisterService(RpcService* service) { return true; } -bool Server::RegisterAsyncService(AsynchronousService* service) { +bool Server::RegisterAsyncService(const grpc::string *host, AsynchronousService* service) { GPR_ASSERT(service->server_ == nullptr && "Can only register an asynchronous service against one server."); service->server_ = this; service->request_args_ = new void*[service->method_count_]; for (size_t i = 0; i < service->method_count_; ++i) { void* tag = grpc_server_register_method(server_, service->method_names_[i], - nullptr); + host ? host->c_str() : nullptr); if (!tag) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", service->method_names_[i]); diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 3ee1d54e76..86c78f05ff 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -51,11 +51,21 @@ std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() { } void ServerBuilder::RegisterService(SynchronousService* service) { - services_.push_back(service->service()); + services_.emplace_back(new NamedService<RpcService>(service->service())); } void ServerBuilder::RegisterAsyncService(AsynchronousService* service) { - async_services_.push_back(service); + async_services_.emplace_back(new NamedService<AsynchronousService>(service)); +} + +void ServerBuilder::RegisterService( + const grpc::string& addr, SynchronousService* service) { + services_.emplace_back(new NamedService<RpcService>(addr, service->service())); +} + +void ServerBuilder::RegisterAsyncService( + const grpc::string& addr, AsynchronousService* service) { + async_services_.emplace_back(new NamedService<AsynchronousService>(addr, service)); } void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) { @@ -97,13 +107,13 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { } for (auto service = services_.begin(); service != services_.end(); service++) { - if (!server->RegisterService(*service)) { + if (!server->RegisterService((*service)->host.get(), (*service)->service)) { return nullptr; } } for (auto service = async_services_.begin(); service != async_services_.end(); service++) { - if (!server->RegisterAsyncService(*service)) { + if (!server->RegisterAsyncService((*service)->host.get(), (*service)->service)) { return nullptr; } } diff --git a/src/node/binding.gyp b/src/node/binding.gyp index 83f72fabca..6ba233388a 100644 --- a/src/node/binding.gyp +++ b/src/node/binding.gyp @@ -10,20 +10,54 @@ '-pthread', '-pedantic', '-g', - '-zdefs' + '-zdefs', '-Werror' ], 'ldflags': [ '-g' ], - 'link_settings': { - 'libraries': [ - '-lpthread', - '-lgrpc', - '-lgpr' - ] - }, "conditions": [ + ['OS != "win"', { + 'variables': { + 'pkg_config_grpc': '<!(pkg-config --exists grpc >/dev/null 2>&1 && echo true || echo false)' + }, + 'conditions': [ + ['pkg_config_grpc == "true"', { + 'link_settings': { + 'libraries': [ + '<!@(pkg-config --libs-only-l --static grpc)' + ] + }, + 'cflags': [ + '<!@(pkg-config --cflags grpc)' + ], + 'libraries': [ + '<!@(pkg-config --libs-only-L --static grpc)' + ], + 'ldflags': [ + '<!@(pkg-config --libs-only-other --static grpc)' + ] + }, { + 'link_settings': { + 'libraries': [ + '-lpthread', + '-lgrpc', + '-lgpr' + ], + }, + 'conditions':[ + ['OS != "mac"', { + 'link_settings': { + 'libraries': [ + '-lrt' + ] + } + }] + ] + } + ] + ] + }], ['OS == "mac"', { 'xcode_settings': { 'MACOSX_DEPLOYMENT_TARGET': '10.9', @@ -32,13 +66,6 @@ '-stdlib=libc++' ] } - }], - ['OS != "mac"', { - 'link_settings': { - 'libraries': [ - '-lrt' - ] - } }] ], "target_name": "grpc", diff --git a/src/node/package.json b/src/node/package.json index 7d4a493af4..6b545705e1 100644 --- a/src/node/package.json +++ b/src/node/package.json @@ -27,7 +27,7 @@ "bindings": "^1.2.0", "lodash": "^3.9.3", "nan": "^1.5.0", - "protobufjs": "dcodeIO/ProtoBuf.js" + "protobufjs": "^4.0.0" }, "devDependencies": { "async": "^0.9.0", diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.m b/src/objective-c/GRPCClient/private/GRPCChannel.m index 36f4c0aa5e..af4326332f 100644 --- a/src/objective-c/GRPCClient/private/GRPCChannel.m +++ b/src/objective-c/GRPCClient/private/GRPCChannel.m @@ -60,7 +60,7 @@ } - (instancetype)initWithHost:(NSString *)host { - if (![host containsString:@"://"]) { + if (![host rangeOfString:@"://"].length) { // No scheme provided; assume https. host = [@"https://" stringByAppendingString:host]; } diff --git a/src/objective-c/tests/Podfile b/src/objective-c/tests/Podfile index 026868db12..2aa837f764 100644 --- a/src/objective-c/tests/Podfile +++ b/src/objective-c/tests/Podfile @@ -1,6 +1,7 @@ source 'https://github.com/CocoaPods/Specs.git' platform :ios, '8.0' +pod 'Protobuf', :path => "../../../third_party/protobuf" pod 'gRPC', :path => "../../.." pod 'RemoteTest', :path => "../generated_libraries/RemoteTestClient" pod 'RouteGuide', :path => "../generated_libraries/RouteGuideClient" diff --git a/src/ruby/ext/grpc/extconf.rb b/src/ruby/ext/grpc/extconf.rb index 6dd0234489..7972272e2d 100644 --- a/src/ruby/ext/grpc/extconf.rb +++ b/src/ruby/ext/grpc/extconf.rb @@ -54,44 +54,55 @@ LIB_DIRS = [ LIBDIR ] -# Check to see if GRPC_ROOT is defined or available -grpc_root = ENV['GRPC_ROOT'] -if grpc_root.nil? - r = File.expand_path(File.join(File.dirname(__FILE__), '../../../..')) - grpc_root = r if File.exist?(File.join(r, 'include/grpc/grpc.h')) -end - -# When grpc_root is available attempt to build the grpc core. -unless grpc_root.nil? - grpc_config = ENV['GRPC_CONFIG'] || 'opt' - if ENV.key?('GRPC_LIB_DIR') - grpc_lib_dir = File.join(grpc_root, ENV['GRPC_LIB_DIR']) - else - grpc_lib_dir = File.join(File.join(grpc_root, 'libs'), grpc_config) - end - unless File.exist?(File.join(grpc_lib_dir, 'libgrpc.a')) - system("make -C #{grpc_root} static_c CONFIG=#{grpc_config}") +def check_grpc_root + grpc_root = ENV['GRPC_ROOT'] + if grpc_root.nil? + r = File.expand_path(File.join(File.dirname(__FILE__), '../../../..')) + grpc_root = r if File.exist?(File.join(r, 'include/grpc/grpc.h')) end - HEADER_DIRS.unshift File.join(grpc_root, 'include') - LIB_DIRS.unshift grpc_lib_dir + grpc_root end -def crash(msg) - print(" extconf failure: #{msg}\n") - exit 1 -end +grpc_pkg_config = system('pkg-config --exists grpc') + +if grpc_pkg_config + $CFLAGS << ' ' + `pkg-config --static --cflags grpc`.strip + ' ' + $LDFLAGS << ' ' + `pkg-config --static --libs grpc`.strip + ' ' +else + dir_config('grpc', HEADER_DIRS, LIB_DIRS) + fail 'libdl not found' unless have_library('dl', 'dlopen') + fail 'zlib not found' unless have_library('z', 'inflate') + begin + fail 'Fail' unless have_library('gpr', 'gpr_now') + fail 'Fail' unless have_library('grpc', 'grpc_channel_destroy') + rescue + # Check to see if GRPC_ROOT is defined or available + grpc_root = check_grpc_root -dir_config('grpc', HEADER_DIRS, LIB_DIRS) + # Stop if there is still no grpc_root + exit 1 if grpc_root.nil? -$CFLAGS << ' -Wno-implicit-function-declaration ' -$CFLAGS << ' -Wno-pointer-sign ' -$CFLAGS << ' -Wno-return-type ' + grpc_config = ENV['GRPC_CONFIG'] || 'opt' + if ENV.key?('GRPC_LIB_DIR') + grpc_lib_dir = File.join(grpc_root, ENV['GRPC_LIB_DIR']) + else + grpc_lib_dir = File.join(File.join(grpc_root, 'libs'), grpc_config) + end + unless File.exist?(File.join(grpc_lib_dir, 'libgrpc.a')) + print "Building internal gRPC\n" + system("make -C #{grpc_root} static_c CONFIG=#{grpc_config}") + end + $CFLAGS << ' -I' + File.join(grpc_root, 'include') + $LDFLAGS << ' -L' + grpc_lib_dir + raise 'gpr not found' unless have_library('gpr', 'gpr_now') + raise 'grpc not found' unless have_library('grpc', 'grpc_channel_destroy') + end +end + +$CFLAGS << ' -std=c99 ' $CFLAGS << ' -Wall ' +$CFLAGS << ' -Wextra ' $CFLAGS << ' -pedantic ' +$CFLAGS << ' -Werror ' -$LDFLAGS << ' -lgrpc -lgpr -lz -ldl' - -crash('need grpc lib') unless have_library('grpc', 'grpc_channel_destroy') -have_library('grpc', 'grpc_channel_destroy') -crash('need gpr lib') unless have_library('gpr', 'gpr_now') create_makefile('grpc/grpc') diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index 9c0d24bf8f..bed3b26850 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -68,8 +68,12 @@ static void grpc_rb_server_free(void *p) { /* Deletes the wrapped object if the mark object is Qnil, which indicates that no other object is the actual owner. */ + /* grpc_server_shutdown does not exist. Change this to something that does + or delete it */ if (svr->wrapped != NULL && svr->mark == Qnil) { - grpc_server_shutdown(svr->wrapped); + // grpc_server_shutdown(svr->wrapped); + // Aborting to indicate a bug + abort(); grpc_server_destroy(svr->wrapped); } |