aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/security
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/security')
-rw-r--r--src/core/security/client_auth_filter.c54
-rw-r--r--src/core/security/credentials.c85
-rw-r--r--src/core/security/credentials.h13
-rw-r--r--src/core/security/google_default_credentials.c17
-rw-r--r--src/core/security/jwt_verifier.c20
-rw-r--r--src/core/security/jwt_verifier.h4
-rw-r--r--src/core/security/secure_endpoint.c92
-rw-r--r--src/core/security/secure_transport_setup.c111
-rw-r--r--src/core/security/secure_transport_setup.h5
-rw-r--r--src/core/security/security_connector.c10
-rw-r--r--src/core/security/security_connector.h8
-rw-r--r--src/core/security/server_auth_filter.c27
-rw-r--r--src/core/security/server_secure_chttp2.c36
13 files changed, 257 insertions, 225 deletions
diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c
index f3ecfd0e60..0a2de4f7bb 100644
--- a/src/core/security/client_auth_filter.c
+++ b/src/core/security/client_auth_filter.c
@@ -76,17 +76,18 @@ typedef struct {
} channel_data;
static void bubble_up_error(grpc_call_element *elem, grpc_status_code status,
- const char *error_msg) {
+ const char *error_msg, grpc_call_list *call_list) {
call_data *calld = elem->call_data;
gpr_log(GPR_ERROR, "Client side authentication failure: %s", error_msg);
grpc_transport_stream_op_add_cancellation(&calld->op, status);
- grpc_call_next_op(elem, &calld->op);
+ grpc_call_next_op(elem, &calld->op, call_list);
}
static void on_credentials_metadata(void *user_data,
grpc_credentials_md *md_elems,
size_t num_md,
- grpc_credentials_status status) {
+ grpc_credentials_status status,
+ grpc_call_list *call_list) {
grpc_call_element *elem = (grpc_call_element *)user_data;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
@@ -95,7 +96,7 @@ static void on_credentials_metadata(void *user_data,
size_t i;
if (status != GRPC_CREDENTIALS_OK) {
bubble_up_error(elem, GRPC_STATUS_UNAUTHENTICATED,
- "Credentials failed to get metadata.");
+ "Credentials failed to get metadata.", call_list);
return;
}
GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT);
@@ -108,7 +109,7 @@ static void on_credentials_metadata(void *user_data,
grpc_mdelem_from_slices(chand->md_ctx, gpr_slice_ref(md_elems[i].key),
gpr_slice_ref(md_elems[i].value)));
}
- grpc_call_next_op(elem, op);
+ grpc_call_next_op(elem, op, call_list);
}
static char *build_service_url(const char *url_scheme, call_data *calld) {
@@ -132,7 +133,8 @@ static char *build_service_url(const char *url_scheme, call_data *calld) {
}
static void send_security_metadata(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op *op,
+ grpc_call_list *call_list) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_client_security_context *ctx =
@@ -148,7 +150,7 @@ static void send_security_metadata(grpc_call_element *elem,
if (!channel_creds_has_md && !call_creds_has_md) {
/* Skip sending metadata altogether. */
- grpc_call_next_op(elem, op);
+ grpc_call_next_op(elem, op, call_list);
return;
}
@@ -157,7 +159,8 @@ static void send_security_metadata(grpc_call_element *elem,
grpc_composite_credentials_create(channel_creds, ctx->creds, NULL);
if (calld->creds == NULL) {
bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT,
- "Incompatible credentials set on channel and call.");
+ "Incompatible credentials set on channel and call.",
+ call_list);
return;
}
} else {
@@ -169,22 +172,24 @@ static void send_security_metadata(grpc_call_element *elem,
build_service_url(chand->security_connector->base.url_scheme, calld);
calld->op = *op; /* Copy op (originates from the caller's stack). */
GPR_ASSERT(calld->pollset);
- grpc_credentials_get_request_metadata(
- calld->creds, calld->pollset, service_url, on_credentials_metadata, elem);
+ grpc_credentials_get_request_metadata(calld->creds, calld->pollset,
+ service_url, on_credentials_metadata,
+ elem, call_list);
gpr_free(service_url);
}
-static void on_host_checked(void *user_data, grpc_security_status status) {
+static void on_host_checked(void *user_data, grpc_security_status status,
+ grpc_call_list *call_list) {
grpc_call_element *elem = (grpc_call_element *)user_data;
call_data *calld = elem->call_data;
if (status == GRPC_SECURITY_OK) {
- send_security_metadata(elem, &calld->op);
+ send_security_metadata(elem, &calld->op, call_list);
} else {
char *error_msg;
gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.",
grpc_mdstr_as_c_string(calld->host));
- bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT, error_msg);
+ bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT, error_msg, call_list);
gpr_free(error_msg);
}
}
@@ -195,7 +200,8 @@ static void on_host_checked(void *user_data, grpc_security_status status) {
op contains type and call direction information, in addition to the data
that is being sent or received. */
static void auth_start_transport_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op *op,
+ grpc_call_list *call_list) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
@@ -247,26 +253,28 @@ static void auth_start_transport_op(grpc_call_element *elem,
const char *call_host = grpc_mdstr_as_c_string(calld->host);
calld->op = *op; /* Copy op (originates from the caller's stack). */
status = grpc_channel_security_connector_check_call_host(
- chand->security_connector, call_host, on_host_checked, elem);
+ chand->security_connector, call_host, on_host_checked, elem,
+ call_list);
if (status != GRPC_SECURITY_OK) {
if (status == GRPC_SECURITY_ERROR) {
char *error_msg;
gpr_asprintf(&error_msg,
"Invalid host %s set in :authority metadata.",
call_host);
- bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT, error_msg);
+ bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT, error_msg,
+ call_list);
gpr_free(error_msg);
}
return; /* early exit */
}
}
- send_security_metadata(elem, op);
+ send_security_metadata(elem, op, call_list);
return; /* early exit */
}
}
- /* pass control up or down the stack */
- grpc_call_next_op(elem, op);
+ /* pass control down the stack */
+ grpc_call_next_op(elem, op, call_list);
}
/* Constructor for call_data */
@@ -285,7 +293,8 @@ static void init_call_elem(grpc_call_element *elem,
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_call_element *elem) {
+static void destroy_call_elem(grpc_call_element *elem,
+ grpc_call_list *call_list) {
call_data *calld = elem->call_data;
grpc_credentials_unref(calld->creds);
if (calld->host != NULL) {
@@ -300,7 +309,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
- int is_last) {
+ int is_last, grpc_call_list *call_list) {
grpc_security_connector *sc = grpc_find_security_connector_in_args(args);
/* grab pointers to our data from the channel element */
channel_data *chand = elem->channel_data;
@@ -326,7 +335,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
}
/* Destructor for channel data */
-static void destroy_channel_elem(grpc_channel_element *elem) {
+static void destroy_channel_elem(grpc_channel_element *elem,
+ grpc_call_list *call_list) {
/* grab pointers to our data from the channel element */
channel_data *chand = elem->channel_data;
grpc_channel_security_connector *ctx = chand->security_connector;
diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c
index f151fba959..086c428112 100644
--- a/src/core/security/credentials.c
+++ b/src/core/security/credentials.c
@@ -108,16 +108,17 @@ void grpc_credentials_get_request_metadata(grpc_credentials *creds,
grpc_pollset *pollset,
const char *service_url,
grpc_credentials_metadata_cb cb,
- void *user_data) {
+ void *user_data,
+ grpc_call_list *call_list) {
if (creds == NULL || !grpc_credentials_has_request_metadata(creds) ||
creds->vtable->get_request_metadata == NULL) {
if (cb != NULL) {
- cb(user_data, NULL, 0, GRPC_CREDENTIALS_OK);
+ cb(user_data, NULL, 0, GRPC_CREDENTIALS_OK, call_list);
}
return;
}
creds->vtable->get_request_metadata(creds, pollset, service_url, cb,
- user_data);
+ user_data, call_list);
}
grpc_security_status grpc_credentials_create_security_connector(
@@ -375,7 +376,8 @@ static void jwt_get_request_metadata(grpc_credentials *creds,
grpc_pollset *pollset,
const char *service_url,
grpc_credentials_metadata_cb cb,
- void *user_data) {
+ void *user_data,
+ grpc_call_list *call_list) {
grpc_service_account_jwt_access_credentials *c =
(grpc_service_account_jwt_access_credentials *)creds;
gpr_timespec refresh_threshold = gpr_time_from_seconds(
@@ -419,10 +421,11 @@ static void jwt_get_request_metadata(grpc_credentials *creds,
}
if (jwt_md != NULL) {
- cb(user_data, jwt_md->entries, jwt_md->num_entries, GRPC_CREDENTIALS_OK);
+ cb(user_data, jwt_md->entries, jwt_md->num_entries, GRPC_CREDENTIALS_OK,
+ call_list);
grpc_credentials_md_store_unref(jwt_md);
} else {
- cb(user_data, NULL, 0, GRPC_CREDENTIALS_ERROR);
+ cb(user_data, NULL, 0, GRPC_CREDENTIALS_ERROR, call_list);
}
}
@@ -568,7 +571,8 @@ end:
}
static void on_oauth2_token_fetcher_http_response(
- void *user_data, const grpc_httpcli_response *response) {
+ void *user_data, const grpc_httpcli_response *response,
+ grpc_call_list *call_list) {
grpc_credentials_metadata_request *r =
(grpc_credentials_metadata_request *)user_data;
grpc_oauth2_token_fetcher_credentials *c =
@@ -583,10 +587,10 @@ static void on_oauth2_token_fetcher_http_response(
c->token_expiration =
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), token_lifetime);
r->cb(r->user_data, c->access_token_md->entries,
- c->access_token_md->num_entries, status);
+ c->access_token_md->num_entries, status, call_list);
} else {
c->token_expiration = gpr_inf_past(GPR_CLOCK_REALTIME);
- r->cb(r->user_data, NULL, 0, status);
+ r->cb(r->user_data, NULL, 0, status, call_list);
}
gpr_mu_unlock(&c->mu);
grpc_credentials_metadata_request_destroy(r);
@@ -594,7 +598,8 @@ static void on_oauth2_token_fetcher_http_response(
static void oauth2_token_fetcher_get_request_metadata(
grpc_credentials *creds, grpc_pollset *pollset, const char *service_url,
- grpc_credentials_metadata_cb cb, void *user_data) {
+ grpc_credentials_metadata_cb cb, void *user_data,
+ grpc_call_list *call_list) {
grpc_oauth2_token_fetcher_credentials *c =
(grpc_oauth2_token_fetcher_credentials *)creds;
gpr_timespec refresh_threshold = gpr_time_from_seconds(
@@ -613,13 +618,14 @@ static void oauth2_token_fetcher_get_request_metadata(
}
if (cached_access_token_md != NULL) {
cb(user_data, cached_access_token_md->entries,
- cached_access_token_md->num_entries, GRPC_CREDENTIALS_OK);
+ cached_access_token_md->num_entries, GRPC_CREDENTIALS_OK, call_list);
grpc_credentials_md_store_unref(cached_access_token_md);
} else {
c->fetch_func(
grpc_credentials_metadata_request_create(creds, cb, user_data),
&c->httpcli_context, pollset, on_oauth2_token_fetcher_http_response,
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), refresh_threshold));
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), refresh_threshold),
+ call_list);
}
}
@@ -644,7 +650,8 @@ static grpc_credentials_vtable compute_engine_vtable = {
static void compute_engine_fetch_oauth2(
grpc_credentials_metadata_request *metadata_req,
grpc_httpcli_context *httpcli_context, grpc_pollset *pollset,
- grpc_httpcli_response_cb response_cb, gpr_timespec deadline) {
+ grpc_httpcli_response_cb response_cb, gpr_timespec deadline,
+ grpc_call_list *call_list) {
grpc_httpcli_header header = {"Metadata-Flavor", "Google"};
grpc_httpcli_request request;
memset(&request, 0, sizeof(grpc_httpcli_request));
@@ -653,7 +660,7 @@ static void compute_engine_fetch_oauth2(
request.hdr_count = 1;
request.hdrs = &header;
grpc_httpcli_get(httpcli_context, pollset, &request, deadline, response_cb,
- metadata_req);
+ metadata_req, call_list);
}
grpc_credentials *grpc_google_compute_engine_credentials_create(
@@ -683,7 +690,8 @@ static grpc_credentials_vtable refresh_token_vtable = {
static void refresh_token_fetch_oauth2(
grpc_credentials_metadata_request *metadata_req,
grpc_httpcli_context *httpcli_context, grpc_pollset *pollset,
- grpc_httpcli_response_cb response_cb, gpr_timespec deadline) {
+ grpc_httpcli_response_cb response_cb, gpr_timespec deadline,
+ grpc_call_list *call_list) {
grpc_google_refresh_token_credentials *c =
(grpc_google_refresh_token_credentials *)metadata_req->creds;
grpc_httpcli_header header = {"Content-Type",
@@ -700,7 +708,7 @@ static void refresh_token_fetch_oauth2(
request.hdrs = &header;
request.handshaker = &grpc_httpcli_ssl;
grpc_httpcli_post(httpcli_context, pollset, &request, body, strlen(body),
- deadline, response_cb, metadata_req);
+ deadline, response_cb, metadata_req, call_list);
gpr_free(body);
}
@@ -743,20 +751,23 @@ static int md_only_test_has_request_metadata_only(
return 1;
}
-void on_simulated_token_fetch_done(void *user_data) {
+static void on_simulated_token_fetch_done(void *user_data) {
grpc_credentials_metadata_request *r =
(grpc_credentials_metadata_request *)user_data;
grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)r->creds;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
r->cb(r->user_data, c->md_store->entries, c->md_store->num_entries,
- GRPC_CREDENTIALS_OK);
+ GRPC_CREDENTIALS_OK, &call_list);
grpc_credentials_metadata_request_destroy(r);
+ grpc_call_list_run(&call_list);
}
static void md_only_test_get_request_metadata(grpc_credentials *creds,
grpc_pollset *pollset,
const char *service_url,
grpc_credentials_metadata_cb cb,
- void *user_data) {
+ void *user_data,
+ grpc_call_list *call_list) {
grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)creds;
if (c->is_async) {
@@ -765,7 +776,7 @@ static void md_only_test_get_request_metadata(grpc_credentials *creds,
grpc_credentials_metadata_request_create(creds, cb, user_data);
gpr_thd_new(&thd_id, on_simulated_token_fetch_done, cb_arg, NULL);
} else {
- cb(user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK);
+ cb(user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK, call_list);
}
}
@@ -809,9 +820,10 @@ static void access_token_get_request_metadata(grpc_credentials *creds,
grpc_pollset *pollset,
const char *service_url,
grpc_credentials_metadata_cb cb,
- void *user_data) {
+ void *user_data,
+ grpc_call_list *call_list) {
grpc_access_token_credentials *c = (grpc_access_token_credentials *)creds;
- cb(user_data, c->access_token_md->entries, 1, GRPC_CREDENTIALS_OK);
+ cb(user_data, c->access_token_md->entries, 1, GRPC_CREDENTIALS_OK, call_list);
}
static grpc_credentials_vtable access_token_vtable = {
@@ -958,11 +970,12 @@ static void composite_md_context_destroy(
static void composite_metadata_cb(void *user_data,
grpc_credentials_md *md_elems, size_t num_md,
- grpc_credentials_status status) {
+ grpc_credentials_status status,
+ grpc_call_list *call_list) {
grpc_composite_credentials_metadata_context *ctx =
(grpc_composite_credentials_metadata_context *)user_data;
if (status != GRPC_CREDENTIALS_OK) {
- ctx->cb(ctx->user_data, NULL, 0, status);
+ ctx->cb(ctx->user_data, NULL, 0, status, call_list);
return;
}
@@ -980,28 +993,30 @@ static void composite_metadata_cb(void *user_data,
grpc_credentials *inner_creds =
ctx->composite_creds->inner.creds_array[ctx->creds_index++];
if (grpc_credentials_has_request_metadata(inner_creds)) {
- grpc_credentials_get_request_metadata(inner_creds, ctx->pollset,
- ctx->service_url,
- composite_metadata_cb, ctx);
+ grpc_credentials_get_request_metadata(
+ inner_creds, ctx->pollset, ctx->service_url, composite_metadata_cb,
+ ctx, call_list);
return;
}
}
/* We're done!. */
ctx->cb(ctx->user_data, ctx->md_elems->entries, ctx->md_elems->num_entries,
- GRPC_CREDENTIALS_OK);
+ GRPC_CREDENTIALS_OK, call_list);
composite_md_context_destroy(ctx);
+ grpc_call_list_run(call_list);
}
static void composite_get_request_metadata(grpc_credentials *creds,
grpc_pollset *pollset,
const char *service_url,
grpc_credentials_metadata_cb cb,
- void *user_data) {
+ void *user_data,
+ grpc_call_list *call_list) {
grpc_composite_credentials *c = (grpc_composite_credentials *)creds;
grpc_composite_credentials_metadata_context *ctx;
if (!grpc_credentials_has_request_metadata(creds)) {
- cb(user_data, NULL, 0, GRPC_CREDENTIALS_OK);
+ cb(user_data, NULL, 0, GRPC_CREDENTIALS_OK, call_list);
return;
}
ctx = gpr_malloc(sizeof(grpc_composite_credentials_metadata_context));
@@ -1016,7 +1031,8 @@ static void composite_get_request_metadata(grpc_credentials *creds,
grpc_credentials *inner_creds = c->inner.creds_array[ctx->creds_index++];
if (grpc_credentials_has_request_metadata(inner_creds)) {
grpc_credentials_get_request_metadata(inner_creds, pollset, service_url,
- composite_metadata_cb, ctx);
+ composite_metadata_cb, ctx,
+ call_list);
return;
}
}
@@ -1152,10 +1168,11 @@ static void iam_get_request_metadata(grpc_credentials *creds,
grpc_pollset *pollset,
const char *service_url,
grpc_credentials_metadata_cb cb,
- void *user_data) {
+ void *user_data,
+ grpc_call_list *call_list) {
grpc_google_iam_credentials *c = (grpc_google_iam_credentials *)creds;
- cb(user_data, c->iam_md->entries, c->iam_md->num_entries,
- GRPC_CREDENTIALS_OK);
+ cb(user_data, c->iam_md->entries, c->iam_md->num_entries, GRPC_CREDENTIALS_OK,
+ call_list);
}
static grpc_credentials_vtable iam_vtable = {
diff --git a/src/core/security/credentials.h b/src/core/security/credentials.h
index 8e4fed7615..a93d0ec653 100644
--- a/src/core/security/credentials.h
+++ b/src/core/security/credentials.h
@@ -126,7 +126,8 @@ char *grpc_get_well_known_google_credentials_file_path(void);
typedef void (*grpc_credentials_metadata_cb)(void *user_data,
grpc_credentials_md *md_elems,
size_t num_md,
- grpc_credentials_status status);
+ grpc_credentials_status status,
+ grpc_call_list *call_list);
typedef struct {
void (*destruct)(grpc_credentials *c);
@@ -134,8 +135,8 @@ typedef struct {
int (*has_request_metadata_only)(const grpc_credentials *c);
void (*get_request_metadata)(grpc_credentials *c, grpc_pollset *pollset,
const char *service_url,
- grpc_credentials_metadata_cb cb,
- void *user_data);
+ grpc_credentials_metadata_cb cb, void *user_data,
+ grpc_call_list *call_list);
grpc_security_status (*create_security_connector)(
grpc_credentials *c, const char *target, const grpc_channel_args *args,
grpc_credentials *request_metadata_creds,
@@ -156,7 +157,8 @@ void grpc_credentials_get_request_metadata(grpc_credentials *creds,
grpc_pollset *pollset,
const char *service_url,
grpc_credentials_metadata_cb cb,
- void *user_data);
+ void *user_data,
+ grpc_call_list *call_list);
/* Creates a security connector for the channel. May also create new channel
args for the channel to be used in place of the passed in const args if
@@ -274,7 +276,8 @@ typedef void (*grpc_fetch_oauth2_func)(grpc_credentials_metadata_request *req,
grpc_httpcli_context *http_context,
grpc_pollset *pollset,
grpc_httpcli_response_cb response_cb,
- gpr_timespec deadline);
+ gpr_timespec deadline,
+ grpc_call_list *call_list);
typedef struct {
grpc_credentials base;
diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c
index 874dd59e84..5da29f5ad5 100644
--- a/src/core/security/google_default_credentials.c
+++ b/src/core/security/google_default_credentials.c
@@ -63,7 +63,8 @@ typedef struct {
} compute_engine_detector;
static void on_compute_engine_detection_http_response(
- void *user_data, const grpc_httpcli_response *response) {
+ void *user_data, const grpc_httpcli_response *response,
+ grpc_call_list *call_list) {
compute_engine_detector *detector = (compute_engine_detector *)user_data;
if (response != NULL && response->status == 200 && response->hdr_count > 0) {
/* Internet providers can return a generic response to all requests, so
@@ -84,12 +85,16 @@ static void on_compute_engine_detection_http_response(
gpr_mu_unlock(GRPC_POLLSET_MU(&detector->pollset));
}
-static void destroy_pollset(void *p) { grpc_pollset_destroy(p); }
+static void destroy_pollset(void *p, int s, grpc_call_list *call_list) {
+ grpc_pollset_destroy(p);
+}
static int is_stack_running_on_compute_engine(void) {
compute_engine_detector detector;
grpc_httpcli_request request;
grpc_httpcli_context context;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
+ grpc_closure destroy_closure;
/* The http call is local. If it takes more than one sec, it is for sure not
on compute engine. */
@@ -108,7 +113,9 @@ static int is_stack_running_on_compute_engine(void) {
grpc_httpcli_get(
&context, &detector.pollset, &request,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay),
- on_compute_engine_detection_http_response, &detector);
+ on_compute_engine_detection_http_response, &detector, &call_list);
+
+ grpc_call_list_run(&call_list);
/* Block until we get the response. This is not ideal but this should only be
called once for the lifetime of the process by the default credentials. */
@@ -121,7 +128,9 @@ static int is_stack_running_on_compute_engine(void) {
gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset));
grpc_httpcli_context_destroy(&context);
- grpc_pollset_shutdown(&detector.pollset, destroy_pollset, &detector.pollset);
+ grpc_closure_init(&destroy_closure, destroy_pollset, &detector.pollset);
+ grpc_pollset_shutdown(&detector.pollset, &destroy_closure, &call_list);
+ grpc_call_list_run(&call_list);
return detector.success;
}
diff --git a/src/core/security/jwt_verifier.c b/src/core/security/jwt_verifier.c
index 790f2178db..a6736c922c 100644
--- a/src/core/security/jwt_verifier.c
+++ b/src/core/security/jwt_verifier.c
@@ -570,7 +570,8 @@ end:
}
static void on_keys_retrieved(void *user_data,
- const grpc_httpcli_response *response) {
+ const grpc_httpcli_response *response,
+ grpc_call_list *call_list) {
grpc_json *json = json_from_http(response);
verifier_cb_ctx *ctx = (verifier_cb_ctx *)user_data;
EVP_PKEY *verification_key = NULL;
@@ -611,7 +612,8 @@ end:
}
static void on_openid_config_retrieved(void *user_data,
- const grpc_httpcli_response *response) {
+ const grpc_httpcli_response *response,
+ grpc_call_list *call_list) {
const grpc_json *cur;
grpc_json *json = json_from_http(response);
verifier_cb_ctx *ctx = (verifier_cb_ctx *)user_data;
@@ -643,7 +645,7 @@ static void on_openid_config_retrieved(void *user_data,
grpc_httpcli_get(
&ctx->verifier->http_ctx, ctx->pollset, &req,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay),
- on_keys_retrieved, ctx);
+ on_keys_retrieved, ctx, call_list);
grpc_json_destroy(json);
gpr_free(req.host);
return;
@@ -682,7 +684,8 @@ static void verifier_put_mapping(grpc_jwt_verifier *v, const char *email_domain,
}
/* Takes ownership of ctx. */
-static void retrieve_key_and_verify(verifier_cb_ctx *ctx) {
+static void retrieve_key_and_verify(verifier_cb_ctx *ctx,
+ grpc_call_list *call_list) {
const char *at_sign;
grpc_httpcli_response_cb http_cb;
char *path_prefix = NULL;
@@ -745,7 +748,7 @@ static void retrieve_key_and_verify(verifier_cb_ctx *ctx) {
grpc_httpcli_get(
&ctx->verifier->http_ctx, ctx->pollset, &req,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay),
- http_cb, ctx);
+ http_cb, ctx, call_list);
gpr_free(req.host);
gpr_free(req.path);
return;
@@ -758,8 +761,8 @@ error:
void grpc_jwt_verifier_verify(grpc_jwt_verifier *verifier,
grpc_pollset *pollset, const char *jwt,
const char *audience,
- grpc_jwt_verification_done_cb cb,
- void *user_data) {
+ grpc_jwt_verification_done_cb cb, void *user_data,
+ grpc_call_list *call_list) {
const char *dot = NULL;
grpc_json *json;
jose_header *header = NULL;
@@ -792,7 +795,8 @@ void grpc_jwt_verifier_verify(grpc_jwt_verifier *verifier,
if (GPR_SLICE_IS_EMPTY(signature)) goto error;
retrieve_key_and_verify(
verifier_cb_ctx_create(verifier, pollset, header, claims, audience,
- signature, jwt, signed_jwt_len, user_data, cb));
+ signature, jwt, signed_jwt_len, user_data, cb),
+ call_list);
return;
error:
diff --git a/src/core/security/jwt_verifier.h b/src/core/security/jwt_verifier.h
index 7a32debfcb..394935302b 100644
--- a/src/core/security/jwt_verifier.h
+++ b/src/core/security/jwt_verifier.h
@@ -123,8 +123,8 @@ typedef void (*grpc_jwt_verification_done_cb)(void *user_data,
void grpc_jwt_verifier_verify(grpc_jwt_verifier *verifier,
grpc_pollset *pollset, const char *jwt,
const char *audience,
- grpc_jwt_verification_done_cb cb,
- void *user_data);
+ grpc_jwt_verification_done_cb cb, void *user_data,
+ grpc_call_list *call_list);
/* --- TESTING ONLY exposed functions. --- */
diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c
index dcafc0fc0f..a501544341 100644
--- a/src/core/security/secure_endpoint.c
+++ b/src/core/security/secure_endpoint.c
@@ -67,9 +67,9 @@ typedef struct {
int grpc_trace_secure_endpoint = 0;
-static void destroy(secure_endpoint *secure_ep) {
+static void destroy(secure_endpoint *secure_ep, grpc_call_list *call_list) {
secure_endpoint *ep = secure_ep;
- grpc_endpoint_destroy(ep->wrapped_ep);
+ grpc_endpoint_destroy(ep->wrapped_ep, call_list);
tsi_frame_protector_destroy(ep->protector);
gpr_slice_buffer_destroy(&ep->leftover_bytes);
gpr_slice_unref(ep->read_staging_buffer);
@@ -102,11 +102,12 @@ static void secure_endpoint_ref(secure_endpoint *ep, const char *reason,
gpr_ref(&ep->ref);
}
#else
-#define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep))
+#define SECURE_ENDPOINT_UNREF(ep, reason, cl) secure_endpoint_unref((ep), (cl))
#define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep))
-static void secure_endpoint_unref(secure_endpoint *ep) {
+static void secure_endpoint_unref(secure_endpoint *ep,
+ grpc_call_list *call_list) {
if (gpr_unref(&ep->ref)) {
- destroy(ep);
+ destroy(ep, call_list);
}
}
@@ -121,7 +122,8 @@ static void flush_read_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
*end = GPR_SLICE_END_PTR(ep->read_staging_buffer);
}
-static void call_read_cb(secure_endpoint *ep, int success) {
+static void call_read_cb(secure_endpoint *ep, int success,
+ grpc_call_list *call_list) {
if (grpc_trace_secure_endpoint) {
size_t i;
for (i = 0; i < ep->read_buffer->count; i++) {
@@ -132,11 +134,11 @@ static void call_read_cb(secure_endpoint *ep, int success) {
}
}
ep->read_buffer = NULL;
- ep->read_cb->cb(ep->read_cb->cb_arg, success);
- SECURE_ENDPOINT_UNREF(ep, "read");
+ grpc_call_list_add(call_list, ep->read_cb, success);
+ SECURE_ENDPOINT_UNREF(ep, "read", call_list);
}
-static int on_read(void *user_data, int success) {
+static void on_read(void *user_data, int success, grpc_call_list *call_list) {
unsigned i;
gpr_uint8 keep_looping = 0;
tsi_result result = TSI_OK;
@@ -146,7 +148,8 @@ static int on_read(void *user_data, int success) {
if (!success) {
gpr_slice_buffer_reset_and_unref(ep->read_buffer);
- return 0;
+ call_read_cb(ep, 0, call_list);
+ return;
}
/* TODO(yangg) check error, maybe bail out early */
@@ -202,21 +205,16 @@ static int on_read(void *user_data, int success) {
if (result != TSI_OK) {
gpr_slice_buffer_reset_and_unref(ep->read_buffer);
- return 0;
+ call_read_cb(ep, 0, call_list);
+ return;
}
- return 1;
-}
-
-static void on_read_cb(void *user_data, int success) {
- call_read_cb(user_data, on_read(user_data, success));
+ call_read_cb(ep, 1, call_list);
}
-static grpc_endpoint_op_status endpoint_read(grpc_endpoint *secure_ep,
- gpr_slice_buffer *slices,
- grpc_closure *cb) {
+static void endpoint_read(grpc_endpoint *secure_ep, gpr_slice_buffer *slices,
+ grpc_closure *cb, grpc_call_list *call_list) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
- int immediate_read_success = -1;
ep->read_cb = cb;
ep->read_buffer = slices;
gpr_slice_buffer_reset_and_unref(ep->read_buffer);
@@ -224,27 +222,13 @@ static grpc_endpoint_op_status endpoint_read(grpc_endpoint *secure_ep,
if (ep->leftover_bytes.count) {
gpr_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer);
GPR_ASSERT(ep->leftover_bytes.count == 0);
- return on_read(ep, 1) ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
+ on_read(ep, 1, call_list);
+ return;
}
SECURE_ENDPOINT_REF(ep, "read");
-
- switch (
- grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read)) {
- case GRPC_ENDPOINT_DONE:
- immediate_read_success = on_read(ep, 1);
- break;
- case GRPC_ENDPOINT_PENDING:
- return GRPC_ENDPOINT_PENDING;
- case GRPC_ENDPOINT_ERROR:
- immediate_read_success = on_read(ep, 0);
- break;
- }
-
- GPR_ASSERT(immediate_read_success != -1);
- SECURE_ENDPOINT_UNREF(ep, "read");
-
- return immediate_read_success ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
+ grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read,
+ call_list);
}
static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
@@ -255,9 +239,8 @@ static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
*end = GPR_SLICE_END_PTR(ep->write_staging_buffer);
}
-static grpc_endpoint_op_status endpoint_write(grpc_endpoint *secure_ep,
- gpr_slice_buffer *slices,
- grpc_closure *cb) {
+static void endpoint_write(grpc_endpoint *secure_ep, gpr_slice_buffer *slices,
+ grpc_closure *cb, grpc_call_list *call_list) {
unsigned i;
tsi_result result = TSI_OK;
secure_endpoint *ep = (secure_endpoint *)secure_ep;
@@ -329,32 +312,37 @@ static grpc_endpoint_op_status endpoint_write(grpc_endpoint *secure_ep,
if (result != TSI_OK) {
/* TODO(yangg) do different things according to the error type? */
gpr_slice_buffer_reset_and_unref(&ep->output_buffer);
- return GRPC_ENDPOINT_ERROR;
+ grpc_call_list_add(call_list, cb, 0);
+ return;
}
- return grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb);
+ grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb, call_list);
}
-static void endpoint_shutdown(grpc_endpoint *secure_ep) {
+static void endpoint_shutdown(grpc_endpoint *secure_ep,
+ grpc_call_list *call_list) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
- grpc_endpoint_shutdown(ep->wrapped_ep);
+ grpc_endpoint_shutdown(ep->wrapped_ep, call_list);
}
-static void endpoint_destroy(grpc_endpoint *secure_ep) {
+static void endpoint_destroy(grpc_endpoint *secure_ep,
+ grpc_call_list *call_list) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
- SECURE_ENDPOINT_UNREF(ep, "destroy");
+ SECURE_ENDPOINT_UNREF(ep, "destroy", call_list);
}
static void endpoint_add_to_pollset(grpc_endpoint *secure_ep,
- grpc_pollset *pollset) {
+ grpc_pollset *pollset,
+ grpc_call_list *call_list) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
- grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset);
+ grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset, call_list);
}
static void endpoint_add_to_pollset_set(grpc_endpoint *secure_ep,
- grpc_pollset_set *pollset_set) {
+ grpc_pollset_set *pollset_set,
+ grpc_call_list *call_list) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
- grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set);
+ grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set, call_list);
}
static char *endpoint_get_peer(grpc_endpoint *secure_ep) {
@@ -386,7 +374,7 @@ grpc_endpoint *grpc_secure_endpoint_create(
gpr_slice_buffer_init(&ep->output_buffer);
gpr_slice_buffer_init(&ep->source_buffer);
ep->read_buffer = NULL;
- grpc_closure_init(&ep->on_read, on_read_cb, ep);
+ grpc_closure_init(&ep->on_read, on_read, ep);
gpr_mu_init(&ep->protector_mu);
gpr_ref_init(&ep->ref, 1);
return &ep->base;
diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c
index 181aa74ba9..1412ab21ad 100644
--- a/src/core/security/secure_transport_setup.c
+++ b/src/core/security/secure_transport_setup.c
@@ -58,23 +58,27 @@ typedef struct {
grpc_closure on_handshake_data_received_from_peer;
} grpc_secure_transport_setup;
-static void on_handshake_data_received_from_peer(void *setup, int success);
+static void on_handshake_data_received_from_peer(void *setup, int success,
+ grpc_call_list *call_list);
-static void on_handshake_data_sent_to_peer(void *setup, int success);
+static void on_handshake_data_sent_to_peer(void *setup, int success,
+ grpc_call_list *call_list);
static void secure_transport_setup_done(grpc_secure_transport_setup *s,
- int is_success) {
+ int is_success,
+ grpc_call_list *call_list) {
if (is_success) {
s->cb(s->user_data, GRPC_SECURITY_OK, s->wrapped_endpoint,
- s->secure_endpoint);
+ s->secure_endpoint, call_list);
} else {
if (s->secure_endpoint != NULL) {
- grpc_endpoint_shutdown(s->secure_endpoint);
- grpc_endpoint_destroy(s->secure_endpoint);
+ grpc_endpoint_shutdown(s->secure_endpoint, call_list);
+ grpc_endpoint_destroy(s->secure_endpoint, call_list);
} else {
- grpc_endpoint_destroy(s->wrapped_endpoint);
+ grpc_endpoint_destroy(s->wrapped_endpoint, call_list);
}
- s->cb(s->user_data, GRPC_SECURITY_ERROR, s->wrapped_endpoint, NULL);
+ s->cb(s->user_data, GRPC_SECURITY_ERROR, s->wrapped_endpoint, NULL,
+ call_list);
}
if (s->handshaker != NULL) tsi_handshaker_destroy(s->handshaker);
if (s->handshake_buffer != NULL) gpr_free(s->handshake_buffer);
@@ -85,13 +89,14 @@ static void secure_transport_setup_done(grpc_secure_transport_setup *s,
gpr_free(s);
}
-static void on_peer_checked(void *user_data, grpc_security_status status) {
+static void on_peer_checked(void *user_data, grpc_security_status status,
+ grpc_call_list *call_list) {
grpc_secure_transport_setup *s = user_data;
tsi_frame_protector *protector;
tsi_result result;
if (status != GRPC_SECURITY_OK) {
gpr_log(GPR_ERROR, "Error checking peer.");
- secure_transport_setup_done(s, 0);
+ secure_transport_setup_done(s, 0, call_list);
return;
}
result =
@@ -99,7 +104,7 @@ static void on_peer_checked(void *user_data, grpc_security_status status) {
if (result != TSI_OK) {
gpr_log(GPR_ERROR, "Frame protector creation failed with error %s.",
tsi_result_to_string(result));
- secure_transport_setup_done(s, 0);
+ secure_transport_setup_done(s, 0, call_list);
return;
}
s->secure_endpoint =
@@ -107,11 +112,12 @@ static void on_peer_checked(void *user_data, grpc_security_status status) {
s->left_overs.slices, s->left_overs.count);
s->left_overs.count = 0;
s->left_overs.length = 0;
- secure_transport_setup_done(s, 1);
+ secure_transport_setup_done(s, 1, call_list);
return;
}
-static void check_peer(grpc_secure_transport_setup *s) {
+static void check_peer(grpc_secure_transport_setup *s,
+ grpc_call_list *call_list) {
grpc_security_status peer_status;
tsi_peer peer;
tsi_result result = tsi_handshaker_extract_peer(s->handshaker, &peer);
@@ -119,21 +125,22 @@ static void check_peer(grpc_secure_transport_setup *s) {
if (result != TSI_OK) {
gpr_log(GPR_ERROR, "Peer extraction failed with error %s",
tsi_result_to_string(result));
- secure_transport_setup_done(s, 0);
+ secure_transport_setup_done(s, 0, call_list);
return;
}
peer_status = grpc_security_connector_check_peer(s->connector, peer,
on_peer_checked, s);
if (peer_status == GRPC_SECURITY_ERROR) {
gpr_log(GPR_ERROR, "Peer check failed.");
- secure_transport_setup_done(s, 0);
+ secure_transport_setup_done(s, 0, call_list);
return;
} else if (peer_status == GRPC_SECURITY_OK) {
- on_peer_checked(s, peer_status);
+ on_peer_checked(s, peer_status, call_list);
}
}
-static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) {
+static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s,
+ grpc_call_list *call_list) {
size_t offset = 0;
tsi_result result = TSI_OK;
gpr_slice to_send;
@@ -153,7 +160,7 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) {
if (result != TSI_OK) {
gpr_log(GPR_ERROR, "Handshake failed with error %s",
tsi_result_to_string(result));
- secure_transport_setup_done(s, 0);
+ secure_transport_setup_done(s, 0, call_list);
return;
}
@@ -163,21 +170,12 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) {
gpr_slice_buffer_add(&s->outgoing, to_send);
/* TODO(klempner,jboeuf): This should probably use the client setup
deadline */
- switch (grpc_endpoint_write(s->wrapped_endpoint, &s->outgoing,
- &s->on_handshake_data_sent_to_peer)) {
- case GRPC_ENDPOINT_ERROR:
- gpr_log(GPR_ERROR, "Could not send handshake data to peer.");
- secure_transport_setup_done(s, 0);
- break;
- case GRPC_ENDPOINT_DONE:
- on_handshake_data_sent_to_peer(s, 1);
- break;
- case GRPC_ENDPOINT_PENDING:
- break;
- }
+ grpc_endpoint_write(s->wrapped_endpoint, &s->outgoing,
+ &s->on_handshake_data_sent_to_peer, call_list);
}
-static void on_handshake_data_received_from_peer(void *setup, int success) {
+static void on_handshake_data_received_from_peer(void *setup, int success,
+ grpc_call_list *call_list) {
grpc_secure_transport_setup *s = setup;
size_t consumed_slice_size = 0;
tsi_result result = TSI_OK;
@@ -187,7 +185,7 @@ static void on_handshake_data_received_from_peer(void *setup, int success) {
if (!success) {
gpr_log(GPR_ERROR, "Read failed.");
- secure_transport_setup_done(s, 0);
+ secure_transport_setup_done(s, 0, call_list);
return;
}
@@ -202,20 +200,11 @@ static void on_handshake_data_received_from_peer(void *setup, int success) {
if (tsi_handshaker_is_in_progress(s->handshaker)) {
/* We may need more data. */
if (result == TSI_INCOMPLETE_DATA) {
- switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming,
- &s->on_handshake_data_received_from_peer)) {
- case GRPC_ENDPOINT_DONE:
- on_handshake_data_received_from_peer(s, 1);
- break;
- case GRPC_ENDPOINT_ERROR:
- on_handshake_data_received_from_peer(s, 0);
- break;
- case GRPC_ENDPOINT_PENDING:
- break;
- }
+ grpc_endpoint_read(s->wrapped_endpoint, &s->incoming,
+ &s->on_handshake_data_received_from_peer, call_list);
return;
} else {
- send_handshake_bytes_to_peer(s);
+ send_handshake_bytes_to_peer(s, call_list);
return;
}
}
@@ -223,7 +212,7 @@ static void on_handshake_data_received_from_peer(void *setup, int success) {
if (result != TSI_OK) {
gpr_log(GPR_ERROR, "Handshake failed with error %s",
tsi_result_to_string(result));
- secure_transport_setup_done(s, 0);
+ secure_transport_setup_done(s, 0, call_list);
return;
}
@@ -233,7 +222,7 @@ static void on_handshake_data_received_from_peer(void *setup, int success) {
num_left_overs =
(has_left_overs_in_current_slice ? 1 : 0) + s->incoming.count - i - 1;
if (num_left_overs == 0) {
- check_peer(s);
+ check_peer(s, call_list);
return;
}
/* Put the leftovers in our buffer (ownership transfered). */
@@ -247,51 +236,41 @@ static void on_handshake_data_received_from_peer(void *setup, int success) {
gpr_slice_buffer_addn(
&s->left_overs, &s->incoming.slices[i + 1],
num_left_overs - (size_t)has_left_overs_in_current_slice);
- check_peer(s);
+ check_peer(s, call_list);
}
/* If setup is NULL, the setup is done. */
-static void on_handshake_data_sent_to_peer(void *setup, int success) {
+static void on_handshake_data_sent_to_peer(void *setup, int success,
+ grpc_call_list *call_list) {
grpc_secure_transport_setup *s = setup;
/* Make sure that write is OK. */
if (!success) {
gpr_log(GPR_ERROR, "Write failed.");
- if (setup != NULL) secure_transport_setup_done(s, 0);
+ if (setup != NULL) secure_transport_setup_done(s, 0, call_list);
return;
}
/* We may be done. */
if (tsi_handshaker_is_in_progress(s->handshaker)) {
- /* TODO(klempner,jboeuf): This should probably use the client setup
- deadline */
- switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming,
- &s->on_handshake_data_received_from_peer)) {
- case GRPC_ENDPOINT_ERROR:
- on_handshake_data_received_from_peer(s, 0);
- break;
- case GRPC_ENDPOINT_PENDING:
- break;
- case GRPC_ENDPOINT_DONE:
- on_handshake_data_received_from_peer(s, 1);
- break;
- }
+ grpc_endpoint_read(s->wrapped_endpoint, &s->incoming,
+ &s->on_handshake_data_received_from_peer, call_list);
} else {
- check_peer(s);
+ check_peer(s, call_list);
}
}
void grpc_setup_secure_transport(grpc_security_connector *connector,
grpc_endpoint *nonsecure_endpoint,
grpc_secure_transport_setup_done_cb cb,
- void *user_data) {
+ void *user_data, grpc_call_list *call_list) {
grpc_security_status result = GRPC_SECURITY_OK;
grpc_secure_transport_setup *s =
gpr_malloc(sizeof(grpc_secure_transport_setup));
memset(s, 0, sizeof(grpc_secure_transport_setup));
result = grpc_security_connector_create_handshaker(connector, &s->handshaker);
if (result != GRPC_SECURITY_OK) {
- secure_transport_setup_done(s, 0);
+ secure_transport_setup_done(s, 0, call_list);
return;
}
s->connector =
@@ -308,5 +287,5 @@ void grpc_setup_secure_transport(grpc_security_connector *connector,
gpr_slice_buffer_init(&s->left_overs);
gpr_slice_buffer_init(&s->outgoing);
gpr_slice_buffer_init(&s->incoming);
- send_handshake_bytes_to_peer(s);
+ send_handshake_bytes_to_peer(s, call_list);
}
diff --git a/src/core/security/secure_transport_setup.h b/src/core/security/secure_transport_setup.h
index d9b802556d..867726ff4c 100644
--- a/src/core/security/secure_transport_setup.h
+++ b/src/core/security/secure_transport_setup.h
@@ -42,12 +42,13 @@
/* Ownership of the secure_endpoint is transfered. */
typedef void (*grpc_secure_transport_setup_done_cb)(
void *user_data, grpc_security_status status,
- grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint);
+ grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint,
+ grpc_call_list *call_list);
/* Calls the callback upon completion. */
void grpc_setup_secure_transport(grpc_security_connector *connector,
grpc_endpoint *nonsecure_endpoint,
grpc_secure_transport_setup_done_cb cb,
- void *user_data);
+ void *user_data, grpc_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_SECURITY_SECURE_TRANSPORT_SETUP_H */
diff --git a/src/core/security/security_connector.c b/src/core/security/security_connector.c
index ba9ac68c5f..dac6a5a5fc 100644
--- a/src/core/security/security_connector.c
+++ b/src/core/security/security_connector.c
@@ -119,9 +119,9 @@ grpc_security_status grpc_security_connector_check_peer(
grpc_security_status grpc_channel_security_connector_check_call_host(
grpc_channel_security_connector *sc, const char *host,
- grpc_security_check_cb cb, void *user_data) {
+ grpc_security_check_cb cb, void *user_data, grpc_call_list *call_list) {
if (sc == NULL || sc->check_call_host == NULL) return GRPC_SECURITY_ERROR;
- return sc->check_call_host(sc, host, cb, user_data);
+ return sc->check_call_host(sc, host, cb, user_data, call_list);
}
#ifdef GRPC_SECURITY_CONNECTOR_REFCOUNT_DEBUG
@@ -275,11 +275,11 @@ end:
static grpc_security_status fake_channel_check_call_host(
grpc_channel_security_connector *sc, const char *host,
- grpc_security_check_cb cb, void *user_data) {
+ grpc_security_check_cb cb, void *user_data, grpc_call_list *call_list) {
grpc_fake_channel_security_connector *c =
(grpc_fake_channel_security_connector *)sc;
if (c->call_host_check_is_async) {
- cb(user_data, GRPC_SECURITY_OK);
+ cb(user_data, GRPC_SECURITY_OK, call_list);
return GRPC_SECURITY_PENDING;
} else {
return GRPC_SECURITY_OK;
@@ -495,7 +495,7 @@ static grpc_security_status ssl_server_check_peer(grpc_security_connector *sc,
static grpc_security_status ssl_channel_check_call_host(
grpc_channel_security_connector *sc, const char *host,
- grpc_security_check_cb cb, void *user_data) {
+ grpc_security_check_cb cb, void *user_data, grpc_call_list *call_list) {
grpc_ssl_channel_security_connector *c =
(grpc_ssl_channel_security_connector *)sc;
diff --git a/src/core/security/security_connector.h b/src/core/security/security_connector.h
index 2c9aa1c5a4..d78d6ad69f 100644
--- a/src/core/security/security_connector.h
+++ b/src/core/security/security_connector.h
@@ -61,7 +61,8 @@ typedef struct grpc_security_connector grpc_security_connector;
#define GRPC_SECURITY_CONNECTOR_ARG "grpc.security_connector"
typedef void (*grpc_security_check_cb)(void *user_data,
- grpc_security_status status);
+ grpc_security_status status,
+ grpc_call_list *call_list);
typedef struct {
void (*destroy)(grpc_security_connector *sc);
@@ -138,7 +139,8 @@ struct grpc_channel_security_connector {
grpc_security_status (*check_call_host)(grpc_channel_security_connector *sc,
const char *host,
grpc_security_check_cb cb,
- void *user_data);
+ void *user_data,
+ grpc_call_list *call_list);
};
/* Checks that the host that will be set for a call is acceptable.
@@ -148,7 +150,7 @@ struct grpc_channel_security_connector {
GRPC_SECURITY_PENDING unless an error is detected early on. */
grpc_security_status grpc_channel_security_connector_check_call_host(
grpc_channel_security_connector *sc, const char *host,
- grpc_security_check_cb cb, void *user_data);
+ grpc_security_check_cb cb, void *user_data, grpc_call_list *call_list);
/* --- Creation security connectors. --- */
diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c
index 8468697472..2376cbbeb3 100644
--- a/src/core/security/server_auth_filter.c
+++ b/src/core/security/server_auth_filter.c
@@ -109,12 +109,14 @@ static grpc_mdelem *remove_consumed_md(void *user_data, grpc_mdelem *md) {
return md;
}
+/* called from application code */
static void on_md_processing_done(
void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
const grpc_metadata *response_md, size_t num_response_md,
grpc_status_code status, const char *error_details) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
/* TODO(jboeuf): Implement support for response_md. */
if (response_md != NULL && num_response_md > 0) {
@@ -129,7 +131,7 @@ static void on_md_processing_done(
grpc_metadata_batch_filter(&calld->md_op->data.metadata, remove_consumed_md,
elem);
grpc_metadata_array_destroy(&calld->md);
- calld->on_done_recv->cb(calld->on_done_recv->cb_arg, 1);
+ calld->on_done_recv->cb(calld->on_done_recv->cb_arg, 1, &call_list);
} else {
gpr_slice message;
grpc_metadata_array_destroy(&calld->md);
@@ -139,11 +141,14 @@ static void on_md_processing_done(
message = gpr_slice_from_copied_string(error_details);
grpc_sopb_reset(calld->recv_ops);
grpc_transport_stream_op_add_close(&calld->transport_op, status, &message);
- grpc_call_next_op(elem, &calld->transport_op);
+ grpc_call_next_op(elem, &calld->transport_op, &call_list);
}
+
+ grpc_call_list_run(&call_list);
}
-static void auth_on_recv(void *user_data, int success) {
+static void auth_on_recv(void *user_data, int success,
+ grpc_call_list *call_list) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
@@ -164,7 +169,7 @@ static void auth_on_recv(void *user_data, int success) {
return;
}
}
- calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
+ calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success, call_list);
}
static void set_recv_ops_md_callbacks(grpc_call_element *elem,
@@ -186,9 +191,10 @@ static void set_recv_ops_md_callbacks(grpc_call_element *elem,
op contains type and call direction information, in addition to the data
that is being sent or received. */
static void auth_start_transport_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op *op,
+ grpc_call_list *call_list) {
set_recv_ops_md_callbacks(elem, op);
- grpc_call_next_op(elem, op);
+ grpc_call_next_op(elem, op, call_list);
}
/* Constructor for call_data */
@@ -227,12 +233,14 @@ static void init_call_elem(grpc_call_element *elem,
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_call_element *elem) {}
+static void destroy_call_elem(grpc_call_element *elem,
+ grpc_call_list *call_list) {}
/* Constructor for channel_data */
static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last) {
+ int is_first, int is_last,
+ grpc_call_list *call_list) {
grpc_security_connector *sc = grpc_find_security_connector_in_args(args);
grpc_auth_metadata_processor *processor =
grpc_find_auth_metadata_processor_in_args(args);
@@ -256,7 +264,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
}
/* Destructor for channel data */
-static void destroy_channel_elem(grpc_channel_element *elem) {
+static void destroy_channel_elem(grpc_channel_element *elem,
+ grpc_call_list *call_list) {
/* grab pointers to our data from the channel element */
channel_data *chand = elem->channel_data;
GRPC_SECURITY_CONNECTOR_UNREF(chand->security_connector,
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index a6f50712f5..e6e2eee658 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -66,6 +66,7 @@ typedef struct grpc_server_secure_state {
int is_shutdown;
gpr_mu mu;
gpr_refcount refcount;
+ grpc_closure destroy_closure;
} grpc_server_secure_state;
static void state_ref(grpc_server_secure_state *state) {
@@ -127,7 +128,8 @@ static int remove_tcp_from_list_locked(grpc_server_secure_state *state,
static void on_secure_transport_setup_done(void *statep,
grpc_security_status status,
grpc_endpoint *wrapped_endpoint,
- grpc_endpoint *secure_endpoint) {
+ grpc_endpoint *secure_endpoint,
+ grpc_call_list *call_list) {
grpc_server_secure_state *state = statep;
grpc_transport *transport;
grpc_mdctx *mdctx;
@@ -137,16 +139,16 @@ static void on_secure_transport_setup_done(void *statep,
remove_tcp_from_list_locked(state, wrapped_endpoint);
if (!state->is_shutdown) {
mdctx = grpc_mdctx_create();
- workqueue = grpc_workqueue_create();
+ workqueue = grpc_workqueue_create(call_list);
transport = grpc_create_chttp2_transport(
grpc_server_get_channel_args(state->server), secure_endpoint, mdctx,
- workqueue, 0);
+ 0);
setup_transport(state, transport, mdctx, workqueue);
grpc_chttp2_transport_start_reading(transport, NULL, 0);
} else {
/* We need to consume this here, because the server may already have gone
* away. */
- grpc_endpoint_destroy(secure_endpoint);
+ grpc_endpoint_destroy(secure_endpoint, call_list);
}
gpr_mu_unlock(&state->mu);
} else {
@@ -158,7 +160,8 @@ static void on_secure_transport_setup_done(void *statep,
state_unref(state);
}
-static void on_accept(void *statep, grpc_endpoint *tcp) {
+static void on_accept(void *statep, grpc_endpoint *tcp,
+ grpc_call_list *call_list) {
grpc_server_secure_state *state = statep;
tcp_endpoint_list *node;
state_ref(state);
@@ -169,22 +172,24 @@ static void on_accept(void *statep, grpc_endpoint *tcp) {
state->handshaking_tcp_endpoints = node;
gpr_mu_unlock(&state->mu);
grpc_setup_secure_transport(state->sc, tcp, on_secure_transport_setup_done,
- state);
+ state, call_list);
}
/* Server callback: start listening on our ports */
static void start(grpc_server *server, void *statep, grpc_pollset **pollsets,
- size_t pollset_count) {
+ size_t pollset_count, grpc_call_list *call_list) {
grpc_server_secure_state *state = statep;
- grpc_tcp_server_start(state->tcp, pollsets, pollset_count, on_accept, state);
+ grpc_tcp_server_start(state->tcp, pollsets, pollset_count, on_accept, state,
+ call_list);
}
-static void destroy_done(void *statep) {
+static void destroy_done(void *statep, int success, grpc_call_list *call_list) {
grpc_server_secure_state *state = statep;
grpc_server_listener_destroy_done(state->server);
gpr_mu_lock(&state->mu);
while (state->handshaking_tcp_endpoints != NULL) {
- grpc_endpoint_shutdown(state->handshaking_tcp_endpoints->tcp_endpoint);
+ grpc_endpoint_shutdown(state->handshaking_tcp_endpoints->tcp_endpoint,
+ call_list);
remove_tcp_from_list_locked(state,
state->handshaking_tcp_endpoints->tcp_endpoint);
}
@@ -194,14 +199,16 @@ static void destroy_done(void *statep) {
/* Server callback: destroy the tcp listener (so we don't generate further
callbacks) */
-static void destroy(grpc_server *server, void *statep) {
+static void destroy(grpc_server *server, void *statep,
+ grpc_call_list *call_list) {
grpc_server_secure_state *state = statep;
grpc_tcp_server *tcp;
gpr_mu_lock(&state->mu);
state->is_shutdown = 1;
tcp = state->tcp;
gpr_mu_unlock(&state->mu);
- grpc_tcp_server_destroy(tcp, destroy_done, state);
+ grpc_closure_init(&state->destroy_closure, destroy_done, state);
+ grpc_tcp_server_destroy(tcp, &state->destroy_closure, call_list);
}
int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
@@ -215,6 +222,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
int port_temp;
grpc_security_status status = GRPC_SECURITY_ERROR;
grpc_security_connector *sc = NULL;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
/* create security context */
if (creds == NULL) goto error;
@@ -277,6 +285,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
/* Register with the server only upon success */
grpc_server_add_listener(server, state, start, destroy);
+ grpc_call_list_run(&call_list);
return port_num;
/* Error path: cleanup and return */
@@ -288,10 +297,11 @@ error:
grpc_resolved_addresses_destroy(resolved);
}
if (tcp) {
- grpc_tcp_server_destroy(tcp, NULL, NULL);
+ grpc_tcp_server_destroy(tcp, NULL, &call_list);
}
if (state) {
gpr_free(state);
}
+ grpc_call_list_run(&call_list);
return 0;
}