aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/census_filter.c8
-rw-r--r--src/core/client_config/subchannel.c4
-rw-r--r--src/core/iomgr/iomgr.c39
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c3
-rw-r--r--src/core/iomgr/pollset_posix.c9
-rw-r--r--src/core/iomgr/pollset_windows.c6
-rw-r--r--src/core/iomgr/socket_windows.c5
-rw-r--r--src/core/iomgr/tcp_client_posix.c10
-rw-r--r--src/core/iomgr/tcp_client_windows.c3
-rw-r--r--src/core/profiling/timers_preciseclock.h2
-rw-r--r--src/core/security/client_auth_filter.c18
-rw-r--r--src/core/security/credentials.c123
-rw-r--r--src/core/security/credentials.h112
-rw-r--r--src/core/security/google_default_credentials.c89
-rw-r--r--src/core/security/json_token.c54
-rw-r--r--src/core/security/json_token.h15
-rw-r--r--src/core/security/jwt_verifier.c40
-rw-r--r--src/core/security/security_context.c14
-rw-r--r--src/core/security/security_context.h8
-rw-r--r--src/core/statistics/census_rpc_stats.c4
-rw-r--r--src/core/statistics/census_tracing.c16
-rw-r--r--src/core/statistics/window_stats.h6
-rw-r--r--src/core/support/cancellable.c4
-rw-r--r--src/core/support/log_linux.c2
-rw-r--r--src/core/support/log_posix.c2
-rw-r--r--src/core/support/log_win32.c2
-rw-r--r--src/core/support/sync_win32.c2
-rw-r--r--src/core/support/time_posix.c44
-rw-r--r--src/core/support/time_win32.c35
-rw-r--r--src/core/surface/call.c60
-rw-r--r--src/core/surface/completion_queue.c232
-rw-r--r--src/core/surface/completion_queue.h18
-rw-r--r--src/core/surface/init.c2
-rw-r--r--src/core/surface/server.c207
-rw-r--r--src/core/surface/version.c41
-rw-r--r--src/core/transport/chttp2/parsing.c5
-rw-r--r--src/core/transport/chttp2/stream_encoder.c3
-rw-r--r--src/core/transport/chttp2_transport.c8
-rw-r--r--src/core/transport/metadata.c20
39 files changed, 766 insertions, 509 deletions
diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c
index 43ef5fb3ff..d996c3475e 100644
--- a/src/core/channel/census_filter.c
+++ b/src/core/channel/census_filter.c
@@ -151,7 +151,7 @@ static void client_init_call_elem(grpc_call_element* elem,
call_data* d = elem->call_data;
GPR_ASSERT(d != NULL);
init_rpc_stats(&d->stats);
- d->start_ts = gpr_now();
+ d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
d->op_id = census_tracing_start_op();
if (initial_op) client_mutate_op(elem, initial_op);
}
@@ -169,7 +169,7 @@ static void server_init_call_elem(grpc_call_element* elem,
call_data* d = elem->call_data;
GPR_ASSERT(d != NULL);
init_rpc_stats(&d->stats);
- d->start_ts = gpr_now();
+ d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
d->op_id = census_tracing_start_op();
if (initial_op) server_mutate_op(elem, initial_op);
}
@@ -177,8 +177,8 @@ static void server_init_call_elem(grpc_call_element* elem,
static void server_destroy_call_elem(grpc_call_element* elem) {
call_data* d = elem->call_data;
GPR_ASSERT(d != NULL);
- d->stats.elapsed_time_ms =
- gpr_timespec_to_micros(gpr_time_sub(gpr_now(), d->start_ts));
+ d->stats.elapsed_time_ms = gpr_timespec_to_micros(
+ gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), d->start_ts));
census_record_rpc_server_stats(d->op_id, &d->stats);
census_tracing_end_op(d->op_id);
}
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 6cf9062ab0..8cdad1015f 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -300,7 +300,7 @@ static void continue_connect(grpc_subchannel *c) {
}
static void start_connect(grpc_subchannel *c) {
- gpr_timespec now = gpr_now();
+ gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
c->next_attempt = now;
c->backoff_delta = gpr_time_from_seconds(1);
@@ -585,7 +585,7 @@ static void subchannel_connected(void *arg, int iomgr_success) {
c->have_alarm = 1;
c->next_attempt = gpr_time_add(c->next_attempt, c->backoff_delta);
c->backoff_delta = gpr_time_add(c->backoff_delta, c->backoff_delta);
- grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, gpr_now());
+ grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, gpr_now(GPR_CLOCK_REALTIME));
gpr_mu_unlock(&c->mu);
}
}
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index c507e7c26a..cca92d3b32 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -59,7 +59,7 @@ static void background_callback_executor(void *ignored) {
while (!g_shutdown) {
gpr_timespec deadline = gpr_inf_future;
gpr_timespec short_deadline =
- gpr_time_add(gpr_now(), gpr_time_from_millis(100));
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100));
if (g_cbs_head) {
grpc_iomgr_closure *closure = g_cbs_head;
g_cbs_head = closure->next;
@@ -67,7 +67,8 @@ static void background_callback_executor(void *ignored) {
gpr_mu_unlock(&g_mu);
closure->cb(closure->cb_arg, closure->success);
gpr_mu_lock(&g_mu);
- } else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) {
+ } else if (grpc_alarm_check(&g_mu, gpr_now(GPR_CLOCK_REALTIME),
+ &deadline)) {
} else {
gpr_mu_unlock(&g_mu);
gpr_sleep_until(gpr_time_min(short_deadline, deadline));
@@ -89,7 +90,7 @@ void grpc_iomgr_init(void) {
gpr_thd_id id;
gpr_mu_init(&g_mu);
gpr_cv_init(&g_rcv);
- grpc_alarm_list_init(gpr_now());
+ grpc_alarm_list_init(gpr_now(GPR_CLOCK_REALTIME));
g_root_object.next = g_root_object.prev = &g_root_object;
g_root_object.name = "root";
grpc_iomgr_platform_init();
@@ -110,21 +111,27 @@ void grpc_iomgr_shutdown(void) {
grpc_iomgr_object *obj;
grpc_iomgr_closure *closure;
gpr_timespec shutdown_deadline =
- gpr_time_add(gpr_now(), gpr_time_from_seconds(10));
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10));
+ gpr_timespec last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
gpr_mu_lock(&g_mu);
g_shutdown = 1;
while (g_cbs_head != NULL || g_root_object.next != &g_root_object) {
- if (g_cbs_head != NULL && g_root_object.next != &g_root_object) {
- gpr_log(GPR_DEBUG,
- "Waiting for %d iomgr objects to be destroyed and executing "
- "final callbacks",
- count_objects());
- } else if (g_cbs_head != NULL) {
- gpr_log(GPR_DEBUG, "Executing final iomgr callbacks");
- } else {
- gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed",
- count_objects());
+ if (gpr_time_cmp(
+ gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), last_warning_time),
+ gpr_time_from_seconds(1)) >= 0) {
+ if (g_cbs_head != NULL && g_root_object.next != &g_root_object) {
+ gpr_log(GPR_DEBUG,
+ "Waiting for %d iomgr objects to be destroyed and executing "
+ "final callbacks",
+ count_objects());
+ } else if (g_cbs_head != NULL) {
+ gpr_log(GPR_DEBUG, "Executing final iomgr callbacks");
+ } else {
+ gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed",
+ count_objects());
+ }
+ last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
}
if (g_cbs_head) {
do {
@@ -145,9 +152,9 @@ void grpc_iomgr_shutdown(void) {
if (g_root_object.next != &g_root_object) {
int timeout = 0;
gpr_timespec short_deadline =
- gpr_time_add(gpr_now(), gpr_time_from_millis(100));
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100));
while (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) {
- if (gpr_time_cmp(gpr_now(), shutdown_deadline) > 0) {
+ if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) {
timeout = 1;
break;
}
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 1900bbf9e1..3746c8edaf 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -105,10 +105,11 @@ static void multipoll_with_epoll_pollset_maybe_work(
* here.
*/
- timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now);
pollset->counter += 1;
gpr_mu_unlock(&pollset->mu);
+ timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now);
+
do {
ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms);
if (ep_rv < 0) {
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 12496440de..85101764d2 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -122,7 +122,7 @@ static void finish_shutdown(grpc_pollset *pollset) {
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
/* pollset->mu already held */
- gpr_timespec now = gpr_now();
+ gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
if (gpr_time_cmp(now, deadline) > 0) {
return 0;
}
@@ -187,15 +187,16 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
}
-int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now) {
+int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
+ gpr_timespec now) {
gpr_timespec timeout;
static const int max_spin_polling_us = 10;
if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
return -1;
}
if (gpr_time_cmp(
- deadline,
- gpr_time_add(now, gpr_time_from_micros(max_spin_polling_us))) <= 0) {
+ deadline,
+ gpr_time_add(now, gpr_time_from_micros(max_spin_polling_us))) <= 0) {
return 0;
}
timeout = gpr_time_sub(deadline, now);
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index 8d6bc79c96..24226cc980 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -70,7 +70,7 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
gpr_timespec now;
- now = gpr_now();
+ now = gpr_now(GPR_CLOCK_REALTIME);
if (gpr_time_cmp(now, deadline) > 0) {
return 0 /* GPR_FALSE */;
}
@@ -86,8 +86,6 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
return 1 /* GPR_TRUE */;
}
-void grpc_pollset_kick(grpc_pollset *p) {
- gpr_cv_signal(&p->cv);
-}
+void grpc_pollset_kick(grpc_pollset *p) { gpr_cv_signal(&p->cv); }
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c
index fbf3fdc949..897408ded2 100644
--- a/src/core/iomgr/socket_windows.c
+++ b/src/core/iomgr/socket_windows.c
@@ -45,11 +45,14 @@
#include "src/core/iomgr/socket_windows.h"
grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name) {
+ char *final_name;
grpc_winsocket *r = gpr_malloc(sizeof(grpc_winsocket));
memset(r, 0, sizeof(grpc_winsocket));
r->socket = socket;
gpr_mu_init(&r->state_mu);
- grpc_iomgr_register_object(&r->iomgr_object, name);
+ gpr_asprintf(&final_name, "%s:socket=0x%p", name, r);
+ grpc_iomgr_register_object(&r->iomgr_object, final_name);
+ gpr_free(final_name);
grpc_iocp_add_socket(r);
return r;
}
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index d981aaf028..dc0489e64f 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -114,6 +114,7 @@ static void on_writable(void *acp, int success) {
void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb;
void *cb_arg = ac->cb_arg;
+ gpr_mu_lock(&ac->mu);
if (success) {
do {
so_error_size = sizeof(so_error);
@@ -139,6 +140,7 @@ static void on_writable(void *acp, int success) {
opened too many network connections. The "easy" fix:
don't do that! */
gpr_log(GPR_ERROR, "kernel out of buffers");
+ gpr_mu_unlock(&ac->mu);
grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
return;
} else {
@@ -165,10 +167,11 @@ static void on_writable(void *acp, int success) {
abort();
finish:
- gpr_mu_lock(&ac->mu);
- if (!ep) {
+ if (ep == NULL) {
grpc_pollset_set_del_fd(ac->interested_parties, ac->fd);
grpc_fd_orphan(ac->fd, NULL, "tcp_client_orphan");
+ } else {
+ ac->fd = NULL;
}
done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
@@ -250,7 +253,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
ac->write_closure.cb_arg = ac;
gpr_mu_lock(&ac->mu);
- grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
+ grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac,
+ gpr_now(GPR_CLOCK_REALTIME));
grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
gpr_mu_unlock(&ac->mu);
diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c
index b1a169b519..16741452b9 100644
--- a/src/core/iomgr/tcp_client_windows.c
+++ b/src/core/iomgr/tcp_client_windows.c
@@ -215,7 +215,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
ac->refs = 2;
ac->aborted = 0;
- grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
+ grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac,
+ gpr_now(GPR_CLOCK_REALTIME));
socket->write_info.outstanding = 1;
grpc_socket_notify_on_write(socket, on_connect, ac);
return;
diff --git a/src/core/profiling/timers_preciseclock.h b/src/core/profiling/timers_preciseclock.h
index 163d52b797..5c251b47e6 100644
--- a/src/core/profiling/timers_preciseclock.h
+++ b/src/core/profiling/timers_preciseclock.h
@@ -82,7 +82,7 @@ struct grpc_precise_clock {
gpr_timespec clock;
};
static void grpc_precise_clock_now(grpc_precise_clock* clk) {
- clk->clock = gpr_now();
+ clk->clock = gpr_now(GPR_CLOCK_REALTIME);
}
#define GRPC_PRECISE_CLOCK_FORMAT "%ld.%09d"
#define GRPC_PRECISE_CLOCK_PRINTF_ARGS(clk) \
diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c
index 16f2abe04b..9e49a807f1 100644
--- a/src/core/security/client_auth_filter.c
+++ b/src/core/security/client_auth_filter.c
@@ -61,6 +61,7 @@ typedef struct {
grpc_transport_stream_op op;
size_t op_md_idx;
int sent_initial_metadata;
+ gpr_uint8 security_context_set;
grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT];
} call_data;
@@ -199,8 +200,22 @@ static void auth_start_transport_op(grpc_call_element *elem,
channel_data *chand = elem->channel_data;
grpc_linked_mdelem *l;
size_t i;
+ grpc_client_security_context* sec_ctx = NULL;
- /* TODO(jboeuf): write the call auth context. */
+ if (calld->security_context_set == 0) {
+ calld->security_context_set = 1;
+ GPR_ASSERT(op->context);
+ if (op->context[GRPC_CONTEXT_SECURITY].value == NULL) {
+ op->context[GRPC_CONTEXT_SECURITY].value =
+ grpc_client_security_context_create();
+ op->context[GRPC_CONTEXT_SECURITY].destroy =
+ grpc_client_security_context_destroy;
+ }
+ sec_ctx = op->context[GRPC_CONTEXT_SECURITY].value;
+ GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter");
+ sec_ctx->auth_context = GRPC_AUTH_CONTEXT_REF(
+ chand->security_connector->base.auth_context, "client_auth_filter");
+ }
if (op->bind_pollset) {
calld->pollset = op->bind_pollset;
@@ -263,6 +278,7 @@ static void init_call_elem(grpc_call_element *elem,
calld->method = NULL;
calld->pollset = NULL;
calld->sent_initial_metadata = 0;
+ calld->security_context_set = 0;
GPR_ASSERT(!initial_op || !initial_op->send_ops);
}
diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c
index 8d694c2f79..230f0dfb85 100644
--- a/src/core/security/credentials.c
+++ b/src/core/security/credentials.c
@@ -41,7 +41,6 @@
#include "src/core/json/json.h"
#include "src/core/httpcli/httpcli.h"
#include "src/core/iomgr/iomgr.h"
-#include "src/core/security/json_token.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
@@ -52,12 +51,12 @@
/* -- Common. -- */
-typedef struct {
+struct grpc_credentials_metadata_request {
grpc_credentials *creds;
grpc_credentials_metadata_cb cb;
grpc_iomgr_closure *on_simulated_token_fetch_done_closure;
void *user_data;
-} grpc_credentials_metadata_request;
+};
static grpc_credentials_metadata_request *
grpc_credentials_metadata_request_create(grpc_credentials *creds,
@@ -152,16 +151,6 @@ grpc_security_status grpc_server_credentials_create_security_connector(
/* -- Ssl credentials. -- */
-typedef struct {
- grpc_credentials base;
- grpc_ssl_config config;
-} grpc_ssl_credentials;
-
-typedef struct {
- grpc_server_credentials base;
- grpc_ssl_server_config config;
-} grpc_ssl_server_credentials;
-
static void ssl_destroy(grpc_credentials *creds) {
grpc_ssl_credentials *c = (grpc_ssl_credentials *)creds;
if (c->config.pem_root_certs != NULL) gpr_free(c->config.pem_root_certs);
@@ -326,22 +315,6 @@ grpc_server_credentials *grpc_ssl_server_credentials_create(
/* -- Jwt credentials -- */
-typedef struct {
- grpc_credentials base;
-
- /* Have a simple cache for now with just 1 entry. We could have a map based on
- the service_url for a more sophisticated one. */
- gpr_mu cache_mu;
- struct {
- grpc_credentials_md_store *jwt_md;
- char *service_url;
- gpr_timespec jwt_expiration;
- } cached;
-
- grpc_auth_json_key key;
- gpr_timespec jwt_lifetime;
-} grpc_jwt_credentials;
-
static void jwt_reset_cache(grpc_jwt_credentials *c) {
if (c->cached.jwt_md != NULL) {
grpc_credentials_md_store_unref(c->cached.jwt_md);
@@ -384,7 +357,8 @@ static void jwt_get_request_metadata(grpc_credentials *creds,
if (c->cached.service_url != NULL &&
strcmp(c->cached.service_url, service_url) == 0 &&
c->cached.jwt_md != NULL &&
- (gpr_time_cmp(gpr_time_sub(c->cached.jwt_expiration, gpr_now()),
+ (gpr_time_cmp(gpr_time_sub(c->cached.jwt_expiration,
+ gpr_now(GPR_CLOCK_REALTIME)),
refresh_threshold) > 0)) {
jwt_md = grpc_credentials_md_store_ref(c->cached.jwt_md);
}
@@ -401,7 +375,8 @@ static void jwt_get_request_metadata(grpc_credentials *creds,
char *md_value;
gpr_asprintf(&md_value, "Bearer %s", jwt);
gpr_free(jwt);
- c->cached.jwt_expiration = gpr_time_add(gpr_now(), c->jwt_lifetime);
+ c->cached.jwt_expiration =
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c->jwt_lifetime);
c->cached.service_url = gpr_strdup(service_url);
c->cached.jwt_md = grpc_credentials_md_store_create(1);
grpc_credentials_md_store_add_cstrings(
@@ -424,10 +399,9 @@ static grpc_credentials_vtable jwt_vtable = {
jwt_destroy, jwt_has_request_metadata, jwt_has_request_metadata_only,
jwt_get_request_metadata, NULL};
-grpc_credentials *grpc_jwt_credentials_create(const char *json_key,
- gpr_timespec token_lifetime) {
+grpc_credentials *grpc_jwt_credentials_create_from_auth_json_key(
+ grpc_auth_json_key key, gpr_timespec token_lifetime) {
grpc_jwt_credentials *c;
- grpc_auth_json_key key = grpc_auth_json_key_create_from_string(json_key);
if (!grpc_auth_json_key_is_valid(&key)) {
gpr_log(GPR_ERROR, "Invalid input for jwt credentials creation");
return NULL;
@@ -444,25 +418,13 @@ grpc_credentials *grpc_jwt_credentials_create(const char *json_key,
return &c->base;
}
-/* -- Oauth2TokenFetcher credentials -- */
-
-/* This object is a base for credentials that need to acquire an oauth2 token
- from an http service. */
-
-typedef void (*grpc_fetch_oauth2_func)(grpc_credentials_metadata_request *req,
- grpc_httpcli_context *http_context,
- grpc_pollset *pollset,
- grpc_httpcli_response_cb response_cb,
- gpr_timespec deadline);
+grpc_credentials *grpc_jwt_credentials_create(const char *json_key,
+ gpr_timespec token_lifetime) {
+ return grpc_jwt_credentials_create_from_auth_json_key(
+ grpc_auth_json_key_create_from_string(json_key), token_lifetime);
+}
-typedef struct {
- grpc_credentials base;
- gpr_mu mu;
- grpc_credentials_md_store *access_token_md;
- gpr_timespec token_expiration;
- grpc_httpcli_context httpcli_context;
- grpc_fetch_oauth2_func fetch_func;
-} grpc_oauth2_token_fetcher_credentials;
+/* -- Oauth2TokenFetcher credentials -- */
static void oauth2_token_fetcher_destroy(grpc_credentials *creds) {
grpc_oauth2_token_fetcher_credentials *c =
@@ -585,7 +547,8 @@ static void on_oauth2_token_fetcher_http_response(
status = grpc_oauth2_token_fetcher_credentials_parse_server_response(
response, &c->access_token_md, &token_lifetime);
if (status == GRPC_CREDENTIALS_OK) {
- c->token_expiration = gpr_time_add(gpr_now(), token_lifetime);
+ c->token_expiration =
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), token_lifetime);
r->cb(r->user_data, c->access_token_md->entries,
c->access_token_md->num_entries, status);
} else {
@@ -607,8 +570,9 @@ static void oauth2_token_fetcher_get_request_metadata(
{
gpr_mu_lock(&c->mu);
if (c->access_token_md != NULL &&
- (gpr_time_cmp(gpr_time_sub(c->token_expiration, gpr_now()),
- refresh_threshold) > 0)) {
+ (gpr_time_cmp(
+ gpr_time_sub(c->token_expiration, gpr_now(GPR_CLOCK_REALTIME)),
+ refresh_threshold) > 0)) {
cached_access_token_md =
grpc_credentials_md_store_ref(c->access_token_md);
}
@@ -622,7 +586,7 @@ static void oauth2_token_fetcher_get_request_metadata(
c->fetch_func(
grpc_credentials_metadata_request_create(creds, cb, user_data),
&c->httpcli_context, pollset, on_oauth2_token_fetcher_http_response,
- gpr_time_add(gpr_now(), refresh_threshold));
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), refresh_threshold));
}
}
@@ -669,13 +633,6 @@ grpc_credentials *grpc_compute_engine_credentials_create(void) {
/* -- ServiceAccount credentials. -- */
-typedef struct {
- grpc_oauth2_token_fetcher_credentials base;
- grpc_auth_json_key key;
- char *scope;
- gpr_timespec token_lifetime;
-} grpc_service_account_credentials;
-
static void service_account_destroy(grpc_credentials *creds) {
grpc_service_account_credentials *c =
(grpc_service_account_credentials *)creds;
@@ -746,11 +703,6 @@ grpc_credentials *grpc_service_account_credentials_create(
/* -- RefreshToken credentials. -- */
-typedef struct {
- grpc_oauth2_token_fetcher_credentials base;
- grpc_auth_refresh_token refresh_token;
-} grpc_refresh_token_credentials;
-
static void refresh_token_destroy(grpc_credentials *creds) {
grpc_refresh_token_credentials *c = (grpc_refresh_token_credentials *)creds;
grpc_auth_refresh_token_destruct(&c->refresh_token);
@@ -786,12 +738,9 @@ static void refresh_token_fetch_oauth2(
gpr_free(body);
}
-grpc_credentials *grpc_refresh_token_credentials_create(
- const char *json_refresh_token) {
+grpc_credentials *grpc_refresh_token_credentials_create_from_auth_refresh_token(
+ grpc_auth_refresh_token refresh_token) {
grpc_refresh_token_credentials *c;
- grpc_auth_refresh_token refresh_token =
- grpc_auth_refresh_token_create_from_string(json_refresh_token);
-
if (!grpc_auth_refresh_token_is_valid(&refresh_token)) {
gpr_log(GPR_ERROR, "Invalid input for refresh token credentials creation");
return NULL;
@@ -804,13 +753,13 @@ grpc_credentials *grpc_refresh_token_credentials_create(
return &c->base.base;
}
-/* -- Fake Oauth2 credentials. -- */
+grpc_credentials *grpc_refresh_token_credentials_create(
+ const char *json_refresh_token) {
+ return grpc_refresh_token_credentials_create_from_auth_refresh_token(
+ grpc_auth_refresh_token_create_from_string(json_refresh_token));
+}
-typedef struct {
- grpc_credentials base;
- grpc_credentials_md_store *access_token_md;
- int is_async;
-} grpc_fake_oauth2_credentials;
+/* -- Fake Oauth2 credentials. -- */
static void fake_oauth2_destroy(grpc_credentials *creds) {
grpc_fake_oauth2_credentials *c = (grpc_fake_oauth2_credentials *)creds;
@@ -877,11 +826,6 @@ grpc_credentials *grpc_fake_oauth2_credentials_create(
/* -- Oauth2 Access Token credentials. -- */
-typedef struct {
- grpc_credentials base;
- grpc_credentials_md_store *access_token_md;
-} grpc_access_token_credentials;
-
static void access_token_destroy(grpc_credentials *creds) {
grpc_access_token_credentials *c = (grpc_access_token_credentials *)creds;
grpc_credentials_md_store_unref(c->access_token_md);
@@ -997,12 +941,6 @@ grpc_server_credentials *grpc_fake_transport_security_server_credentials_create(
/* -- Composite credentials. -- */
typedef struct {
- grpc_credentials base;
- grpc_credentials_array inner;
- grpc_credentials *connector_creds;
-} grpc_composite_credentials;
-
-typedef struct {
grpc_composite_credentials *composite_creds;
size_t creds_index;
grpc_credentials_md_store *md_elems;
@@ -1232,11 +1170,6 @@ grpc_credentials *grpc_credentials_contains_type(
/* -- IAM credentials. -- */
-typedef struct {
- grpc_credentials base;
- grpc_credentials_md_store *iam_md;
-} grpc_iam_credentials;
-
static void iam_destroy(grpc_credentials *creds) {
grpc_iam_credentials *c = (grpc_iam_credentials *)creds;
grpc_credentials_md_store_unref(c->iam_md);
diff --git a/src/core/security/credentials.h b/src/core/security/credentials.h
index 75af73a0c6..d988901cf7 100644
--- a/src/core/security/credentials.h
+++ b/src/core/security/credentials.h
@@ -39,6 +39,8 @@
#include <grpc/grpc_security.h>
#include <grpc/support/sync.h>
+#include "src/core/httpcli/httpcli.h"
+#include "src/core/security/json_token.h"
#include "src/core/security/security_connector.h"
struct grpc_httpcli_response;
@@ -178,11 +180,22 @@ grpc_credentials_status
grpc_oauth2_token_fetcher_credentials_parse_server_response(
const struct grpc_httpcli_response *response,
grpc_credentials_md_store **token_md, gpr_timespec *token_lifetime);
+void grpc_flush_cached_google_default_credentials(void);
/* Simulates an oauth2 token fetch with the specified value for testing. */
grpc_credentials *grpc_fake_oauth2_credentials_create(
const char *token_md_value, int is_async);
+/* Private constructor for jwt credentials from an already parsed json key.
+ Takes ownership of the key. */
+grpc_credentials *grpc_jwt_credentials_create_from_auth_json_key(
+ grpc_auth_json_key key, gpr_timespec token_lifetime);
+
+/* Private constructor for refresh token credentials from an already parsed
+ refresh token. Takes ownership of the refresh token. */
+grpc_credentials *grpc_refresh_token_credentials_create_from_auth_refresh_token(
+ grpc_auth_refresh_token token);
+
/* --- grpc_server_credentials. --- */
typedef struct {
@@ -199,4 +212,103 @@ struct grpc_server_credentials {
grpc_security_status grpc_server_credentials_create_security_connector(
grpc_server_credentials *creds, grpc_security_connector **sc);
+/* -- Ssl credentials. -- */
+
+typedef struct {
+ grpc_credentials base;
+ grpc_ssl_config config;
+} grpc_ssl_credentials;
+
+typedef struct {
+ grpc_server_credentials base;
+ grpc_ssl_server_config config;
+} grpc_ssl_server_credentials;
+
+/* -- Jwt credentials -- */
+
+typedef struct {
+ grpc_credentials base;
+
+ /* Have a simple cache for now with just 1 entry. We could have a map based on
+ the service_url for a more sophisticated one. */
+ gpr_mu cache_mu;
+ struct {
+ grpc_credentials_md_store *jwt_md;
+ char *service_url;
+ gpr_timespec jwt_expiration;
+ } cached;
+
+ grpc_auth_json_key key;
+ gpr_timespec jwt_lifetime;
+} grpc_jwt_credentials;
+
+/* -- Oauth2TokenFetcher credentials --
+
+ This object is a base for credentials that need to acquire an oauth2 token
+ from an http service. */
+
+typedef struct grpc_credentials_metadata_request
+ grpc_credentials_metadata_request;
+
+typedef void (*grpc_fetch_oauth2_func)(grpc_credentials_metadata_request *req,
+ grpc_httpcli_context *http_context,
+ grpc_pollset *pollset,
+ grpc_httpcli_response_cb response_cb,
+ gpr_timespec deadline);
+
+typedef struct {
+ grpc_credentials base;
+ gpr_mu mu;
+ grpc_credentials_md_store *access_token_md;
+ gpr_timespec token_expiration;
+ grpc_httpcli_context httpcli_context;
+ grpc_fetch_oauth2_func fetch_func;
+} grpc_oauth2_token_fetcher_credentials;
+
+/* -- ServiceAccount credentials. -- */
+
+typedef struct {
+ grpc_oauth2_token_fetcher_credentials base;
+ grpc_auth_json_key key;
+ char *scope;
+ gpr_timespec token_lifetime;
+} grpc_service_account_credentials;
+
+/* -- RefreshToken credentials. -- */
+
+typedef struct {
+ grpc_oauth2_token_fetcher_credentials base;
+ grpc_auth_refresh_token refresh_token;
+} grpc_refresh_token_credentials;
+
+/* -- Oauth2 Access Token credentials. -- */
+
+typedef struct {
+ grpc_credentials base;
+ grpc_credentials_md_store *access_token_md;
+} grpc_access_token_credentials;
+
+/* -- Fake Oauth2 credentials. -- */
+
+typedef struct {
+ grpc_credentials base;
+ grpc_credentials_md_store *access_token_md;
+ int is_async;
+} grpc_fake_oauth2_credentials;
+
+/* -- IAM credentials. -- */
+
+typedef struct {
+ grpc_credentials base;
+ grpc_credentials_md_store *iam_md;
+} grpc_iam_credentials;
+
+/* -- Composite credentials. -- */
+
+typedef struct {
+ grpc_credentials base;
+ grpc_credentials_array inner;
+ grpc_credentials *connector_creds;
+} grpc_composite_credentials;
+
#endif /* GRPC_INTERNAL_CORE_SECURITY_CREDENTIALS_H */
diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c
index 5822ce6337..f622deff42 100644
--- a/src/core/security/google_default_credentials.c
+++ b/src/core/security/google_default_credentials.c
@@ -46,7 +46,6 @@
/* -- Constants. -- */
#define GRPC_COMPUTE_ENGINE_DETECTION_HOST "metadata.google.internal"
-#define GRPC_GOOGLE_CREDENTIALS_ENV_VAR "GOOGLE_APPLICATION_CREDENTIALS"
/* -- Default credentials. -- */
@@ -104,9 +103,10 @@ static int is_stack_running_on_compute_engine(void) {
grpc_httpcli_context_init(&context);
- grpc_httpcli_get(&context, &detector.pollset, &request,
- gpr_time_add(gpr_now(), max_detection_delay),
- on_compute_engine_detection_http_response, &detector);
+ grpc_httpcli_get(
+ &context, &detector.pollset, &request,
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay),
+ on_compute_engine_detection_http_response, &detector);
/* Block until we get the response. This is not ideal but this should only be
called once for the lifetime of the process by the default credentials. */
@@ -123,36 +123,40 @@ static int is_stack_running_on_compute_engine(void) {
}
/* Takes ownership of creds_path if not NULL. */
-static grpc_credentials *create_jwt_creds_from_path(char *creds_path) {
+static grpc_credentials *create_default_creds_from_path(char *creds_path) {
+ grpc_json *json = NULL;
+ grpc_auth_json_key key;
+ grpc_auth_refresh_token token;
grpc_credentials *result = NULL;
- gpr_slice creds_data;
+ gpr_slice creds_data = gpr_empty_slice();
int file_ok = 0;
- if (creds_path == NULL) return NULL;
- creds_data = gpr_load_file(creds_path, 1, &file_ok);
- gpr_free(creds_path);
- if (file_ok) {
- result = grpc_jwt_credentials_create(
- (const char *)GPR_SLICE_START_PTR(creds_data),
- grpc_max_auth_token_lifetime);
- gpr_slice_unref(creds_data);
+ if (creds_path == NULL) goto end;
+ creds_data = gpr_load_file(creds_path, 0, &file_ok);
+ if (!file_ok) goto end;
+ json = grpc_json_parse_string_with_len(
+ (char *)GPR_SLICE_START_PTR(creds_data), GPR_SLICE_LENGTH(creds_data));
+ if (json == NULL) goto end;
+
+ /* First, try an auth json key. */
+ key = grpc_auth_json_key_create_from_json(json);
+ if (grpc_auth_json_key_is_valid(&key)) {
+ result = grpc_jwt_credentials_create_from_auth_json_key(
+ key, grpc_max_auth_token_lifetime);
+ goto end;
}
- return result;
-}
-/* Takes ownership of creds_path if not NULL. */
-static grpc_credentials *create_refresh_token_creds_from_path(
- char *creds_path) {
- grpc_credentials *result = NULL;
- gpr_slice creds_data;
- int file_ok = 0;
- if (creds_path == NULL) return NULL;
- creds_data = gpr_load_file(creds_path, 1, &file_ok);
- gpr_free(creds_path);
- if (file_ok) {
- result = grpc_refresh_token_credentials_create(
- (const char *)GPR_SLICE_START_PTR(creds_data));
- gpr_slice_unref(creds_data);
+ /* Then try a refresh token if the auth json key was invalid. */
+ token = grpc_auth_refresh_token_create_from_json(json);
+ if (grpc_auth_refresh_token_is_valid(&token)) {
+ result =
+ grpc_refresh_token_credentials_create_from_auth_refresh_token(token);
+ goto end;
}
+
+end:
+ if (creds_path != NULL) gpr_free(creds_path);
+ gpr_slice_unref(creds_data);
+ if (json != NULL) grpc_json_destroy(json);
return result;
}
@@ -170,12 +174,12 @@ grpc_credentials *grpc_google_default_credentials_create(void) {
}
/* First, try the environment variable. */
- result =
- create_jwt_creds_from_path(gpr_getenv(GRPC_GOOGLE_CREDENTIALS_ENV_VAR));
+ result = create_default_creds_from_path(
+ gpr_getenv(GRPC_GOOGLE_CREDENTIALS_ENV_VAR));
if (result != NULL) goto end;
/* Then the well-known file. */
- result = create_refresh_token_creds_from_path(
+ result = create_default_creds_from_path(
grpc_get_well_known_google_credentials_file_path());
if (result != NULL) goto end;
@@ -193,11 +197,24 @@ end:
if (!serving_cached_credentials && result != NULL) {
/* Blend with default ssl credentials and add a global reference so that it
can be cached and re-served. */
- result = grpc_composite_credentials_create(
- grpc_ssl_credentials_create(NULL, NULL), result);
- GPR_ASSERT(result != NULL);
- default_credentials = grpc_credentials_ref(result);
+ grpc_credentials *ssl_creds = grpc_ssl_credentials_create(NULL, NULL);
+ default_credentials = grpc_credentials_ref(grpc_composite_credentials_create(
+ ssl_creds, result));
+ GPR_ASSERT(default_credentials != NULL);
+ grpc_credentials_unref(ssl_creds);
+ grpc_credentials_unref(result);
+ result = default_credentials;
}
gpr_mu_unlock(&g_mu);
return result;
}
+
+void grpc_flush_cached_google_default_credentials(void) {
+ gpr_once_init(&g_once, init_default_credentials);
+ gpr_mu_lock(&g_mu);
+ if (default_credentials != NULL) {
+ grpc_credentials_unref(default_credentials);
+ default_credentials = NULL;
+ }
+ gpr_mu_unlock(&g_mu);
+}
diff --git a/src/core/security/json_token.c b/src/core/security/json_token.c
index 6116f1d767..9b1ea255ae 100644
--- a/src/core/security/json_token.c
+++ b/src/core/security/json_token.c
@@ -46,17 +46,11 @@
#include <openssl/evp.h>
#include <openssl/pem.h>
-#include "src/core/json/json.h"
-
/* --- Constants. --- */
/* 1 hour max. */
const gpr_timespec grpc_max_auth_token_lifetime = {3600, 0};
-#define GRPC_AUTH_JSON_TYPE_INVALID "invalid"
-#define GRPC_AUTH_JSON_TYPE_SERVICE_ACCOUNT "service_account"
-#define GRPC_AUTH_JSON_TYPE_AUTHORIZED_USER "authorized_user"
-
#define GRPC_JWT_RSA_SHA256_ALGORITHM "RS256"
#define GRPC_JWT_TYPE "JWT"
@@ -66,7 +60,7 @@ static grpc_jwt_encode_and_sign_override g_jwt_encode_and_sign_override = NULL;
/* --- grpc_auth_json_key. --- */
-static const char *json_get_string_property(grpc_json *json,
+static const char *json_get_string_property(const grpc_json *json,
const char *prop_name) {
grpc_json *child;
for (child = json->child; child != NULL; child = child->next) {
@@ -79,7 +73,8 @@ static const char *json_get_string_property(grpc_json *json,
return child->value;
}
-static int set_json_key_string_property(grpc_json *json, const char *prop_name,
+static int set_json_key_string_property(const grpc_json *json,
+ const char *prop_name,
char **json_key_field) {
const char *prop_value = json_get_string_property(json, prop_name);
if (prop_value == NULL) return 0;
@@ -92,11 +87,8 @@ int grpc_auth_json_key_is_valid(const grpc_auth_json_key *json_key) {
strcmp(json_key->type, GRPC_AUTH_JSON_TYPE_INVALID);
}
-grpc_auth_json_key grpc_auth_json_key_create_from_string(
- const char *json_string) {
+grpc_auth_json_key grpc_auth_json_key_create_from_json(const grpc_json *json) {
grpc_auth_json_key result;
- char *scratchpad = gpr_strdup(json_string);
- grpc_json *json = grpc_json_parse_string(scratchpad);
BIO *bio = NULL;
const char *prop_value;
int success = 0;
@@ -104,7 +96,7 @@ grpc_auth_json_key grpc_auth_json_key_create_from_string(
memset(&result, 0, sizeof(grpc_auth_json_key));
result.type = GRPC_AUTH_JSON_TYPE_INVALID;
if (json == NULL) {
- gpr_log(GPR_ERROR, "Invalid json string %s", json_string);
+ gpr_log(GPR_ERROR, "Invalid json.");
goto end;
}
@@ -142,8 +134,16 @@ grpc_auth_json_key grpc_auth_json_key_create_from_string(
end:
if (bio != NULL) BIO_free(bio);
- if (json != NULL) grpc_json_destroy(json);
if (!success) grpc_auth_json_key_destruct(&result);
+ return result;
+}
+
+grpc_auth_json_key grpc_auth_json_key_create_from_string(
+ const char *json_string) {
+ char *scratchpad = gpr_strdup(json_string);
+ grpc_json *json = grpc_json_parse_string(scratchpad);
+ grpc_auth_json_key result = grpc_auth_json_key_create_from_json(json);
+ if (json != NULL) grpc_json_destroy(json);
gpr_free(scratchpad);
return result;
}
@@ -207,7 +207,7 @@ static char *encoded_jwt_claim(const grpc_auth_json_key *json_key,
grpc_json *child = NULL;
char *json_str = NULL;
char *result = NULL;
- gpr_timespec now = gpr_now();
+ gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
gpr_timespec expiration = gpr_time_add(now, token_lifetime);
char now_str[GPR_LTOA_MIN_BUFSIZE];
char expiration_str[GPR_LTOA_MIN_BUFSIZE];
@@ -218,8 +218,8 @@ static char *encoded_jwt_claim(const grpc_auth_json_key *json_key,
gpr_ltoa(now.tv_sec, now_str);
gpr_ltoa(expiration.tv_sec, expiration_str);
- child = create_child(NULL, json, "iss", json_key->client_email,
- GRPC_JSON_STRING);
+ child =
+ create_child(NULL, json, "iss", json_key->client_email, GRPC_JSON_STRING);
if (scope != NULL) {
child = create_child(child, json, "scope", scope, GRPC_JSON_STRING);
} else {
@@ -342,18 +342,16 @@ int grpc_auth_refresh_token_is_valid(
strcmp(refresh_token->type, GRPC_AUTH_JSON_TYPE_INVALID);
}
-grpc_auth_refresh_token grpc_auth_refresh_token_create_from_string(
- const char *json_string) {
+grpc_auth_refresh_token grpc_auth_refresh_token_create_from_json(
+ const grpc_json *json) {
grpc_auth_refresh_token result;
- char *scratchpad = gpr_strdup(json_string);
- grpc_json *json = grpc_json_parse_string(scratchpad);
const char *prop_value;
int success = 0;
memset(&result, 0, sizeof(grpc_auth_refresh_token));
result.type = GRPC_AUTH_JSON_TYPE_INVALID;
if (json == NULL) {
- gpr_log(GPR_ERROR, "Invalid json string %s", json_string);
+ gpr_log(GPR_ERROR, "Invalid json.");
goto end;
}
@@ -374,8 +372,17 @@ grpc_auth_refresh_token grpc_auth_refresh_token_create_from_string(
success = 1;
end:
- if (json != NULL) grpc_json_destroy(json);
if (!success) grpc_auth_refresh_token_destruct(&result);
+ return result;
+}
+
+grpc_auth_refresh_token grpc_auth_refresh_token_create_from_string(
+ const char *json_string) {
+ char *scratchpad = gpr_strdup(json_string);
+ grpc_json *json = grpc_json_parse_string(scratchpad);
+ grpc_auth_refresh_token result =
+ grpc_auth_refresh_token_create_from_json(json);
+ if (json != NULL) grpc_json_destroy(json);
gpr_free(scratchpad);
return result;
}
@@ -396,4 +403,3 @@ void grpc_auth_refresh_token_destruct(grpc_auth_refresh_token *refresh_token) {
refresh_token->refresh_token = NULL;
}
}
-
diff --git a/src/core/security/json_token.h b/src/core/security/json_token.h
index 197796ab4c..091dfefb6e 100644
--- a/src/core/security/json_token.h
+++ b/src/core/security/json_token.h
@@ -37,10 +37,16 @@
#include <grpc/support/slice.h>
#include <openssl/rsa.h>
+#include "src/core/json/json.h"
+
/* --- Constants. --- */
#define GRPC_JWT_OAUTH2_AUDIENCE "https://www.googleapis.com/oauth2/v3/token"
+#define GRPC_AUTH_JSON_TYPE_INVALID "invalid"
+#define GRPC_AUTH_JSON_TYPE_SERVICE_ACCOUNT "service_account"
+#define GRPC_AUTH_JSON_TYPE_AUTHORIZED_USER "authorized_user"
+
/* --- auth_json_key parsing. --- */
typedef struct {
@@ -59,6 +65,10 @@ int grpc_auth_json_key_is_valid(const grpc_auth_json_key *json_key);
grpc_auth_json_key grpc_auth_json_key_create_from_string(
const char *json_string);
+/* Creates a json_key object from parsed json. Returns an invalid object if a
+ parsing error has been encountered. */
+grpc_auth_json_key grpc_auth_json_key_create_from_json(const grpc_json *json);
+
/* Destructs the object. */
void grpc_auth_json_key_destruct(grpc_auth_json_key *json_key);
@@ -97,6 +107,11 @@ int grpc_auth_refresh_token_is_valid(
grpc_auth_refresh_token grpc_auth_refresh_token_create_from_string(
const char *json_string);
+/* Creates a refresh token object from parsed json. Returns an invalid object if
+ a parsing error has been encountered. */
+grpc_auth_refresh_token grpc_auth_refresh_token_create_from_json(
+ const grpc_json *json);
+
/* Destructs the object. */
void grpc_auth_refresh_token_destruct(grpc_auth_refresh_token *refresh_token);
diff --git a/src/core/security/jwt_verifier.c b/src/core/security/jwt_verifier.c
index 01007a1a84..9140eb2ef7 100644
--- a/src/core/security/jwt_verifier.c
+++ b/src/core/security/jwt_verifier.c
@@ -189,7 +189,6 @@ struct grpc_jwt_claims {
gpr_slice buffer;
};
-
void grpc_jwt_claims_destroy(grpc_jwt_claims *claims) {
grpc_json_destroy(claims->json);
gpr_slice_unref(claims->buffer);
@@ -286,12 +285,14 @@ grpc_jwt_verifier_status grpc_jwt_claims_check(const grpc_jwt_claims *claims,
GPR_ASSERT(claims != NULL);
- skewed_now = gpr_time_add(gpr_now(), grpc_jwt_verifier_clock_skew);
+ skewed_now =
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_clock_skew);
if (gpr_time_cmp(skewed_now, claims->nbf) < 0) {
gpr_log(GPR_ERROR, "JWT is not valid yet.");
return GRPC_JWT_VERIFIER_TIME_CONSTRAINT_FAILURE;
}
- skewed_now = gpr_time_sub(gpr_now(), grpc_jwt_verifier_clock_skew);
+ skewed_now =
+ gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_clock_skew);
if (gpr_time_cmp(skewed_now, claims->exp) > 0) {
gpr_log(GPR_ERROR, "JWT is expired.");
return GRPC_JWT_VERIFIER_TIME_CONSTRAINT_FAILURE;
@@ -327,10 +328,10 @@ typedef struct {
/* Takes ownership of the header, claims and signature. */
static verifier_cb_ctx *verifier_cb_ctx_create(
- grpc_jwt_verifier *verifier, grpc_pollset *pollset,
- jose_header * header, grpc_jwt_claims *claims, const char *audience,
- gpr_slice signature, const char *signed_jwt, size_t signed_jwt_len,
- void *user_data, grpc_jwt_verification_done_cb cb) {
+ grpc_jwt_verifier *verifier, grpc_pollset *pollset, jose_header *header,
+ grpc_jwt_claims *claims, const char *audience, gpr_slice signature,
+ const char *signed_jwt, size_t signed_jwt_len, void *user_data,
+ grpc_jwt_verification_done_cb cb) {
verifier_cb_ctx *ctx = gpr_malloc(sizeof(verifier_cb_ctx));
memset(ctx, 0, sizeof(verifier_cb_ctx));
ctx->verifier = verifier;
@@ -604,7 +605,7 @@ end:
static void on_openid_config_retrieved(void *user_data,
const grpc_httpcli_response *response) {
- const grpc_json* cur;
+ const grpc_json *cur;
grpc_json *json = json_from_http(response);
verifier_cb_ctx *ctx = (verifier_cb_ctx *)user_data;
grpc_httpcli_request req;
@@ -632,9 +633,10 @@ static void on_openid_config_retrieved(void *user_data,
} else {
*(req.host + (req.path - jwks_uri)) = '\0';
}
- grpc_httpcli_get(&ctx->verifier->http_ctx, ctx->pollset, &req,
- gpr_time_add(gpr_now(), grpc_jwt_verifier_max_delay),
- on_keys_retrieved, ctx);
+ grpc_httpcli_get(
+ &ctx->verifier->http_ctx, ctx->pollset, &req,
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay),
+ on_keys_retrieved, ctx);
grpc_json_destroy(json);
gpr_free(req.host);
return;
@@ -645,8 +647,8 @@ error:
verifier_cb_ctx_destroy(ctx);
}
-static email_key_mapping *verifier_get_mapping(
- grpc_jwt_verifier *v, const char *email_domain) {
+static email_key_mapping *verifier_get_mapping(grpc_jwt_verifier *v,
+ const char *email_domain) {
size_t i;
if (v->mappings == NULL) return NULL;
for (i = 0; i < v->num_mappings; i++) {
@@ -733,9 +735,10 @@ static void retrieve_key_and_verify(verifier_cb_ctx *ctx) {
http_cb = on_openid_config_retrieved;
}
- grpc_httpcli_get(&ctx->verifier->http_ctx, ctx->pollset, &req,
- gpr_time_add(gpr_now(), grpc_jwt_verifier_max_delay),
- http_cb, ctx);
+ grpc_httpcli_get(
+ &ctx->verifier->http_ctx, ctx->pollset, &req,
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay),
+ http_cb, ctx);
gpr_free(req.host);
gpr_free(req.path);
return;
@@ -764,7 +767,7 @@ void grpc_jwt_verifier_verify(grpc_jwt_verifier *verifier,
dot = strchr(cur, '.');
if (dot == NULL) goto error;
json = parse_json_part_from_jwt(cur, dot - cur, &header_buffer);
- if (json == NULL) goto error;
+ if (json == NULL) goto error;
header = jose_header_from_json(json, header_buffer);
if (header == NULL) goto error;
@@ -772,7 +775,7 @@ void grpc_jwt_verifier_verify(grpc_jwt_verifier *verifier,
dot = strchr(cur, '.');
if (dot == NULL) goto error;
json = parse_json_part_from_jwt(cur, dot - cur, &claims_buffer);
- if (json == NULL) goto error;
+ if (json == NULL) goto error;
claims = grpc_jwt_claims_from_json(json, claims_buffer);
if (claims == NULL) goto error;
@@ -827,4 +830,3 @@ void grpc_jwt_verifier_destroy(grpc_jwt_verifier *v) {
}
gpr_free(v);
}
-
diff --git a/src/core/security/security_context.c b/src/core/security/security_context.c
index 4d56549f9b..8ce7876bd8 100644
--- a/src/core/security/security_context.c
+++ b/src/core/security/security_context.c
@@ -69,12 +69,20 @@ grpc_call_error grpc_call_set_credentials(grpc_call *call,
return GRPC_CALL_OK;
}
-const grpc_auth_context *grpc_call_auth_context(grpc_call *call) {
+grpc_auth_context *grpc_call_auth_context(grpc_call *call) {
void *sec_ctx = grpc_call_context_get(call, GRPC_CONTEXT_SECURITY);
if (sec_ctx == NULL) return NULL;
return grpc_call_is_client(call)
- ? ((grpc_client_security_context *)sec_ctx)->auth_context
- : ((grpc_server_security_context *)sec_ctx)->auth_context;
+ ? GRPC_AUTH_CONTEXT_REF(
+ ((grpc_client_security_context *)sec_ctx)->auth_context,
+ "grpc_call_auth_context client")
+ : GRPC_AUTH_CONTEXT_REF(
+ ((grpc_server_security_context *)sec_ctx)->auth_context,
+ "grpc_call_auth_context server");
+}
+
+void grpc_auth_context_release(grpc_auth_context *context) {
+ GRPC_AUTH_CONTEXT_UNREF(context, "grpc_auth_context_unref");
}
/* --- grpc_client_security_context --- */
diff --git a/src/core/security/security_context.h b/src/core/security/security_context.h
index 20c4390898..76a45910bb 100644
--- a/src/core/security/security_context.h
+++ b/src/core/security/security_context.h
@@ -36,6 +36,10 @@
#include "src/core/security/credentials.h"
+#ifdef __cplusplus
+extern "C" {
+#endif
+
/* --- grpc_auth_context ---
High level authentication context object. Can optionally be chained. */
@@ -103,5 +107,9 @@ typedef struct {
grpc_server_security_context *grpc_server_security_context_create(void);
void grpc_server_security_context_destroy(void *ctx);
+#ifdef __cplusplus
+}
+#endif
+
#endif /* GRPC_INTERNAL_CORE_SECURITY_SECURITY_CONTEXT_H */
diff --git a/src/core/statistics/census_rpc_stats.c b/src/core/statistics/census_rpc_stats.c
index 0491c91947..3e571b1143 100644
--- a/src/core/statistics/census_rpc_stats.c
+++ b/src/core/statistics/census_rpc_stats.c
@@ -157,7 +157,7 @@ static void record_stats(census_ht* store, census_op_id op_id,
key.ptr = gpr_strdup(key.ptr);
census_ht_insert(store, key, (void*)window_stats);
}
- census_window_stats_add(window_stats, gpr_now(), stats);
+ census_window_stats_add(window_stats, gpr_now(GPR_CLOCK_REALTIME), stats);
} else {
census_internal_unlock_trace_store();
}
@@ -185,7 +185,7 @@ static void get_stats(census_ht* store, census_aggregated_rpc_stats* data) {
if (store != NULL) {
size_t n;
unsigned i, j;
- gpr_timespec now = gpr_now();
+ gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
census_ht_kv* kv = census_ht_get_all_elements(store, &n);
if (kv != NULL) {
data->num_entries = n;
diff --git a/src/core/statistics/census_tracing.c b/src/core/statistics/census_tracing.c
index 05e72b99c0..3036ba5407 100644
--- a/src/core/statistics/census_tracing.c
+++ b/src/core/statistics/census_tracing.c
@@ -94,7 +94,7 @@ census_op_id census_tracing_start_op(void) {
g_id++;
memcpy(&ret->id, &g_id, sizeof(census_op_id));
ret->rpc_stats.cnt = 1;
- ret->ts = gpr_now();
+ ret->ts = gpr_now(GPR_CLOCK_REALTIME);
census_ht_insert(g_trace_store, op_id_as_key(&ret->id), (void*)ret);
gpr_log(GPR_DEBUG, "Start tracing for id %lu", g_id);
gpr_mu_unlock(&g_mu);
@@ -122,7 +122,7 @@ void census_tracing_print(census_op_id op_id, const char* anno_txt) {
trace = census_ht_find(g_trace_store, op_id_as_key(&op_id));
if (trace != NULL) {
census_trace_annotation* anno = gpr_malloc(sizeof(census_trace_annotation));
- anno->ts = gpr_now();
+ anno->ts = gpr_now(GPR_CLOCK_REALTIME);
{
char* d = anno->txt;
const char* s = anno_txt;
@@ -143,8 +143,8 @@ void census_tracing_end_op(census_op_id op_id) {
gpr_mu_lock(&g_mu);
trace = census_ht_find(g_trace_store, op_id_as_key(&op_id));
if (trace != NULL) {
- trace->rpc_stats.elapsed_time_ms =
- gpr_timespec_to_micros(gpr_time_sub(gpr_now(), trace->ts));
+ trace->rpc_stats.elapsed_time_ms = gpr_timespec_to_micros(
+ gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), trace->ts));
gpr_log(GPR_DEBUG, "End tracing for id %lu, method %s, latency %f us",
op_id_2_uint64(&op_id), trace->method,
trace->rpc_stats.elapsed_time_ms);
@@ -194,8 +194,8 @@ const char* census_get_trace_method_name(const census_trace_obj* trace) {
static census_trace_annotation* dup_annotation_chain(
census_trace_annotation* from) {
- census_trace_annotation *ret = NULL;
- census_trace_annotation **to = &ret;
+ census_trace_annotation* ret = NULL;
+ census_trace_annotation** to = &ret;
for (; from != NULL; from = from->next) {
*to = gpr_malloc(sizeof(census_trace_annotation));
memcpy(*to, from, sizeof(census_trace_annotation));
@@ -223,9 +223,9 @@ census_trace_obj** census_get_active_ops(int* num_active_ops) {
size_t n = 0;
census_ht_kv* all_kvs = census_ht_get_all_elements(g_trace_store, &n);
*num_active_ops = (int)n;
- if (n != 0 ) {
+ if (n != 0) {
size_t i = 0;
- ret = gpr_malloc(sizeof(census_trace_obj *) * n);
+ ret = gpr_malloc(sizeof(census_trace_obj*) * n);
for (i = 0; i < n; i++) {
ret[i] = trace_obj_dup((census_trace_obj*)all_kvs[i].v);
}
diff --git a/src/core/statistics/window_stats.h b/src/core/statistics/window_stats.h
index d733d8d247..0020f6b44c 100644
--- a/src/core/statistics/window_stats.h
+++ b/src/core/statistics/window_stats.h
@@ -90,11 +90,11 @@
// Record a new event, taking 15.3ms, transferring 1784 bytes.
stat.latency = 0.153;
stat.bytes = 1784;
- census_window_stats_add(stats, gpr_now(), &stat);
+ census_window_stats_add(stats, gpr_now(GPR_CLOCK_REALTIME), &stat);
// Get sums and print them out
result[kMinInterval].statistic = &sums[kMinInterval];
result[kHourInterval].statistic = &sums[kHourInterval];
- census_window_stats_get_sums(stats, gpr_now(), result);
+ census_window_stats_get_sums(stats, gpr_now(GPR_CLOCK_REALTIME), result);
printf("%d events/min, average time %gs, average bytes %g\n",
result[kMinInterval].count,
(my_stat*)result[kMinInterval].statistic->latency /
@@ -170,4 +170,4 @@ void census_window_stats_get_sums(const struct census_window_stats* wstats,
assertion failure). This function is thread-compatible. */
void census_window_stats_destroy(struct census_window_stats* wstats);
-#endif /* GRPC_INTERNAL_CORE_STATISTICS_WINDOW_STATS_H */
+#endif /* GRPC_INTERNAL_CORE_STATISTICS_WINDOW_STATS_H */
diff --git a/src/core/support/cancellable.c b/src/core/support/cancellable.c
index 5a4d488dd3..3cbb318ab6 100644
--- a/src/core/support/cancellable.c
+++ b/src/core/support/cancellable.c
@@ -121,8 +121,8 @@ void gpr_cancellable_cancel(gpr_cancellable *c) {
} else {
gpr_event ev;
gpr_event_init(&ev);
- gpr_event_wait(&ev,
- gpr_time_add(gpr_now(), gpr_time_from_micros(1000)));
+ gpr_event_wait(&ev, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_micros(1000)));
}
}
} while (failures != 0);
diff --git a/src/core/support/log_linux.c b/src/core/support/log_linux.c
index 7937466b79..5ac36e7b95 100644
--- a/src/core/support/log_linux.c
+++ b/src/core/support/log_linux.c
@@ -76,7 +76,7 @@ void gpr_default_log(gpr_log_func_args *args) {
char *prefix;
const char *display_file;
char time_buffer[64];
- gpr_timespec now = gpr_now();
+ gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
struct tm tm;
final_slash = strrchr(args->file, '/');
diff --git a/src/core/support/log_posix.c b/src/core/support/log_posix.c
index afca792c40..940ee20f15 100644
--- a/src/core/support/log_posix.c
+++ b/src/core/support/log_posix.c
@@ -75,7 +75,7 @@ void gpr_default_log(gpr_log_func_args *args) {
char *final_slash;
const char *display_file;
char time_buffer[64];
- gpr_timespec now = gpr_now();
+ gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
struct tm tm;
final_slash = strrchr(args->file, '/');
diff --git a/src/core/support/log_win32.c b/src/core/support/log_win32.c
index d249be7d2e..629781da8a 100644
--- a/src/core/support/log_win32.c
+++ b/src/core/support/log_win32.c
@@ -82,7 +82,7 @@ void gpr_log(const char *file, int line, gpr_log_severity severity,
/* Simple starter implementation */
void gpr_default_log(gpr_log_func_args *args) {
char time_buffer[64];
- gpr_timespec now = gpr_now();
+ gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
struct tm tm;
if (localtime_s(&tm, &now.tv_sec)) {
diff --git a/src/core/support/sync_win32.c b/src/core/support/sync_win32.c
index 72f39f8d46..29b77fc4c2 100644
--- a/src/core/support/sync_win32.c
+++ b/src/core/support/sync_win32.c
@@ -86,7 +86,7 @@ int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline) {
if (gpr_time_cmp(abs_deadline, gpr_inf_future) == 0) {
SleepConditionVariableCS(cv, &mu->cs, INFINITE);
} else {
- gpr_timespec now = gpr_now();
+ gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
gpr_int64 now_ms = now.tv_sec * 1000 + now.tv_nsec / 1000000;
gpr_int64 deadline_ms =
abs_deadline.tv_sec * 1000 + abs_deadline.tv_nsec / 1000000;
diff --git a/src/core/support/time_posix.c b/src/core/support/time_posix.c
index afb58ef231..f9b7958783 100644
--- a/src/core/support/time_posix.c
+++ b/src/core/support/time_posix.c
@@ -55,22 +55,52 @@ static gpr_timespec gpr_from_timespec(struct timespec ts) {
return rv;
}
-gpr_timespec gpr_now(void) {
+/** maps gpr_clock_type --> clockid_t for clock_gettime */
+static clockid_t clockid_for_gpr_clock[] = {CLOCK_MONOTONIC, CLOCK_REALTIME};
+
+void gpr_time_init(void) {}
+
+gpr_timespec gpr_now(gpr_clock_type clock) {
struct timespec now;
- clock_gettime(CLOCK_REALTIME, &now);
+ clock_gettime(clockid_for_gpr_clock[clock], &now);
return gpr_from_timespec(now);
}
#else
/* For some reason Apple's OSes haven't implemented clock_gettime. */
#include <sys/time.h>
+#include <mach/mach.h>
+#include <mach/mach_time.h>
+
+static double g_time_scale;
+static uint64_t g_time_start;
+
+void gpr_time_init(void) {
+ mach_timebase_info_data_t tb = {0, 1};
+ mach_timebase_info(&tb);
+ g_time_scale = tb.numer;
+ g_time_scale /= tb.denom;
+ g_time_start = mach_absolute_time();
+}
-gpr_timespec gpr_now(void) {
+gpr_timespec gpr_now(gpr_clock_type clock) {
gpr_timespec now;
struct timeval now_tv;
- gettimeofday(&now_tv, NULL);
- now.tv_sec = now_tv.tv_sec;
- now.tv_nsec = now_tv.tv_usec * 1000;
+ double now_dbl;
+
+ switch (clock) {
+ case GPR_CLOCK_REALTIME:
+ gettimeofday(&now_tv, NULL);
+ now.tv_sec = now_tv.tv_sec;
+ now.tv_nsec = now_tv.tv_usec * 1000;
+ break;
+ case GPR_CLOCK_MONOTONIC:
+ now_dbl = (mach_absolute_time() - g_time_start) * g_time_scale;
+ now.tv_sec = now_dbl * 1e-9;
+ now.tv_nsec = now_dbl - now.tv_sec * 1e9;
+ break;
+ }
+
return now;
}
#endif
@@ -83,7 +113,7 @@ void gpr_sleep_until(gpr_timespec until) {
for (;;) {
/* We could simplify by using clock_nanosleep instead, but it might be
* slightly less portable. */
- now = gpr_now();
+ now = gpr_now(GPR_CLOCK_REALTIME);
if (gpr_time_cmp(until, now) <= 0) {
return;
}
diff --git a/src/core/support/time_win32.c b/src/core/support/time_win32.c
index 9db267c01b..fa77c74eeb 100644
--- a/src/core/support/time_win32.c
+++ b/src/core/support/time_win32.c
@@ -40,12 +40,34 @@
#include <grpc/support/time.h>
#include <sys/timeb.h>
-gpr_timespec gpr_now(void) {
+static LARGE_INTEGER g_start_time;
+static double g_time_scale;
+
+void gpr_time_init(void) {
+ LARGE_INTEGER frequency;
+ QueryPerformanceFrequency(&frequency);
+ QueryPerformanceCounter(&g_start_time);
+ g_time_scale = 1.0 / frequency.QuadPart;
+}
+
+gpr_timespec gpr_now(gpr_clock_type clock) {
gpr_timespec now_tv;
struct _timeb now_tb;
- _ftime_s(&now_tb);
- now_tv.tv_sec = now_tb.time;
- now_tv.tv_nsec = now_tb.millitm * 1000000;
+ LARGE_INTEGER timestamp;
+ double now_dbl;
+ switch (clock) {
+ case GPR_CLOCK_REALTIME:
+ _ftime_s(&now_tb);
+ now_tv.tv_sec = now_tb.time;
+ now_tv.tv_nsec = now_tb.millitm * 1000000;
+ break;
+ case GPR_CLOCK_MONOTONIC:
+ QueryPerformanceCounter(&timestamp);
+ now_dbl = (timestamp.QuadPart - g_start_time.QuadPart) * g_time_scale;
+ now_tv.tv_sec = (time_t)now_dbl;
+ now_tv.tv_nsec = (int)((now_dbl - (double)now_tv.tv_sec) * 1e9);
+ break;
+ }
return now_tv;
}
@@ -57,13 +79,14 @@ void gpr_sleep_until(gpr_timespec until) {
for (;;) {
/* We could simplify by using clock_nanosleep instead, but it might be
* slightly less portable. */
- now = gpr_now();
+ now = gpr_now(GPR_CLOCK_REALTIME);
if (gpr_time_cmp(until, now) <= 0) {
return;
}
delta = gpr_time_sub(until, now);
- sleep_millis = (DWORD)delta.tv_sec * GPR_MS_PER_SEC + delta.tv_nsec / GPR_NS_PER_MS;
+ sleep_millis =
+ (DWORD)delta.tv_sec * GPR_MS_PER_SEC + delta.tv_nsec / GPR_NS_PER_MS;
Sleep(sleep_millis);
}
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index fb3b0b1918..0a551ac47f 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -49,6 +49,17 @@
#include <stdlib.h>
#include <string.h>
+/** The maximum number of completions possible.
+ Based upon the maximum number of individually queueable ops in the batch
+ api:
+ - initial metadata send
+ - message send
+ - status/close send (depending on client/server)
+ - initial metadata recv
+ - message recv
+ - status/close recv (depending on client/server) */
+#define MAX_CONCURRENT_COMPLETIONS 6
+
typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state;
typedef enum {
@@ -135,6 +146,7 @@ struct grpc_call {
grpc_mdctx *metadata_context;
/* TODO(ctiller): share with cq if possible? */
gpr_mu mu;
+ gpr_mu completion_mu;
/* how far through the stream have we read? */
read_state read_state;
@@ -162,6 +174,8 @@ struct grpc_call {
gpr_uint8 error_status_set;
/** should the alarm be cancelled */
gpr_uint8 cancel_alarm;
+ /** bitmask of allocated completion events in completions */
+ gpr_uint8 allocated_completions;
/* flags with bits corresponding to write states allowing us to determine
what was sent */
@@ -250,6 +264,9 @@ struct grpc_call {
grpc_iomgr_closure on_done_recv;
grpc_iomgr_closure on_done_send;
grpc_iomgr_closure on_done_bind;
+
+ /** completion events - for completion queue use */
+ grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS];
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -286,6 +303,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
memset(call, 0, sizeof(grpc_call));
gpr_mu_init(&call->mu);
+ gpr_mu_init(&call->completion_mu);
call->channel = channel;
call->cq = cq;
if (cq) {
@@ -349,6 +367,29 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) {
return call->cq;
}
+static grpc_cq_completion *allocate_completion(grpc_call *call) {
+ gpr_uint8 i;
+ gpr_mu_lock(&call->completion_mu);
+ for (i = 0; i < GPR_ARRAY_SIZE(call->completions); i++) {
+ if (call->allocated_completions & (1u << i)) {
+ continue;
+ }
+ call->allocated_completions |= 1u << i;
+ gpr_mu_unlock(&call->completion_mu);
+ return &call->completions[i];
+ }
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+}
+
+static void done_completion(void *call, grpc_cq_completion *completion) {
+ grpc_call *c = call;
+ gpr_mu_lock(&c->completion_mu);
+ c->allocated_completions &= ~(1u << (completion - c->completions));
+ gpr_mu_unlock(&c->completion_mu);
+ GRPC_CALL_INTERNAL_UNREF(c, "completion", 1);
+}
+
#ifdef GRPC_CALL_REF_COUNT_DEBUG
void grpc_call_internal_ref(grpc_call *c, const char *reason) {
gpr_log(GPR_DEBUG, "CALL: ref %p %d -> %d [%s]", c,
@@ -365,6 +406,7 @@ static void destroy_call(void *call, int ignored_success) {
grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call");
gpr_mu_destroy(&c->mu);
+ gpr_mu_destroy(&c->completion_mu);
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (c->status[i].details) {
GRPC_MDSTR_UNREF(c->status[i].details);
@@ -1188,7 +1230,8 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
}
GRPC_CALL_INTERNAL_REF(call, "alarm");
call->have_alarm = 1;
- grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
+ grpc_alarm_init(&call->alarm, deadline, call_alarm, call,
+ gpr_now(GPR_CLOCK_REALTIME));
}
/* we offset status by a small amount when storing it into transport metadata
@@ -1316,11 +1359,13 @@ static void set_cancelled_value(grpc_status_code status, void *dest) {
}
static void finish_batch(grpc_call *call, int success, void *tag) {
- grpc_cq_end_op(call->cq, tag, call, success);
+ grpc_cq_end_op(call->cq, tag, success, done_completion, call,
+ allocate_completion(call));
}
static void finish_batch_with_close(grpc_call *call, int success, void *tag) {
- grpc_cq_end_op(call->cq, tag, call, 1);
+ grpc_cq_end_op(call->cq, tag, 1, done_completion, call,
+ allocate_completion(call));
}
static int are_write_flags_valid(gpr_uint32 flags) {
@@ -1343,8 +1388,10 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
if (nops == 0) {
- grpc_cq_begin_op(call->cq, call);
- grpc_cq_end_op(call->cq, tag, call, 1);
+ grpc_cq_begin_op(call->cq);
+ GRPC_CALL_INTERNAL_REF(call, "completion");
+ grpc_cq_end_op(call->cq, tag, 1, done_completion, call,
+ allocate_completion(call));
return GRPC_CALL_OK;
}
@@ -1466,7 +1513,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
}
}
- grpc_cq_begin_op(call->cq, call);
+ GRPC_CALL_INTERNAL_REF(call, "completion");
+ grpc_cq_begin_op(call->cq);
return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag);
}
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 030a8b4e6f..c67f75fc5c 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -45,34 +45,20 @@
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
-#define NUM_TAG_BUCKETS 31
-
-/* A single event: extends grpc_event to form a linked list with a destruction
- function (on_finish) that is hidden from outside this module */
-typedef struct event {
- grpc_event base;
- struct event *queue_next;
- struct event *queue_prev;
- struct event *bucket_next;
- struct event *bucket_prev;
-} event;
-
/* Completion queue structure */
struct grpc_completion_queue {
- /* When refs drops to zero, we are in shutdown mode, and will be destroyable
- once all queued events are drained */
- gpr_refcount refs;
- /* Once owning_refs drops to zero, we will destroy the cq */
+ /** completed events */
+ grpc_cq_completion completed_head;
+ grpc_cq_completion *completed_tail;
+ /** Number of pending events (+1 if we're not shutdown) */
+ gpr_refcount pending_events;
+ /** Once owning_refs drops to zero, we will destroy the cq */
gpr_refcount owning_refs;
- /* the set of low level i/o things that concern this cq */
+ /** the set of low level i/o things that concern this cq */
grpc_pollset pollset;
- /* 0 initially, 1 once we've begun shutting down */
+ /** 0 initially, 1 once we've begun shutting down */
int shutdown;
int shutdown_called;
- /* Head of a linked list of queued events (prev points to the last element) */
- event *queue;
- /* Fixed size chained hash table of events for pluck() */
- event *buckets[NUM_TAG_BUCKETS];
int is_server_cq;
};
@@ -80,19 +66,20 @@ grpc_completion_queue *grpc_completion_queue_create(void) {
grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue));
memset(cc, 0, sizeof(*cc));
/* Initial ref is dropped by grpc_completion_queue_shutdown */
- gpr_ref_init(&cc->refs, 1);
+ gpr_ref_init(&cc->pending_events, 1);
/* One for destroy(), one for pollset_shutdown */
gpr_ref_init(&cc->owning_refs, 2);
grpc_pollset_init(&cc->pollset);
+ cc->completed_tail = &cc->completed_head;
+ cc->completed_head.next = (gpr_uintptr)cc->completed_tail;
return cc;
}
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s",
- cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1,
- reason);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", cc,
+ (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, reason);
#else
void grpc_cq_internal_ref(grpc_completion_queue *cc) {
#endif
@@ -107,186 +94,135 @@ static void on_pollset_destroy_done(void *arg) {
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason,
const char *file, int line) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s",
- cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1,
- reason);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", cc,
+ (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason);
#else
void grpc_cq_internal_unref(grpc_completion_queue *cc) {
#endif
if (gpr_unref(&cc->owning_refs)) {
- GPR_ASSERT(cc->queue == NULL);
+ GPR_ASSERT(cc->completed_head.next == (gpr_uintptr)&cc->completed_head);
grpc_pollset_destroy(&cc->pollset);
gpr_free(cc);
}
}
-/* Create and append an event to the queue. Returns the event so that its data
- members can be filled in.
- Requires GRPC_POLLSET_MU(&cc->pollset) locked. */
-static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
- void *tag, grpc_call *call) {
- event *ev = gpr_malloc(sizeof(event));
- gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS;
- ev->base.type = type;
- ev->base.tag = tag;
- if (cc->queue == NULL) {
- cc->queue = ev->queue_next = ev->queue_prev = ev;
- } else {
- ev->queue_next = cc->queue;
- ev->queue_prev = cc->queue->queue_prev;
- ev->queue_next->queue_prev = ev->queue_prev->queue_next = ev;
- }
- if (cc->buckets[bucket] == NULL) {
- cc->buckets[bucket] = ev->bucket_next = ev->bucket_prev = ev;
- } else {
- ev->bucket_next = cc->buckets[bucket];
- ev->bucket_prev = cc->buckets[bucket]->bucket_prev;
- ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev;
- }
- grpc_pollset_kick(&cc->pollset);
- return ev;
-}
-
-void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call) {
- gpr_ref(&cc->refs);
- if (call) GRPC_CALL_INTERNAL_REF(call, "cq");
+void grpc_cq_begin_op(grpc_completion_queue *cc) {
+ gpr_ref(&cc->pending_events);
}
/* Signal the end of an operation - if this is the last waiting-to-be-queued
event, then enter shutdown mode */
-void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
- int success) {
- event *ev;
- int shutdown = 0;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
- ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call);
- ev->base.success = success;
- if (gpr_unref(&cc->refs)) {
+/* Queue a GRPC_OP_COMPLETED operation */
+void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
+ void (*done)(void *done_arg, grpc_cq_completion *storage),
+ void *done_arg, grpc_cq_completion *storage) {
+ int shutdown = gpr_unref(&cc->pending_events);
+
+ storage->tag = tag;
+ storage->done = done;
+ storage->done_arg = done_arg;
+ storage->next =
+ ((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0));
+
+ if (!shutdown) {
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ cc->completed_tail->next =
+ ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
+ cc->completed_tail = storage;
+ grpc_pollset_kick(&cc->pollset);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ } else {
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ cc->completed_tail->next =
+ ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
+ cc->completed_tail = storage;
GPR_ASSERT(!cc->shutdown);
GPR_ASSERT(cc->shutdown_called);
cc->shutdown = 1;
- shutdown = 1;
- }
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- if (call) GRPC_CALL_INTERNAL_UNREF(call, "cq", 0);
- if (shutdown) {
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
}
}
-/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
-static event *create_shutdown_event(void) {
- event *ev = gpr_malloc(sizeof(event));
- ev->base.type = GRPC_QUEUE_SHUTDOWN;
- ev->base.tag = NULL;
- return ev;
-}
-
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline) {
- event *ev = NULL;
grpc_event ret;
GRPC_CQ_INTERNAL_REF(cc, "next");
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
- if (cc->queue != NULL) {
- gpr_uintptr bucket;
- ev = cc->queue;
- bucket = ((gpr_uintptr)ev->base.tag) % NUM_TAG_BUCKETS;
- cc->queue = ev->queue_next;
- ev->queue_next->queue_prev = ev->queue_prev;
- ev->queue_prev->queue_next = ev->queue_next;
- ev->bucket_next->bucket_prev = ev->bucket_prev;
- ev->bucket_prev->bucket_next = ev->bucket_next;
- if (ev == cc->buckets[bucket]) {
- cc->buckets[bucket] = ev->bucket_next;
- if (ev == cc->buckets[bucket]) {
- cc->buckets[bucket] = NULL;
- }
- }
- if (cc->queue == ev) {
- cc->queue = NULL;
+ if (cc->completed_tail != &cc->completed_head) {
+ grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next;
+ cc->completed_head.next = c->next & ~(gpr_uintptr)1;
+ if (c == cc->completed_tail) {
+ cc->completed_tail = &cc->completed_head;
}
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ ret.type = GRPC_OP_COMPLETE;
+ ret.success = c->next & 1u;
+ ret.tag = c->tag;
+ c->done(c->done_arg, c);
break;
}
if (cc->shutdown) {
- ev = create_shutdown_event();
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ memset(&ret, 0, sizeof(ret));
+ ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
if (!grpc_pollset_work(&cc->pollset, deadline)) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
- GRPC_CQ_INTERNAL_UNREF(cc, "next");
- return ret;
+ break;
}
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- ret = ev->base;
- gpr_free(ev);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "next");
return ret;
}
-static event *pluck_event(grpc_completion_queue *cc, void *tag) {
- gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS;
- event *ev = cc->buckets[bucket];
- if (ev == NULL) return NULL;
- do {
- if (ev->base.tag == tag) {
- ev->queue_next->queue_prev = ev->queue_prev;
- ev->queue_prev->queue_next = ev->queue_next;
- ev->bucket_next->bucket_prev = ev->bucket_prev;
- ev->bucket_prev->bucket_next = ev->bucket_next;
- if (ev == cc->buckets[bucket]) {
- cc->buckets[bucket] = ev->bucket_next;
- if (ev == cc->buckets[bucket]) {
- cc->buckets[bucket] = NULL;
- }
- }
- if (cc->queue == ev) {
- cc->queue = ev->queue_next;
- if (cc->queue == ev) {
- cc->queue = NULL;
- }
- }
- return ev;
- }
- ev = ev->bucket_next;
- } while (ev != cc->buckets[bucket]);
- return NULL;
-}
-
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline) {
- event *ev = NULL;
grpc_event ret;
+ grpc_cq_completion *c;
+ grpc_cq_completion *prev;
GRPC_CQ_INTERNAL_REF(cc, "pluck");
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
- if ((ev = pluck_event(cc, tag))) {
- break;
+ prev = &cc->completed_head;
+ while ((c = (grpc_cq_completion *)(prev->next & ~(gpr_uintptr)1)) !=
+ &cc->completed_head) {
+ if (c->tag == tag) {
+ prev->next =
+ (prev->next & (gpr_uintptr)1) | (c->next & ~(gpr_uintptr)1);
+ if (c == cc->completed_tail) {
+ cc->completed_tail = prev;
+ }
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ ret.type = GRPC_OP_COMPLETE;
+ ret.success = c->next & 1u;
+ ret.tag = c->tag;
+ c->done(c->done_arg, c);
+ goto done;
+ }
+ prev = c;
}
if (cc->shutdown) {
- ev = create_shutdown_event();
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ memset(&ret, 0, sizeof(ret));
+ ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
if (!grpc_pollset_work(&cc->pollset, deadline)) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
- GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
- return ret;
+ break;
}
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- ret = ev->base;
- gpr_free(ev);
+done:
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
return ret;
@@ -303,7 +239,7 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
cc->shutdown_called = 1;
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- if (gpr_unref(&cc->refs)) {
+ if (gpr_unref(&cc->pending_events)) {
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
@@ -324,8 +260,8 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) {
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
grpc_pollset_kick(&cc->pollset);
- grpc_pollset_work(&cc->pollset,
- gpr_time_add(gpr_now(), gpr_time_from_millis(100)));
+ grpc_pollset_work(&cc->pollset, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(100)));
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index 1b9010f462..f944f48d8e 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -39,6 +39,17 @@
#include "src/core/iomgr/pollset.h"
#include <grpc/grpc.h>
+typedef struct grpc_cq_completion {
+ /** user supplied tag */
+ void *tag;
+ /** done callback - called when this queue element is no longer
+ needed by the completion queue */
+ void (*done)(void *done_arg, struct grpc_cq_completion *c);
+ void *done_arg;
+ /** next pointer; low bit is used to indicate success or not */
+ gpr_uintptr next;
+} grpc_cq_completion;
+
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line);
@@ -57,11 +68,12 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc);
/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made */
-void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call);
+void grpc_cq_begin_op(grpc_completion_queue *cc);
/* Queue a GRPC_OP_COMPLETED operation */
-void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
- int success);
+void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
+ void (*done)(void *done_arg, grpc_cq_completion *storage),
+ void *done_arg, grpc_cq_completion *storage);
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index 3847ded28c..04e27d30ac 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -35,6 +35,7 @@
#include <grpc/census.h>
#include <grpc/grpc.h>
+#include <grpc/support/time.h>
#include "src/core/channel/channel_stack.h"
#include "src/core/client_config/resolver_registry.h"
#include "src/core/client_config/resolvers/dns_resolver.h"
@@ -64,6 +65,7 @@ void grpc_init(void) {
gpr_mu_lock(&g_init_mu);
if (++g_initializations == 1) {
+ gpr_time_init();
grpc_resolver_registry_init("dns:///");
grpc_register_resolver_type("dns", grpc_dns_resolver_factory_create());
#ifdef GPR_POSIX_SOCKET
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index a9d8940631..4dc51bf031 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -72,12 +72,14 @@ typedef struct {
typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
-typedef struct {
+typedef struct requested_call {
requested_call_type type;
+ struct requested_call *next;
void *tag;
grpc_completion_queue *cq_bound_to_call;
grpc_completion_queue *cq_for_notification;
grpc_call **call;
+ grpc_cq_completion completion;
union {
struct {
grpc_call_details *details;
@@ -92,17 +94,11 @@ typedef struct {
} data;
} requested_call;
-typedef struct {
- requested_call *calls;
- size_t count;
- size_t capacity;
-} requested_call_array;
-
struct registered_method {
char *method;
char *host;
call_data *pending;
- requested_call_array requested;
+ requested_call *requests;
registered_method *next;
};
@@ -131,6 +127,7 @@ struct channel_data {
typedef struct shutdown_tag {
void *tag;
grpc_completion_queue *cq;
+ grpc_cq_completion completion;
} shutdown_tag;
struct grpc_server {
@@ -153,7 +150,7 @@ struct grpc_server {
gpr_mu mu_call; /* mutex for call-specific state */
registered_method *registered_methods;
- requested_call_array requested_calls;
+ requested_call *requests;
gpr_uint8 shutdown;
gpr_uint8 shutdown_published;
@@ -166,6 +163,9 @@ struct grpc_server {
listener *listeners;
int listeners_destroyed;
gpr_refcount internal_refcount;
+
+ /** when did we print the last shutdown progress message */
+ gpr_timespec last_shutdown_message_time;
};
typedef enum {
@@ -270,7 +270,8 @@ static void send_shutdown(grpc_channel *channel, int send_goaway,
}
static void channel_broadcaster_shutdown(channel_broadcaster *cb,
- int send_goaway, int force_disconnect) {
+ int send_goaway,
+ int force_disconnect) {
size_t i;
for (i = 0; i < cb->num_channels; i++) {
@@ -329,22 +330,6 @@ static int call_list_remove(call_data *call, call_list list) {
return 1;
}
-static void requested_call_array_destroy(requested_call_array *array) {
- gpr_free(array->calls);
-}
-
-static requested_call *requested_call_array_add(requested_call_array *array) {
- requested_call *rc;
- if (array->count == array->capacity) {
- array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2);
- array->calls =
- gpr_realloc(array->calls, sizeof(requested_call) * array->capacity);
- }
- rc = &array->calls[array->count++];
- memset(rc, 0, sizeof(*rc));
- return rc;
-}
-
static void server_ref(grpc_server *server) {
gpr_ref(&server->internal_refcount);
}
@@ -356,12 +341,10 @@ static void server_delete(grpc_server *server) {
gpr_mu_destroy(&server->mu_global);
gpr_mu_destroy(&server->mu_call);
gpr_free(server->channel_filters);
- requested_call_array_destroy(&server->requested_calls);
while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next;
gpr_free(rm->method);
gpr_free(rm->host);
- requested_call_array_destroy(&rm->requested);
gpr_free(rm);
}
for (i = 0; i < server->cq_count; i++) {
@@ -409,23 +392,24 @@ static void destroy_channel(channel_data *chand) {
static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
call_data **pending_root,
- requested_call_array *array) {
- requested_call rc;
+ requested_call **requests) {
+ requested_call *rc;
call_data *calld = elem->call_data;
gpr_mu_lock(&server->mu_call);
- if (array->count == 0) {
+ rc = *requests;
+ if (rc == NULL) {
gpr_mu_lock(&calld->mu_state);
calld->state = PENDING;
gpr_mu_unlock(&calld->mu_state);
call_list_join(pending_root, calld, PENDING_START);
gpr_mu_unlock(&server->mu_call);
} else {
- rc = array->calls[--array->count];
+ *requests = rc->next;
gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
gpr_mu_unlock(&server->mu_call);
- begin_call(server, calld, &rc);
+ begin_call(server, calld, rc);
}
}
@@ -441,14 +425,14 @@ static void start_new_rpc(grpc_call_element *elem) {
/* TODO(ctiller): unify these two searches */
/* check for an exact match with host */
hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
- for (i = 0; i < chand->registered_method_max_probes; i++) {
+ for (i = 0; i <= chand->registered_method_max_probes; i++) {
rm = &chand->registered_methods[(hash + i) %
chand->registered_method_slots];
if (!rm) break;
if (rm->host != calld->host) continue;
if (rm->method != calld->path) continue;
finish_start_new_rpc(server, elem, &rm->server_registered_method->pending,
- &rm->server_registered_method->requested);
+ &rm->server_registered_method->requests);
return;
}
/* check for a wildcard method definition (no host set) */
@@ -460,12 +444,12 @@ static void start_new_rpc(grpc_call_element *elem) {
if (rm->host != NULL) continue;
if (rm->method != calld->path) continue;
finish_start_new_rpc(server, elem, &rm->server_registered_method->pending,
- &rm->server_registered_method->requested);
+ &rm->server_registered_method->requests);
return;
}
}
finish_start_new_rpc(server, elem, &server->lists[PENDING_START],
- &server->requested_calls);
+ &server->requests);
}
static void kill_zombie(void *elem, int success) {
@@ -481,26 +465,47 @@ static int num_listeners(grpc_server *server) {
return n;
}
+static void done_shutdown_event(void *server, grpc_cq_completion *completion) {
+ server_unref(server);
+}
+
+static int num_channels(grpc_server *server) {
+ channel_data *chand;
+ int n = 0;
+ for (chand = server->root_channel_data.next;
+ chand != &server->root_channel_data; chand = chand->next) {
+ n++;
+ }
+ return n;
+}
+
static void maybe_finish_shutdown(grpc_server *server) {
size_t i;
if (!server->shutdown || server->shutdown_published) {
return;
}
- if (server->root_channel_data.next != &server->root_channel_data) {
- gpr_log(GPR_DEBUG,
- "Waiting for all channels to close before destroying server");
- return;
- }
- if (server->listeners_destroyed < num_listeners(server)) {
- gpr_log(GPR_DEBUG, "Waiting for all listeners to be destroyed (@ %d/%d)",
- server->listeners_destroyed, num_listeners(server));
+ if (server->root_channel_data.next != &server->root_channel_data ||
+ server->listeners_destroyed < num_listeners(server)) {
+ if (gpr_time_cmp(
+ gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), server->last_shutdown_message_time),
+ gpr_time_from_seconds(1)) >= 0) {
+ server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
+ gpr_log(GPR_DEBUG,
+ "Waiting for %d channels and %d/%d listeners to be destroyed"
+ " before shutting down server",
+ num_channels(server),
+ num_listeners(server) - server->listeners_destroyed,
+ num_listeners(server));
+ }
return;
}
server->shutdown_published = 1;
for (i = 0; i < server->num_shutdown_tags; i++) {
- grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
- NULL, 1);
+ server_ref(server);
+ grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, 1,
+ done_shutdown_event, server,
+ &server->shutdown_tags[i].completion);
}
}
@@ -924,15 +929,14 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
- requested_call_array requested_calls;
- size_t i;
+ requested_call *requests = NULL;
registered_method *rm;
shutdown_tag *sdt;
channel_broadcaster broadcaster;
/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu_global);
- grpc_cq_begin_op(cq, NULL);
+ grpc_cq_begin_op(cq);
server->shutdown_tags =
gpr_realloc(server->shutdown_tags,
sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
@@ -944,27 +948,21 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
return;
}
+ server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
+
channel_broadcaster_init(server, &broadcaster);
/* collect all unregistered then registered calls */
gpr_mu_lock(&server->mu_call);
- requested_calls = server->requested_calls;
- memset(&server->requested_calls, 0, sizeof(server->requested_calls));
+ requests = server->requests;
+ server->requests = NULL;
for (rm = server->registered_methods; rm; rm = rm->next) {
- if (requested_calls.count + rm->requested.count >
- requested_calls.capacity) {
- requested_calls.capacity =
- GPR_MAX(requested_calls.count + rm->requested.count,
- 2 * requested_calls.capacity);
- requested_calls.calls =
- gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) *
- requested_calls.capacity);
+ while (rm->requests != NULL) {
+ requested_call *c = rm->requests;
+ rm->requests = c->next;
+ c->next = requests;
+ requests = c;
}
- memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls,
- sizeof(*requested_calls.calls) * rm->requested.count);
- requested_calls.count += rm->requested.count;
- gpr_free(rm->requested.calls);
- memset(&rm->requested, 0, sizeof(rm->requested));
}
gpr_mu_unlock(&server->mu_call);
@@ -973,10 +971,11 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
gpr_mu_unlock(&server->mu_global);
/* terminate all the requested calls */
- for (i = 0; i < requested_calls.count; i++) {
- fail_call(server, &requested_calls.calls[i]);
+ while (requests != NULL) {
+ requested_call *next = requests->next;
+ fail_call(server, requests);
+ requests = next;
}
- gpr_free(requested_calls.calls);
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
@@ -1038,7 +1037,7 @@ void grpc_server_add_listener(grpc_server *server, void *arg,
static grpc_call_error queue_call_request(grpc_server *server,
requested_call *rc) {
call_data *calld = NULL;
- requested_call_array *requested_calls = NULL;
+ requested_call **requests = NULL;
gpr_mu_lock(&server->mu_call);
if (server->shutdown) {
gpr_mu_unlock(&server->mu_call);
@@ -1049,12 +1048,12 @@ static grpc_call_error queue_call_request(grpc_server *server,
case BATCH_CALL:
calld =
call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
- requested_calls = &server->requested_calls;
+ requests = &server->requests;
break;
case REGISTERED_CALL:
calld = call_list_remove_head(
&rc->data.registered.registered_method->pending, PENDING_START);
- requested_calls = &rc->data.registered.registered_method->requested;
+ requests = &rc->data.registered.registered_method->requests;
break;
}
if (calld != NULL) {
@@ -1066,7 +1065,8 @@ static grpc_call_error queue_call_request(grpc_server *server,
begin_call(server, calld, rc);
return GRPC_CALL_OK;
} else {
- *requested_call_array_add(requested_calls) = *rc;
+ rc->next = *requests;
+ *requests = rc;
gpr_mu_unlock(&server->mu_call);
return GRPC_CALL_OK;
}
@@ -1077,22 +1077,23 @@ grpc_call_error grpc_server_request_call(
grpc_metadata_array *initial_metadata,
grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag) {
- requested_call rc;
+ requested_call *rc = gpr_malloc(sizeof(*rc));
GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details,
initial_metadata, cq_bound_to_call,
cq_for_notification, tag);
if (!grpc_cq_is_server_cq(cq_for_notification)) {
+ gpr_free(rc);
return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
}
- grpc_cq_begin_op(cq_for_notification, NULL);
- rc.type = BATCH_CALL;
- rc.tag = tag;
- rc.cq_bound_to_call = cq_bound_to_call;
- rc.cq_for_notification = cq_for_notification;
- rc.call = call;
- rc.data.batch.details = details;
- rc.data.batch.initial_metadata = initial_metadata;
- return queue_call_request(server, &rc);
+ grpc_cq_begin_op(cq_for_notification);
+ rc->type = BATCH_CALL;
+ rc->tag = tag;
+ rc->cq_bound_to_call = cq_bound_to_call;
+ rc->cq_for_notification = cq_for_notification;
+ rc->call = call;
+ rc->data.batch.details = details;
+ rc->data.batch.initial_metadata = initial_metadata;
+ return queue_call_request(server, rc);
}
grpc_call_error grpc_server_request_registered_call(
@@ -1100,22 +1101,23 @@ grpc_call_error grpc_server_request_registered_call(
grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag) {
- requested_call rc;
+ requested_call *rc = gpr_malloc(sizeof(*rc));
registered_method *registered_method = rm;
if (!grpc_cq_is_server_cq(cq_for_notification)) {
+ gpr_free(rc);
return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
}
- grpc_cq_begin_op(cq_for_notification, NULL);
- rc.type = REGISTERED_CALL;
- rc.tag = tag;
- rc.cq_bound_to_call = cq_bound_to_call;
- rc.cq_for_notification = cq_for_notification;
- rc.call = call;
- rc.data.registered.registered_method = registered_method;
- rc.data.registered.deadline = deadline;
- rc.data.registered.initial_metadata = initial_metadata;
- rc.data.registered.optional_payload = optional_payload;
- return queue_call_request(server, &rc);
+ grpc_cq_begin_op(cq_for_notification);
+ rc->type = REGISTERED_CALL;
+ rc->tag = tag;
+ rc->cq_bound_to_call = cq_bound_to_call;
+ rc->cq_for_notification = cq_for_notification;
+ rc->call = call;
+ rc->data.registered.registered_method = registered_method;
+ rc->data.registered.deadline = deadline;
+ rc->data.registered.initial_metadata = initial_metadata;
+ rc->data.registered.optional_payload = optional_payload;
+ return queue_call_request(server, rc);
}
static void publish_registered_or_batch(grpc_call *call, int success,
@@ -1182,8 +1184,11 @@ static void begin_call(grpc_server *server, call_data *calld,
}
GRPC_CALL_INTERNAL_REF(calld->call, "server");
- grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish,
- rc->tag);
+ grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, rc);
+}
+
+static void done_request_event(void *req, grpc_cq_completion *c) {
+ gpr_free(req);
}
static void fail_call(grpc_server *server, requested_call *rc) {
@@ -1196,15 +1201,19 @@ static void fail_call(grpc_server *server, requested_call *rc) {
rc->data.registered.initial_metadata->count = 0;
break;
}
- grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, 0);
+ grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc,
+ &rc->completion);
}
static void publish_registered_or_batch(grpc_call *call, int success,
- void *tag) {
+ void *prc) {
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
+ requested_call *rc = prc;
call_data *calld = elem->call_data;
- grpc_cq_end_op(calld->cq_new, tag, call, success);
+ grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc,
+ &rc->completion);
+ GRPC_CALL_INTERNAL_UNREF(call, "server", 0);
}
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
diff --git a/src/core/surface/version.c b/src/core/surface/version.c
new file mode 100644
index 0000000000..4f5d648371
--- /dev/null
+++ b/src/core/surface/version.c
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/* This file is autogenerated from:
+ templates/src/core/surface/version.c.template */
+
+#include <grpc/grpc.h>
+
+const char *grpc_version_string(void) {
+ return "0.10.0.0";
+}
diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c
index 130167f830..9597395aab 100644
--- a/src/core/transport/chttp2/parsing.c
+++ b/src/core/transport/chttp2/parsing.c
@@ -205,7 +205,8 @@ void grpc_chttp2_publish_reads(
}
if (stream_parsing->saw_rst_stream) {
stream_global->cancelled = 1;
- stream_global->cancelled_status = grpc_chttp2_http2_error_to_grpc_status(stream_parsing->rst_stream_reason);
+ stream_global->cancelled_status = grpc_chttp2_http2_error_to_grpc_status(
+ stream_parsing->rst_stream_reason);
if (stream_parsing->rst_stream_reason == GRPC_CHTTP2_NO_ERROR) {
stream_global->published_cancelled = 1;
}
@@ -599,7 +600,7 @@ static void on_header(void *tp, grpc_mdelem *md) {
}
grpc_chttp2_incoming_metadata_buffer_set_deadline(
&stream_parsing->incoming_metadata,
- gpr_time_add(gpr_now(), *cached_timeout));
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), *cached_timeout));
GRPC_MDELEM_UNREF(md);
} else {
grpc_chttp2_incoming_metadata_buffer_add(&stream_parsing->incoming_metadata,
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index 56ab82006a..d553d80085 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -437,7 +437,8 @@ static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline,
framer_state *st) {
char timeout_str[GRPC_CHTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE];
grpc_mdelem *mdelem;
- grpc_chttp2_encode_timeout(gpr_time_sub(deadline, gpr_now()), timeout_str);
+ grpc_chttp2_encode_timeout(
+ gpr_time_sub(deadline, gpr_now(GPR_CLOCK_REALTIME)), timeout_str);
mdelem = grpc_mdelem_from_metadata_strings(
c->mdctx, GRPC_MDSTR_REF(c->timeout_key_str),
grpc_mdstr_from_string(c->mdctx, timeout_str));
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 322ff39820..ac399e4a1d 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -876,11 +876,19 @@ static void update_global_window(void *args, gpr_uint32 id, void *stream) {
grpc_chttp2_stream *s = stream;
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_stream_global *stream_global = &s->global;
+ int was_zero;
+ int is_zero;
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("settings", transport_global, stream_global,
outgoing_window,
t->parsing.initial_window_update);
+ was_zero = stream_global->outgoing_window <= 0;
stream_global->outgoing_window += t->parsing.initial_window_update;
+ is_zero = stream_global->outgoing_window <= 0;
+
+ if (was_zero && !is_zero) {
+ grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ }
}
static void read_error_locked(grpc_chttp2_transport *t) {
diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c
index 9cbb0952d0..e95b7a21f9 100644
--- a/src/core/transport/metadata.c
+++ b/src/core/transport/metadata.c
@@ -87,6 +87,7 @@ typedef struct internal_metadata {
gpr_atm refcnt;
/* private only data */
+ gpr_mu mu_user_data;
void *user_data;
void (*destroy_user_data)(void *user_data);
@@ -183,7 +184,7 @@ grpc_mdctx *grpc_mdctx_create(void) {
/* This seed is used to prevent remote connections from controlling hash table
* collisions. It needs to be somewhat unpredictable to a remote connection.
*/
- return grpc_mdctx_create_with_seed(gpr_now().tv_nsec);
+ return grpc_mdctx_create_with_seed(gpr_now(GPR_CLOCK_REALTIME).tv_nsec);
}
static void discard_metadata(grpc_mdctx *ctx) {
@@ -200,6 +201,7 @@ static void discard_metadata(grpc_mdctx *ctx) {
if (cur->user_data) {
cur->destroy_user_data(cur->user_data);
}
+ gpr_mu_destroy(&cur->mu_user_data);
gpr_free(cur);
cur = next;
ctx->mdtab_free--;
@@ -467,6 +469,7 @@ grpc_mdelem *grpc_mdelem_from_metadata_strings(grpc_mdctx *ctx,
md->user_data = NULL;
md->destroy_user_data = NULL;
md->bucket_next = ctx->mdtab[hash % ctx->mdtab_capacity];
+ gpr_mu_init(&md->mu_user_data);
#ifdef GRPC_METADATA_REFCOUNT_DEBUG
gpr_log(GPR_DEBUG, "ELM NEW:%p:%d: '%s' = '%s'", md,
gpr_atm_no_barrier_load(&md->refcnt),
@@ -581,18 +584,29 @@ size_t grpc_mdctx_get_mdtab_free_test_only(grpc_mdctx *ctx) {
void *grpc_mdelem_get_user_data(grpc_mdelem *md,
void (*if_destroy_func)(void *)) {
internal_metadata *im = (internal_metadata *)md;
- return im->destroy_user_data == if_destroy_func ? im->user_data : NULL;
+ void *result;
+ gpr_mu_lock(&im->mu_user_data);
+ result = im->destroy_user_data == if_destroy_func ? im->user_data : NULL;
+ gpr_mu_unlock(&im->mu_user_data);
+ return result;
}
void grpc_mdelem_set_user_data(grpc_mdelem *md, void (*destroy_func)(void *),
void *user_data) {
internal_metadata *im = (internal_metadata *)md;
GPR_ASSERT((user_data == NULL) == (destroy_func == NULL));
+ gpr_mu_lock(&im->mu_user_data);
if (im->destroy_user_data) {
- im->destroy_user_data(im->user_data);
+ /* user data can only be set once */
+ gpr_mu_unlock(&im->mu_user_data);
+ if (destroy_func != NULL) {
+ destroy_func(user_data);
+ }
+ return;
}
im->destroy_user_data = destroy_func;
im->user_data = user_data;
+ gpr_mu_unlock(&im->mu_user_data);
}
gpr_slice grpc_mdstr_as_base64_encoded_and_huffman_compressed(grpc_mdstr *gs) {