aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt2
-rw-r--r--Makefile2
-rw-r--r--build.yaml1
-rw-r--r--include/grpc/impl/codegen/atm.h1
-rw-r--r--src/core/ext/filters/client_channel/http_proxy.c64
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c7
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.c286
-rw-r--r--src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c13
-rw-r--r--src/core/lib/iomgr/ev_epoll_thread_pool_linux.c14
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c18
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.c15
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c7
-rw-r--r--src/core/lib/iomgr/ev_posix.c5
-rw-r--r--src/core/lib/iomgr/ev_posix.h4
-rw-r--r--src/core/lib/iomgr/lockfree_event.c14
-rw-r--r--src/core/lib/iomgr/lockfree_event.h5
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c6
-rw-r--r--src/core/lib/iomgr/tcp_posix.c2
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c2
-rw-r--r--src/core/lib/iomgr/udp_server.c2
-rw-r--r--src/core/lib/security/credentials/composite/composite_credentials.c122
-rw-r--r--src/core/lib/security/credentials/credentials.c26
-rw-r--r--src/core/lib/security/credentials/credentials.h74
-rw-r--r--src/core/lib/security/credentials/credentials_metadata.c77
-rw-r--r--src/core/lib/security/credentials/fake/fake_credentials.c47
-rw-r--r--src/core/lib/security/credentials/fake/fake_credentials.h4
-rw-r--r--src/core/lib/security/credentials/iam/iam_credentials.c45
-rw-r--r--src/core/lib/security/credentials/iam/iam_credentials.h2
-rw-r--r--src/core/lib/security/credentials/jwt/jwt_credentials.c51
-rw-r--r--src/core/lib/security/credentials/jwt/jwt_credentials.h2
-rw-r--r--src/core/lib/security/credentials/oauth2/oauth2_credentials.c193
-rw-r--r--src/core/lib/security/credentials/oauth2/oauth2_credentials.h15
-rw-r--r--src/core/lib/security/credentials/plugin/plugin_credentials.c165
-rw-r--r--src/core/lib/security/credentials/plugin/plugin_credentials.h16
-rw-r--r--src/core/lib/security/transport/client_auth_filter.c186
-rw-r--r--src/core/lib/security/transport/security_connector.c57
-rw-r--r--src/core/lib/security/transport/security_connector.h29
-rw-r--r--src/node/test/credentials_test.js8
-rw-r--r--src/ruby/spec/generic/client_stub_spec.rb23
-rw-r--r--test/core/end2end/end2end_nosec_tests.c8
-rw-r--r--test/core/end2end/end2end_tests.c8
-rw-r--r--test/core/end2end/fixtures/h2_http_proxy.c19
-rw-r--r--test/core/end2end/fixtures/h2_oauth2.c12
-rw-r--r--test/core/end2end/fixtures/http_proxy_fixture.c52
-rw-r--r--test/core/end2end/fixtures/http_proxy_fixture.h19
-rwxr-xr-xtest/core/end2end/gen_build_yaml.py14
-rwxr-xr-xtest/core/end2end/generate_tests.bzl17
-rw-r--r--test/core/end2end/tests/proxy_auth.c235
-rw-r--r--test/core/iomgr/ev_epollsig_linux_test.c6
-rw-r--r--test/core/iomgr/fd_posix_test.c11
-rw-r--r--test/core/iomgr/pollset_set_test.c3
-rw-r--r--test/core/security/credentials_test.c530
-rw-r--r--test/core/security/oauth2_utils.c43
-rw-r--r--test/core/security/print_google_default_creds_token.c42
-rw-r--r--test/core/surface/completion_queue_threading_test.c3
-rw-r--r--test/cpp/end2end/end2end_test.cc8
-rw-r--r--test/cpp/microbenchmarks/bm_pollset.cc5
-rw-r--r--tools/internal_ci/helper_scripts/prepare_build_interop_rc5
-rw-r--r--tools/run_tests/generated/sources_and_headers.json2
-rw-r--r--tools/run_tests/generated/tests.json51
-rw-r--r--vsprojects/vcxproj/test/end2end/tests/end2end_nosec_tests/end2end_nosec_tests.vcxproj2
-rw-r--r--vsprojects/vcxproj/test/end2end/tests/end2end_nosec_tests/end2end_nosec_tests.vcxproj.filters3
-rw-r--r--vsprojects/vcxproj/test/end2end/tests/end2end_tests/end2end_tests.vcxproj2
-rw-r--r--vsprojects/vcxproj/test/end2end/tests/end2end_tests/end2end_tests.vcxproj.filters3
64 files changed, 1824 insertions, 891 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 0f87beb02b..266f2c0774 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -4354,6 +4354,7 @@ add_library(end2end_tests
test/core/end2end/tests/payload.c
test/core/end2end/tests/ping.c
test/core/end2end/tests/ping_pong_streaming.c
+ test/core/end2end/tests/proxy_auth.c
test/core/end2end/tests/registered_call.c
test/core/end2end/tests/request_with_flags.c
test/core/end2end/tests/request_with_payload.c
@@ -4453,6 +4454,7 @@ add_library(end2end_nosec_tests
test/core/end2end/tests/payload.c
test/core/end2end/tests/ping.c
test/core/end2end/tests/ping_pong_streaming.c
+ test/core/end2end/tests/proxy_auth.c
test/core/end2end/tests/registered_call.c
test/core/end2end/tests/request_with_flags.c
test/core/end2end/tests/request_with_payload.c
diff --git a/Makefile b/Makefile
index 2ae0229716..7b53024b6c 100644
--- a/Makefile
+++ b/Makefile
@@ -7952,6 +7952,7 @@ LIBEND2END_TESTS_SRC = \
test/core/end2end/tests/payload.c \
test/core/end2end/tests/ping.c \
test/core/end2end/tests/ping_pong_streaming.c \
+ test/core/end2end/tests/proxy_auth.c \
test/core/end2end/tests/registered_call.c \
test/core/end2end/tests/request_with_flags.c \
test/core/end2end/tests/request_with_payload.c \
@@ -8046,6 +8047,7 @@ LIBEND2END_NOSEC_TESTS_SRC = \
test/core/end2end/tests/payload.c \
test/core/end2end/tests/ping.c \
test/core/end2end/tests/ping_pong_streaming.c \
+ test/core/end2end/tests/proxy_auth.c \
test/core/end2end/tests/registered_call.c \
test/core/end2end/tests/request_with_flags.c \
test/core/end2end/tests/request_with_payload.c \
diff --git a/build.yaml b/build.yaml
index fb2f6d08be..198467bf5a 100644
--- a/build.yaml
+++ b/build.yaml
@@ -4496,6 +4496,7 @@ targets:
- grpc
- gpr_test_util
- gpr
+ timeout_seconds: 1200
- name: writes_per_rpc_test
gtest: true
cpu_cost: 0.5
diff --git a/include/grpc/impl/codegen/atm.h b/include/grpc/impl/codegen/atm.h
index c3b528be4d..2cfd22ab63 100644
--- a/include/grpc/impl/codegen/atm.h
+++ b/include/grpc/impl/codegen/atm.h
@@ -60,6 +60,7 @@
int gpr_atm_no_barrier_cas(gpr_atm *p, gpr_atm o, gpr_atm n);
int gpr_atm_acq_cas(gpr_atm *p, gpr_atm o, gpr_atm n);
int gpr_atm_rel_cas(gpr_atm *p, gpr_atm o, gpr_atm n);
+ int gpr_atm_full_cas(gpr_atm *p, gpr_atm o, gpr_atm n);
// Atomically, set *p=n and return the old value of *p
gpr_atm gpr_atm_full_xchg(gpr_atm *p, gpr_atm n);
diff --git a/src/core/ext/filters/client_channel/http_proxy.c b/src/core/ext/filters/client_channel/http_proxy.c
index aa3f61c991..ef3512ed83 100644
--- a/src/core/ext/filters/client_channel/http_proxy.c
+++ b/src/core/ext/filters/client_channel/http_proxy.c
@@ -30,15 +30,23 @@
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/slice/b64.h"
#include "src/core/lib/support/env.h"
#include "src/core/lib/support/string.h"
-static char* grpc_get_http_proxy_server(grpc_exec_ctx* exec_ctx) {
+/**
+ * Parses the 'http_proxy' env var and returns the proxy hostname to resolve or
+ * NULL on error. Also sets 'user_cred' to user credentials if present in the
+ * 'http_proxy' env var, otherwise leaves it unchanged. It is caller's
+ * responsibility to gpr_free user_cred.
+ */
+static char* get_http_proxy_server(grpc_exec_ctx* exec_ctx, char** user_cred) {
+ GPR_ASSERT(user_cred != NULL);
+ char* proxy_name = NULL;
char* uri_str = gpr_getenv("http_proxy");
if (uri_str == NULL) return NULL;
grpc_uri* uri =
grpc_uri_parse(exec_ctx, uri_str, false /* suppress_errors */);
- char* proxy_name = NULL;
if (uri == NULL || uri->authority == NULL) {
gpr_log(GPR_ERROR, "cannot parse value of 'http_proxy' env var");
goto done;
@@ -47,11 +55,27 @@ static char* grpc_get_http_proxy_server(grpc_exec_ctx* exec_ctx) {
gpr_log(GPR_ERROR, "'%s' scheme not supported in proxy URI", uri->scheme);
goto done;
}
- if (strchr(uri->authority, '@') != NULL) {
- gpr_log(GPR_ERROR, "userinfo not supported in proxy URI");
- goto done;
+ /* Split on '@' to separate user credentials from host */
+ char** authority_strs = NULL;
+ size_t authority_nstrs;
+ gpr_string_split(uri->authority, "@", &authority_strs, &authority_nstrs);
+ GPR_ASSERT(authority_nstrs != 0); /* should have at least 1 string */
+ if (authority_nstrs == 1) {
+ /* User cred not present in authority */
+ proxy_name = authority_strs[0];
+ } else if (authority_nstrs == 2) {
+ /* User cred found */
+ *user_cred = authority_strs[0];
+ proxy_name = authority_strs[1];
+ gpr_log(GPR_DEBUG, "userinfo found in proxy URI");
+ } else {
+ /* Bad authority */
+ for (size_t i = 0; i < authority_nstrs; i++) {
+ gpr_free(authority_strs[i]);
+ }
+ proxy_name = NULL;
}
- proxy_name = gpr_strdup(uri->authority);
+ gpr_free(authority_strs);
done:
gpr_free(uri_str);
grpc_uri_destroy(uri);
@@ -64,7 +88,8 @@ static bool proxy_mapper_map_name(grpc_exec_ctx* exec_ctx,
const grpc_channel_args* args,
char** name_to_resolve,
grpc_channel_args** new_args) {
- *name_to_resolve = grpc_get_http_proxy_server(exec_ctx);
+ char* user_cred = NULL;
+ *name_to_resolve = get_http_proxy_server(exec_ctx, &user_cred);
if (*name_to_resolve == NULL) return false;
grpc_uri* uri =
grpc_uri_parse(exec_ctx, server_uri, false /* suppress_errors */);
@@ -73,12 +98,16 @@ static bool proxy_mapper_map_name(grpc_exec_ctx* exec_ctx,
"'http_proxy' environment variable set, but cannot "
"parse server URI '%s' -- not using proxy",
server_uri);
- if (uri != NULL) grpc_uri_destroy(uri);
+ if (uri != NULL) {
+ gpr_free(user_cred);
+ grpc_uri_destroy(uri);
+ }
return false;
}
if (strcmp(uri->scheme, "unix") == 0) {
gpr_log(GPR_INFO, "not using proxy for Unix domain socket '%s'",
server_uri);
+ gpr_free(user_cred);
grpc_uri_destroy(uri);
return false;
}
@@ -126,10 +155,25 @@ static bool proxy_mapper_map_name(grpc_exec_ctx* exec_ctx,
}
}
}
- grpc_arg new_arg = grpc_channel_arg_string_create(
+ grpc_arg args_to_add[2];
+ args_to_add[0] = grpc_channel_arg_string_create(
GRPC_ARG_HTTP_CONNECT_SERVER,
uri->path[0] == '/' ? uri->path + 1 : uri->path);
- *new_args = grpc_channel_args_copy_and_add(args, &new_arg, 1);
+ if (user_cred != NULL) {
+ /* Use base64 encoding for user credentials as stated in RFC 7617 */
+ char* encoded_user_cred =
+ grpc_base64_encode(user_cred, strlen(user_cred), 0, 0);
+ char* header;
+ gpr_asprintf(&header, "Proxy-Authorization:Basic %s", encoded_user_cred);
+ gpr_free(encoded_user_cred);
+ args_to_add[1] =
+ grpc_channel_arg_string_create(GRPC_ARG_HTTP_CONNECT_HEADERS, header);
+ *new_args = grpc_channel_args_copy_and_add(args, args_to_add, 2);
+ gpr_free(header);
+ } else {
+ *new_args = grpc_channel_args_copy_and_add(args, args_to_add, 1);
+ }
+ gpr_free(user_cred);
grpc_uri_destroy(uri);
return true;
}
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
index 1ab8295e9e..b696344eab 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
@@ -103,10 +103,9 @@ static void fd_node_destroy(grpc_exec_ctx *exec_ctx, fd_node *fdn) {
grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, fdn->grpc_fd);
/* c-ares library has closed the fd inside grpc_fd. This fd may be picked up
immediately by another thread, and should not be closed by the following
- grpc_fd_orphan. To prevent this fd from being closed by grpc_fd_orphan,
- a fd pointer is provided. */
- int fd;
- grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, &fd, "c-ares query finished");
+ grpc_fd_orphan. */
+ grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, NULL, true /* already_closed */,
+ "c-ares query finished");
gpr_free(fdn);
}
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index 66ba601adb..8db4a92453 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -45,6 +45,7 @@
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
+#include "src/core/lib/support/string.h"
static grpc_wakeup_fd global_wakeup_fd;
static int g_epfd;
@@ -77,8 +78,21 @@ static void fd_global_shutdown(void);
typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
+static const char *kick_state_string(kick_state st) {
+ switch (st) {
+ case UNKICKED:
+ return "UNKICKED";
+ case KICKED:
+ return "KICKED";
+ case DESIGNATED_POLLER:
+ return "DESIGNATED_POLLER";
+ }
+ GPR_UNREACHABLE_CODE(return "UNKNOWN");
+}
+
struct grpc_pollset_worker {
kick_state kick_state;
+ int kick_state_mutator; // which line of code last changed kick state
bool initialized_cv;
grpc_pollset_worker *next;
grpc_pollset_worker *prev;
@@ -86,6 +100,12 @@ struct grpc_pollset_worker {
grpc_closure_list schedule_on_end_work;
};
+#define SET_KICK_STATE(worker, state) \
+ do { \
+ (worker)->kick_state = (state); \
+ (worker)->kick_state_mutator = __LINE__; \
+ } while (false)
+
#define MAX_NEIGHBOURHOODS 1024
typedef struct pollset_neighbourhood {
@@ -100,10 +120,15 @@ struct grpc_pollset {
bool reassigning_neighbourhood;
grpc_pollset_worker *root_worker;
bool kicked_without_poller;
+
+ /* Set to true if the pollset is observed to have no workers available to
+ * poll */
bool seen_inactive;
- bool shutting_down; /* Is the pollset shutting down ? */
- bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
+ bool shutting_down; /* Is the pollset shutting down ? */
grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
+
+ /* Number of workers who are *about-to* attach themselves to the pollset
+ * worker list */
int begin_refs;
grpc_pollset *next;
@@ -224,7 +249,7 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
- const char *reason) {
+ bool already_closed, const char *reason) {
grpc_error *error = GRPC_ERROR_NONE;
if (!grpc_lfev_is_shutdown(&fd->read_closure)) {
@@ -235,7 +260,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
descriptor fd->fd (but we still own the grpc_fd structure). */
if (release_fd != NULL) {
*release_fd = fd->fd;
- } else {
+ } else if (!already_closed) {
close(fd->fd);
}
@@ -263,29 +288,23 @@ static bool fd_is_shutdown(grpc_fd *fd) {
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
}
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
- grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
-
- /* Note, it is possible that fd_become_readable might be called twice with
- different 'notifier's when an fd becomes readable and it is in two epoll
- sets (This can happen briefly during polling island merges). In such cases
- it does not really matter which notifer is set as the read_notifier_pollset
- (They would both point to the same polling island anyway) */
+ grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
/* Use release store to match with acquire load in fd_get_read_notifier */
gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
/*******************************************************************************
@@ -410,18 +429,28 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
if (pollset->root_worker != NULL) {
grpc_pollset_worker *worker = pollset->root_worker;
do {
- if (worker->initialized_cv) {
- worker->kick_state = KICKED;
- gpr_cv_signal(&worker->cv);
- } else {
- worker->kick_state = KICKED;
- append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
- "pollset_shutdown");
+ switch (worker->kick_state) {
+ case KICKED:
+ break;
+ case UNKICKED:
+ SET_KICK_STATE(worker, KICKED);
+ if (worker->initialized_cv) {
+ gpr_cv_signal(&worker->cv);
+ }
+ break;
+ case DESIGNATED_POLLER:
+ SET_KICK_STATE(worker, KICKED);
+ append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
+ "pollset_kick_all");
+ break;
}
worker = worker->next;
} while (worker != pollset->root_worker);
}
+ // TODO: sreek. Check if we need to set 'kicked_without_poller' to true here
+ // in the else case
+
return error;
}
@@ -437,7 +466,9 @@ static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
GPR_ASSERT(pollset->shutdown_closure == NULL);
+ GPR_ASSERT(!pollset->shutting_down);
pollset->shutdown_closure = closure;
+ pollset->shutting_down = true;
GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
@@ -510,10 +541,14 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline) {
if (worker_hdl != NULL) *worker_hdl = worker;
worker->initialized_cv = false;
- worker->kick_state = UNKICKED;
+ SET_KICK_STATE(worker, UNKICKED);
worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
pollset->begin_refs++;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, "PS:%p BEGIN_STARTS:%p", pollset, worker);
+ }
+
if (pollset->seen_inactive) {
// pollset has been observed to be inactive, we need to move back to the
// active list
@@ -529,6 +564,11 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
retry_lock_neighbourhood:
gpr_mu_lock(&neighbourhood->mu);
gpr_mu_lock(&pollset->mu);
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
+ pollset, worker, kick_state_string(worker->kick_state),
+ is_reassigning);
+ }
if (pollset->seen_inactive) {
if (neighbourhood != pollset->neighbourhood) {
gpr_mu_unlock(&neighbourhood->mu);
@@ -539,8 +579,14 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
pollset->seen_inactive = false;
if (neighbourhood->active_root == NULL) {
neighbourhood->active_root = pollset->next = pollset->prev = pollset;
- if (gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
- worker->kick_state = DESIGNATED_POLLER;
+ /* TODO: sreek. Why would this worker state be other than UNKICKED
+ * here ? (since the worker isn't added to the pollset yet, there is no
+ * way it can be "found" by other threads to get kicked). */
+
+ /* If there is no designated poller, make this the designated poller */
+ if (worker->kick_state == UNKICKED &&
+ gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
+ SET_KICK_STATE(worker, DESIGNATED_POLLER);
}
} else {
pollset->next = neighbourhood->active_root;
@@ -554,24 +600,53 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
}
gpr_mu_unlock(&neighbourhood->mu);
}
+
worker_insert(pollset, worker);
pollset->begin_refs--;
- if (worker->kick_state == UNKICKED) {
+ if (worker->kick_state == UNKICKED && !pollset->kicked_without_poller) {
GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
worker->initialized_cv = true;
gpr_cv_init(&worker->cv);
- while (worker->kick_state == UNKICKED &&
- pollset->shutdown_closure == NULL) {
+ while (worker->kick_state == UNKICKED && !pollset->shutting_down) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
+ pollset, worker, kick_state_string(worker->kick_state),
+ pollset->shutting_down);
+ }
+
if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) &&
worker->kick_state == UNKICKED) {
- worker->kick_state = KICKED;
+ /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
+ received a kick */
+ SET_KICK_STATE(worker, KICKED);
}
}
*now = gpr_now(now->clock_type);
}
- return worker->kick_state == DESIGNATED_POLLER &&
- pollset->shutdown_closure == NULL;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR,
+ "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
+ "kicked_without_poller: %d",
+ pollset, worker, kick_state_string(worker->kick_state),
+ pollset->shutting_down, pollset->kicked_without_poller);
+ }
+
+ /* We release pollset lock in this function at a couple of places:
+ * 1. Briefly when assigning pollset to a neighbourhood
+ * 2. When doing gpr_cv_wait()
+ * It is possible that 'kicked_without_poller' was set to true during (1) and
+ * 'shutting_down' is set to true during (1) or (2). If either of them is
+ * true, this worker cannot do polling */
+ /* TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
+ * case; especially when the worker is the DESIGNATED_POLLER */
+
+ if (pollset->kicked_without_poller) {
+ pollset->kicked_without_poller = false;
+ return false;
+ }
+
+ return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down;
}
static bool check_neighbourhood_for_available_poller(
@@ -591,10 +666,18 @@ static bool check_neighbourhood_for_available_poller(
case UNKICKED:
if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
(gpr_atm)inspect_worker)) {
- inspect_worker->kick_state = DESIGNATED_POLLER;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, " .. choose next poller to be %p",
+ inspect_worker);
+ }
+ SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
if (inspect_worker->initialized_cv) {
gpr_cv_signal(&inspect_worker->cv);
}
+ } else {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, " .. beaten to choose next poller");
+ }
}
// even if we didn't win the cas, there's a worker, we can stop
found_worker = true;
@@ -607,9 +690,12 @@ static bool check_neighbourhood_for_available_poller(
break;
}
inspect_worker = inspect_worker->next;
- } while (inspect_worker != inspect->root_worker);
+ } while (!found_worker && inspect_worker != inspect->root_worker);
}
if (!found_worker) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, " .. mark pollset %p inactive", inspect);
+ }
inspect->seen_inactive = true;
if (inspect == neighbourhood->active_root) {
neighbourhood->active_root =
@@ -627,15 +713,22 @@ static bool check_neighbourhood_for_available_poller(
static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "PS:%p END_WORKER:%p", pollset, worker);
+ }
if (worker_hdl != NULL) *worker_hdl = NULL;
- worker->kick_state = KICKED;
+ /* Make sure we appear kicked */
+ SET_KICK_STATE(worker, KICKED);
grpc_closure_list_move(&worker->schedule_on_end_work,
&exec_ctx->closure_list);
if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
if (worker->next != worker && worker->next->kick_state == UNKICKED) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, " .. choose next poller to be peer %p", worker);
+ }
GPR_ASSERT(worker->next->initialized_cv);
gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
- worker->next->kick_state = DESIGNATED_POLLER;
+ SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
gpr_cv_signal(&worker->next->cv);
if (grpc_exec_ctx_has_work(exec_ctx)) {
gpr_mu_unlock(&pollset->mu);
@@ -644,9 +737,9 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
} else {
gpr_atm_no_barrier_store(&g_active_poller, 0);
- gpr_mu_unlock(&pollset->mu);
size_t poller_neighbourhood_idx =
(size_t)(pollset->neighbourhood - g_neighbourhoods);
+ gpr_mu_unlock(&pollset->mu);
bool found_worker = false;
bool scan_state[MAX_NEIGHBOURHOODS];
for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
@@ -682,6 +775,9 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (worker->initialized_cv) {
gpr_cv_destroy(&worker->cv);
}
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, " .. remove worker");
+ }
if (EMPTIED == worker_remove(pollset, worker)) {
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
@@ -702,16 +798,18 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->kicked_without_poller = false;
return GRPC_ERROR_NONE;
}
- gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
+ gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
- GPR_ASSERT(!pollset->shutdown_closure);
+ GPR_ASSERT(!pollset->shutting_down);
GPR_ASSERT(!pollset->seen_inactive);
gpr_mu_unlock(&pollset->mu);
append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline),
err_desc);
gpr_mu_lock(&pollset->mu);
gpr_tls_set(&g_current_thread_worker, 0);
+ } else {
+ gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
}
end_worker(exec_ctx, pollset, &worker, worker_hdl);
gpr_tls_set(&g_current_thread_pollset, 0);
@@ -720,46 +818,136 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
static grpc_error *pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_strvec log;
+ gpr_strvec_init(&log);
+ char *tmp;
+ gpr_asprintf(
+ &tmp, "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset,
+ specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset),
+ (void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker);
+ gpr_strvec_add(&log, tmp);
+ if (pollset->root_worker != NULL) {
+ gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}",
+ kick_state_string(pollset->root_worker->kick_state),
+ pollset->root_worker->next,
+ kick_state_string(pollset->root_worker->next->kick_state));
+ gpr_strvec_add(&log, tmp);
+ }
+ if (specific_worker != NULL) {
+ gpr_asprintf(&tmp, " worker_kick_state=%s",
+ kick_state_string(specific_worker->kick_state));
+ gpr_strvec_add(&log, tmp);
+ }
+ tmp = gpr_strvec_flatten(&log, NULL);
+ gpr_strvec_destroy(&log);
+ gpr_log(GPR_ERROR, "%s", tmp);
+ gpr_free(tmp);
+ }
if (specific_worker == NULL) {
if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
grpc_pollset_worker *root_worker = pollset->root_worker;
if (root_worker == NULL) {
pollset->kicked_without_poller = true;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kicked_without_poller");
+ }
return GRPC_ERROR_NONE;
}
grpc_pollset_worker *next_worker = root_worker->next;
- if (root_worker == next_worker &&
- root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
- &g_active_poller)) {
- root_worker->kick_state = KICKED;
+ if (root_worker->kick_state == KICKED) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. already kicked %p", root_worker);
+ }
+ SET_KICK_STATE(root_worker, KICKED);
+ return GRPC_ERROR_NONE;
+ } else if (next_worker->kick_state == KICKED) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. already kicked %p", next_worker);
+ }
+ SET_KICK_STATE(next_worker, KICKED);
+ return GRPC_ERROR_NONE;
+ } else if (root_worker ==
+ next_worker && // only try and wake up a poller if
+ // there is no next worker
+ root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
+ &g_active_poller)) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kicked %p", root_worker);
+ }
+ SET_KICK_STATE(root_worker, KICKED);
return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
} else if (next_worker->kick_state == UNKICKED) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kicked %p", next_worker);
+ }
GPR_ASSERT(next_worker->initialized_cv);
- next_worker->kick_state = KICKED;
+ SET_KICK_STATE(next_worker, KICKED);
gpr_cv_signal(&next_worker->cv);
return GRPC_ERROR_NONE;
+ } else if (next_worker->kick_state == DESIGNATED_POLLER) {
+ if (root_worker->kick_state != DESIGNATED_POLLER) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(
+ GPR_ERROR,
+ " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
+ root_worker, root_worker->initialized_cv, next_worker);
+ }
+ SET_KICK_STATE(root_worker, KICKED);
+ if (root_worker->initialized_cv) {
+ gpr_cv_signal(&root_worker->cv);
+ }
+ return GRPC_ERROR_NONE;
+ } else {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. non-root poller %p (root=%p)", next_worker,
+ root_worker);
+ }
+ SET_KICK_STATE(next_worker, KICKED);
+ return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
+ }
} else {
+ GPR_ASSERT(next_worker->kick_state == KICKED);
+ SET_KICK_STATE(next_worker, KICKED);
return GRPC_ERROR_NONE;
}
} else {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kicked while waking up");
+ }
return GRPC_ERROR_NONE;
}
} else if (specific_worker->kick_state == KICKED) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. specific worker already kicked");
+ }
return GRPC_ERROR_NONE;
} else if (gpr_tls_get(&g_current_thread_worker) ==
(intptr_t)specific_worker) {
- specific_worker->kick_state = KICKED;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. mark %p kicked", specific_worker);
+ }
+ SET_KICK_STATE(specific_worker, KICKED);
return GRPC_ERROR_NONE;
} else if (specific_worker ==
(grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) {
- specific_worker->kick_state = KICKED;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kick active poller");
+ }
+ SET_KICK_STATE(specific_worker, KICKED);
return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
} else if (specific_worker->initialized_cv) {
- specific_worker->kick_state = KICKED;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kick waiting worker");
+ }
+ SET_KICK_STATE(specific_worker, KICKED);
gpr_cv_signal(&specific_worker->cv);
return GRPC_ERROR_NONE;
} else {
- specific_worker->kick_state = KICKED;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kick non-waiting worker");
+ }
+ SET_KICK_STATE(specific_worker, KICKED);
return GRPC_ERROR_NONE;
}
}
@@ -805,6 +993,7 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
static void shutdown_engine(void) {
fd_global_shutdown();
pollset_global_shutdown();
+ close(g_epfd);
}
static const grpc_event_engine_vtable vtable = {
@@ -841,9 +1030,6 @@ static const grpc_event_engine_vtable vtable = {
/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
* Create a dummy epoll_fd to make sure epoll support is available */
const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
- /* TODO(ctiller): temporary, until this stabilizes */
- if (!explicit_request) return NULL;
-
if (!grpc_has_wakeup_fd()) {
return NULL;
}
@@ -862,6 +1048,8 @@ const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
return NULL;
}
+ gpr_log(GPR_ERROR, "grpc epoll fd: %d", g_epfd);
+
return &vtable;
}
diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
index 27b4892d1d..f2f3e15704 100644
--- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
@@ -931,7 +931,7 @@ static int fd_wrapped_fd(grpc_fd *fd) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
- const char *reason) {
+ bool already_closed, const char *reason) {
grpc_error *error = GRPC_ERROR_NONE;
polling_island *unref_pi = NULL;
@@ -952,8 +952,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
before doing this.) */
if (fd->po.pi != NULL) {
polling_island *pi_latest = polling_island_lock(fd->po.pi);
- polling_island_remove_fd_locked(pi_latest, fd, false /* is_fd_closed */,
- &error);
+ polling_island_remove_fd_locked(pi_latest, fd, already_closed, &error);
gpr_mu_unlock(&pi_latest->mu);
unref_pi = fd->po.pi;
@@ -1007,12 +1006,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
}
/*******************************************************************************
@@ -1223,7 +1222,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
- grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
/* Note, it is possible that fd_become_readable might be called twice with
different 'notifier's when an fd becomes readable and it is in two epoll
@@ -1235,7 +1234,7 @@ static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
index 49be72c03e..07c8eadf4f 100644
--- a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
@@ -493,8 +493,8 @@ static int fd_wrapped_fd(grpc_fd *fd) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
- const char *reason) {
- bool is_fd_closed = false;
+ bool already_closed, const char *reason) {
+ bool is_fd_closed = already_closed;
grpc_error *error = GRPC_ERROR_NONE;
epoll_set *unref_eps = NULL;
@@ -505,7 +505,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
descriptor fd->fd (but we still own the grpc_fd structure). */
if (release_fd != NULL) {
*release_fd = fd->fd;
- } else {
+ } else if (!is_fd_closed) {
close(fd->fd);
is_fd_closed = true;
}
@@ -560,12 +560,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
}
/*******************************************************************************
@@ -696,11 +696,11 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
}
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
static void pollset_release_epoll_set(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index 5690431759..770d1fd0a9 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -380,8 +380,8 @@ static int fd_wrapped_fd(grpc_fd *fd) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
- const char *reason) {
- bool is_fd_closed = false;
+ bool already_closed, const char *reason) {
+ bool is_fd_closed = already_closed;
grpc_error *error = GRPC_ERROR_NONE;
gpr_mu_lock(&fd->pollable.po.mu);
@@ -392,7 +392,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
descriptor fd->fd (but we still own the grpc_fd structure). */
if (release_fd != NULL) {
*release_fd = fd->fd;
- } else {
+ } else if (!is_fd_closed) {
close(fd->fd);
is_fd_closed = true;
}
@@ -438,12 +438,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
}
/*******************************************************************************
@@ -710,7 +710,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
- grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
/* Note, it is possible that fd_become_readable might be called twice with
different 'notifier's when an fd becomes readable and it is in two epoll
@@ -722,7 +722,7 @@ static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
static grpc_error *fd_become_pollable_locked(grpc_fd *fd) {
@@ -1049,8 +1049,8 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
/* Introduce a spurious completion.
If we do not, then it may be that the fd-specific epoll set consumed
a completion without being polled, leading to a missed edge going up. */
- grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure);
- grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure);
+ grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure, "read");
+ grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure, "write");
pollset_kick_all(exec_ctx, pollset);
pollset->current_pollable = &pollset->pollable;
if (append_error(&error, pollable_materialize(&pollset->pollable),
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c
index 0d969dccce..11067e3959 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.c
+++ b/src/core/lib/iomgr/ev_epollsig_linux.c
@@ -854,7 +854,7 @@ static int fd_wrapped_fd(grpc_fd *fd) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
- const char *reason) {
+ bool already_closed, const char *reason) {
grpc_error *error = GRPC_ERROR_NONE;
polling_island *unref_pi = NULL;
@@ -875,8 +875,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
before doing this.) */
if (fd->po.pi != NULL) {
polling_island *pi_latest = polling_island_lock(fd->po.pi);
- polling_island_remove_fd_locked(pi_latest, fd, false /* is_fd_closed */,
- &error);
+ polling_island_remove_fd_locked(pi_latest, fd, already_closed, &error);
gpr_mu_unlock(&pi_latest->mu);
unref_pi = fd->po.pi;
@@ -933,12 +932,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
}
/*******************************************************************************
@@ -1115,7 +1114,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
- grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
/* Note, it is possible that fd_become_readable might be called twice with
different 'notifier's when an fd becomes readable and it is in two epoll
@@ -1127,7 +1126,7 @@ static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
@@ -1731,7 +1730,7 @@ const grpc_event_engine_vtable *grpc_init_epollsig_linux(
if (!is_grpc_wakeup_signal_initialized) {
/* TODO(ctiller): when other epoll engines are ready, remove the true || to
* force this to be explitly chosen if needed */
- if (true || explicit_request) {
+ if (explicit_request) {
grpc_use_signal(SIGRTMIN + 6);
} else {
return NULL;
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 1f8d7eef26..365aa583bb 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -398,11 +398,14 @@ static int fd_wrapped_fd(grpc_fd *fd) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
- const char *reason) {
+ bool already_closed, const char *reason) {
fd->on_done_closure = on_done;
fd->released = release_fd != NULL;
- if (fd->released) {
+ if (release_fd != NULL) {
*release_fd = fd->fd;
+ fd->released = true;
+ } else if (already_closed) {
+ fd->released = true;
}
gpr_mu_lock(&fd->mu);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index cff77e641c..91f8cd5482 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -170,8 +170,9 @@ int grpc_fd_wrapped_fd(grpc_fd *fd) {
}
void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
- int *release_fd, const char *reason) {
- g_event_engine->fd_orphan(exec_ctx, fd, on_done, release_fd, reason);
+ int *release_fd, bool already_closed, const char *reason) {
+ g_event_engine->fd_orphan(exec_ctx, fd, on_done, release_fd, already_closed,
+ reason);
}
void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 0de7333843..1108e46ef8 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -37,7 +37,7 @@ typedef struct grpc_event_engine_vtable {
grpc_fd *(*fd_create)(int fd, const char *name);
int (*fd_wrapped_fd)(grpc_fd *fd);
void (*fd_orphan)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
- int *release_fd, const char *reason);
+ int *release_fd, bool already_closed, const char *reason);
void (*fd_shutdown)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why);
void (*fd_notify_on_read)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
@@ -104,7 +104,7 @@ int grpc_fd_wrapped_fd(grpc_fd *fd);
notify_on_write.
MUST NOT be called with a pollset lock taken */
void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
- int *release_fd, const char *reason);
+ int *release_fd, bool already_closed, const char *reason);
/* Has grpc_fd_shutdown been called on an fd? */
bool grpc_fd_is_shutdown(grpc_fd *fd);
diff --git a/src/core/lib/iomgr/lockfree_event.c b/src/core/lib/iomgr/lockfree_event.c
index c2ceecb3c5..f967b22ba9 100644
--- a/src/core/lib/iomgr/lockfree_event.c
+++ b/src/core/lib/iomgr/lockfree_event.c
@@ -79,12 +79,12 @@ bool grpc_lfev_is_shutdown(gpr_atm *state) {
}
void grpc_lfev_notify_on(grpc_exec_ctx *exec_ctx, gpr_atm *state,
- grpc_closure *closure) {
+ grpc_closure *closure, const char *variable) {
while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(state);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "lfev_notify_on: %p curr=%p closure=%p", state,
- (void *)curr, closure);
+ gpr_log(GPR_ERROR, "lfev_notify_on[%s]: %p curr=%p closure=%p", variable,
+ state, (void *)curr, closure);
}
switch (curr) {
case CLOSURE_NOT_READY: {
@@ -149,7 +149,7 @@ bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state,
while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(state);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "lfev_set_shutdown: %p curr=%p err=%s", state,
+ gpr_log(GPR_ERROR, "lfev_set_shutdown: %p curr=%p err=%s", state,
(void *)curr, grpc_error_string(shutdown_err));
}
switch (curr) {
@@ -193,12 +193,14 @@ bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state,
GPR_UNREACHABLE_CODE(return false);
}
-void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state) {
+void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state,
+ const char *variable) {
while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(state);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "lfev_set_ready: %p curr=%p", state, (void *)curr);
+ gpr_log(GPR_ERROR, "lfev_set_ready[%s]: %p curr=%p", variable, state,
+ (void *)curr);
}
switch (curr) {
diff --git a/src/core/lib/iomgr/lockfree_event.h b/src/core/lib/iomgr/lockfree_event.h
index ef3844a07a..6a14a0f3b2 100644
--- a/src/core/lib/iomgr/lockfree_event.h
+++ b/src/core/lib/iomgr/lockfree_event.h
@@ -30,10 +30,11 @@ void grpc_lfev_destroy(gpr_atm *state);
bool grpc_lfev_is_shutdown(gpr_atm *state);
void grpc_lfev_notify_on(grpc_exec_ctx *exec_ctx, gpr_atm *state,
- grpc_closure *closure);
+ grpc_closure *closure, const char *variable);
/* Returns true on first successful shutdown */
bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state,
grpc_error *shutdown_err);
-void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state);
+void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state,
+ const char *variable);
#endif /* GRPC_CORE_LIB_IOMGR_LOCKFREE_EVENT_H */
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index 21e320a6e7..a25fba4527 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -209,7 +209,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
finish:
if (fd != NULL) {
grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd);
- grpc_fd_orphan(exec_ctx, fd, NULL, NULL, "tcp_client_orphan");
+ grpc_fd_orphan(exec_ctx, fd, NULL, NULL, false /* already_closed */,
+ "tcp_client_orphan");
fd = NULL;
}
done = (--ac->refs == 0);
@@ -295,7 +296,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
- grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error");
+ grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, false /* already_closed */,
+ "tcp_client_connect_error");
GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"));
goto done;
}
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index b6dcd15cb0..2f543fd8a9 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -156,7 +156,7 @@ static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
- "tcp_unref_orphan");
+ false /* already_closed */, "tcp_unref_orphan");
grpc_slice_buffer_destroy_internal(exec_ctx, &tcp->last_read_buffer);
grpc_resource_user_unref(exec_ctx, tcp->resource_user);
gpr_free(tcp->peer_string);
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index f304642951..0fc5c0fd86 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -166,7 +166,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
grpc_schedule_on_exec_ctx);
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
- "tcp_listener_shutdown");
+ false /* already_closed */, "tcp_listener_shutdown");
}
gpr_mu_unlock(&s->mu);
} else {
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 54e7f417a7..88fa34cb7a 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -214,7 +214,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
sp->server->user_data);
}
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
- "udp_listener_shutdown");
+ false /* already_closed */, "udp_listener_shutdown");
}
gpr_mu_unlock(&s->mu);
} else {
diff --git a/src/core/lib/security/credentials/composite/composite_credentials.c b/src/core/lib/security/credentials/composite/composite_credentials.c
index 77d7b04627..09fd60a12c 100644
--- a/src/core/lib/security/credentials/composite/composite_credentials.c
+++ b/src/core/lib/security/credentials/composite/composite_credentials.c
@@ -32,88 +32,98 @@
typedef struct {
grpc_composite_call_credentials *composite_creds;
size_t creds_index;
- grpc_credentials_md_store *md_elems;
- grpc_auth_metadata_context auth_md_context;
- void *user_data;
grpc_polling_entity *pollent;
- grpc_credentials_metadata_cb cb;
+ grpc_auth_metadata_context auth_md_context;
+ grpc_credentials_mdelem_array *md_array;
+ grpc_closure *on_request_metadata;
+ grpc_closure internal_on_request_metadata;
} grpc_composite_call_credentials_metadata_context;
static void composite_call_destruct(grpc_exec_ctx *exec_ctx,
grpc_call_credentials *creds) {
grpc_composite_call_credentials *c = (grpc_composite_call_credentials *)creds;
- size_t i;
- for (i = 0; i < c->inner.num_creds; i++) {
+ for (size_t i = 0; i < c->inner.num_creds; i++) {
grpc_call_credentials_unref(exec_ctx, c->inner.creds_array[i]);
}
gpr_free(c->inner.creds_array);
}
-static void composite_call_md_context_destroy(
- grpc_exec_ctx *exec_ctx,
- grpc_composite_call_credentials_metadata_context *ctx) {
- grpc_credentials_md_store_unref(exec_ctx, ctx->md_elems);
- gpr_free(ctx);
-}
-
-static void composite_call_metadata_cb(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_credentials_md *md_elems,
- size_t num_md,
- grpc_credentials_status status,
- const char *error_details) {
+static void composite_call_metadata_cb(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
grpc_composite_call_credentials_metadata_context *ctx =
- (grpc_composite_call_credentials_metadata_context *)user_data;
- if (status != GRPC_CREDENTIALS_OK) {
- ctx->cb(exec_ctx, ctx->user_data, NULL, 0, status, error_details);
- return;
- }
-
- /* Copy the metadata in the context. */
- if (num_md > 0) {
- size_t i;
- for (i = 0; i < num_md; i++) {
- grpc_credentials_md_store_add(ctx->md_elems, md_elems[i].key,
- md_elems[i].value);
+ (grpc_composite_call_credentials_metadata_context *)arg;
+ if (error == GRPC_ERROR_NONE) {
+ /* See if we need to get some more metadata. */
+ if (ctx->creds_index < ctx->composite_creds->inner.num_creds) {
+ grpc_call_credentials *inner_creds =
+ ctx->composite_creds->inner.creds_array[ctx->creds_index++];
+ if (grpc_call_credentials_get_request_metadata(
+ exec_ctx, inner_creds, ctx->pollent, ctx->auth_md_context,
+ ctx->md_array, &ctx->internal_on_request_metadata, &error)) {
+ // Synchronous response, so call ourselves recursively.
+ composite_call_metadata_cb(exec_ctx, arg, error);
+ GRPC_ERROR_UNREF(error);
+ }
+ return;
}
+ // We're done!
}
-
- /* See if we need to get some more metadata. */
- if (ctx->creds_index < ctx->composite_creds->inner.num_creds) {
- grpc_call_credentials *inner_creds =
- ctx->composite_creds->inner.creds_array[ctx->creds_index++];
- grpc_call_credentials_get_request_metadata(
- exec_ctx, inner_creds, ctx->pollent, ctx->auth_md_context,
- composite_call_metadata_cb, ctx);
- return;
- }
-
- /* We're done!. */
- ctx->cb(exec_ctx, ctx->user_data, ctx->md_elems->entries,
- ctx->md_elems->num_entries, GRPC_CREDENTIALS_OK, NULL);
- composite_call_md_context_destroy(exec_ctx, ctx);
+ GRPC_CLOSURE_SCHED(exec_ctx, ctx->on_request_metadata, GRPC_ERROR_REF(error));
+ gpr_free(ctx);
}
-static void composite_call_get_request_metadata(
+static bool composite_call_get_request_metadata(
grpc_exec_ctx *exec_ctx, grpc_call_credentials *creds,
grpc_polling_entity *pollent, grpc_auth_metadata_context auth_md_context,
- grpc_credentials_metadata_cb cb, void *user_data) {
+ grpc_credentials_mdelem_array *md_array, grpc_closure *on_request_metadata,
+ grpc_error **error) {
grpc_composite_call_credentials *c = (grpc_composite_call_credentials *)creds;
grpc_composite_call_credentials_metadata_context *ctx;
-
ctx = gpr_zalloc(sizeof(grpc_composite_call_credentials_metadata_context));
- ctx->auth_md_context = auth_md_context;
- ctx->user_data = user_data;
- ctx->cb = cb;
ctx->composite_creds = c;
ctx->pollent = pollent;
- ctx->md_elems = grpc_credentials_md_store_create(c->inner.num_creds);
- grpc_call_credentials_get_request_metadata(
- exec_ctx, c->inner.creds_array[ctx->creds_index++], ctx->pollent,
- auth_md_context, composite_call_metadata_cb, ctx);
+ ctx->auth_md_context = auth_md_context;
+ ctx->md_array = md_array;
+ ctx->on_request_metadata = on_request_metadata;
+ GRPC_CLOSURE_INIT(&ctx->internal_on_request_metadata,
+ composite_call_metadata_cb, ctx, grpc_schedule_on_exec_ctx);
+ while (ctx->creds_index < ctx->composite_creds->inner.num_creds) {
+ grpc_call_credentials *inner_creds =
+ ctx->composite_creds->inner.creds_array[ctx->creds_index++];
+ if (grpc_call_credentials_get_request_metadata(
+ exec_ctx, inner_creds, ctx->pollent, ctx->auth_md_context,
+ ctx->md_array, &ctx->internal_on_request_metadata, error)) {
+ if (*error != GRPC_ERROR_NONE) break;
+ } else {
+ break;
+ }
+ }
+ // If we got through all creds synchronously or we got a synchronous
+ // error on one of them, return synchronously.
+ if (ctx->creds_index == ctx->composite_creds->inner.num_creds ||
+ *error != GRPC_ERROR_NONE) {
+ gpr_free(ctx);
+ return true;
+ }
+ // At least one inner cred is returning asynchronously, so we'll
+ // return asynchronously as well.
+ return false;
+}
+
+static void composite_call_cancel_get_request_metadata(
+ grpc_exec_ctx *exec_ctx, grpc_call_credentials *creds,
+ grpc_credentials_mdelem_array *md_array, grpc_error *error) {
+ grpc_composite_call_credentials *c = (grpc_composite_call_credentials *)creds;
+ for (size_t i = 0; i < c->inner.num_creds; ++i) {
+ grpc_call_credentials_cancel_get_request_metadata(
+ exec_ctx, c->inner.creds_array[i], md_array, GRPC_ERROR_REF(error));
+ }
+ GRPC_ERROR_UNREF(error);
}
static grpc_call_credentials_vtable composite_call_credentials_vtable = {
- composite_call_destruct, composite_call_get_request_metadata};
+ composite_call_destruct, composite_call_get_request_metadata,
+ composite_call_cancel_get_request_metadata};
static grpc_call_credentials_array get_creds_array(
grpc_call_credentials **creds_addr) {
diff --git a/src/core/lib/security/credentials/credentials.c b/src/core/lib/security/credentials/credentials.c
index b1f1e82076..8a67c9865b 100644
--- a/src/core/lib/security/credentials/credentials.c
+++ b/src/core/lib/security/credentials/credentials.c
@@ -38,13 +38,10 @@
/* -- Common. -- */
grpc_credentials_metadata_request *grpc_credentials_metadata_request_create(
- grpc_call_credentials *creds, grpc_credentials_metadata_cb cb,
- void *user_data) {
+ grpc_call_credentials *creds) {
grpc_credentials_metadata_request *r =
gpr_zalloc(sizeof(grpc_credentials_metadata_request));
r->creds = grpc_call_credentials_ref(creds);
- r->cb = cb;
- r->user_data = user_data;
return r;
}
@@ -104,18 +101,25 @@ void grpc_call_credentials_release(grpc_call_credentials *creds) {
grpc_exec_ctx_finish(&exec_ctx);
}
-void grpc_call_credentials_get_request_metadata(
+bool grpc_call_credentials_get_request_metadata(
grpc_exec_ctx *exec_ctx, grpc_call_credentials *creds,
grpc_polling_entity *pollent, grpc_auth_metadata_context context,
- grpc_credentials_metadata_cb cb, void *user_data) {
+ grpc_credentials_mdelem_array *md_array, grpc_closure *on_request_metadata,
+ grpc_error **error) {
if (creds == NULL || creds->vtable->get_request_metadata == NULL) {
- if (cb != NULL) {
- cb(exec_ctx, user_data, NULL, 0, GRPC_CREDENTIALS_OK, NULL);
- }
+ return true;
+ }
+ return creds->vtable->get_request_metadata(
+ exec_ctx, creds, pollent, context, md_array, on_request_metadata, error);
+}
+
+void grpc_call_credentials_cancel_get_request_metadata(
+ grpc_exec_ctx *exec_ctx, grpc_call_credentials *creds,
+ grpc_credentials_mdelem_array *md_array, grpc_error *error) {
+ if (creds == NULL || creds->vtable->cancel_get_request_metadata == NULL) {
return;
}
- creds->vtable->get_request_metadata(exec_ctx, creds, pollent, context, cb,
- user_data);
+ creds->vtable->cancel_get_request_metadata(exec_ctx, creds, md_array, error);
}
grpc_security_status grpc_channel_credentials_create_security_connector(
diff --git a/src/core/lib/security/credentials/credentials.h b/src/core/lib/security/credentials/credentials.h
index c45c2e957c..04a54b0ca8 100644
--- a/src/core/lib/security/credentials/credentials.h
+++ b/src/core/lib/security/credentials/credentials.h
@@ -138,48 +138,39 @@ grpc_channel_credentials *grpc_channel_credentials_from_arg(
grpc_channel_credentials *grpc_channel_credentials_find_in_args(
const grpc_channel_args *args);
-/* --- grpc_credentials_md. --- */
+/* --- grpc_credentials_mdelem_array. --- */
typedef struct {
- grpc_slice key;
- grpc_slice value;
-} grpc_credentials_md;
+ grpc_mdelem *md;
+ size_t size;
+} grpc_credentials_mdelem_array;
-typedef struct {
- grpc_credentials_md *entries;
- size_t num_entries;
- size_t allocated;
- gpr_refcount refcount;
-} grpc_credentials_md_store;
+/// Takes a new ref to \a md.
+void grpc_credentials_mdelem_array_add(grpc_credentials_mdelem_array *list,
+ grpc_mdelem md);
-grpc_credentials_md_store *grpc_credentials_md_store_create(
- size_t initial_capacity);
+/// Appends all elements from \a src to \a dst, taking a new ref to each one.
+void grpc_credentials_mdelem_array_append(grpc_credentials_mdelem_array *dst,
+ grpc_credentials_mdelem_array *src);
-/* Will ref key and value. */
-void grpc_credentials_md_store_add(grpc_credentials_md_store *store,
- grpc_slice key, grpc_slice value);
-void grpc_credentials_md_store_add_cstrings(grpc_credentials_md_store *store,
- const char *key, const char *value);
-grpc_credentials_md_store *grpc_credentials_md_store_ref(
- grpc_credentials_md_store *store);
-void grpc_credentials_md_store_unref(grpc_exec_ctx *exec_ctx,
- grpc_credentials_md_store *store);
+void grpc_credentials_mdelem_array_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_credentials_mdelem_array *list);
/* --- grpc_call_credentials. --- */
-/* error_details must be NULL if status is GRPC_CREDENTIALS_OK. */
-typedef void (*grpc_credentials_metadata_cb)(
- grpc_exec_ctx *exec_ctx, void *user_data, grpc_credentials_md *md_elems,
- size_t num_md, grpc_credentials_status status, const char *error_details);
-
typedef struct {
void (*destruct)(grpc_exec_ctx *exec_ctx, grpc_call_credentials *c);
- void (*get_request_metadata)(grpc_exec_ctx *exec_ctx,
+ bool (*get_request_metadata)(grpc_exec_ctx *exec_ctx,
grpc_call_credentials *c,
grpc_polling_entity *pollent,
grpc_auth_metadata_context context,
- grpc_credentials_metadata_cb cb,
- void *user_data);
+ grpc_credentials_mdelem_array *md_array,
+ grpc_closure *on_request_metadata,
+ grpc_error **error);
+ void (*cancel_get_request_metadata)(grpc_exec_ctx *exec_ctx,
+ grpc_call_credentials *c,
+ grpc_credentials_mdelem_array *md_array,
+ grpc_error *error);
} grpc_call_credentials_vtable;
struct grpc_call_credentials {
@@ -191,15 +182,29 @@ struct grpc_call_credentials {
grpc_call_credentials *grpc_call_credentials_ref(grpc_call_credentials *creds);
void grpc_call_credentials_unref(grpc_exec_ctx *exec_ctx,
grpc_call_credentials *creds);
-void grpc_call_credentials_get_request_metadata(
+
+/// Returns true if completed synchronously, in which case \a error will
+/// be set to indicate the result. Otherwise, \a on_request_metadata will
+/// be invoked asynchronously when complete. \a md_array will be populated
+/// with the resulting metadata once complete.
+bool grpc_call_credentials_get_request_metadata(
grpc_exec_ctx *exec_ctx, grpc_call_credentials *creds,
grpc_polling_entity *pollent, grpc_auth_metadata_context context,
- grpc_credentials_metadata_cb cb, void *user_data);
+ grpc_credentials_mdelem_array *md_array, grpc_closure *on_request_metadata,
+ grpc_error **error);
+
+/// Cancels a pending asynchronous operation started by
+/// grpc_call_credentials_get_request_metadata() with the corresponding
+/// value of \a md_array.
+void grpc_call_credentials_cancel_get_request_metadata(
+ grpc_exec_ctx *exec_ctx, grpc_call_credentials *c,
+ grpc_credentials_mdelem_array *md_array, grpc_error *error);
/* Metadata-only credentials with the specified key and value where
asynchronicity can be simulated for testing. */
grpc_call_credentials *grpc_md_only_test_credentials_create(
- const char *md_key, const char *md_value, int is_async);
+ grpc_exec_ctx *exec_ctx, const char *md_key, const char *md_value,
+ bool is_async);
/* --- grpc_server_credentials. --- */
@@ -238,14 +243,11 @@ grpc_server_credentials *grpc_find_server_credentials_in_args(
typedef struct {
grpc_call_credentials *creds;
- grpc_credentials_metadata_cb cb;
grpc_http_response response;
- void *user_data;
} grpc_credentials_metadata_request;
grpc_credentials_metadata_request *grpc_credentials_metadata_request_create(
- grpc_call_credentials *creds, grpc_credentials_metadata_cb cb,
- void *user_data);
+ grpc_call_credentials *creds);
void grpc_credentials_metadata_request_destroy(
grpc_exec_ctx *exec_ctx, grpc_credentials_metadata_request *r);
diff --git a/src/core/lib/security/credentials/credentials_metadata.c b/src/core/lib/security/credentials/credentials_metadata.c
index fcfdd52d20..ccd39e610f 100644
--- a/src/core/lib/security/credentials/credentials_metadata.c
+++ b/src/core/lib/security/credentials/credentials_metadata.c
@@ -24,65 +24,36 @@
#include "src/core/lib/slice/slice_internal.h"
-static void store_ensure_capacity(grpc_credentials_md_store *store) {
- if (store->num_entries == store->allocated) {
- store->allocated = (store->allocated == 0) ? 1 : store->allocated * 2;
- store->entries = gpr_realloc(
- store->entries, store->allocated * sizeof(grpc_credentials_md));
+static void mdelem_list_ensure_capacity(grpc_credentials_mdelem_array *list,
+ size_t additional_space_needed) {
+ size_t target_size = list->size + additional_space_needed;
+ // Find the next power of two greater than the target size (i.e.,
+ // whenever we add more space, we double what we already have).
+ size_t new_size = 2;
+ while (new_size < target_size) {
+ new_size *= 2;
}
+ list->md = gpr_realloc(list->md, sizeof(grpc_mdelem) * new_size);
}
-grpc_credentials_md_store *grpc_credentials_md_store_create(
- size_t initial_capacity) {
- grpc_credentials_md_store *store =
- gpr_zalloc(sizeof(grpc_credentials_md_store));
- if (initial_capacity > 0) {
- store->entries = gpr_malloc(initial_capacity * sizeof(grpc_credentials_md));
- store->allocated = initial_capacity;
- }
- gpr_ref_init(&store->refcount, 1);
- return store;
-}
-
-void grpc_credentials_md_store_add(grpc_credentials_md_store *store,
- grpc_slice key, grpc_slice value) {
- if (store == NULL) return;
- store_ensure_capacity(store);
- store->entries[store->num_entries].key = grpc_slice_ref_internal(key);
- store->entries[store->num_entries].value = grpc_slice_ref_internal(value);
- store->num_entries++;
+void grpc_credentials_mdelem_array_add(grpc_credentials_mdelem_array *list,
+ grpc_mdelem md) {
+ mdelem_list_ensure_capacity(list, 1);
+ list->md[list->size++] = GRPC_MDELEM_REF(md);
}
-void grpc_credentials_md_store_add_cstrings(grpc_credentials_md_store *store,
- const char *key,
- const char *value) {
- if (store == NULL) return;
- store_ensure_capacity(store);
- store->entries[store->num_entries].key = grpc_slice_from_copied_string(key);
- store->entries[store->num_entries].value =
- grpc_slice_from_copied_string(value);
- store->num_entries++;
-}
-
-grpc_credentials_md_store *grpc_credentials_md_store_ref(
- grpc_credentials_md_store *store) {
- if (store == NULL) return NULL;
- gpr_ref(&store->refcount);
- return store;
+void grpc_credentials_mdelem_array_append(grpc_credentials_mdelem_array *dst,
+ grpc_credentials_mdelem_array *src) {
+ mdelem_list_ensure_capacity(dst, src->size);
+ for (size_t i = 0; i < src->size; ++i) {
+ dst->md[dst->size++] = GRPC_MDELEM_REF(src->md[i]);
+ }
}
-void grpc_credentials_md_store_unref(grpc_exec_ctx *exec_ctx,
- grpc_credentials_md_store *store) {
- if (store == NULL) return;
- if (gpr_unref(&store->refcount)) {
- if (store->entries != NULL) {
- size_t i;
- for (i = 0; i < store->num_entries; i++) {
- grpc_slice_unref_internal(exec_ctx, store->entries[i].key);
- grpc_slice_unref_internal(exec_ctx, store->entries[i].value);
- }
- gpr_free(store->entries);
- }
- gpr_free(store);
+void grpc_credentials_mdelem_array_destroy(
+ grpc_exec_ctx *exec_ctx, grpc_credentials_mdelem_array *list) {
+ for (size_t i = 0; i < list->size; ++i) {
+ GRPC_MDELEM_UNREF(exec_ctx, list->md[i]);
}
+ gpr_free(list->md);
}
diff --git a/src/core/lib/security/credentials/fake/fake_credentials.c b/src/core/lib/security/credentials/fake/fake_credentials.c
index 67e74f7b92..ac9017850f 100644
--- a/src/core/lib/security/credentials/fake/fake_credentials.c
+++ b/src/core/lib/security/credentials/fake/fake_credentials.c
@@ -98,49 +98,44 @@ const char *grpc_fake_transport_get_expected_targets(
static void md_only_test_destruct(grpc_exec_ctx *exec_ctx,
grpc_call_credentials *creds) {
grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)creds;
- grpc_credentials_md_store_unref(exec_ctx, c->md_store);
+ GRPC_MDELEM_UNREF(exec_ctx, c->md);
}
-static void on_simulated_token_fetch_done(grpc_exec_ctx *exec_ctx,
- void *user_data, grpc_error *error) {
- grpc_credentials_metadata_request *r =
- (grpc_credentials_metadata_request *)user_data;
- grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)r->creds;
- r->cb(exec_ctx, r->user_data, c->md_store->entries, c->md_store->num_entries,
- GRPC_CREDENTIALS_OK, NULL);
- grpc_credentials_metadata_request_destroy(exec_ctx, r);
-}
-
-static void md_only_test_get_request_metadata(
+static bool md_only_test_get_request_metadata(
grpc_exec_ctx *exec_ctx, grpc_call_credentials *creds,
grpc_polling_entity *pollent, grpc_auth_metadata_context context,
- grpc_credentials_metadata_cb cb, void *user_data) {
+ grpc_credentials_mdelem_array *md_array, grpc_closure *on_request_metadata,
+ grpc_error **error) {
grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)creds;
-
+ grpc_credentials_mdelem_array_add(md_array, c->md);
if (c->is_async) {
- grpc_credentials_metadata_request *cb_arg =
- grpc_credentials_metadata_request_create(creds, cb, user_data);
- GRPC_CLOSURE_SCHED(exec_ctx,
- GRPC_CLOSURE_CREATE(on_simulated_token_fetch_done,
- cb_arg, grpc_executor_scheduler),
- GRPC_ERROR_NONE);
- } else {
- cb(exec_ctx, user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK, NULL);
+ GRPC_CLOSURE_SCHED(exec_ctx, on_request_metadata, GRPC_ERROR_NONE);
+ return false;
}
+ return true;
+}
+
+static void md_only_test_cancel_get_request_metadata(
+ grpc_exec_ctx *exec_ctx, grpc_call_credentials *c,
+ grpc_credentials_mdelem_array *md_array, grpc_error *error) {
+ GRPC_ERROR_UNREF(error);
}
static grpc_call_credentials_vtable md_only_test_vtable = {
- md_only_test_destruct, md_only_test_get_request_metadata};
+ md_only_test_destruct, md_only_test_get_request_metadata,
+ md_only_test_cancel_get_request_metadata};
grpc_call_credentials *grpc_md_only_test_credentials_create(
- const char *md_key, const char *md_value, int is_async) {
+ grpc_exec_ctx *exec_ctx, const char *md_key, const char *md_value,
+ bool is_async) {
grpc_md_only_test_credentials *c =
gpr_zalloc(sizeof(grpc_md_only_test_credentials));
c->base.type = GRPC_CALL_CREDENTIALS_TYPE_OAUTH2;
c->base.vtable = &md_only_test_vtable;
gpr_ref_init(&c->base.refcount, 1);
- c->md_store = grpc_credentials_md_store_create(1);
- grpc_credentials_md_store_add_cstrings(c->md_store, md_key, md_value);
+ c->md =
+ grpc_mdelem_from_slices(exec_ctx, grpc_slice_from_copied_string(md_key),
+ grpc_slice_from_copied_string(md_value));
c->is_async = is_async;
return &c->base;
}
diff --git a/src/core/lib/security/credentials/fake/fake_credentials.h b/src/core/lib/security/credentials/fake/fake_credentials.h
index fae7a6a9c4..aa0f3b6e20 100644
--- a/src/core/lib/security/credentials/fake/fake_credentials.h
+++ b/src/core/lib/security/credentials/fake/fake_credentials.h
@@ -52,8 +52,8 @@ const char *grpc_fake_transport_get_expected_targets(
typedef struct {
grpc_call_credentials base;
- grpc_credentials_md_store *md_store;
- int is_async;
+ grpc_mdelem md;
+ bool is_async;
} grpc_md_only_test_credentials;
#endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_FAKE_FAKE_CREDENTIALS_H */
diff --git a/src/core/lib/security/credentials/iam/iam_credentials.c b/src/core/lib/security/credentials/iam/iam_credentials.c
index 4b32c5a047..3de8319d98 100644
--- a/src/core/lib/security/credentials/iam/iam_credentials.c
+++ b/src/core/lib/security/credentials/iam/iam_credentials.c
@@ -30,26 +30,33 @@
static void iam_destruct(grpc_exec_ctx *exec_ctx,
grpc_call_credentials *creds) {
grpc_google_iam_credentials *c = (grpc_google_iam_credentials *)creds;
- grpc_credentials_md_store_unref(exec_ctx, c->iam_md);
+ grpc_credentials_mdelem_array_destroy(exec_ctx, &c->md_array);
}
-static void iam_get_request_metadata(grpc_exec_ctx *exec_ctx,
+static bool iam_get_request_metadata(grpc_exec_ctx *exec_ctx,
grpc_call_credentials *creds,
grpc_polling_entity *pollent,
grpc_auth_metadata_context context,
- grpc_credentials_metadata_cb cb,
- void *user_data) {
+ grpc_credentials_mdelem_array *md_array,
+ grpc_closure *on_request_metadata,
+ grpc_error **error) {
grpc_google_iam_credentials *c = (grpc_google_iam_credentials *)creds;
- cb(exec_ctx, user_data, c->iam_md->entries, c->iam_md->num_entries,
- GRPC_CREDENTIALS_OK, NULL);
+ grpc_credentials_mdelem_array_append(md_array, &c->md_array);
+ return true;
}
-static grpc_call_credentials_vtable iam_vtable = {iam_destruct,
- iam_get_request_metadata};
+static void iam_cancel_get_request_metadata(
+ grpc_exec_ctx *exec_ctx, grpc_call_credentials *c,
+ grpc_credentials_mdelem_array *md_array, grpc_error *error) {
+ GRPC_ERROR_UNREF(error);
+}
+
+static grpc_call_credentials_vtable iam_vtable = {
+ iam_destruct, iam_get_request_metadata, iam_cancel_get_request_metadata};
grpc_call_credentials *grpc_google_iam_credentials_create(
const char *token, const char *authority_selector, void *reserved) {
- grpc_google_iam_credentials *c;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE(
"grpc_iam_credentials_create(token=%s, authority_selector=%s, "
"reserved=%p)",
@@ -57,14 +64,22 @@ grpc_call_credentials *grpc_google_iam_credentials_create(
GPR_ASSERT(reserved == NULL);
GPR_ASSERT(token != NULL);
GPR_ASSERT(authority_selector != NULL);
- c = gpr_zalloc(sizeof(grpc_google_iam_credentials));
+ grpc_google_iam_credentials *c = gpr_zalloc(sizeof(*c));
c->base.type = GRPC_CALL_CREDENTIALS_TYPE_IAM;
c->base.vtable = &iam_vtable;
gpr_ref_init(&c->base.refcount, 1);
- c->iam_md = grpc_credentials_md_store_create(2);
- grpc_credentials_md_store_add_cstrings(
- c->iam_md, GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY, token);
- grpc_credentials_md_store_add_cstrings(
- c->iam_md, GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, authority_selector);
+ grpc_mdelem md = grpc_mdelem_from_slices(
+ &exec_ctx,
+ grpc_slice_from_static_string(GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY),
+ grpc_slice_from_copied_string(token));
+ grpc_credentials_mdelem_array_add(&c->md_array, md);
+ GRPC_MDELEM_UNREF(&exec_ctx, md);
+ md = grpc_mdelem_from_slices(
+ &exec_ctx,
+ grpc_slice_from_static_string(GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY),
+ grpc_slice_from_copied_string(authority_selector));
+ grpc_credentials_mdelem_array_add(&c->md_array, md);
+ GRPC_MDELEM_UNREF(&exec_ctx, md);
+ grpc_exec_ctx_finish(&exec_ctx);
return &c->base;
}
diff --git a/src/core/lib/security/credentials/iam/iam_credentials.h b/src/core/lib/security/credentials/iam/iam_credentials.h
index fff3c6d2ca..5e3cf65bad 100644
--- a/src/core/lib/security/credentials/iam/iam_credentials.h
+++ b/src/core/lib/security/credentials/iam/iam_credentials.h
@@ -23,7 +23,7 @@
typedef struct {
grpc_call_credentials base;
- grpc_credentials_md_store *iam_md;
+ grpc_credentials_mdelem_array md_array;
} grpc_google_iam_credentials;
#endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_IAM_IAM_CREDENTIALS_H */
diff --git a/src/core/lib/security/credentials/jwt/jwt_credentials.c b/src/core/lib/security/credentials/jwt/jwt_credentials.c
index 4357657def..02c82e99ba 100644
--- a/src/core/lib/security/credentials/jwt/jwt_credentials.c
+++ b/src/core/lib/security/credentials/jwt/jwt_credentials.c
@@ -29,10 +29,8 @@
static void jwt_reset_cache(grpc_exec_ctx *exec_ctx,
grpc_service_account_jwt_access_credentials *c) {
- if (c->cached.jwt_md != NULL) {
- grpc_credentials_md_store_unref(exec_ctx, c->cached.jwt_md);
- c->cached.jwt_md = NULL;
- }
+ GRPC_MDELEM_UNREF(exec_ctx, c->cached.jwt_md);
+ c->cached.jwt_md = GRPC_MDNULL;
if (c->cached.service_url != NULL) {
gpr_free(c->cached.service_url);
c->cached.service_url = NULL;
@@ -49,33 +47,34 @@ static void jwt_destruct(grpc_exec_ctx *exec_ctx,
gpr_mu_destroy(&c->cache_mu);
}
-static void jwt_get_request_metadata(grpc_exec_ctx *exec_ctx,
+static bool jwt_get_request_metadata(grpc_exec_ctx *exec_ctx,
grpc_call_credentials *creds,
grpc_polling_entity *pollent,
grpc_auth_metadata_context context,
- grpc_credentials_metadata_cb cb,
- void *user_data) {
+ grpc_credentials_mdelem_array *md_array,
+ grpc_closure *on_request_metadata,
+ grpc_error **error) {
grpc_service_account_jwt_access_credentials *c =
(grpc_service_account_jwt_access_credentials *)creds;
gpr_timespec refresh_threshold = gpr_time_from_seconds(
GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS, GPR_TIMESPAN);
/* See if we can return a cached jwt. */
- grpc_credentials_md_store *jwt_md = NULL;
+ grpc_mdelem jwt_md = GRPC_MDNULL;
{
gpr_mu_lock(&c->cache_mu);
if (c->cached.service_url != NULL &&
strcmp(c->cached.service_url, context.service_url) == 0 &&
- c->cached.jwt_md != NULL &&
+ !GRPC_MDISNULL(c->cached.jwt_md) &&
(gpr_time_cmp(gpr_time_sub(c->cached.jwt_expiration,
gpr_now(GPR_CLOCK_REALTIME)),
refresh_threshold) > 0)) {
- jwt_md = grpc_credentials_md_store_ref(c->cached.jwt_md);
+ jwt_md = GRPC_MDELEM_REF(c->cached.jwt_md);
}
gpr_mu_unlock(&c->cache_mu);
}
- if (jwt_md == NULL) {
+ if (GRPC_MDISNULL(jwt_md)) {
char *jwt = NULL;
/* Generate a new jwt. */
gpr_mu_lock(&c->cache_mu);
@@ -89,27 +88,33 @@ static void jwt_get_request_metadata(grpc_exec_ctx *exec_ctx,
c->cached.jwt_expiration =
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c->jwt_lifetime);
c->cached.service_url = gpr_strdup(context.service_url);
- c->cached.jwt_md = grpc_credentials_md_store_create(1);
- grpc_credentials_md_store_add_cstrings(
- c->cached.jwt_md, GRPC_AUTHORIZATION_METADATA_KEY, md_value);
+ c->cached.jwt_md = grpc_mdelem_from_slices(
+ exec_ctx,
+ grpc_slice_from_static_string(GRPC_AUTHORIZATION_METADATA_KEY),
+ grpc_slice_from_copied_string(md_value));
gpr_free(md_value);
- jwt_md = grpc_credentials_md_store_ref(c->cached.jwt_md);
+ jwt_md = GRPC_MDELEM_REF(c->cached.jwt_md);
}
gpr_mu_unlock(&c->cache_mu);
}
- if (jwt_md != NULL) {
- cb(exec_ctx, user_data, jwt_md->entries, jwt_md->num_entries,
- GRPC_CREDENTIALS_OK, NULL);
- grpc_credentials_md_store_unref(exec_ctx, jwt_md);
+ if (!GRPC_MDISNULL(jwt_md)) {
+ grpc_credentials_mdelem_array_add(md_array, jwt_md);
+ GRPC_MDELEM_UNREF(exec_ctx, jwt_md);
} else {
- cb(exec_ctx, user_data, NULL, 0, GRPC_CREDENTIALS_ERROR,
- "Could not generate JWT.");
+ *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Could not generate JWT.");
}
+ return true;
+}
+
+static void jwt_cancel_get_request_metadata(
+ grpc_exec_ctx *exec_ctx, grpc_call_credentials *c,
+ grpc_credentials_mdelem_array *md_array, grpc_error *error) {
+ GRPC_ERROR_UNREF(error);
}
-static grpc_call_credentials_vtable jwt_vtable = {jwt_destruct,
- jwt_get_request_metadata};
+static grpc_call_credentials_vtable jwt_vtable = {
+ jwt_destruct, jwt_get_request_metadata, jwt_cancel_get_request_metadata};
grpc_call_credentials *
grpc_service_account_jwt_access_credentials_create_from_auth_json_key(
diff --git a/src/core/lib/security/credentials/jwt/jwt_credentials.h b/src/core/lib/security/credentials/jwt/jwt_credentials.h
index 6e461f1715..07f4022669 100644
--- a/src/core/lib/security/credentials/jwt/jwt_credentials.h
+++ b/src/core/lib/security/credentials/jwt/jwt_credentials.h
@@ -29,7 +29,7 @@ typedef struct {
// the service_url for a more sophisticated one.
gpr_mu cache_mu;
struct {
- grpc_credentials_md_store *jwt_md;
+ grpc_mdelem jwt_md;
char *service_url;
gpr_timespec jwt_expiration;
} cached;
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
index 9de561b310..ffa941bb9e 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
@@ -107,7 +107,7 @@ static void oauth2_token_fetcher_destruct(grpc_exec_ctx *exec_ctx,
grpc_call_credentials *creds) {
grpc_oauth2_token_fetcher_credentials *c =
(grpc_oauth2_token_fetcher_credentials *)creds;
- grpc_credentials_md_store_unref(exec_ctx, c->access_token_md);
+ GRPC_MDELEM_UNREF(exec_ctx, c->access_token_md);
gpr_mu_destroy(&c->mu);
grpc_httpcli_context_destroy(exec_ctx, &c->httpcli_context);
}
@@ -115,7 +115,7 @@ static void oauth2_token_fetcher_destruct(grpc_exec_ctx *exec_ctx,
grpc_credentials_status
grpc_oauth2_token_fetcher_credentials_parse_server_response(
grpc_exec_ctx *exec_ctx, const grpc_http_response *response,
- grpc_credentials_md_store **token_md, gpr_timespec *token_lifetime) {
+ grpc_mdelem *token_md, gpr_timespec *token_lifetime) {
char *null_terminated_body = NULL;
char *new_access_token = NULL;
grpc_credentials_status status = GRPC_CREDENTIALS_OK;
@@ -184,17 +184,18 @@ grpc_oauth2_token_fetcher_credentials_parse_server_response(
token_lifetime->tv_sec = strtol(expires_in->value, NULL, 10);
token_lifetime->tv_nsec = 0;
token_lifetime->clock_type = GPR_TIMESPAN;
- if (*token_md != NULL) grpc_credentials_md_store_unref(exec_ctx, *token_md);
- *token_md = grpc_credentials_md_store_create(1);
- grpc_credentials_md_store_add_cstrings(
- *token_md, GRPC_AUTHORIZATION_METADATA_KEY, new_access_token);
+ if (!GRPC_MDISNULL(*token_md)) GRPC_MDELEM_UNREF(exec_ctx, *token_md);
+ *token_md = grpc_mdelem_from_slices(
+ exec_ctx,
+ grpc_slice_from_static_string(GRPC_AUTHORIZATION_METADATA_KEY),
+ grpc_slice_from_copied_string(new_access_token));
status = GRPC_CREDENTIALS_OK;
}
end:
- if (status != GRPC_CREDENTIALS_OK && (*token_md != NULL)) {
- grpc_credentials_md_store_unref(exec_ctx, *token_md);
- *token_md = NULL;
+ if (status != GRPC_CREDENTIALS_OK && !GRPC_MDISNULL(*token_md)) {
+ GRPC_MDELEM_UNREF(exec_ctx, *token_md);
+ *token_md = GRPC_MDNULL;
}
if (null_terminated_body != NULL) gpr_free(null_terminated_body);
if (new_access_token != NULL) gpr_free(new_access_token);
@@ -205,63 +206,124 @@ end:
static void on_oauth2_token_fetcher_http_response(grpc_exec_ctx *exec_ctx,
void *user_data,
grpc_error *error) {
+ GRPC_LOG_IF_ERROR("oauth_fetch", GRPC_ERROR_REF(error));
grpc_credentials_metadata_request *r =
(grpc_credentials_metadata_request *)user_data;
grpc_oauth2_token_fetcher_credentials *c =
(grpc_oauth2_token_fetcher_credentials *)r->creds;
+ grpc_mdelem access_token_md = GRPC_MDNULL;
gpr_timespec token_lifetime;
- grpc_credentials_status status;
-
- GRPC_LOG_IF_ERROR("oauth_fetch", GRPC_ERROR_REF(error));
-
+ grpc_credentials_status status =
+ grpc_oauth2_token_fetcher_credentials_parse_server_response(
+ exec_ctx, &r->response, &access_token_md, &token_lifetime);
+ // Update cache and grab list of pending requests.
gpr_mu_lock(&c->mu);
- status = grpc_oauth2_token_fetcher_credentials_parse_server_response(
- exec_ctx, &r->response, &c->access_token_md, &token_lifetime);
- if (status == GRPC_CREDENTIALS_OK) {
- c->token_expiration =
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), token_lifetime);
- r->cb(exec_ctx, r->user_data, c->access_token_md->entries,
- c->access_token_md->num_entries, GRPC_CREDENTIALS_OK, NULL);
- } else {
- c->token_expiration = gpr_inf_past(GPR_CLOCK_REALTIME);
- r->cb(exec_ctx, r->user_data, NULL, 0, status,
- "Error occured when fetching oauth2 token.");
- }
+ c->token_fetch_pending = false;
+ c->access_token_md = GRPC_MDELEM_REF(access_token_md);
+ c->token_expiration =
+ status == GRPC_CREDENTIALS_OK
+ ? gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), token_lifetime)
+ : gpr_inf_past(GPR_CLOCK_REALTIME);
+ grpc_oauth2_pending_get_request_metadata *pending_request =
+ c->pending_requests;
+ c->pending_requests = NULL;
gpr_mu_unlock(&c->mu);
+ // Invoke callbacks for all pending requests.
+ while (pending_request != NULL) {
+ if (status == GRPC_CREDENTIALS_OK) {
+ grpc_credentials_mdelem_array_add(pending_request->md_array,
+ access_token_md);
+ } else {
+ error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Error occured when fetching oauth2 token.", &error, 1);
+ }
+ GRPC_CLOSURE_SCHED(exec_ctx, pending_request->on_request_metadata, error);
+ grpc_oauth2_pending_get_request_metadata *prev = pending_request;
+ pending_request = pending_request->next;
+ gpr_free(prev);
+ }
+ GRPC_MDELEM_UNREF(exec_ctx, access_token_md);
+ grpc_call_credentials_unref(exec_ctx, r->creds);
grpc_credentials_metadata_request_destroy(exec_ctx, r);
}
-static void oauth2_token_fetcher_get_request_metadata(
+static bool oauth2_token_fetcher_get_request_metadata(
grpc_exec_ctx *exec_ctx, grpc_call_credentials *creds,
grpc_polling_entity *pollent, grpc_auth_metadata_context context,
- grpc_credentials_metadata_cb cb, void *user_data) {
+ grpc_credentials_mdelem_array *md_array, grpc_closure *on_request_metadata,
+ grpc_error **error) {
grpc_oauth2_token_fetcher_credentials *c =
(grpc_oauth2_token_fetcher_credentials *)creds;
+ // Check if we can use the cached token.
gpr_timespec refresh_threshold = gpr_time_from_seconds(
GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS, GPR_TIMESPAN);
- grpc_credentials_md_store *cached_access_token_md = NULL;
- {
- gpr_mu_lock(&c->mu);
- if (c->access_token_md != NULL &&
- (gpr_time_cmp(
- gpr_time_sub(c->token_expiration, gpr_now(GPR_CLOCK_REALTIME)),
- refresh_threshold) > 0)) {
- cached_access_token_md =
- grpc_credentials_md_store_ref(c->access_token_md);
- }
+ grpc_mdelem cached_access_token_md = GRPC_MDNULL;
+ gpr_mu_lock(&c->mu);
+ if (!GRPC_MDISNULL(c->access_token_md) &&
+ (gpr_time_cmp(
+ gpr_time_sub(c->token_expiration, gpr_now(GPR_CLOCK_REALTIME)),
+ refresh_threshold) > 0)) {
+ cached_access_token_md = GRPC_MDELEM_REF(c->access_token_md);
+ }
+ if (!GRPC_MDISNULL(cached_access_token_md)) {
gpr_mu_unlock(&c->mu);
+ grpc_credentials_mdelem_array_add(md_array, cached_access_token_md);
+ GRPC_MDELEM_UNREF(exec_ctx, cached_access_token_md);
+ return true;
}
- if (cached_access_token_md != NULL) {
- cb(exec_ctx, user_data, cached_access_token_md->entries,
- cached_access_token_md->num_entries, GRPC_CREDENTIALS_OK, NULL);
- grpc_credentials_md_store_unref(exec_ctx, cached_access_token_md);
- } else {
- c->fetch_func(
- exec_ctx,
- grpc_credentials_metadata_request_create(creds, cb, user_data),
- &c->httpcli_context, pollent, on_oauth2_token_fetcher_http_response,
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), refresh_threshold));
+ // Couldn't get the token from the cache.
+ // Add request to c->pending_requests and start a new fetch if needed.
+ grpc_oauth2_pending_get_request_metadata *pending_request =
+ (grpc_oauth2_pending_get_request_metadata *)gpr_malloc(
+ sizeof(*pending_request));
+ pending_request->md_array = md_array;
+ pending_request->on_request_metadata = on_request_metadata;
+ pending_request->next = c->pending_requests;
+ c->pending_requests = pending_request;
+ bool start_fetch = false;
+ if (!c->token_fetch_pending) {
+ c->token_fetch_pending = true;
+ start_fetch = true;
+ }
+ gpr_mu_unlock(&c->mu);
+ if (start_fetch) {
+ grpc_call_credentials_ref(creds);
+ c->fetch_func(exec_ctx, grpc_credentials_metadata_request_create(creds),
+ &c->httpcli_context, pollent,
+ on_oauth2_token_fetcher_http_response,
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), refresh_threshold));
}
+ return false;
+}
+
+static void oauth2_token_fetcher_cancel_get_request_metadata(
+ grpc_exec_ctx *exec_ctx, grpc_call_credentials *creds,
+ grpc_credentials_mdelem_array *md_array, grpc_error *error) {
+ grpc_oauth2_token_fetcher_credentials *c =
+ (grpc_oauth2_token_fetcher_credentials *)creds;
+ gpr_mu_lock(&c->mu);
+ grpc_oauth2_pending_get_request_metadata *prev = NULL;
+ grpc_oauth2_pending_get_request_metadata *pending_request =
+ c->pending_requests;
+ while (pending_request != NULL) {
+ if (pending_request->md_array == md_array) {
+ // Remove matching pending request from the list.
+ if (prev != NULL) {
+ prev->next = pending_request->next;
+ } else {
+ c->pending_requests = pending_request->next;
+ }
+ // Invoke the callback immediately with an error.
+ GRPC_CLOSURE_SCHED(exec_ctx, pending_request->on_request_metadata,
+ GRPC_ERROR_REF(error));
+ gpr_free(pending_request);
+ break;
+ }
+ prev = pending_request;
+ pending_request = pending_request->next;
+ }
+ gpr_mu_unlock(&c->mu);
+ GRPC_ERROR_UNREF(error);
}
static void init_oauth2_token_fetcher(grpc_oauth2_token_fetcher_credentials *c,
@@ -280,7 +342,8 @@ static void init_oauth2_token_fetcher(grpc_oauth2_token_fetcher_credentials *c,
//
static grpc_call_credentials_vtable compute_engine_vtable = {
- oauth2_token_fetcher_destruct, oauth2_token_fetcher_get_request_metadata};
+ oauth2_token_fetcher_destruct, oauth2_token_fetcher_get_request_metadata,
+ oauth2_token_fetcher_cancel_get_request_metadata};
static void compute_engine_fetch_oauth2(
grpc_exec_ctx *exec_ctx, grpc_credentials_metadata_request *metadata_req,
@@ -301,7 +364,6 @@ static void compute_engine_fetch_oauth2(
grpc_httpcli_get(
exec_ctx, httpcli_context, pollent, resource_quota, &request, deadline,
GRPC_CLOSURE_CREATE(response_cb, metadata_req, grpc_schedule_on_exec_ctx),
-
&metadata_req->response);
grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
}
@@ -331,7 +393,8 @@ static void refresh_token_destruct(grpc_exec_ctx *exec_ctx,
}
static grpc_call_credentials_vtable refresh_token_vtable = {
- refresh_token_destruct, oauth2_token_fetcher_get_request_metadata};
+ refresh_token_destruct, oauth2_token_fetcher_get_request_metadata,
+ oauth2_token_fetcher_cancel_get_request_metadata};
static void refresh_token_fetch_oauth2(
grpc_exec_ctx *exec_ctx, grpc_credentials_metadata_request *metadata_req,
@@ -416,26 +479,33 @@ grpc_call_credentials *grpc_google_refresh_token_credentials_create(
static void access_token_destruct(grpc_exec_ctx *exec_ctx,
grpc_call_credentials *creds) {
grpc_access_token_credentials *c = (grpc_access_token_credentials *)creds;
- grpc_credentials_md_store_unref(exec_ctx, c->access_token_md);
+ GRPC_MDELEM_UNREF(exec_ctx, c->access_token_md);
}
-static void access_token_get_request_metadata(
+static bool access_token_get_request_metadata(
grpc_exec_ctx *exec_ctx, grpc_call_credentials *creds,
grpc_polling_entity *pollent, grpc_auth_metadata_context context,
- grpc_credentials_metadata_cb cb, void *user_data) {
+ grpc_credentials_mdelem_array *md_array, grpc_closure *on_request_metadata,
+ grpc_error **error) {
grpc_access_token_credentials *c = (grpc_access_token_credentials *)creds;
- cb(exec_ctx, user_data, c->access_token_md->entries, 1, GRPC_CREDENTIALS_OK,
- NULL);
+ grpc_credentials_mdelem_array_add(md_array, c->access_token_md);
+ return true;
+}
+
+static void access_token_cancel_get_request_metadata(
+ grpc_exec_ctx *exec_ctx, grpc_call_credentials *c,
+ grpc_credentials_mdelem_array *md_array, grpc_error *error) {
+ GRPC_ERROR_UNREF(error);
}
static grpc_call_credentials_vtable access_token_vtable = {
- access_token_destruct, access_token_get_request_metadata};
+ access_token_destruct, access_token_get_request_metadata,
+ access_token_cancel_get_request_metadata};
grpc_call_credentials *grpc_access_token_credentials_create(
const char *access_token, void *reserved) {
grpc_access_token_credentials *c =
gpr_zalloc(sizeof(grpc_access_token_credentials));
- char *token_md_value;
GRPC_API_TRACE(
"grpc_access_token_credentials_create(access_token=<redacted>, "
"reserved=%p)",
@@ -444,10 +514,13 @@ grpc_call_credentials *grpc_access_token_credentials_create(
c->base.type = GRPC_CALL_CREDENTIALS_TYPE_OAUTH2;
c->base.vtable = &access_token_vtable;
gpr_ref_init(&c->base.refcount, 1);
- c->access_token_md = grpc_credentials_md_store_create(1);
+ char *token_md_value;
gpr_asprintf(&token_md_value, "Bearer %s", access_token);
- grpc_credentials_md_store_add_cstrings(
- c->access_token_md, GRPC_AUTHORIZATION_METADATA_KEY, token_md_value);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ c->access_token_md = grpc_mdelem_from_slices(
+ &exec_ctx, grpc_slice_from_static_string(GRPC_AUTHORIZATION_METADATA_KEY),
+ grpc_slice_from_copied_string(token_md_value));
+ grpc_exec_ctx_finish(&exec_ctx);
gpr_free(token_md_value);
return &c->base;
}
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h
index 72093afe9e..9d041a20ea 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h
@@ -58,11 +58,20 @@ typedef void (*grpc_fetch_oauth2_func)(grpc_exec_ctx *exec_ctx,
grpc_polling_entity *pollent,
grpc_iomgr_cb_func cb,
gpr_timespec deadline);
+
+typedef struct grpc_oauth2_pending_get_request_metadata {
+ grpc_credentials_mdelem_array *md_array;
+ grpc_closure *on_request_metadata;
+ struct grpc_oauth2_pending_get_request_metadata *next;
+} grpc_oauth2_pending_get_request_metadata;
+
typedef struct {
grpc_call_credentials base;
gpr_mu mu;
- grpc_credentials_md_store *access_token_md;
+ grpc_mdelem access_token_md;
gpr_timespec token_expiration;
+ bool token_fetch_pending;
+ grpc_oauth2_pending_get_request_metadata *pending_requests;
grpc_httpcli_context httpcli_context;
grpc_fetch_oauth2_func fetch_func;
} grpc_oauth2_token_fetcher_credentials;
@@ -76,7 +85,7 @@ typedef struct {
// Access token credentials.
typedef struct {
grpc_call_credentials base;
- grpc_credentials_md_store *access_token_md;
+ grpc_mdelem access_token_md;
} grpc_access_token_credentials;
// Private constructor for refresh token credentials from an already parsed
@@ -89,6 +98,6 @@ grpc_refresh_token_credentials_create_from_auth_refresh_token(
grpc_credentials_status
grpc_oauth2_token_fetcher_credentials_parse_server_response(
grpc_exec_ctx *exec_ctx, const struct grpc_http_response *response,
- grpc_credentials_md_store **token_md, gpr_timespec *token_lifetime);
+ grpc_mdelem *token_md, gpr_timespec *token_lifetime);
#endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_OAUTH2_OAUTH2_CREDENTIALS_H */
diff --git a/src/core/lib/security/credentials/plugin/plugin_credentials.c b/src/core/lib/security/credentials/plugin/plugin_credentials.c
index 96ebfb4d0d..73e0c23e0f 100644
--- a/src/core/lib/security/credentials/plugin/plugin_credentials.c
+++ b/src/core/lib/security/credentials/plugin/plugin_credentials.c
@@ -31,19 +31,28 @@
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/validate_metadata.h"
-typedef struct {
- void *user_data;
- grpc_credentials_metadata_cb cb;
-} grpc_metadata_plugin_request;
-
static void plugin_destruct(grpc_exec_ctx *exec_ctx,
grpc_call_credentials *creds) {
grpc_plugin_credentials *c = (grpc_plugin_credentials *)creds;
+ gpr_mu_destroy(&c->mu);
if (c->plugin.state != NULL && c->plugin.destroy != NULL) {
c->plugin.destroy(c->plugin.state);
}
}
+static void pending_request_remove_locked(
+ grpc_plugin_credentials *c,
+ grpc_plugin_credentials_pending_request *pending_request) {
+ if (pending_request->prev == NULL) {
+ c->pending_requests = pending_request->next;
+ } else {
+ pending_request->prev->next = pending_request->next;
+ }
+ if (pending_request->next != NULL) {
+ pending_request->next->prev = pending_request->prev;
+ }
+}
+
static void plugin_md_request_metadata_ready(void *request,
const grpc_metadata *md,
size_t num_md,
@@ -53,76 +62,117 @@ static void plugin_md_request_metadata_ready(void *request,
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INITIALIZER(
GRPC_EXEC_CTX_FLAG_IS_FINISHED | GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP,
NULL, NULL);
- grpc_metadata_plugin_request *r = (grpc_metadata_plugin_request *)request;
- if (status != GRPC_STATUS_OK) {
- if (error_details != NULL) {
- gpr_log(GPR_ERROR, "Getting metadata from plugin failed with error: %s",
- error_details);
- }
- r->cb(&exec_ctx, r->user_data, NULL, 0, GRPC_CREDENTIALS_ERROR,
- error_details);
- } else {
- size_t i;
- bool seen_illegal_header = false;
- grpc_credentials_md *md_array = NULL;
- for (i = 0; i < num_md; i++) {
- if (!GRPC_LOG_IF_ERROR("validate_metadata_from_plugin",
- grpc_validate_header_key_is_legal(md[i].key))) {
- seen_illegal_header = true;
- break;
- } else if (!grpc_is_binary_header(md[i].key) &&
- !GRPC_LOG_IF_ERROR(
- "validate_metadata_from_plugin",
- grpc_validate_header_nonbin_value_is_legal(md[i].value))) {
- gpr_log(GPR_ERROR, "Plugin added invalid metadata value.");
- seen_illegal_header = true;
- break;
+ grpc_plugin_credentials_pending_request *r =
+ (grpc_plugin_credentials_pending_request *)request;
+ // Check if the request has been cancelled.
+ // If not, remove it from the pending list, so that it cannot be
+ // cancelled out from under us.
+ gpr_mu_lock(&r->creds->mu);
+ if (!r->cancelled) pending_request_remove_locked(r->creds, r);
+ gpr_mu_unlock(&r->creds->mu);
+ grpc_call_credentials_unref(&exec_ctx, &r->creds->base);
+ // If it has not been cancelled, process it.
+ if (!r->cancelled) {
+ if (status != GRPC_STATUS_OK) {
+ char *msg;
+ gpr_asprintf(&msg, "Getting metadata from plugin failed with error: %s",
+ error_details);
+ GRPC_CLOSURE_SCHED(&exec_ctx, r->on_request_metadata,
+ GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg));
+ gpr_free(msg);
+ } else {
+ bool seen_illegal_header = false;
+ for (size_t i = 0; i < num_md; ++i) {
+ if (!GRPC_LOG_IF_ERROR("validate_metadata_from_plugin",
+ grpc_validate_header_key_is_legal(md[i].key))) {
+ seen_illegal_header = true;
+ break;
+ } else if (!grpc_is_binary_header(md[i].key) &&
+ !GRPC_LOG_IF_ERROR(
+ "validate_metadata_from_plugin",
+ grpc_validate_header_nonbin_value_is_legal(
+ md[i].value))) {
+ gpr_log(GPR_ERROR, "Plugin added invalid metadata value.");
+ seen_illegal_header = true;
+ break;
+ }
}
- }
- if (seen_illegal_header) {
- r->cb(&exec_ctx, r->user_data, NULL, 0, GRPC_CREDENTIALS_ERROR,
- "Illegal metadata");
- } else if (num_md > 0) {
- md_array = gpr_malloc(num_md * sizeof(grpc_credentials_md));
- for (i = 0; i < num_md; i++) {
- md_array[i].key = grpc_slice_ref_internal(md[i].key);
- md_array[i].value = grpc_slice_ref_internal(md[i].value);
+ if (seen_illegal_header) {
+ GRPC_CLOSURE_SCHED(
+ &exec_ctx, r->on_request_metadata,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Illegal metadata"));
+ } else {
+ for (size_t i = 0; i < num_md; ++i) {
+ grpc_mdelem mdelem = grpc_mdelem_from_slices(
+ &exec_ctx, grpc_slice_ref_internal(md[i].key),
+ grpc_slice_ref_internal(md[i].value));
+ grpc_credentials_mdelem_array_add(r->md_array, mdelem);
+ GRPC_MDELEM_UNREF(&exec_ctx, mdelem);
+ }
+ GRPC_CLOSURE_SCHED(&exec_ctx, r->on_request_metadata, GRPC_ERROR_NONE);
}
- r->cb(&exec_ctx, r->user_data, md_array, num_md, GRPC_CREDENTIALS_OK,
- NULL);
- for (i = 0; i < num_md; i++) {
- grpc_slice_unref_internal(&exec_ctx, md_array[i].key);
- grpc_slice_unref_internal(&exec_ctx, md_array[i].value);
- }
- gpr_free(md_array);
- } else if (num_md == 0) {
- r->cb(&exec_ctx, r->user_data, NULL, 0, GRPC_CREDENTIALS_OK, NULL);
}
}
gpr_free(r);
grpc_exec_ctx_finish(&exec_ctx);
}
-static void plugin_get_request_metadata(grpc_exec_ctx *exec_ctx,
+static bool plugin_get_request_metadata(grpc_exec_ctx *exec_ctx,
grpc_call_credentials *creds,
grpc_polling_entity *pollent,
grpc_auth_metadata_context context,
- grpc_credentials_metadata_cb cb,
- void *user_data) {
+ grpc_credentials_mdelem_array *md_array,
+ grpc_closure *on_request_metadata,
+ grpc_error **error) {
grpc_plugin_credentials *c = (grpc_plugin_credentials *)creds;
if (c->plugin.get_metadata != NULL) {
- grpc_metadata_plugin_request *request = gpr_zalloc(sizeof(*request));
- request->user_data = user_data;
- request->cb = cb;
+ // Create pending_request object.
+ grpc_plugin_credentials_pending_request *pending_request =
+ (grpc_plugin_credentials_pending_request *)gpr_zalloc(
+ sizeof(*pending_request));
+ pending_request->creds = c;
+ pending_request->md_array = md_array;
+ pending_request->on_request_metadata = on_request_metadata;
+ // Add it to the pending list.
+ gpr_mu_lock(&c->mu);
+ if (c->pending_requests != NULL) {
+ c->pending_requests->prev = pending_request;
+ }
+ pending_request->next = c->pending_requests;
+ c->pending_requests = pending_request;
+ gpr_mu_unlock(&c->mu);
+ // Invoke the plugin. The callback holds a ref to us.
+ grpc_call_credentials_ref(creds);
c->plugin.get_metadata(c->plugin.state, context,
- plugin_md_request_metadata_ready, request);
- } else {
- cb(exec_ctx, user_data, NULL, 0, GRPC_CREDENTIALS_OK, NULL);
+ plugin_md_request_metadata_ready, pending_request);
+ return false;
+ }
+ return true;
+}
+
+static void plugin_cancel_get_request_metadata(
+ grpc_exec_ctx *exec_ctx, grpc_call_credentials *creds,
+ grpc_credentials_mdelem_array *md_array, grpc_error *error) {
+ grpc_plugin_credentials *c = (grpc_plugin_credentials *)creds;
+ gpr_mu_lock(&c->mu);
+ for (grpc_plugin_credentials_pending_request *pending_request =
+ c->pending_requests;
+ pending_request != NULL; pending_request = pending_request->next) {
+ if (pending_request->md_array == md_array) {
+ pending_request->cancelled = true;
+ GRPC_CLOSURE_SCHED(exec_ctx, pending_request->on_request_metadata,
+ GRPC_ERROR_REF(error));
+ pending_request_remove_locked(c, pending_request);
+ break;
+ }
}
+ gpr_mu_unlock(&c->mu);
+ GRPC_ERROR_UNREF(error);
}
static grpc_call_credentials_vtable plugin_vtable = {
- plugin_destruct, plugin_get_request_metadata};
+ plugin_destruct, plugin_get_request_metadata,
+ plugin_cancel_get_request_metadata};
grpc_call_credentials *grpc_metadata_credentials_create_from_plugin(
grpc_metadata_credentials_plugin plugin, void *reserved) {
@@ -134,5 +184,6 @@ grpc_call_credentials *grpc_metadata_credentials_create_from_plugin(
c->base.vtable = &plugin_vtable;
gpr_ref_init(&c->base.refcount, 1);
c->plugin = plugin;
+ gpr_mu_init(&c->mu);
return &c->base;
}
diff --git a/src/core/lib/security/credentials/plugin/plugin_credentials.h b/src/core/lib/security/credentials/plugin/plugin_credentials.h
index ba3dd76859..57266d589a 100644
--- a/src/core/lib/security/credentials/plugin/plugin_credentials.h
+++ b/src/core/lib/security/credentials/plugin/plugin_credentials.h
@@ -21,10 +21,22 @@
#include "src/core/lib/security/credentials/credentials.h"
-typedef struct {
+struct grpc_plugin_credentials;
+
+typedef struct grpc_plugin_credentials_pending_request {
+ bool cancelled;
+ struct grpc_plugin_credentials *creds;
+ grpc_credentials_mdelem_array *md_array;
+ grpc_closure *on_request_metadata;
+ struct grpc_plugin_credentials_pending_request *prev;
+ struct grpc_plugin_credentials_pending_request *next;
+} grpc_plugin_credentials_pending_request;
+
+typedef struct grpc_plugin_credentials {
grpc_call_credentials base;
grpc_metadata_credentials_plugin plugin;
- grpc_credentials_md_store *plugin_md;
+ gpr_mu mu;
+ grpc_plugin_credentials_pending_request *pending_requests;
} grpc_plugin_credentials;
#endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_PLUGIN_PLUGIN_CREDENTIALS_H */
diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c
index 50a51b31cd..531a88434f 100644
--- a/src/core/lib/security/transport/client_auth_filter.c
+++ b/src/core/lib/security/transport/client_auth_filter.c
@@ -51,8 +51,15 @@ typedef struct {
grpc_polling_entity *pollent;
gpr_atm security_context_set;
gpr_mu security_context_mu;
+ grpc_credentials_mdelem_array md_array;
grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT];
grpc_auth_metadata_context auth_md_context;
+ grpc_closure closure;
+ // Either 0 (no cancellation and no async operation in flight),
+ // a grpc_closure* (if the lowest bit is 0),
+ // or a grpc_error* (if the lowest bit is 1).
+ gpr_atm cancellation_state;
+ grpc_closure cancel_closure;
} call_data;
/* We can have a per-channel credentials. */
@@ -61,6 +68,43 @@ typedef struct {
grpc_auth_context *auth_context;
} channel_data;
+static void decode_cancel_state(gpr_atm cancel_state, grpc_closure **func,
+ grpc_error **error) {
+ // If the lowest bit is 1, the value is a grpc_error*.
+ // Otherwise, if non-zdero, the value is a grpc_closure*.
+ if (cancel_state & 1) {
+ *error = (grpc_error *)(cancel_state & ~(gpr_atm)1);
+ } else if (cancel_state != 0) {
+ *func = (grpc_closure *)cancel_state;
+ }
+}
+
+static gpr_atm encode_cancel_state_error(grpc_error *error) {
+ // Set the lowest bit to 1 to indicate that it's an error.
+ return (gpr_atm)1 | (gpr_atm)error;
+}
+
+// Returns an error if the call has been cancelled. Otherwise, sets the
+// cancellation function to be called upon cancellation.
+static grpc_error *set_cancel_func(grpc_call_element *elem,
+ grpc_iomgr_cb_func func) {
+ call_data *calld = (call_data *)elem->call_data;
+ // Decode original state.
+ gpr_atm original_state = gpr_atm_acq_load(&calld->cancellation_state);
+ grpc_error *original_error = GRPC_ERROR_NONE;
+ grpc_closure *original_func = NULL;
+ decode_cancel_state(original_state, &original_func, &original_error);
+ // If error is set, return it.
+ if (original_error != GRPC_ERROR_NONE) return GRPC_ERROR_REF(original_error);
+ // Otherwise, store func.
+ GRPC_CLOSURE_INIT(&calld->cancel_closure, func, elem,
+ grpc_schedule_on_exec_ctx);
+ GPR_ASSERT(((gpr_atm)&calld->cancel_closure & (gpr_atm)1) == 0);
+ gpr_atm_rel_store(&calld->cancellation_state,
+ (gpr_atm)&calld->cancel_closure);
+ return GRPC_ERROR_NONE;
+}
+
static void reset_auth_metadata_context(
grpc_auth_metadata_context *auth_md_context) {
if (auth_md_context->service_url != NULL) {
@@ -86,41 +130,29 @@ static void add_error(grpc_error **combined, grpc_error *error) {
*combined = grpc_error_add_child(*combined, error);
}
-static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_credentials_md *md_elems,
- size_t num_md,
- grpc_credentials_status status,
- const char *error_details) {
- grpc_transport_stream_op_batch *batch =
- (grpc_transport_stream_op_batch *)user_data;
+static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *input_error) {
+ grpc_transport_stream_op_batch *batch = (grpc_transport_stream_op_batch *)arg;
grpc_call_element *elem = batch->handler_private.extra_arg;
call_data *calld = elem->call_data;
reset_auth_metadata_context(&calld->auth_md_context);
- grpc_error *error = GRPC_ERROR_NONE;
- if (status != GRPC_CREDENTIALS_OK) {
- error = grpc_error_set_int(
- GRPC_ERROR_CREATE_FROM_COPIED_STRING(
- error_details != NULL && strlen(error_details) > 0
- ? error_details
- : "Credentials failed to get metadata."),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED);
- } else {
- GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT);
+ grpc_error *error = GRPC_ERROR_REF(input_error);
+ if (error == GRPC_ERROR_NONE) {
+ GPR_ASSERT(calld->md_array.size <= MAX_CREDENTIALS_METADATA_COUNT);
GPR_ASSERT(batch->send_initial_metadata);
grpc_metadata_batch *mdb =
batch->payload->send_initial_metadata.send_initial_metadata;
- for (size_t i = 0; i < num_md; i++) {
- add_error(&error,
- grpc_metadata_batch_add_tail(
- exec_ctx, mdb, &calld->md_links[i],
- grpc_mdelem_from_slices(
- exec_ctx, grpc_slice_ref_internal(md_elems[i].key),
- grpc_slice_ref_internal(md_elems[i].value))));
+ for (size_t i = 0; i < calld->md_array.size; ++i) {
+ add_error(&error, grpc_metadata_batch_add_tail(
+ exec_ctx, mdb, &calld->md_links[i],
+ GRPC_MDELEM_REF(calld->md_array.md[i])));
}
}
if (error == GRPC_ERROR_NONE) {
grpc_call_next_op(exec_ctx, elem, batch);
} else {
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_UNAUTHENTICATED);
grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error);
}
}
@@ -155,6 +187,14 @@ void build_auth_metadata_context(grpc_security_connector *sc,
gpr_free(host);
}
+static void cancel_get_request_metadata(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_call_element *elem = (grpc_call_element *)arg;
+ call_data *calld = (call_data *)elem->call_data;
+ grpc_call_credentials_cancel_get_request_metadata(
+ exec_ctx, calld->creds, &calld->md_array, GRPC_ERROR_REF(error));
+}
+
static void send_security_metadata(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op_batch *batch) {
@@ -193,20 +233,33 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
build_auth_metadata_context(&chand->security_connector->base,
chand->auth_context, calld);
+
+ grpc_error *cancel_error = set_cancel_func(elem, cancel_get_request_metadata);
+ if (cancel_error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch,
+ cancel_error);
+ return;
+ }
GPR_ASSERT(calld->pollent != NULL);
- grpc_call_credentials_get_request_metadata(
- exec_ctx, calld->creds, calld->pollent, calld->auth_md_context,
- on_credentials_metadata, batch);
+ GRPC_CLOSURE_INIT(&calld->closure, on_credentials_metadata, batch,
+ grpc_schedule_on_exec_ctx);
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (grpc_call_credentials_get_request_metadata(
+ exec_ctx, calld->creds, calld->pollent, calld->auth_md_context,
+ &calld->md_array, &calld->closure, &error)) {
+ // Synchronous return; invoke on_credentials_metadata() directly.
+ on_credentials_metadata(exec_ctx, batch, error);
+ GRPC_ERROR_UNREF(error);
+ }
}
-static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_security_status status) {
- grpc_transport_stream_op_batch *batch =
- (grpc_transport_stream_op_batch *)user_data;
+static void on_host_checked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_transport_stream_op_batch *batch = (grpc_transport_stream_op_batch *)arg;
grpc_call_element *elem = batch->handler_private.extra_arg;
call_data *calld = elem->call_data;
- if (status == GRPC_SECURITY_OK) {
+ if (error == GRPC_ERROR_NONE) {
send_security_metadata(exec_ctx, elem, batch);
} else {
char *error_msg;
@@ -223,6 +276,16 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data,
}
}
+static void cancel_check_call_host(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_call_element *elem = (grpc_call_element *)arg;
+ call_data *calld = (call_data *)elem->call_data;
+ channel_data *chand = (channel_data *)elem->channel_data;
+ grpc_channel_security_connector_cancel_check_call_host(
+ exec_ctx, chand->security_connector, &calld->closure,
+ GRPC_ERROR_REF(error));
+}
+
static void auth_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *batch) {
@@ -232,7 +295,32 @@ static void auth_start_transport_stream_op_batch(
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- if (!batch->cancel_stream) {
+ if (batch->cancel_stream) {
+ while (true) {
+ // Decode the original cancellation state.
+ gpr_atm original_state = gpr_atm_acq_load(&calld->cancellation_state);
+ grpc_error *cancel_error = GRPC_ERROR_NONE;
+ grpc_closure *func = NULL;
+ decode_cancel_state(original_state, &func, &cancel_error);
+ // If we had already set a cancellation error, there's nothing
+ // more to do.
+ if (cancel_error != GRPC_ERROR_NONE) break;
+ // If there's a cancel func, call it.
+ // Note that even if the cancel func has been changed by some
+ // other thread between when we decoded it and now, it will just
+ // be a no-op.
+ cancel_error = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
+ if (func != NULL) {
+ GRPC_CLOSURE_SCHED(exec_ctx, func, GRPC_ERROR_REF(cancel_error));
+ }
+ // Encode the new error into cancellation state.
+ if (gpr_atm_full_cas(&calld->cancellation_state, original_state,
+ encode_cancel_state_error(cancel_error))) {
+ break; // Success.
+ }
+ // The cas failed, so try again.
+ }
+ } else {
/* double checked lock over security context to ensure it's set once */
if (gpr_atm_acq_load(&calld->security_context_set) == 0) {
gpr_mu_lock(&calld->security_context_mu);
@@ -277,12 +365,26 @@ static void auth_start_transport_stream_op_batch(
}
}
if (calld->have_host) {
- char *call_host = grpc_slice_to_c_string(calld->host);
- batch->handler_private.extra_arg = elem;
- grpc_channel_security_connector_check_call_host(
- exec_ctx, chand->security_connector, call_host, chand->auth_context,
- on_host_checked, batch);
- gpr_free(call_host);
+ grpc_error *cancel_error = set_cancel_func(elem, cancel_check_call_host);
+ if (cancel_error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch,
+ cancel_error);
+ } else {
+ char *call_host = grpc_slice_to_c_string(calld->host);
+ batch->handler_private.extra_arg = elem;
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (grpc_channel_security_connector_check_call_host(
+ exec_ctx, chand->security_connector, call_host,
+ chand->auth_context,
+ GRPC_CLOSURE_INIT(&calld->closure, on_host_checked, batch,
+ grpc_schedule_on_exec_ctx),
+ &error)) {
+ // Synchronous return; invoke on_host_checked() directly.
+ on_host_checked(exec_ctx, batch, error);
+ GRPC_ERROR_UNREF(error);
+ }
+ gpr_free(call_host);
+ }
GPR_TIMER_END("auth_start_transport_stream_op_batch", 0);
return; /* early exit */
}
@@ -315,6 +417,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
grpc_closure *ignored) {
call_data *calld = elem->call_data;
+ grpc_credentials_mdelem_array_destroy(exec_ctx, &calld->md_array);
grpc_call_credentials_unref(exec_ctx, calld->creds);
if (calld->have_host) {
grpc_slice_unref_internal(exec_ctx, calld->host);
@@ -324,6 +427,11 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
reset_auth_metadata_context(&calld->auth_md_context);
gpr_mu_destroy(&calld->security_context_mu);
+ gpr_atm cancel_state = gpr_atm_acq_load(&calld->cancellation_state);
+ grpc_error *cancel_error = GRPC_ERROR_NONE;
+ grpc_closure *cancel_func = NULL;
+ decode_cancel_state(cancel_state, &cancel_func, &cancel_error);
+ GRPC_ERROR_UNREF(cancel_error);
}
/* Constructor for channel_data */
diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c
index 6788126769..a7568b995f 100644
--- a/src/core/lib/security/transport/security_connector.c
+++ b/src/core/lib/security/transport/security_connector.c
@@ -136,15 +136,27 @@ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx,
}
}
-void grpc_channel_security_connector_check_call_host(
+bool grpc_channel_security_connector_check_call_host(
grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc,
const char *host, grpc_auth_context *auth_context,
- grpc_security_call_host_check_cb cb, void *user_data) {
+ grpc_closure *on_call_host_checked, grpc_error **error) {
if (sc == NULL || sc->check_call_host == NULL) {
- cb(exec_ctx, user_data, GRPC_SECURITY_ERROR);
- } else {
- sc->check_call_host(exec_ctx, sc, host, auth_context, cb, user_data);
+ *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "cannot check call host -- no security connector");
+ return true;
}
+ return sc->check_call_host(exec_ctx, sc, host, auth_context,
+ on_call_host_checked, error);
+}
+
+void grpc_channel_security_connector_cancel_check_call_host(
+ grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc,
+ grpc_closure *on_call_host_checked, grpc_error *error) {
+ if (sc == NULL || sc->cancel_check_call_host == NULL) {
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
+ sc->cancel_check_call_host(exec_ctx, sc, on_call_host_checked, error);
}
#ifndef NDEBUG
@@ -368,13 +380,19 @@ static void fake_server_check_peer(grpc_exec_ctx *exec_ctx,
fake_check_peer(exec_ctx, sc, peer, auth_context, on_peer_checked);
}
-static void fake_channel_check_call_host(grpc_exec_ctx *exec_ctx,
+static bool fake_channel_check_call_host(grpc_exec_ctx *exec_ctx,
grpc_channel_security_connector *sc,
const char *host,
grpc_auth_context *auth_context,
- grpc_security_call_host_check_cb cb,
- void *user_data) {
- cb(exec_ctx, user_data, GRPC_SECURITY_OK);
+ grpc_closure *on_call_host_checked,
+ grpc_error **error) {
+ return true;
+}
+
+static void fake_channel_cancel_check_call_host(
+ grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc,
+ grpc_closure *on_call_host_checked, grpc_error *error) {
+ GRPC_ERROR_UNREF(error);
}
static void fake_channel_add_handshakers(
@@ -413,6 +431,7 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
c->base.request_metadata_creds =
grpc_call_credentials_ref(request_metadata_creds);
c->base.check_call_host = fake_channel_check_call_host;
+ c->base.cancel_check_call_host = fake_channel_cancel_check_call_host;
c->base.add_handshakers = fake_channel_add_handshakers;
c->target = gpr_strdup(target);
const char *expected_targets = grpc_fake_transport_get_expected_targets(args);
@@ -663,26 +682,35 @@ void tsi_shallow_peer_destruct(tsi_peer *peer) {
if (peer->properties != NULL) gpr_free(peer->properties);
}
-static void ssl_channel_check_call_host(grpc_exec_ctx *exec_ctx,
+static bool ssl_channel_check_call_host(grpc_exec_ctx *exec_ctx,
grpc_channel_security_connector *sc,
const char *host,
grpc_auth_context *auth_context,
- grpc_security_call_host_check_cb cb,
- void *user_data) {
+ grpc_closure *on_call_host_checked,
+ grpc_error **error) {
grpc_ssl_channel_security_connector *c =
(grpc_ssl_channel_security_connector *)sc;
grpc_security_status status = GRPC_SECURITY_ERROR;
tsi_peer peer = tsi_shallow_peer_from_ssl_auth_context(auth_context);
if (ssl_host_matches_name(&peer, host)) status = GRPC_SECURITY_OK;
-
/* If the target name was overridden, then the original target_name was
'checked' transitively during the previous peer check at the end of the
handshake. */
if (c->overridden_target_name != NULL && strcmp(host, c->target_name) == 0) {
status = GRPC_SECURITY_OK;
}
- cb(exec_ctx, user_data, status);
+ if (status != GRPC_SECURITY_OK) {
+ *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "call host does not match SSL server name");
+ }
tsi_shallow_peer_destruct(&peer);
+ return true;
+}
+
+static void ssl_channel_cancel_check_call_host(
+ grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc,
+ grpc_closure *on_call_host_checked, grpc_error *error) {
+ GRPC_ERROR_UNREF(error);
}
static grpc_security_connector_vtable ssl_channel_vtable = {
@@ -811,6 +839,7 @@ grpc_security_status grpc_ssl_channel_security_connector_create(
c->base.request_metadata_creds =
grpc_call_credentials_ref(request_metadata_creds);
c->base.check_call_host = ssl_channel_check_call_host;
+ c->base.cancel_check_call_host = ssl_channel_cancel_check_call_host;
c->base.add_handshakers = ssl_channel_add_handshakers;
gpr_split_host_port(target_name, &c->target_name, &port);
gpr_free(port);
diff --git a/src/core/lib/security/transport/security_connector.h b/src/core/lib/security/transport/security_connector.h
index 1c0fe40045..4f9b63ad20 100644
--- a/src/core/lib/security/transport/security_connector.h
+++ b/src/core/lib/security/transport/security_connector.h
@@ -117,27 +117,38 @@ grpc_security_connector *grpc_security_connector_find_in_args(
typedef struct grpc_channel_security_connector grpc_channel_security_connector;
-typedef void (*grpc_security_call_host_check_cb)(grpc_exec_ctx *exec_ctx,
- void *user_data,
- grpc_security_status status);
-
struct grpc_channel_security_connector {
grpc_security_connector base;
grpc_call_credentials *request_metadata_creds;
- void (*check_call_host)(grpc_exec_ctx *exec_ctx,
+ bool (*check_call_host)(grpc_exec_ctx *exec_ctx,
grpc_channel_security_connector *sc, const char *host,
grpc_auth_context *auth_context,
- grpc_security_call_host_check_cb cb, void *user_data);
+ grpc_closure *on_call_host_checked,
+ grpc_error **error);
+ void (*cancel_check_call_host)(grpc_exec_ctx *exec_ctx,
+ grpc_channel_security_connector *sc,
+ grpc_closure *on_call_host_checked,
+ grpc_error *error);
void (*add_handshakers)(grpc_exec_ctx *exec_ctx,
grpc_channel_security_connector *sc,
grpc_handshake_manager *handshake_mgr);
};
-/* Checks that the host that will be set for a call is acceptable. */
-void grpc_channel_security_connector_check_call_host(
+/// Checks that the host that will be set for a call is acceptable.
+/// Returns true if completed synchronously, in which case \a error will
+/// be set to indicate the result. Otherwise, \a on_call_host_checked
+/// will be invoked when complete.
+bool grpc_channel_security_connector_check_call_host(
grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc,
const char *host, grpc_auth_context *auth_context,
- grpc_security_call_host_check_cb cb, void *user_data);
+ grpc_closure *on_call_host_checked, grpc_error **error);
+
+/// Cancels a pending asychronous call to
+/// grpc_channel_security_connector_check_call_host() with
+/// \a on_call_host_checked as its callback.
+void grpc_channel_security_connector_cancel_check_call_host(
+ grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc,
+ grpc_closure *on_call_host_checked, grpc_error *error);
/* Registers handshakers with \a handshake_mgr. */
void grpc_channel_security_connector_add_handshakers(
diff --git a/src/node/test/credentials_test.js b/src/node/test/credentials_test.js
index 3688f03512..0ff838e7d0 100644
--- a/src/node/test/credentials_test.js
+++ b/src/node/test/credentials_test.js
@@ -319,7 +319,9 @@ describe('client credentials', function() {
client_options);
client.unary({}, function(err, data) {
assert(err);
- assert.strictEqual(err.message, 'Authentication error');
+ assert.strictEqual(err.message,
+ 'Getting metadata from plugin failed with error: ' +
+ 'Authentication error');
assert.strictEqual(err.code, grpc.status.UNAUTHENTICATED);
done();
});
@@ -367,7 +369,9 @@ describe('client credentials', function() {
client_options);
client.unary({}, function(err, data) {
assert(err);
- assert.strictEqual(err.message, 'Authentication failure');
+ assert.strictEqual(err.message,
+ 'Getting metadata from plugin failed with error: ' +
+ 'Authentication failure');
done();
});
});
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index c2543dc388..42cff4041b 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -477,10 +477,31 @@ describe 'ClientStub' do
/Header values must be of type string or array/)
end
+ def run_server_streamer_against_client_with_unmarshal_error(
+ expected_input, replys)
+ wakey_thread do |notifier|
+ c = expect_server_to_be_invoked(notifier)
+ expect(c.remote_read).to eq(expected_input)
+ begin
+ replys.each { |r| c.remote_send(r) }
+ rescue GRPC::Core::CallError
+ # An attempt to write to the client might fail. This is ok
+ # because the client call is expected to fail when
+ # unmarshalling the first response, and to cancel the call,
+ # and there is a race as for when the server-side call will
+ # start to fail.
+ p 'remote_send failed (allowed because call expected to cancel)'
+ ensure
+ c.send_status(OK, 'OK', true)
+ end
+ end
+ end
+
it 'the call terminates when there is an unmarshalling error' do
server_port = create_test_server
host = "localhost:#{server_port}"
- th = run_server_streamer(@sent_msg, @replys, @pass)
+ th = run_server_streamer_against_client_with_unmarshal_error(
+ @sent_msg, @replys)
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
unmarshal = proc { fail(ArgumentError, 'test unmarshalling error') }
diff --git a/test/core/end2end/end2end_nosec_tests.c b/test/core/end2end/end2end_nosec_tests.c
index ae1db54f1a..6a061a4e2d 100644
--- a/test/core/end2end/end2end_nosec_tests.c
+++ b/test/core/end2end/end2end_nosec_tests.c
@@ -106,6 +106,8 @@ extern void ping(grpc_end2end_test_config config);
extern void ping_pre_init(void);
extern void ping_pong_streaming(grpc_end2end_test_config config);
extern void ping_pong_streaming_pre_init(void);
+extern void proxy_auth(grpc_end2end_test_config config);
+extern void proxy_auth_pre_init(void);
extern void registered_call(grpc_end2end_test_config config);
extern void registered_call_pre_init(void);
extern void request_with_flags(grpc_end2end_test_config config);
@@ -181,6 +183,7 @@ void grpc_end2end_tests_pre_init(void) {
payload_pre_init();
ping_pre_init();
ping_pong_streaming_pre_init();
+ proxy_auth_pre_init();
registered_call_pre_init();
request_with_flags_pre_init();
request_with_payload_pre_init();
@@ -244,6 +247,7 @@ void grpc_end2end_tests(int argc, char **argv,
payload(config);
ping(config);
ping_pong_streaming(config);
+ proxy_auth(config);
registered_call(config);
request_with_flags(config);
request_with_payload(config);
@@ -416,6 +420,10 @@ void grpc_end2end_tests(int argc, char **argv,
ping_pong_streaming(config);
continue;
}
+ if (0 == strcmp("proxy_auth", argv[i])) {
+ proxy_auth(config);
+ continue;
+ }
if (0 == strcmp("registered_call", argv[i])) {
registered_call(config);
continue;
diff --git a/test/core/end2end/end2end_tests.c b/test/core/end2end/end2end_tests.c
index d18dd9c7b6..3fc7c3fb6c 100644
--- a/test/core/end2end/end2end_tests.c
+++ b/test/core/end2end/end2end_tests.c
@@ -108,6 +108,8 @@ extern void ping(grpc_end2end_test_config config);
extern void ping_pre_init(void);
extern void ping_pong_streaming(grpc_end2end_test_config config);
extern void ping_pong_streaming_pre_init(void);
+extern void proxy_auth(grpc_end2end_test_config config);
+extern void proxy_auth_pre_init(void);
extern void registered_call(grpc_end2end_test_config config);
extern void registered_call_pre_init(void);
extern void request_with_flags(grpc_end2end_test_config config);
@@ -184,6 +186,7 @@ void grpc_end2end_tests_pre_init(void) {
payload_pre_init();
ping_pre_init();
ping_pong_streaming_pre_init();
+ proxy_auth_pre_init();
registered_call_pre_init();
request_with_flags_pre_init();
request_with_payload_pre_init();
@@ -248,6 +251,7 @@ void grpc_end2end_tests(int argc, char **argv,
payload(config);
ping(config);
ping_pong_streaming(config);
+ proxy_auth(config);
registered_call(config);
request_with_flags(config);
request_with_payload(config);
@@ -424,6 +428,10 @@ void grpc_end2end_tests(int argc, char **argv,
ping_pong_streaming(config);
continue;
}
+ if (0 == strcmp("proxy_auth", argv[i])) {
+ proxy_auth(config);
+ continue;
+ }
if (0 == strcmp("registered_call", argv[i])) {
registered_call(config);
continue;
diff --git a/test/core/end2end/fixtures/h2_http_proxy.c b/test/core/end2end/fixtures/h2_http_proxy.c
index f8c88e5953..6145892365 100644
--- a/test/core/end2end/fixtures/h2_http_proxy.c
+++ b/test/core/end2end/fixtures/h2_http_proxy.c
@@ -47,11 +47,13 @@ static grpc_end2end_test_fixture chttp2_create_fixture_fullstack(
grpc_channel_args *client_args, grpc_channel_args *server_args) {
grpc_end2end_test_fixture f;
memset(&f, 0, sizeof(f));
-
fullstack_fixture_data *ffd = gpr_malloc(sizeof(fullstack_fixture_data));
const int server_port = grpc_pick_unused_port_or_die();
gpr_join_host_port(&ffd->server_addr, "localhost", server_port);
- ffd->proxy = grpc_end2end_http_proxy_create();
+
+ /* Passing client_args to proxy_create for the case of checking for proxy auth
+ */
+ ffd->proxy = grpc_end2end_http_proxy_create(client_args);
f.fixture_data = ffd;
f.cq = grpc_completion_queue_create_for_next(NULL);
@@ -64,8 +66,17 @@ void chttp2_init_client_fullstack(grpc_end2end_test_fixture *f,
grpc_channel_args *client_args) {
fullstack_fixture_data *ffd = f->fixture_data;
char *proxy_uri;
- gpr_asprintf(&proxy_uri, "http://%s",
- grpc_end2end_http_proxy_get_proxy_name(ffd->proxy));
+
+ /* If testing for proxy auth, add credentials to proxy uri */
+ const grpc_arg *proxy_auth_arg =
+ grpc_channel_args_find(client_args, GRPC_ARG_HTTP_PROXY_AUTH_CREDS);
+ if (proxy_auth_arg == NULL || proxy_auth_arg->type != GRPC_ARG_STRING) {
+ gpr_asprintf(&proxy_uri, "http://%s",
+ grpc_end2end_http_proxy_get_proxy_name(ffd->proxy));
+ } else {
+ gpr_asprintf(&proxy_uri, "http://%s@%s", proxy_auth_arg->value.string,
+ grpc_end2end_http_proxy_get_proxy_name(ffd->proxy));
+ }
gpr_setenv("http_proxy", proxy_uri);
gpr_free(proxy_uri);
f->client = grpc_insecure_channel_create(ffd->server_addr, client_args, NULL);
diff --git a/test/core/end2end/fixtures/h2_oauth2.c b/test/core/end2end/fixtures/h2_oauth2.c
index 9cbaaa0811..ee1d0b1416 100644
--- a/test/core/end2end/fixtures/h2_oauth2.c
+++ b/test/core/end2end/fixtures/h2_oauth2.c
@@ -137,10 +137,11 @@ void chttp2_tear_down_secure_fullstack(grpc_end2end_test_fixture *f) {
static void chttp2_init_client_simple_ssl_with_oauth2_secure_fullstack(
grpc_end2end_test_fixture *f, grpc_channel_args *client_args) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_channel_credentials *ssl_creds =
grpc_ssl_credentials_create(test_root_cert, NULL, NULL);
- grpc_call_credentials *oauth2_creds =
- grpc_md_only_test_credentials_create("authorization", oauth2_md, 1);
+ grpc_call_credentials *oauth2_creds = grpc_md_only_test_credentials_create(
+ &exec_ctx, "authorization", oauth2_md, true /* is_async */);
grpc_channel_credentials *ssl_oauth2_creds =
grpc_composite_channel_credentials_create(ssl_creds, oauth2_creds, NULL);
grpc_arg ssl_name_override = {GRPC_ARG_STRING,
@@ -149,13 +150,10 @@ static void chttp2_init_client_simple_ssl_with_oauth2_secure_fullstack(
grpc_channel_args *new_client_args =
grpc_channel_args_copy_and_add(client_args, &ssl_name_override, 1);
chttp2_init_client_secure_fullstack(f, new_client_args, ssl_oauth2_creds);
- {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_channel_args_destroy(&exec_ctx, new_client_args);
- grpc_exec_ctx_finish(&exec_ctx);
- }
+ grpc_channel_args_destroy(&exec_ctx, new_client_args);
grpc_channel_credentials_release(ssl_creds);
grpc_call_credentials_release(oauth2_creds);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static int fail_server_auth_check(grpc_channel_args *server_args) {
diff --git a/test/core/end2end/fixtures/http_proxy_fixture.c b/test/core/end2end/fixtures/http_proxy_fixture.c
index 54693c4900..a4cfc77bcb 100644
--- a/test/core/end2end/fixtures/http_proxy_fixture.c
+++ b/test/core/end2end/fixtures/http_proxy_fixture.c
@@ -22,6 +22,7 @@
#include <string.h>
+#include <grpc/grpc.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
@@ -46,7 +47,9 @@
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/slice/b64.h"
#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/support/string.h"
#include "test/core/util/port.h"
struct grpc_end2end_http_proxy {
@@ -304,6 +307,28 @@ static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg,
&conn->on_write_response_done);
}
+/**
+ * Parses the proxy auth header value to check if it matches :-
+ * Basic <base64_encoded_expected_cred>
+ * Returns true if it matches, false otherwise
+ */
+static bool proxy_auth_header_matches(grpc_exec_ctx* exec_ctx,
+ char* proxy_auth_header_val,
+ char* expected_cred) {
+ GPR_ASSERT(proxy_auth_header_val != NULL);
+ GPR_ASSERT(expected_cred != NULL);
+ if (strncmp(proxy_auth_header_val, "Basic ", 6) != 0) {
+ return false;
+ }
+ proxy_auth_header_val += 6;
+ grpc_slice decoded_slice =
+ grpc_base64_decode(exec_ctx, proxy_auth_header_val, 0);
+ const bool header_matches =
+ grpc_slice_str_cmp(decoded_slice, expected_cred) == 0;
+ grpc_slice_unref_internal(exec_ctx, decoded_slice);
+ return header_matches;
+}
+
// Callback to read the HTTP CONNECT request.
// TODO(roth): Technically, for any of the failure modes handled by this
// function, we should handle the error by returning an HTTP response to
@@ -352,6 +377,28 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg,
GRPC_ERROR_UNREF(error);
return;
}
+ // If proxy auth is being used, check if the header is present and as expected
+ const grpc_arg* proxy_auth_arg = grpc_channel_args_find(
+ conn->proxy->channel_args, GRPC_ARG_HTTP_PROXY_AUTH_CREDS);
+ if (proxy_auth_arg != NULL && proxy_auth_arg->type == GRPC_ARG_STRING) {
+ bool client_authenticated = false;
+ for (size_t i = 0; i < conn->http_request.hdr_count; i++) {
+ if (strcmp(conn->http_request.hdrs[i].key, "Proxy-Authorization") == 0) {
+ client_authenticated = proxy_auth_header_matches(
+ exec_ctx, conn->http_request.hdrs[i].value,
+ proxy_auth_arg->value.string);
+ break;
+ }
+ }
+ if (!client_authenticated) {
+ const char* msg = "HTTP Connect could not verify authentication";
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(msg);
+ proxy_connection_failed(exec_ctx, conn, true /* is_client */,
+ "HTTP proxy read request", error);
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
+ }
// Resolve address.
grpc_resolved_addresses* resolved_addresses = NULL;
error = grpc_blocking_resolve_address(conn->http_request.path, "80",
@@ -436,7 +483,8 @@ static void thread_main(void* arg) {
grpc_exec_ctx_finish(&exec_ctx);
}
-grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(void) {
+grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(
+ grpc_channel_args* args) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_end2end_http_proxy* proxy =
(grpc_end2end_http_proxy*)gpr_malloc(sizeof(*proxy));
@@ -448,7 +496,7 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(void) {
gpr_join_host_port(&proxy->proxy_name, "localhost", proxy_port);
gpr_log(GPR_INFO, "Proxy address: %s", proxy->proxy_name);
// Create TCP server.
- proxy->channel_args = grpc_channel_args_copy(NULL);
+ proxy->channel_args = grpc_channel_args_copy(args);
grpc_error* error = grpc_tcp_server_create(
&exec_ctx, NULL, proxy->channel_args, &proxy->server);
GPR_ASSERT(error == GRPC_ERROR_NONE);
diff --git a/test/core/end2end/fixtures/http_proxy_fixture.h b/test/core/end2end/fixtures/http_proxy_fixture.h
index a72162e846..103bd08196 100644
--- a/test/core/end2end/fixtures/http_proxy_fixture.h
+++ b/test/core/end2end/fixtures/http_proxy_fixture.h
@@ -16,11 +16,28 @@
*
*/
+#ifndef GRPC_TEST_CORE_END2END_FIXTURES_HTTP_PROXY_FIXTURE_H
+#define GRPC_TEST_CORE_END2END_FIXTURES_HTTP_PROXY_FIXTURE_H
+
+#include <grpc/grpc.h>
+
+/* The test credentials being used for HTTP Proxy Authorization */
+#define GRPC_TEST_HTTP_PROXY_AUTH_CREDS "aladdin:opensesame"
+
+/* A channel arg key used to indicate that the channel uses proxy authorization.
+ * The value (string) should be the proxy auth credentials that should be
+ * checked.
+ */
+#define GRPC_ARG_HTTP_PROXY_AUTH_CREDS "grpc.test.proxy_auth"
+
typedef struct grpc_end2end_http_proxy grpc_end2end_http_proxy;
-grpc_end2end_http_proxy* grpc_end2end_http_proxy_create();
+grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(
+ grpc_channel_args* args);
void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy);
const char* grpc_end2end_http_proxy_get_proxy_name(
grpc_end2end_http_proxy* proxy);
+
+#endif /* GRPC_TEST_CORE_END2END_FIXTURES_HTTP_PROXY_FIXTURE_H */
diff --git a/test/core/end2end/gen_build_yaml.py b/test/core/end2end/gen_build_yaml.py
index 6878964c4f..18bae63a8a 100755
--- a/test/core/end2end/gen_build_yaml.py
+++ b/test/core/end2end/gen_build_yaml.py
@@ -24,9 +24,9 @@ import hashlib
FixtureOptions = collections.namedtuple(
'FixtureOptions',
- 'fullstack includes_proxy dns_resolver name_resolution secure platforms ci_mac tracing exclude_configs exclude_iomgrs large_writes enables_compression supports_compression is_inproc is_http2')
+ 'fullstack includes_proxy dns_resolver name_resolution secure platforms ci_mac tracing exclude_configs exclude_iomgrs large_writes enables_compression supports_compression is_inproc is_http2 supports_proxy_auth')
default_unsecure_fixture_options = FixtureOptions(
- True, False, True, True, False, ['windows', 'linux', 'mac', 'posix'], True, False, [], [], True, False, True, False, True)
+ True, False, True, True, False, ['windows', 'linux', 'mac', 'posix'], True, False, [], [], True, False, True, False, True, False)
socketpair_unsecure_fixture_options = default_unsecure_fixture_options._replace(fullstack=False, dns_resolver=False)
default_secure_fixture_options = default_unsecure_fixture_options._replace(secure=True)
uds_fixture_options = default_unsecure_fixture_options._replace(dns_resolver=False, platforms=['linux', 'mac', 'posix'], exclude_iomgrs=['uv'])
@@ -47,7 +47,7 @@ END2END_FIXTURES = {
'h2_full+trace': default_unsecure_fixture_options._replace(tracing=True),
'h2_full+workarounds': default_unsecure_fixture_options,
'h2_http_proxy': default_unsecure_fixture_options._replace(
- ci_mac=False, exclude_iomgrs=['uv']),
+ ci_mac=False, exclude_iomgrs=['uv'], supports_proxy_auth=True),
'h2_oauth2': default_secure_fixture_options._replace(
ci_mac=False, exclude_iomgrs=['uv']),
'h2_proxy': default_unsecure_fixture_options._replace(
@@ -69,8 +69,8 @@ END2END_FIXTURES = {
TestOptions = collections.namedtuple(
'TestOptions',
- 'needs_fullstack needs_dns needs_names proxyable secure traceable cpu_cost exclude_iomgrs large_writes flaky allows_compression needs_compression exclude_inproc needs_http2')
-default_test_options = TestOptions(False, False, False, True, False, True, 1.0, [], False, False, True, False, False, False)
+ 'needs_fullstack needs_dns needs_names proxyable secure traceable cpu_cost exclude_iomgrs large_writes flaky allows_compression needs_compression exclude_inproc needs_http2 needs_proxy_auth')
+default_test_options = TestOptions(False, False, False, True, False, True, 1.0, [], False, False, True, False, False, False, False)
connectivity_test_options = default_test_options._replace(needs_fullstack=True)
LOWCPU = 0.1
@@ -128,6 +128,7 @@ END2END_TESTS = {
'load_reporting_hook': default_test_options,
'ping_pong_streaming': default_test_options._replace(cpu_cost=LOWCPU),
'ping': connectivity_test_options._replace(proxyable=False, cpu_cost=LOWCPU),
+ 'proxy_auth': default_test_options._replace(needs_proxy_auth=True),
'registered_call': default_test_options,
'request_with_flags': default_test_options._replace(
proxyable=False, cpu_cost=LOWCPU),
@@ -178,6 +179,9 @@ def compatible(f, t):
if END2END_TESTS[t].needs_http2:
if not END2END_FIXTURES[f].is_http2:
return False
+ if END2END_TESTS[t].needs_proxy_auth:
+ if not END2END_FIXTURES[f].supports_proxy_auth:
+ return False
return True
diff --git a/test/core/end2end/generate_tests.bzl b/test/core/end2end/generate_tests.bzl
index ea9ad03513..6d1917c0ff 100755
--- a/test/core/end2end/generate_tests.bzl
+++ b/test/core/end2end/generate_tests.bzl
@@ -21,7 +21,7 @@ load("//bazel:grpc_build_system.bzl", "grpc_sh_test", "grpc_cc_binary", "grpc_cc
def fixture_options(fullstack=True, includes_proxy=False, dns_resolver=True,
name_resolution=True, secure=True, tracing=False,
platforms=['windows', 'linux', 'mac', 'posix'],
- is_inproc=False, is_http2=True):
+ is_inproc=False, is_http2=True, supports_proxy_auth=False):
return struct(
fullstack=fullstack,
includes_proxy=includes_proxy,
@@ -30,7 +30,8 @@ def fixture_options(fullstack=True, includes_proxy=False, dns_resolver=True,
secure=secure,
tracing=tracing,
is_inproc=is_inproc,
- is_http2=is_http2
+ is_http2=is_http2,
+ supports_proxy_auth=supports_proxy_auth
#platforms=platforms
)
@@ -47,7 +48,7 @@ END2END_FIXTURES = {
'h2_full+pipe': fixture_options(platforms=['linux']),
'h2_full+trace': fixture_options(tracing=True),
'h2_full+workarounds': fixture_options(),
- 'h2_http_proxy': fixture_options(),
+ 'h2_http_proxy': fixture_options(supports_proxy_auth=True),
'h2_oauth2': fixture_options(),
'h2_proxy': fixture_options(includes_proxy=True),
'h2_sockpair_1byte': fixture_options(fullstack=False, dns_resolver=False),
@@ -67,7 +68,8 @@ END2END_FIXTURES = {
def test_options(needs_fullstack=False, needs_dns=False, needs_names=False,
proxyable=True, secure=False, traceable=False,
- exclude_inproc=False, needs_http2=False):
+ exclude_inproc=False, needs_http2=False,
+ needs_proxy_auth=False):
return struct(
needs_fullstack=needs_fullstack,
needs_dns=needs_dns,
@@ -76,7 +78,8 @@ def test_options(needs_fullstack=False, needs_dns=False, needs_names=False,
secure=secure,
traceable=traceable,
exclude_inproc=exclude_inproc,
- needs_http2=needs_http2
+ needs_http2=needs_http2,
+ needs_proxy_auth=needs_proxy_auth
)
@@ -123,6 +126,7 @@ END2END_TESTS = {
'load_reporting_hook': test_options(),
'ping_pong_streaming': test_options(),
'ping': test_options(needs_fullstack=True, proxyable=False),
+ 'proxy_auth': test_options(needs_proxy_auth=True),
'registered_call': test_options(),
'request_with_flags': test_options(proxyable=False),
'request_with_payload': test_options(),
@@ -165,6 +169,9 @@ def compatible(fopt, topt):
if topt.needs_http2:
if not fopt.is_http2:
return False
+ if topt.needs_proxy_auth:
+ if not fopt.supports_proxy_auth:
+ return False
return True
diff --git a/test/core/end2end/tests/proxy_auth.c b/test/core/end2end/tests/proxy_auth.c
new file mode 100644
index 0000000000..d922049bcb
--- /dev/null
+++ b/test/core/end2end/tests/proxy_auth.c
@@ -0,0 +1,235 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * This test is for checking whether proxy authentication is working with HTTP
+ * Connect.
+ */
+#include "test/core/end2end/end2end_tests.h"
+#include "test/core/end2end/fixtures/http_proxy_fixture.h"
+
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/byte_buffer.h>
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
+#include "src/core/lib/support/string.h"
+#include "test/core/end2end/cq_verifier.h"
+
+static void *tag(intptr_t t) { return (void *)t; }
+
+static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
+ const char *test_name,
+ grpc_channel_args *client_args,
+ grpc_channel_args *server_args) {
+ grpc_end2end_test_fixture f;
+ gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
+ f = config.create_fixture(client_args, server_args);
+ config.init_server(&f, server_args);
+ config.init_client(&f, client_args);
+ return f;
+}
+
+static gpr_timespec n_seconds_from_now(int n) {
+ return grpc_timeout_seconds_to_deadline(n);
+}
+
+static gpr_timespec five_seconds_from_now(void) {
+ return n_seconds_from_now(5);
+}
+
+static void drain_cq(grpc_completion_queue *cq) {
+ grpc_event ev;
+ do {
+ ev = grpc_completion_queue_next(cq, five_seconds_from_now(), NULL);
+ } while (ev.type != GRPC_QUEUE_SHUTDOWN);
+}
+
+static void shutdown_server(grpc_end2end_test_fixture *f) {
+ if (!f->server) return;
+ grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
+ GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
+ grpc_timeout_seconds_to_deadline(5),
+ NULL)
+ .type == GRPC_OP_COMPLETE);
+ grpc_server_destroy(f->server);
+ f->server = NULL;
+}
+
+static void shutdown_client(grpc_end2end_test_fixture *f) {
+ if (!f->client) return;
+ grpc_channel_destroy(f->client);
+ f->client = NULL;
+}
+
+static void end_test(grpc_end2end_test_fixture *f) {
+ shutdown_server(f);
+ shutdown_client(f);
+
+ grpc_completion_queue_shutdown(f->cq);
+ drain_cq(f->cq);
+ grpc_completion_queue_destroy(f->cq);
+ grpc_completion_queue_destroy(f->shutdown_cq);
+}
+
+static void simple_request_body(grpc_end2end_test_config config,
+ grpc_end2end_test_fixture f) {
+ grpc_call *c;
+ grpc_call *s;
+ cq_verifier *cqv = cq_verifier_create(f.cq);
+ grpc_op ops[6];
+ grpc_op *op;
+ grpc_metadata_array initial_metadata_recv;
+ grpc_metadata_array trailing_metadata_recv;
+ grpc_metadata_array request_metadata_recv;
+ grpc_call_details call_details;
+ grpc_status_code status;
+ grpc_call_error error;
+ grpc_slice details;
+ int was_cancelled = 2;
+ char *peer;
+
+ gpr_timespec deadline = five_seconds_from_now();
+ c = grpc_channel_create_call(
+ f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq,
+ grpc_slice_from_static_string("/foo"),
+ get_host_override_slice("foo.test.google.fr:1234", config), deadline,
+ NULL);
+ GPR_ASSERT(c);
+
+ peer = grpc_call_get_peer(c);
+ GPR_ASSERT(peer != NULL);
+ gpr_log(GPR_DEBUG, "client_peer_before_call=%s", peer);
+ gpr_free(peer);
+
+ grpc_metadata_array_init(&initial_metadata_recv);
+ grpc_metadata_array_init(&trailing_metadata_recv);
+ grpc_metadata_array_init(&request_metadata_recv);
+ grpc_call_details_init(&call_details);
+
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
+ op->data.recv_status_on_client.status = &status;
+ op->data.recv_status_on_client.status_details = &details;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ error =
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.cq, f.cq, tag(101));
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
+ cq_verify(cqv);
+
+ peer = grpc_call_get_peer(s);
+ GPR_ASSERT(peer != NULL);
+ gpr_log(GPR_DEBUG, "server_peer=%s", peer);
+ gpr_free(peer);
+ peer = grpc_call_get_peer(c);
+ GPR_ASSERT(peer != NULL);
+ gpr_log(GPR_DEBUG, "client_peer=%s", peer);
+ gpr_free(peer);
+
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+ op->data.send_status_from_server.trailing_metadata_count = 0;
+ op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
+ grpc_slice status_details = grpc_slice_from_static_string("xyz");
+ op->data.send_status_from_server.status_details = &status_details;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ op->data.recv_close_on_server.cancelled = &was_cancelled;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ CQ_EXPECT_COMPLETION(cqv, tag(102), 1);
+ CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
+ cq_verify(cqv);
+
+ GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);
+ GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
+ GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
+ validate_host_override_string("foo.test.google.fr:1234", call_details.host,
+ config);
+ GPR_ASSERT(0 == call_details.flags);
+ GPR_ASSERT(was_cancelled == 1);
+
+ grpc_slice_unref(details);
+ grpc_metadata_array_destroy(&initial_metadata_recv);
+ grpc_metadata_array_destroy(&trailing_metadata_recv);
+ grpc_metadata_array_destroy(&request_metadata_recv);
+ grpc_call_details_destroy(&call_details);
+
+ grpc_call_unref(c);
+ grpc_call_unref(s);
+
+ cq_verifier_destroy(cqv);
+}
+
+static void test_invoke_proxy_auth(grpc_end2end_test_config config) {
+ /* Indicate that the proxy requires user auth */
+ grpc_arg client_arg = {.type = GRPC_ARG_STRING,
+ .key = GRPC_ARG_HTTP_PROXY_AUTH_CREDS,
+ .value.string = GRPC_TEST_HTTP_PROXY_AUTH_CREDS};
+ grpc_channel_args client_args = {.num_args = 1, .args = &client_arg};
+ grpc_end2end_test_fixture f =
+ begin_test(config, "test_invoke_proxy_auth", &client_args, NULL);
+ simple_request_body(config, f);
+ end_test(&f);
+ config.tear_down_data(&f);
+}
+
+void proxy_auth(grpc_end2end_test_config config) {
+ test_invoke_proxy_auth(config);
+}
+
+void proxy_auth_pre_init(void) {}
diff --git a/test/core/iomgr/ev_epollsig_linux_test.c b/test/core/iomgr/ev_epollsig_linux_test.c
index 1d272fa406..c702065d4d 100644
--- a/test/core/iomgr/ev_epollsig_linux_test.c
+++ b/test/core/iomgr/ev_epollsig_linux_test.c
@@ -79,7 +79,8 @@ static void test_fd_cleanup(grpc_exec_ctx *exec_ctx, test_fd *tfds,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("test_fd_cleanup"));
grpc_exec_ctx_flush(exec_ctx);
- grpc_fd_orphan(exec_ctx, tfds[i].fd, NULL, &release_fd, "test_fd_cleanup");
+ grpc_fd_orphan(exec_ctx, tfds[i].fd, NULL, &release_fd,
+ false /* already_closed */, "test_fd_cleanup");
grpc_exec_ctx_flush(exec_ctx);
GPR_ASSERT(release_fd == tfds[i].inner_fd);
@@ -294,7 +295,8 @@ static void test_threading(void) {
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_fd_shutdown(&exec_ctx, shared.wakeup_desc, GRPC_ERROR_CANCELLED);
- grpc_fd_orphan(&exec_ctx, shared.wakeup_desc, NULL, NULL, "done");
+ grpc_fd_orphan(&exec_ctx, shared.wakeup_desc, NULL, NULL,
+ false /* already_closed */, "done");
grpc_pollset_shutdown(&exec_ctx, shared.pollset,
GRPC_CLOSURE_CREATE(destroy_pollset, shared.pollset,
grpc_schedule_on_exec_ctx));
diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c
index 02596450d2..85d5d9c07f 100644
--- a/test/core/iomgr/fd_posix_test.c
+++ b/test/core/iomgr/fd_posix_test.c
@@ -114,7 +114,8 @@ static void session_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */
bool success) {
session *se = arg;
server *sv = se->sv;
- grpc_fd_orphan(exec_ctx, se->em_fd, NULL, NULL, "a");
+ grpc_fd_orphan(exec_ctx, se->em_fd, NULL, NULL, false /* already_closed */,
+ "a");
gpr_free(se);
/* Start to shutdown listen fd. */
grpc_fd_shutdown(exec_ctx, sv->em_fd,
@@ -171,7 +172,8 @@ static void listen_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg /*server */,
int success) {
server *sv = arg;
- grpc_fd_orphan(exec_ctx, sv->em_fd, NULL, NULL, "b");
+ grpc_fd_orphan(exec_ctx, sv->em_fd, NULL, NULL, false /* already_closed */,
+ "b");
gpr_mu_lock(g_mu);
sv->done = 1;
@@ -291,7 +293,8 @@ static void client_init(client *cl) {
static void client_session_shutdown_cb(grpc_exec_ctx *exec_ctx,
void *arg /*client */, int success) {
client *cl = arg;
- grpc_fd_orphan(exec_ctx, cl->em_fd, NULL, NULL, "c");
+ grpc_fd_orphan(exec_ctx, cl->em_fd, NULL, NULL, false /* already_closed */,
+ "c");
cl->done = 1;
GPR_ASSERT(
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)));
@@ -511,7 +514,7 @@ static void test_grpc_fd_change(void) {
GPR_ASSERT(b.cb_that_ran == second_read_callback);
gpr_mu_unlock(g_mu);
- grpc_fd_orphan(&exec_ctx, em_fd, NULL, NULL, "d");
+ grpc_fd_orphan(&exec_ctx, em_fd, NULL, NULL, false /* already_closed */, "d");
grpc_exec_ctx_finish(&exec_ctx);
destroy_change_data(&a);
destroy_change_data(&b);
diff --git a/test/core/iomgr/pollset_set_test.c b/test/core/iomgr/pollset_set_test.c
index 6aedaf1081..5750ac0f4b 100644
--- a/test/core/iomgr/pollset_set_test.c
+++ b/test/core/iomgr/pollset_set_test.c
@@ -137,7 +137,8 @@ static void cleanup_test_fds(grpc_exec_ctx *exec_ctx, test_fd *tfds,
* grpc_wakeup_fd and we would like to destroy it ourselves (by calling
* grpc_wakeup_fd_destroy). To prevent grpc_fd from calling close() on the
* underlying fd, call it with a non-NULL 'release_fd' parameter */
- grpc_fd_orphan(exec_ctx, tfds[i].fd, NULL, &release_fd, "test_fd_cleanup");
+ grpc_fd_orphan(exec_ctx, tfds[i].fd, NULL, &release_fd,
+ false /* already_closed */, "test_fd_cleanup");
grpc_exec_ctx_flush(exec_ctx);
grpc_wakeup_fd_destroy(&tfds[i].wakeup_fd);
diff --git a/test/core/security/credentials_test.c b/test/core/security/credentials_test.c
index a76cb0499d..e60e398767 100644
--- a/test/core/security/credentials_test.c
+++ b/test/core/security/credentials_test.c
@@ -105,8 +105,6 @@ static const char valid_oauth2_json_response[] =
" \"expires_in\":3599, "
" \"token_type\":\"Bearer\"}";
-static const char test_user_data[] = "user data";
-
static const char test_scope[] = "perm1 perm2";
static const char test_signed_jwt[] =
@@ -134,11 +132,6 @@ static char *test_json_key_str(void) {
return result;
}
-typedef struct {
- const char *key;
- const char *value;
-} expected_md;
-
static grpc_httpcli_response http_response(int status, const char *body) {
grpc_httpcli_response response;
memset(&response, 0, sizeof(grpc_httpcli_response));
@@ -150,89 +143,57 @@ static grpc_httpcli_response http_response(int status, const char *body) {
/* -- Tests. -- */
-static void test_empty_md_store(void) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_credentials_md_store *store = grpc_credentials_md_store_create(0);
- GPR_ASSERT(store->num_entries == 0);
- GPR_ASSERT(store->allocated == 0);
- grpc_credentials_md_store_unref(&exec_ctx, store);
- grpc_exec_ctx_finish(&exec_ctx);
-}
-
-static void test_ref_unref_empty_md_store(void) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_credentials_md_store *store = grpc_credentials_md_store_create(0);
- grpc_credentials_md_store_ref(store);
- grpc_credentials_md_store_ref(store);
- GPR_ASSERT(store->num_entries == 0);
- GPR_ASSERT(store->allocated == 0);
- grpc_credentials_md_store_unref(&exec_ctx, store);
- grpc_credentials_md_store_unref(&exec_ctx, store);
- grpc_credentials_md_store_unref(&exec_ctx, store);
- grpc_exec_ctx_finish(&exec_ctx);
-}
-
-static void test_add_to_empty_md_store(void) {
+static void test_empty_md_array(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_credentials_md_store *store = grpc_credentials_md_store_create(0);
- const char *key_str = "hello";
- const char *value_str = "there blah blah blah blah blah blah blah";
- grpc_slice key = grpc_slice_from_copied_string(key_str);
- grpc_slice value = grpc_slice_from_copied_string(value_str);
- grpc_credentials_md_store_add(store, key, value);
- GPR_ASSERT(store->num_entries == 1);
- GPR_ASSERT(grpc_slice_eq(key, store->entries[0].key));
- GPR_ASSERT(grpc_slice_eq(value, store->entries[0].value));
- grpc_slice_unref(key);
- grpc_slice_unref(value);
- grpc_credentials_md_store_unref(&exec_ctx, store);
+ grpc_credentials_mdelem_array md_array;
+ memset(&md_array, 0, sizeof(md_array));
+ GPR_ASSERT(md_array.md == NULL);
+ GPR_ASSERT(md_array.size == 0);
+ grpc_credentials_mdelem_array_destroy(&exec_ctx, &md_array);
grpc_exec_ctx_finish(&exec_ctx);
}
-static void test_add_cstrings_to_empty_md_store(void) {
+static void test_add_to_empty_md_array(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_credentials_md_store *store = grpc_credentials_md_store_create(0);
- const char *key_str = "hello";
- const char *value_str = "there blah blah blah blah blah blah blah";
- grpc_credentials_md_store_add_cstrings(store, key_str, value_str);
- GPR_ASSERT(store->num_entries == 1);
- GPR_ASSERT(grpc_slice_str_cmp(store->entries[0].key, key_str) == 0);
- GPR_ASSERT(grpc_slice_str_cmp(store->entries[0].value, value_str) == 0);
- grpc_credentials_md_store_unref(&exec_ctx, store);
+ grpc_credentials_mdelem_array md_array;
+ memset(&md_array, 0, sizeof(md_array));
+ const char *key = "hello";
+ const char *value = "there blah blah blah blah blah blah blah";
+ grpc_mdelem md =
+ grpc_mdelem_from_slices(&exec_ctx, grpc_slice_from_copied_string(key),
+ grpc_slice_from_copied_string(value));
+ grpc_credentials_mdelem_array_add(&md_array, md);
+ GPR_ASSERT(md_array.size == 1);
+ GPR_ASSERT(grpc_mdelem_eq(md, md_array.md[0]));
+ GRPC_MDELEM_UNREF(&exec_ctx, md);
+ grpc_credentials_mdelem_array_destroy(&exec_ctx, &md_array);
grpc_exec_ctx_finish(&exec_ctx);
}
-static void test_empty_preallocated_md_store(void) {
+static void test_add_abunch_to_md_array(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_credentials_md_store *store = grpc_credentials_md_store_create(4);
- GPR_ASSERT(store->num_entries == 0);
- GPR_ASSERT(store->allocated == 4);
- GPR_ASSERT(store->entries != NULL);
- grpc_credentials_md_store_unref(&exec_ctx, store);
- grpc_exec_ctx_finish(&exec_ctx);
-}
-
-static void test_add_abunch_to_md_store(void) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_credentials_md_store *store = grpc_credentials_md_store_create(4);
+ grpc_credentials_mdelem_array md_array;
+ memset(&md_array, 0, sizeof(md_array));
+ const char *key = "hello";
+ const char *value = "there blah blah blah blah blah blah blah";
+ grpc_mdelem md =
+ grpc_mdelem_from_slices(&exec_ctx, grpc_slice_from_copied_string(key),
+ grpc_slice_from_copied_string(value));
size_t num_entries = 1000;
- const char *key_str = "hello";
- const char *value_str = "there blah blah blah blah blah blah blah";
- size_t i;
- for (i = 0; i < num_entries; i++) {
- grpc_credentials_md_store_add_cstrings(store, key_str, value_str);
+ for (size_t i = 0; i < num_entries; ++i) {
+ grpc_credentials_mdelem_array_add(&md_array, md);
}
- for (i = 0; i < num_entries; i++) {
- GPR_ASSERT(grpc_slice_str_cmp(store->entries[i].key, key_str) == 0);
- GPR_ASSERT(grpc_slice_str_cmp(store->entries[i].value, value_str) == 0);
+ for (size_t i = 0; i < num_entries; ++i) {
+ GPR_ASSERT(grpc_mdelem_eq(md_array.md[i], md));
}
- grpc_credentials_md_store_unref(&exec_ctx, store);
+ GRPC_MDELEM_UNREF(&exec_ctx, md);
+ grpc_credentials_mdelem_array_destroy(&exec_ctx, &md_array);
grpc_exec_ctx_finish(&exec_ctx);
}
static void test_oauth2_token_fetcher_creds_parsing_ok(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_credentials_md_store *token_md = NULL;
+ grpc_mdelem token_md = GRPC_MDNULL;
gpr_timespec token_lifetime;
grpc_httpcli_response response =
http_response(200, valid_oauth2_json_response);
@@ -241,20 +202,18 @@ static void test_oauth2_token_fetcher_creds_parsing_ok(void) {
GRPC_CREDENTIALS_OK);
GPR_ASSERT(token_lifetime.tv_sec == 3599);
GPR_ASSERT(token_lifetime.tv_nsec == 0);
- GPR_ASSERT(token_md->num_entries == 1);
- GPR_ASSERT(grpc_slice_str_cmp(token_md->entries[0].key, "authorization") ==
- 0);
- GPR_ASSERT(grpc_slice_str_cmp(token_md->entries[0].value,
+ GPR_ASSERT(grpc_slice_str_cmp(GRPC_MDKEY(token_md), "authorization") == 0);
+ GPR_ASSERT(grpc_slice_str_cmp(GRPC_MDVALUE(token_md),
"Bearer ya29.AHES6ZRN3-HlhAPya30GnW_bHSb_") ==
0);
- grpc_credentials_md_store_unref(&exec_ctx, token_md);
+ GRPC_MDELEM_UNREF(&exec_ctx, token_md);
grpc_http_response_destroy(&response);
grpc_exec_ctx_finish(&exec_ctx);
}
static void test_oauth2_token_fetcher_creds_parsing_bad_http_status(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_credentials_md_store *token_md = NULL;
+ grpc_mdelem token_md = GRPC_MDNULL;
gpr_timespec token_lifetime;
grpc_httpcli_response response =
http_response(401, valid_oauth2_json_response);
@@ -267,7 +226,7 @@ static void test_oauth2_token_fetcher_creds_parsing_bad_http_status(void) {
static void test_oauth2_token_fetcher_creds_parsing_empty_http_body(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_credentials_md_store *token_md = NULL;
+ grpc_mdelem token_md = GRPC_MDNULL;
gpr_timespec token_lifetime;
grpc_httpcli_response response = http_response(200, "");
GPR_ASSERT(grpc_oauth2_token_fetcher_credentials_parse_server_response(
@@ -279,7 +238,7 @@ static void test_oauth2_token_fetcher_creds_parsing_empty_http_body(void) {
static void test_oauth2_token_fetcher_creds_parsing_invalid_json(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_credentials_md_store *token_md = NULL;
+ grpc_mdelem token_md = GRPC_MDNULL;
gpr_timespec token_lifetime;
grpc_httpcli_response response =
http_response(200,
@@ -295,7 +254,7 @@ static void test_oauth2_token_fetcher_creds_parsing_invalid_json(void) {
static void test_oauth2_token_fetcher_creds_parsing_missing_token(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_credentials_md_store *token_md = NULL;
+ grpc_mdelem token_md = GRPC_MDNULL;
gpr_timespec token_lifetime;
grpc_httpcli_response response = http_response(200,
"{"
@@ -310,7 +269,7 @@ static void test_oauth2_token_fetcher_creds_parsing_missing_token(void) {
static void test_oauth2_token_fetcher_creds_parsing_missing_token_type(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_credentials_md_store *token_md = NULL;
+ grpc_mdelem token_md = GRPC_MDNULL;
gpr_timespec token_lifetime;
grpc_httpcli_response response =
http_response(200,
@@ -327,7 +286,7 @@ static void test_oauth2_token_fetcher_creds_parsing_missing_token_type(void) {
static void test_oauth2_token_fetcher_creds_parsing_missing_token_lifetime(
void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_credentials_md_store *token_md = NULL;
+ grpc_mdelem token_md = GRPC_MDNULL;
gpr_timespec token_lifetime;
grpc_httpcli_response response =
http_response(200,
@@ -340,75 +299,121 @@ static void test_oauth2_token_fetcher_creds_parsing_missing_token_lifetime(
grpc_exec_ctx_finish(&exec_ctx);
}
-static void check_metadata(expected_md *expected, grpc_credentials_md *md_elems,
- size_t num_md) {
- size_t i;
- for (i = 0; i < num_md; i++) {
+typedef struct {
+ const char *key;
+ const char *value;
+} expected_md;
+
+typedef struct {
+ grpc_error *expected_error;
+ const expected_md *expected;
+ size_t expected_size;
+ grpc_credentials_mdelem_array md_array;
+ grpc_closure on_request_metadata;
+ grpc_call_credentials *creds;
+} request_metadata_state;
+
+static void check_metadata(const expected_md *expected,
+ grpc_credentials_mdelem_array *md_array) {
+ for (size_t i = 0; i < md_array->size; ++i) {
size_t j;
- for (j = 0; j < num_md; j++) {
- if (0 == grpc_slice_str_cmp(md_elems[j].key, expected[i].key)) {
- GPR_ASSERT(grpc_slice_str_cmp(md_elems[j].value, expected[i].value) ==
- 0);
+ for (j = 0; j < md_array->size; ++j) {
+ if (0 ==
+ grpc_slice_str_cmp(GRPC_MDKEY(md_array->md[j]), expected[i].key)) {
+ GPR_ASSERT(grpc_slice_str_cmp(GRPC_MDVALUE(md_array->md[j]),
+ expected[i].value) == 0);
break;
}
}
- if (j == num_md) {
+ if (j == md_array->size) {
gpr_log(GPR_ERROR, "key %s not found", expected[i].key);
GPR_ASSERT(0);
}
}
}
-static void check_google_iam_metadata(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_credentials_md *md_elems,
- size_t num_md,
- grpc_credentials_status status,
- const char *error_details) {
- grpc_call_credentials *c = (grpc_call_credentials *)user_data;
- expected_md emd[] = {{GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
- test_google_iam_authorization_token},
- {GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
- test_google_iam_authority_selector}};
- GPR_ASSERT(status == GRPC_CREDENTIALS_OK);
- GPR_ASSERT(error_details == NULL);
- GPR_ASSERT(num_md == 2);
- check_metadata(emd, md_elems, num_md);
- grpc_call_credentials_unref(exec_ctx, c);
+static void check_request_metadata(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ request_metadata_state *state = (request_metadata_state *)arg;
+ gpr_log(GPR_INFO, "expected_error: %s",
+ grpc_error_string(state->expected_error));
+ gpr_log(GPR_INFO, "actual_error: %s", grpc_error_string(error));
+ if (state->expected_error == GRPC_ERROR_NONE) {
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ } else {
+ grpc_slice expected_error;
+ GPR_ASSERT(grpc_error_get_str(state->expected_error,
+ GRPC_ERROR_STR_DESCRIPTION, &expected_error));
+ grpc_slice actual_error;
+ GPR_ASSERT(
+ grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION, &actual_error));
+ GPR_ASSERT(grpc_slice_cmp(expected_error, actual_error) == 0);
+ GRPC_ERROR_UNREF(state->expected_error);
+ }
+ gpr_log(GPR_INFO, "expected_size=%" PRIdPTR " actual_size=%" PRIdPTR,
+ state->expected_size, state->md_array.size);
+ GPR_ASSERT(state->md_array.size == state->expected_size);
+ check_metadata(state->expected, &state->md_array);
+ grpc_credentials_mdelem_array_destroy(exec_ctx, &state->md_array);
+ gpr_free(state);
+}
+
+static request_metadata_state *make_request_metadata_state(
+ grpc_error *expected_error, const expected_md *expected,
+ size_t expected_size) {
+ request_metadata_state *state = gpr_zalloc(sizeof(*state));
+ state->expected_error = expected_error;
+ state->expected = expected;
+ state->expected_size = expected_size;
+ GRPC_CLOSURE_INIT(&state->on_request_metadata, check_request_metadata, state,
+ grpc_schedule_on_exec_ctx);
+ return state;
+}
+
+static void run_request_metadata_test(grpc_exec_ctx *exec_ctx,
+ grpc_call_credentials *creds,
+ grpc_auth_metadata_context auth_md_ctx,
+ request_metadata_state *state) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (grpc_call_credentials_get_request_metadata(
+ exec_ctx, creds, NULL, auth_md_ctx, &state->md_array,
+ &state->on_request_metadata, &error)) {
+ // Synchronous result. Invoke the callback directly.
+ check_request_metadata(exec_ctx, state, error);
+ GRPC_ERROR_UNREF(error);
+ }
}
static void test_google_iam_creds(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ expected_md emd[] = {{GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
+ test_google_iam_authorization_token},
+ {GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
+ test_google_iam_authority_selector}};
+ request_metadata_state *state =
+ make_request_metadata_state(GRPC_ERROR_NONE, emd, GPR_ARRAY_SIZE(emd));
grpc_call_credentials *creds = grpc_google_iam_credentials_create(
test_google_iam_authorization_token, test_google_iam_authority_selector,
NULL);
grpc_auth_metadata_context auth_md_ctx = {test_service_url, test_method, NULL,
NULL};
- grpc_call_credentials_get_request_metadata(
- &exec_ctx, creds, NULL, auth_md_ctx, check_google_iam_metadata, creds);
+ run_request_metadata_test(&exec_ctx, creds, auth_md_ctx, state);
+ grpc_call_credentials_unref(&exec_ctx, creds);
grpc_exec_ctx_finish(&exec_ctx);
}
-static void check_access_token_metadata(
- grpc_exec_ctx *exec_ctx, void *user_data, grpc_credentials_md *md_elems,
- size_t num_md, grpc_credentials_status status, const char *error_details) {
- grpc_call_credentials *c = (grpc_call_credentials *)user_data;
- expected_md emd[] = {{GRPC_AUTHORIZATION_METADATA_KEY, "Bearer blah"}};
- GPR_ASSERT(status == GRPC_CREDENTIALS_OK);
- GPR_ASSERT(error_details == NULL);
- GPR_ASSERT(num_md == 1);
- check_metadata(emd, md_elems, num_md);
- grpc_call_credentials_unref(exec_ctx, c);
-}
-
static void test_access_token_creds(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ expected_md emd[] = {{GRPC_AUTHORIZATION_METADATA_KEY, "Bearer blah"}};
+ request_metadata_state *state =
+ make_request_metadata_state(GRPC_ERROR_NONE, emd, GPR_ARRAY_SIZE(emd));
grpc_call_credentials *creds =
grpc_access_token_credentials_create("blah", NULL);
grpc_auth_metadata_context auth_md_ctx = {test_service_url, test_method, NULL,
NULL};
GPR_ASSERT(strcmp(creds->type, GRPC_CALL_CREDENTIALS_TYPE_OAUTH2) == 0);
- grpc_call_credentials_get_request_metadata(
- &exec_ctx, creds, NULL, auth_md_ctx, check_access_token_metadata, creds);
+ run_request_metadata_test(&exec_ctx, creds, auth_md_ctx, state);
+ grpc_call_credentials_unref(&exec_ctx, creds);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -444,30 +449,20 @@ static void test_channel_oauth2_composite_creds(void) {
grpc_exec_ctx_finish(&exec_ctx);
}
-static void check_oauth2_google_iam_composite_metadata(
- grpc_exec_ctx *exec_ctx, void *user_data, grpc_credentials_md *md_elems,
- size_t num_md, grpc_credentials_status status, const char *error_details) {
- grpc_call_credentials *c = (grpc_call_credentials *)user_data;
+static void test_oauth2_google_iam_composite_creds(void) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
expected_md emd[] = {
{GRPC_AUTHORIZATION_METADATA_KEY, test_oauth2_bearer_token},
{GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
test_google_iam_authorization_token},
{GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
test_google_iam_authority_selector}};
- GPR_ASSERT(status == GRPC_CREDENTIALS_OK);
- GPR_ASSERT(error_details == NULL);
- GPR_ASSERT(num_md == 3);
- check_metadata(emd, md_elems, num_md);
- grpc_call_credentials_unref(exec_ctx, c);
-}
-
-static void test_oauth2_google_iam_composite_creds(void) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- const grpc_call_credentials_array *creds_array;
+ request_metadata_state *state =
+ make_request_metadata_state(GRPC_ERROR_NONE, emd, GPR_ARRAY_SIZE(emd));
grpc_auth_metadata_context auth_md_ctx = {test_service_url, test_method, NULL,
NULL};
grpc_call_credentials *oauth2_creds = grpc_md_only_test_credentials_create(
- "authorization", test_oauth2_bearer_token, 0);
+ &exec_ctx, "authorization", test_oauth2_bearer_token, 0);
grpc_call_credentials *google_iam_creds = grpc_google_iam_credentials_create(
test_google_iam_authorization_token, test_google_iam_authority_selector,
NULL);
@@ -478,16 +473,15 @@ static void test_oauth2_google_iam_composite_creds(void) {
grpc_call_credentials_unref(&exec_ctx, google_iam_creds);
GPR_ASSERT(
strcmp(composite_creds->type, GRPC_CALL_CREDENTIALS_TYPE_COMPOSITE) == 0);
- creds_array =
+ const grpc_call_credentials_array *creds_array =
grpc_composite_call_credentials_get_credentials(composite_creds);
GPR_ASSERT(creds_array->num_creds == 2);
GPR_ASSERT(strcmp(creds_array->creds_array[0]->type,
GRPC_CALL_CREDENTIALS_TYPE_OAUTH2) == 0);
GPR_ASSERT(strcmp(creds_array->creds_array[1]->type,
GRPC_CALL_CREDENTIALS_TYPE_IAM) == 0);
- grpc_call_credentials_get_request_metadata(
- &exec_ctx, composite_creds, NULL, auth_md_ctx,
- check_oauth2_google_iam_composite_metadata, composite_creds);
+ run_request_metadata_test(&exec_ctx, composite_creds, auth_md_ctx, state);
+ grpc_call_credentials_unref(&exec_ctx, composite_creds);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -541,29 +535,6 @@ static void test_channel_oauth2_google_iam_composite_creds(void) {
grpc_exec_ctx_finish(&exec_ctx);
}
-static void on_oauth2_creds_get_metadata_success(
- grpc_exec_ctx *exec_ctx, void *user_data, grpc_credentials_md *md_elems,
- size_t num_md, grpc_credentials_status status, const char *error_details) {
- GPR_ASSERT(status == GRPC_CREDENTIALS_OK);
- GPR_ASSERT(error_details == NULL);
- GPR_ASSERT(num_md == 1);
- GPR_ASSERT(grpc_slice_str_cmp(md_elems[0].key, "authorization") == 0);
- GPR_ASSERT(grpc_slice_str_cmp(md_elems[0].value,
- "Bearer ya29.AHES6ZRN3-HlhAPya30GnW_bHSb_") ==
- 0);
- GPR_ASSERT(user_data != NULL);
- GPR_ASSERT(strcmp((const char *)user_data, test_user_data) == 0);
-}
-
-static void on_oauth2_creds_get_metadata_failure(
- grpc_exec_ctx *exec_ctx, void *user_data, grpc_credentials_md *md_elems,
- size_t num_md, grpc_credentials_status status, const char *error_details) {
- GPR_ASSERT(status == GRPC_CREDENTIALS_ERROR);
- GPR_ASSERT(num_md == 0);
- GPR_ASSERT(user_data != NULL);
- GPR_ASSERT(strcmp((const char *)user_data, test_user_data) == 0);
-}
-
static void validate_compute_engine_http_request(
const grpc_httpcli_request *request) {
GPR_ASSERT(request->handshaker != &grpc_httpcli_ssl);
@@ -616,43 +587,48 @@ static int httpcli_get_should_not_be_called(grpc_exec_ctx *exec_ctx,
static void test_compute_engine_creds_success(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_call_credentials *compute_engine_creds =
+ expected_md emd[] = {
+ {"authorization", "Bearer ya29.AHES6ZRN3-HlhAPya30GnW_bHSb_"}};
+ grpc_call_credentials *creds =
grpc_google_compute_engine_credentials_create(NULL);
grpc_auth_metadata_context auth_md_ctx = {test_service_url, test_method, NULL,
NULL};
/* First request: http get should be called. */
+ request_metadata_state *state =
+ make_request_metadata_state(GRPC_ERROR_NONE, emd, GPR_ARRAY_SIZE(emd));
grpc_httpcli_set_override(compute_engine_httpcli_get_success_override,
httpcli_post_should_not_be_called);
- grpc_call_credentials_get_request_metadata(
- &exec_ctx, compute_engine_creds, NULL, auth_md_ctx,
- on_oauth2_creds_get_metadata_success, (void *)test_user_data);
+ run_request_metadata_test(&exec_ctx, creds, auth_md_ctx, state);
grpc_exec_ctx_flush(&exec_ctx);
/* Second request: the cached token should be served directly. */
+ state =
+ make_request_metadata_state(GRPC_ERROR_NONE, emd, GPR_ARRAY_SIZE(emd));
grpc_httpcli_set_override(httpcli_get_should_not_be_called,
httpcli_post_should_not_be_called);
- grpc_call_credentials_get_request_metadata(
- &exec_ctx, compute_engine_creds, NULL, auth_md_ctx,
- on_oauth2_creds_get_metadata_success, (void *)test_user_data);
- grpc_exec_ctx_finish(&exec_ctx);
+ run_request_metadata_test(&exec_ctx, creds, auth_md_ctx, state);
+ grpc_exec_ctx_flush(&exec_ctx);
- grpc_call_credentials_unref(&exec_ctx, compute_engine_creds);
+ grpc_call_credentials_unref(&exec_ctx, creds);
grpc_httpcli_set_override(NULL, NULL);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_compute_engine_creds_failure(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ request_metadata_state *state = make_request_metadata_state(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Error occured when fetching oauth2 token."),
+ NULL, 0);
grpc_auth_metadata_context auth_md_ctx = {test_service_url, test_method, NULL,
NULL};
- grpc_call_credentials *compute_engine_creds =
+ grpc_call_credentials *creds =
grpc_google_compute_engine_credentials_create(NULL);
grpc_httpcli_set_override(compute_engine_httpcli_get_failure_override,
httpcli_post_should_not_be_called);
- grpc_call_credentials_get_request_metadata(
- &exec_ctx, compute_engine_creds, NULL, auth_md_ctx,
- on_oauth2_creds_get_metadata_failure, (void *)test_user_data);
- grpc_call_credentials_unref(&exec_ctx, compute_engine_creds);
+ run_request_metadata_test(&exec_ctx, creds, auth_md_ctx, state);
+ grpc_call_credentials_unref(&exec_ctx, creds);
grpc_httpcli_set_override(NULL, NULL);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -702,46 +678,48 @@ static int refresh_token_httpcli_post_failure(
static void test_refresh_token_creds_success(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ expected_md emd[] = {
+ {"authorization", "Bearer ya29.AHES6ZRN3-HlhAPya30GnW_bHSb_"}};
grpc_auth_metadata_context auth_md_ctx = {test_service_url, test_method, NULL,
NULL};
- grpc_call_credentials *refresh_token_creds =
- grpc_google_refresh_token_credentials_create(test_refresh_token_str,
- NULL);
+ grpc_call_credentials *creds = grpc_google_refresh_token_credentials_create(
+ test_refresh_token_str, NULL);
/* First request: http get should be called. */
+ request_metadata_state *state =
+ make_request_metadata_state(GRPC_ERROR_NONE, emd, GPR_ARRAY_SIZE(emd));
grpc_httpcli_set_override(httpcli_get_should_not_be_called,
refresh_token_httpcli_post_success);
- grpc_call_credentials_get_request_metadata(
- &exec_ctx, refresh_token_creds, NULL, auth_md_ctx,
- on_oauth2_creds_get_metadata_success, (void *)test_user_data);
+ run_request_metadata_test(&exec_ctx, creds, auth_md_ctx, state);
grpc_exec_ctx_flush(&exec_ctx);
/* Second request: the cached token should be served directly. */
+ state =
+ make_request_metadata_state(GRPC_ERROR_NONE, emd, GPR_ARRAY_SIZE(emd));
grpc_httpcli_set_override(httpcli_get_should_not_be_called,
httpcli_post_should_not_be_called);
- grpc_call_credentials_get_request_metadata(
- &exec_ctx, refresh_token_creds, NULL, auth_md_ctx,
- on_oauth2_creds_get_metadata_success, (void *)test_user_data);
+ run_request_metadata_test(&exec_ctx, creds, auth_md_ctx, state);
grpc_exec_ctx_flush(&exec_ctx);
- grpc_call_credentials_unref(&exec_ctx, refresh_token_creds);
+ grpc_call_credentials_unref(&exec_ctx, creds);
grpc_httpcli_set_override(NULL, NULL);
grpc_exec_ctx_finish(&exec_ctx);
}
static void test_refresh_token_creds_failure(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ request_metadata_state *state = make_request_metadata_state(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Error occured when fetching oauth2 token."),
+ NULL, 0);
grpc_auth_metadata_context auth_md_ctx = {test_service_url, test_method, NULL,
NULL};
- grpc_call_credentials *refresh_token_creds =
- grpc_google_refresh_token_credentials_create(test_refresh_token_str,
- NULL);
+ grpc_call_credentials *creds = grpc_google_refresh_token_credentials_create(
+ test_refresh_token_str, NULL);
grpc_httpcli_set_override(httpcli_get_should_not_be_called,
refresh_token_httpcli_post_failure);
- grpc_call_credentials_get_request_metadata(
- &exec_ctx, refresh_token_creds, NULL, auth_md_ctx,
- on_oauth2_creds_get_metadata_failure, (void *)test_user_data);
- grpc_call_credentials_unref(&exec_ctx, refresh_token_creds);
+ run_request_metadata_test(&exec_ctx, creds, auth_md_ctx, state);
+ grpc_call_credentials_unref(&exec_ctx, creds);
grpc_httpcli_set_override(NULL, NULL);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -792,30 +770,6 @@ static char *encode_and_sign_jwt_should_not_be_called(
return NULL;
}
-static void on_jwt_creds_get_metadata_success(
- grpc_exec_ctx *exec_ctx, void *user_data, grpc_credentials_md *md_elems,
- size_t num_md, grpc_credentials_status status, const char *error_details) {
- char *expected_md_value;
- gpr_asprintf(&expected_md_value, "Bearer %s", test_signed_jwt);
- GPR_ASSERT(status == GRPC_CREDENTIALS_OK);
- GPR_ASSERT(error_details == NULL);
- GPR_ASSERT(num_md == 1);
- GPR_ASSERT(grpc_slice_str_cmp(md_elems[0].key, "authorization") == 0);
- GPR_ASSERT(grpc_slice_str_cmp(md_elems[0].value, expected_md_value) == 0);
- GPR_ASSERT(user_data != NULL);
- GPR_ASSERT(strcmp((const char *)user_data, test_user_data) == 0);
- gpr_free(expected_md_value);
-}
-
-static void on_jwt_creds_get_metadata_failure(
- grpc_exec_ctx *exec_ctx, void *user_data, grpc_credentials_md *md_elems,
- size_t num_md, grpc_credentials_status status, const char *error_details) {
- GPR_ASSERT(status == GRPC_CREDENTIALS_ERROR);
- GPR_ASSERT(num_md == 0);
- GPR_ASSERT(user_data != NULL);
- GPR_ASSERT(strcmp((const char *)user_data, test_user_data) == 0);
-}
-
static grpc_service_account_jwt_access_credentials *creds_as_jwt(
grpc_call_credentials *creds) {
GPR_ASSERT(creds != NULL);
@@ -860,37 +814,42 @@ static void test_jwt_creds_success(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_auth_metadata_context auth_md_ctx = {test_service_url, test_method, NULL,
NULL};
- grpc_call_credentials *jwt_creds =
+ char *expected_md_value;
+ gpr_asprintf(&expected_md_value, "Bearer %s", test_signed_jwt);
+ expected_md emd[] = {{"authorization", expected_md_value}};
+ grpc_call_credentials *creds =
grpc_service_account_jwt_access_credentials_create(
json_key_string, grpc_max_auth_token_lifetime(), NULL);
/* First request: jwt_encode_and_sign should be called. */
+ request_metadata_state *state =
+ make_request_metadata_state(GRPC_ERROR_NONE, emd, GPR_ARRAY_SIZE(emd));
grpc_jwt_encode_and_sign_set_override(encode_and_sign_jwt_success);
- grpc_call_credentials_get_request_metadata(
- &exec_ctx, jwt_creds, NULL, auth_md_ctx,
- on_jwt_creds_get_metadata_success, (void *)test_user_data);
+ run_request_metadata_test(&exec_ctx, creds, auth_md_ctx, state);
grpc_exec_ctx_flush(&exec_ctx);
/* Second request: the cached token should be served directly. */
+ state =
+ make_request_metadata_state(GRPC_ERROR_NONE, emd, GPR_ARRAY_SIZE(emd));
grpc_jwt_encode_and_sign_set_override(
encode_and_sign_jwt_should_not_be_called);
- grpc_call_credentials_get_request_metadata(
- &exec_ctx, jwt_creds, NULL, auth_md_ctx,
- on_jwt_creds_get_metadata_success, (void *)test_user_data);
+ run_request_metadata_test(&exec_ctx, creds, auth_md_ctx, state);
grpc_exec_ctx_flush(&exec_ctx);
/* Third request: Different service url so jwt_encode_and_sign should be
called again (no caching). */
+ state =
+ make_request_metadata_state(GRPC_ERROR_NONE, emd, GPR_ARRAY_SIZE(emd));
auth_md_ctx.service_url = other_test_service_url;
grpc_jwt_encode_and_sign_set_override(encode_and_sign_jwt_success);
- grpc_call_credentials_get_request_metadata(
- &exec_ctx, jwt_creds, NULL, auth_md_ctx,
- on_jwt_creds_get_metadata_success, (void *)test_user_data);
+ run_request_metadata_test(&exec_ctx, creds, auth_md_ctx, state);
grpc_exec_ctx_flush(&exec_ctx);
+ grpc_call_credentials_unref(&exec_ctx, creds);
gpr_free(json_key_string);
- grpc_call_credentials_unref(&exec_ctx, jwt_creds);
+ gpr_free(expected_md_value);
grpc_jwt_encode_and_sign_set_override(NULL);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_jwt_creds_signing_failure(void) {
@@ -898,17 +857,17 @@ static void test_jwt_creds_signing_failure(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_auth_metadata_context auth_md_ctx = {test_service_url, test_method, NULL,
NULL};
- grpc_call_credentials *jwt_creds =
+ request_metadata_state *state = make_request_metadata_state(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Could not generate JWT."), NULL, 0);
+ grpc_call_credentials *creds =
grpc_service_account_jwt_access_credentials_create(
json_key_string, grpc_max_auth_token_lifetime(), NULL);
grpc_jwt_encode_and_sign_set_override(encode_and_sign_jwt_failure);
- grpc_call_credentials_get_request_metadata(
- &exec_ctx, jwt_creds, NULL, auth_md_ctx,
- on_jwt_creds_get_metadata_failure, (void *)test_user_data);
+ run_request_metadata_test(&exec_ctx, creds, auth_md_ctx, state);
gpr_free(json_key_string);
- grpc_call_credentials_unref(&exec_ctx, jwt_creds);
+ grpc_call_credentials_unref(&exec_ctx, creds);
grpc_jwt_encode_and_sign_set_override(NULL);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -986,8 +945,10 @@ static char *null_well_known_creds_path_getter(void) { return NULL; }
static void test_google_default_creds_gce(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_composite_channel_credentials *creds;
- grpc_channel_credentials *cached_creds;
+ expected_md emd[] = {
+ {"authorization", "Bearer ya29.AHES6ZRN3-HlhAPya30GnW_bHSb_"}};
+ request_metadata_state *state =
+ make_request_metadata_state(GRPC_ERROR_NONE, emd, GPR_ARRAY_SIZE(emd));
grpc_auth_metadata_context auth_md_ctx = {test_service_url, test_method, NULL,
NULL};
grpc_flush_cached_google_default_credentials();
@@ -999,33 +960,33 @@ static void test_google_default_creds_gce(void) {
grpc_httpcli_set_override(
default_creds_gce_detection_httpcli_get_success_override,
httpcli_post_should_not_be_called);
- creds = (grpc_composite_channel_credentials *)
- grpc_google_default_credentials_create();
+ grpc_composite_channel_credentials *creds =
+ (grpc_composite_channel_credentials *)
+ grpc_google_default_credentials_create();
/* Verify that the default creds actually embeds a GCE creds. */
GPR_ASSERT(creds != NULL);
GPR_ASSERT(creds->call_creds != NULL);
grpc_httpcli_set_override(compute_engine_httpcli_get_success_override,
httpcli_post_should_not_be_called);
- grpc_call_credentials_get_request_metadata(
- &exec_ctx, creds->call_creds, NULL, auth_md_ctx,
- on_oauth2_creds_get_metadata_success, (void *)test_user_data);
+ run_request_metadata_test(&exec_ctx, creds->call_creds, auth_md_ctx, state);
grpc_exec_ctx_flush(&exec_ctx);
- grpc_exec_ctx_finish(&exec_ctx);
/* Check that we get a cached creds if we call
grpc_google_default_credentials_create again.
GCE detection should not occur anymore either. */
grpc_httpcli_set_override(httpcli_get_should_not_be_called,
httpcli_post_should_not_be_called);
- cached_creds = grpc_google_default_credentials_create();
+ grpc_channel_credentials *cached_creds =
+ grpc_google_default_credentials_create();
GPR_ASSERT(cached_creds == &creds->base);
/* Cleanup. */
- grpc_channel_credentials_release(cached_creds);
- grpc_channel_credentials_release(&creds->base);
+ grpc_channel_credentials_unref(&exec_ctx, cached_creds);
+ grpc_channel_credentials_unref(&exec_ctx, &creds->base);
grpc_httpcli_set_override(NULL, NULL);
grpc_override_well_known_credentials_path_getter(NULL);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static int default_creds_gce_detection_httpcli_get_failure_override(
@@ -1068,12 +1029,7 @@ typedef enum {
PLUGIN_DESTROY_CALLED_STATE
} plugin_state;
-typedef struct {
- const char *key;
- const char *value;
-} plugin_metadata;
-
-static const plugin_metadata plugin_md[] = {{"foo", "bar"}, {"hi", "there"}};
+static const expected_md plugin_md[] = {{"foo", "bar"}, {"hi", "there"}};
static void plugin_get_metadata_success(void *state,
grpc_auth_metadata_context context,
@@ -1110,79 +1066,60 @@ static void plugin_get_metadata_failure(void *state,
cb(user_data, NULL, 0, GRPC_STATUS_UNAUTHENTICATED, plugin_error_details);
}
-static void on_plugin_metadata_received_success(
- grpc_exec_ctx *exec_ctx, void *user_data, grpc_credentials_md *md_elems,
- size_t num_md, grpc_credentials_status status, const char *error_details) {
- size_t i = 0;
- GPR_ASSERT(user_data == NULL);
- GPR_ASSERT(md_elems != NULL);
- GPR_ASSERT(num_md == GPR_ARRAY_SIZE(plugin_md));
- for (i = 0; i < num_md; i++) {
- GPR_ASSERT(grpc_slice_str_cmp(md_elems[i].key, plugin_md[i].key) == 0);
- GPR_ASSERT(grpc_slice_str_cmp(md_elems[i].value, plugin_md[i].value) == 0);
- }
-}
-
-static void on_plugin_metadata_received_failure(
- grpc_exec_ctx *exec_ctx, void *user_data, grpc_credentials_md *md_elems,
- size_t num_md, grpc_credentials_status status, const char *error_details) {
- GPR_ASSERT(user_data == NULL);
- GPR_ASSERT(md_elems == NULL);
- GPR_ASSERT(num_md == 0);
- GPR_ASSERT(status == GRPC_CREDENTIALS_ERROR);
- GPR_ASSERT(error_details != NULL);
- GPR_ASSERT(strcmp(error_details, plugin_error_details) == 0);
-}
-
static void plugin_destroy(void *state) {
plugin_state *s = (plugin_state *)state;
*s = PLUGIN_DESTROY_CALLED_STATE;
}
static void test_metadata_plugin_success(void) {
- grpc_call_credentials *creds;
plugin_state state = PLUGIN_INITIAL_STATE;
grpc_metadata_credentials_plugin plugin;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_auth_metadata_context auth_md_ctx = {test_service_url, test_method, NULL,
NULL};
+ request_metadata_state *md_state = make_request_metadata_state(
+ GRPC_ERROR_NONE, plugin_md, GPR_ARRAY_SIZE(plugin_md));
plugin.state = &state;
plugin.get_metadata = plugin_get_metadata_success;
plugin.destroy = plugin_destroy;
- creds = grpc_metadata_credentials_create_from_plugin(plugin, NULL);
+ grpc_call_credentials *creds =
+ grpc_metadata_credentials_create_from_plugin(plugin, NULL);
GPR_ASSERT(state == PLUGIN_INITIAL_STATE);
- grpc_call_credentials_get_request_metadata(
- &exec_ctx, creds, NULL, auth_md_ctx, on_plugin_metadata_received_success,
- NULL);
+ run_request_metadata_test(&exec_ctx, creds, auth_md_ctx, md_state);
GPR_ASSERT(state == PLUGIN_GET_METADATA_CALLED_STATE);
- grpc_call_credentials_release(creds);
- GPR_ASSERT(state == PLUGIN_DESTROY_CALLED_STATE);
+ grpc_call_credentials_unref(&exec_ctx, creds);
grpc_exec_ctx_finish(&exec_ctx);
+ GPR_ASSERT(state == PLUGIN_DESTROY_CALLED_STATE);
}
static void test_metadata_plugin_failure(void) {
- grpc_call_credentials *creds;
plugin_state state = PLUGIN_INITIAL_STATE;
grpc_metadata_credentials_plugin plugin;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_auth_metadata_context auth_md_ctx = {test_service_url, test_method, NULL,
NULL};
+ char *expected_error;
+ gpr_asprintf(&expected_error,
+ "Getting metadata from plugin failed with error: %s",
+ plugin_error_details);
+ request_metadata_state *md_state = make_request_metadata_state(
+ GRPC_ERROR_CREATE_FROM_COPIED_STRING(expected_error), NULL, 0);
+ gpr_free(expected_error);
plugin.state = &state;
plugin.get_metadata = plugin_get_metadata_failure;
plugin.destroy = plugin_destroy;
- creds = grpc_metadata_credentials_create_from_plugin(plugin, NULL);
+ grpc_call_credentials *creds =
+ grpc_metadata_credentials_create_from_plugin(plugin, NULL);
GPR_ASSERT(state == PLUGIN_INITIAL_STATE);
- grpc_call_credentials_get_request_metadata(
- &exec_ctx, creds, NULL, auth_md_ctx, on_plugin_metadata_received_failure,
- NULL);
+ run_request_metadata_test(&exec_ctx, creds, auth_md_ctx, md_state);
GPR_ASSERT(state == PLUGIN_GET_METADATA_CALLED_STATE);
- grpc_call_credentials_release(creds);
- GPR_ASSERT(state == PLUGIN_DESTROY_CALLED_STATE);
+ grpc_call_credentials_unref(&exec_ctx, creds);
grpc_exec_ctx_finish(&exec_ctx);
+ GPR_ASSERT(state == PLUGIN_DESTROY_CALLED_STATE);
}
static void test_get_well_known_google_credentials_file_path(void) {
@@ -1233,12 +1170,9 @@ static void test_channel_creds_duplicate_without_call_creds(void) {
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_init();
- test_empty_md_store();
- test_ref_unref_empty_md_store();
- test_add_to_empty_md_store();
- test_add_cstrings_to_empty_md_store();
- test_empty_preallocated_md_store();
- test_add_abunch_to_md_store();
+ test_empty_md_array();
+ test_add_to_empty_md_array();
+ test_add_abunch_to_md_array();
test_oauth2_token_fetcher_creds_parsing_ok();
test_oauth2_token_fetcher_creds_parsing_bad_http_status();
test_oauth2_token_fetcher_creds_parsing_empty_http_body();
diff --git a/test/core/security/oauth2_utils.c b/test/core/security/oauth2_utils.c
index e2331fbd97..fdbc6ea741 100644
--- a/test/core/security/oauth2_utils.c
+++ b/test/core/security/oauth2_utils.c
@@ -32,29 +32,31 @@
typedef struct {
gpr_mu *mu;
grpc_polling_entity pops;
- int is_done;
+ bool is_done;
char *token;
+
+ grpc_credentials_mdelem_array md_array;
+ grpc_closure closure;
} oauth2_request;
-static void on_oauth2_response(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_credentials_md *md_elems, size_t num_md,
- grpc_credentials_status status,
- const char *error_details) {
- oauth2_request *request = (oauth2_request *)user_data;
+static void on_oauth2_response(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ oauth2_request *request = (oauth2_request *)arg;
char *token = NULL;
grpc_slice token_slice;
- if (status == GRPC_CREDENTIALS_ERROR) {
- gpr_log(GPR_ERROR, "Fetching token failed.");
+ if (error != GRPC_ERROR_NONE) {
+ gpr_log(GPR_ERROR, "Fetching token failed: %s", grpc_error_string(error));
} else {
- GPR_ASSERT(num_md == 1);
- token_slice = md_elems[0].value;
+ GPR_ASSERT(request->md_array.size == 1);
+ token_slice = GRPC_MDVALUE(request->md_array.md[0]);
token = (char *)gpr_malloc(GRPC_SLICE_LENGTH(token_slice) + 1);
memcpy(token, GRPC_SLICE_START_PTR(token_slice),
GRPC_SLICE_LENGTH(token_slice));
token[GRPC_SLICE_LENGTH(token_slice)] = '\0';
}
+ grpc_credentials_mdelem_array_destroy(exec_ctx, &request->md_array);
gpr_mu_lock(request->mu);
- request->is_done = 1;
+ request->is_done = true;
request->token = token;
GRPC_LOG_IF_ERROR(
"pollset_kick",
@@ -68,6 +70,7 @@ static void do_nothing(grpc_exec_ctx *exec_ctx, void *unused,
char *grpc_test_fetch_oauth2_token_with_credentials(
grpc_call_credentials *creds) {
oauth2_request request;
+ memset(&request, 0, sizeof(request));
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_closure do_nothing_closure;
grpc_auth_metadata_context null_ctx = {"", "", NULL, NULL};
@@ -75,15 +78,23 @@ char *grpc_test_fetch_oauth2_token_with_credentials(
grpc_pollset *pollset = (grpc_pollset *)gpr_zalloc(grpc_pollset_size());
grpc_pollset_init(pollset, &request.mu);
request.pops = grpc_polling_entity_create_from_pollset(pollset);
- request.is_done = 0;
+ request.is_done = false;
GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, NULL,
grpc_schedule_on_exec_ctx);
- grpc_call_credentials_get_request_metadata(
- &exec_ctx, creds, &request.pops, null_ctx, on_oauth2_response, &request);
+ GRPC_CLOSURE_INIT(&request.closure, on_oauth2_response, &request,
+ grpc_schedule_on_exec_ctx);
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (grpc_call_credentials_get_request_metadata(
+ &exec_ctx, creds, &request.pops, null_ctx, &request.md_array,
+ &request.closure, &error)) {
+ // Synchronous result; invoke callback directly.
+ on_oauth2_response(&exec_ctx, &request, error);
+ GRPC_ERROR_UNREF(error);
+ }
+ grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(request.mu);
while (!request.is_done) {
@@ -94,7 +105,7 @@ char *grpc_test_fetch_oauth2_token_with_credentials(
grpc_polling_entity_pollset(&request.pops),
&worker, gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC)))) {
- request.is_done = 1;
+ request.is_done = true;
}
}
gpr_mu_unlock(request.mu);
diff --git a/test/core/security/print_google_default_creds_token.c b/test/core/security/print_google_default_creds_token.c
index 3c3d3a7c10..e1385a80ca 100644
--- a/test/core/security/print_google_default_creds_token.c
+++ b/test/core/security/print_google_default_creds_token.c
@@ -35,25 +35,26 @@
typedef struct {
gpr_mu *mu;
grpc_polling_entity pops;
- int is_done;
+ bool is_done;
+
+ grpc_credentials_mdelem_array md_array;
+ grpc_closure on_request_metadata;
} synchronizer;
-static void on_metadata_response(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_credentials_md *md_elems, size_t num_md,
- grpc_credentials_status status,
- const char *error_details) {
- synchronizer *sync = user_data;
- if (status == GRPC_CREDENTIALS_ERROR) {
- fprintf(stderr, "Fetching token failed.\n");
+static void on_metadata_response(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ synchronizer *sync = arg;
+ if (error != GRPC_ERROR_NONE) {
+ fprintf(stderr, "Fetching token failed: %s\n", grpc_error_string(error));
} else {
char *token;
- GPR_ASSERT(num_md == 1);
- token = grpc_slice_to_c_string(md_elems[0].value);
+ GPR_ASSERT(sync->md_array.size == 1);
+ token = grpc_slice_to_c_string(GRPC_MDVALUE(sync->md_array.md[0]));
printf("\nGot token: %s\n\n", token);
gpr_free(token);
}
gpr_mu_lock(sync->mu);
- sync->is_done = 1;
+ sync->is_done = true;
GRPC_LOG_IF_ERROR(
"pollset_kick",
grpc_pollset_kick(grpc_polling_entity_pollset(&sync->pops), NULL));
@@ -83,14 +84,23 @@ int main(int argc, char **argv) {
goto end;
}
+ memset(&sync, 0, sizeof(sync));
grpc_pollset *pollset = gpr_zalloc(grpc_pollset_size());
grpc_pollset_init(pollset, &sync.mu);
sync.pops = grpc_polling_entity_create_from_pollset(pollset);
- sync.is_done = 0;
+ sync.is_done = false;
+ GRPC_CLOSURE_INIT(&sync.on_request_metadata, on_metadata_response, &sync,
+ grpc_schedule_on_exec_ctx);
- grpc_call_credentials_get_request_metadata(
- &exec_ctx, ((grpc_composite_channel_credentials *)creds)->call_creds,
- &sync.pops, context, on_metadata_response, &sync);
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (grpc_call_credentials_get_request_metadata(
+ &exec_ctx, ((grpc_composite_channel_credentials *)creds)->call_creds,
+ &sync.pops, context, &sync.md_array, &sync.on_request_metadata,
+ &error)) {
+ // Synchronous response. Invoke callback directly.
+ on_metadata_response(&exec_ctx, &sync, error);
+ GRPC_ERROR_UNREF(error);
+ }
gpr_mu_lock(sync.mu);
while (!sync.is_done) {
@@ -101,7 +111,7 @@ int main(int argc, char **argv) {
grpc_polling_entity_pollset(&sync.pops), &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC))))
- sync.is_done = 1;
+ sync.is_done = true;
gpr_mu_unlock(sync.mu);
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(sync.mu);
diff --git a/test/core/surface/completion_queue_threading_test.c b/test/core/surface/completion_queue_threading_test.c
index 366565fb35..99d0fa4980 100644
--- a/test/core/surface/completion_queue_threading_test.c
+++ b/test/core/surface/completion_queue_threading_test.c
@@ -190,7 +190,8 @@ static void consumer_thread(void *arg) {
gpr_log(GPR_INFO, "consumer %d phase 2", opt->id);
for (;;) {
- ev = grpc_completion_queue_next(opt->cc, ten_seconds_time(), NULL);
+ ev = grpc_completion_queue_next(opt->cc,
+ gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL);
switch (ev.type) {
case GRPC_OP_COMPLETE:
GPR_ASSERT(ev.success);
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 8d12971bc1..8bada48a2b 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -1565,7 +1565,9 @@ TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) {
Status s = stub_->Echo(&context, request, &response);
EXPECT_FALSE(s.ok());
EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
- EXPECT_EQ(s.error_message(), kTestCredsPluginErrorMsg);
+ EXPECT_EQ(s.error_message(),
+ grpc::string("Getting metadata from plugin failed with error: ") +
+ kTestCredsPluginErrorMsg);
}
TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) {
@@ -1624,7 +1626,9 @@ TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginFailure) {
Status s = stub_->Echo(&context, request, &response);
EXPECT_FALSE(s.ok());
EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
- EXPECT_EQ(s.error_message(), kTestCredsPluginErrorMsg);
+ EXPECT_EQ(s.error_message(),
+ grpc::string("Getting metadata from plugin failed with error: ") +
+ kTestCredsPluginErrorMsg);
}
TEST_P(SecureEnd2endTest, ClientAuthContext) {
diff --git a/test/cpp/microbenchmarks/bm_pollset.cc b/test/cpp/microbenchmarks/bm_pollset.cc
index 683f4703c2..1fc1f2f83b 100644
--- a/test/cpp/microbenchmarks/bm_pollset.cc
+++ b/test/cpp/microbenchmarks/bm_pollset.cc
@@ -149,7 +149,7 @@ static void BM_PollAddFd(benchmark::State& state) {
grpc_pollset_add_fd(&exec_ctx, ps, fd);
grpc_exec_ctx_flush(&exec_ctx);
}
- grpc_fd_orphan(&exec_ctx, fd, NULL, NULL, "xxx");
+ grpc_fd_orphan(&exec_ctx, fd, NULL, NULL, false /* already_closed */, "xxx");
grpc_closure shutdown_ps_closure;
GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps,
grpc_schedule_on_exec_ctx);
@@ -247,7 +247,8 @@ static void BM_SingleThreadPollOneFd(benchmark::State& state) {
while (!done) {
GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, NULL, now, deadline));
}
- grpc_fd_orphan(&exec_ctx, wakeup, NULL, NULL, "done");
+ grpc_fd_orphan(&exec_ctx, wakeup, NULL, NULL, false /* already_closed */,
+ "done");
wakeup_fd.read_fd = 0;
grpc_closure shutdown_ps_closure;
GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps,
diff --git a/tools/internal_ci/helper_scripts/prepare_build_interop_rc b/tools/internal_ci/helper_scripts/prepare_build_interop_rc
index cc956b8075..859ce621f9 100644
--- a/tools/internal_ci/helper_scripts/prepare_build_interop_rc
+++ b/tools/internal_ci/helper_scripts/prepare_build_interop_rc
@@ -26,3 +26,8 @@ git submodule update --init
# Set up gRPC-Go and gRPC-Java to test
git clone --recursive https://github.com/grpc/grpc-go ./../grpc-go
git clone --recursive https://github.com/grpc/grpc-java ./../grpc-java
+
+# Download json file.
+mkdir ~/service_account
+gsutil cp gs://grpc-testing-secrets/interop/service_account/GrpcTesting-726eb1347f15.json ~/service_account
+export GOOGLE_APPLICATION_CREDENTIALS=~/service_account/GrpcTesting-726eb1347f15.json
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index 66b26dab2b..24e7ff2b7b 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -7339,6 +7339,7 @@
"test/core/end2end/tests/payload.c",
"test/core/end2end/tests/ping.c",
"test/core/end2end/tests/ping_pong_streaming.c",
+ "test/core/end2end/tests/proxy_auth.c",
"test/core/end2end/tests/registered_call.c",
"test/core/end2end/tests/request_with_flags.c",
"test/core/end2end/tests/request_with_payload.c",
@@ -7416,6 +7417,7 @@
"test/core/end2end/tests/payload.c",
"test/core/end2end/tests/ping.c",
"test/core/end2end/tests/ping_pong_streaming.c",
+ "test/core/end2end/tests/proxy_auth.c",
"test/core/end2end/tests/registered_call.c",
"test/core/end2end/tests/request_with_flags.c",
"test/core/end2end/tests/request_with_payload.c",
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index c2ebaa6614..ebf30717d4 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -3993,7 +3993,8 @@
"mac",
"posix",
"windows"
- ]
+ ],
+ "timeout_seconds": 1200
},
{
"args": [],
@@ -16353,6 +16354,30 @@
},
{
"args": [
+ "proxy_auth"
+ ],
+ "ci_platforms": [
+ "windows",
+ "linux",
+ "posix"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [
+ "uv"
+ ],
+ "flaky": false,
+ "language": "c",
+ "name": "h2_http_proxy_test",
+ "platforms": [
+ "windows",
+ "linux",
+ "mac",
+ "posix"
+ ]
+ },
+ {
+ "args": [
"registered_call"
],
"ci_platforms": [
@@ -38866,6 +38891,30 @@
},
{
"args": [
+ "proxy_auth"
+ ],
+ "ci_platforms": [
+ "windows",
+ "linux",
+ "posix"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [
+ "uv"
+ ],
+ "flaky": false,
+ "language": "c",
+ "name": "h2_http_proxy_nosec_test",
+ "platforms": [
+ "windows",
+ "linux",
+ "mac",
+ "posix"
+ ]
+ },
+ {
+ "args": [
"registered_call"
],
"ci_platforms": [
diff --git a/vsprojects/vcxproj/test/end2end/tests/end2end_nosec_tests/end2end_nosec_tests.vcxproj b/vsprojects/vcxproj/test/end2end/tests/end2end_nosec_tests/end2end_nosec_tests.vcxproj
index 4d02c88082..249d99b526 100644
--- a/vsprojects/vcxproj/test/end2end/tests/end2end_nosec_tests/end2end_nosec_tests.vcxproj
+++ b/vsprojects/vcxproj/test/end2end/tests/end2end_nosec_tests/end2end_nosec_tests.vcxproj
@@ -231,6 +231,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\ping_pong_streaming.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\proxy_auth.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\registered_call.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\request_with_flags.c">
diff --git a/vsprojects/vcxproj/test/end2end/tests/end2end_nosec_tests/end2end_nosec_tests.vcxproj.filters b/vsprojects/vcxproj/test/end2end/tests/end2end_nosec_tests/end2end_nosec_tests.vcxproj.filters
index 89316bc535..3a2105ebe8 100644
--- a/vsprojects/vcxproj/test/end2end/tests/end2end_nosec_tests/end2end_nosec_tests.vcxproj.filters
+++ b/vsprojects/vcxproj/test/end2end/tests/end2end_nosec_tests/end2end_nosec_tests.vcxproj.filters
@@ -121,6 +121,9 @@
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\ping_pong_streaming.c">
<Filter>test\core\end2end\tests</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\proxy_auth.c">
+ <Filter>test\core\end2end\tests</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\registered_call.c">
<Filter>test\core\end2end\tests</Filter>
</ClCompile>
diff --git a/vsprojects/vcxproj/test/end2end/tests/end2end_tests/end2end_tests.vcxproj b/vsprojects/vcxproj/test/end2end/tests/end2end_tests/end2end_tests.vcxproj
index 9573111452..b7a2ecd27b 100644
--- a/vsprojects/vcxproj/test/end2end/tests/end2end_tests/end2end_tests.vcxproj
+++ b/vsprojects/vcxproj/test/end2end/tests/end2end_tests/end2end_tests.vcxproj
@@ -233,6 +233,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\ping_pong_streaming.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\proxy_auth.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\registered_call.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\request_with_flags.c">
diff --git a/vsprojects/vcxproj/test/end2end/tests/end2end_tests/end2end_tests.vcxproj.filters b/vsprojects/vcxproj/test/end2end/tests/end2end_tests/end2end_tests.vcxproj.filters
index 7d02fc3fa0..1626b77d14 100644
--- a/vsprojects/vcxproj/test/end2end/tests/end2end_tests/end2end_tests.vcxproj.filters
+++ b/vsprojects/vcxproj/test/end2end/tests/end2end_tests/end2end_tests.vcxproj.filters
@@ -124,6 +124,9 @@
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\ping_pong_streaming.c">
<Filter>test\core\end2end\tests</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\proxy_auth.c">
+ <Filter>test\core\end2end\tests</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\registered_call.c">
<Filter>test\core\end2end\tests</Filter>
</ClCompile>