aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2015-07-17 16:02:15 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2015-07-17 16:02:15 -0700
commit6016e260ca5d36408a3ae23db3c44481ee4c8427 (patch)
tree7dc0825c12b7c5eec567a833327b23706ce0363f /src/core
parent211e65be3cb1f4b6f7184c1945d8284905f79968 (diff)
parente9881bbaf3d53aa80099c42c80fb3331ff38270a (diff)
Merge branch 'master' of github.com:grpc/grpc into str_join_with_sep
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/client_channel.c4
-rw-r--r--src/core/client_config/lb_policies/pick_first.h2
-rw-r--r--src/core/client_config/subchannel.c2
-rw-r--r--src/core/client_config/uri_parser.c2
-rw-r--r--src/core/httpcli/httpcli.c1
-rw-r--r--src/core/iomgr/alarm.c4
-rw-r--r--src/core/iomgr/iocp_windows.c2
-rw-r--r--src/core/iomgr/iomgr.c21
-rw-r--r--src/core/iomgr/pollset_posix.c12
-rw-r--r--src/core/iomgr/pollset_set.h2
-rw-r--r--src/core/iomgr/tcp_server_windows.c2
-rw-r--r--src/core/security/credentials.c15
-rw-r--r--src/core/security/google_default_credentials.c4
-rw-r--r--src/core/security/json_token.c2
-rw-r--r--src/core/security/jwt_verifier.c27
-rw-r--r--src/core/security/secure_transport_setup.c29
-rw-r--r--src/core/security/secure_transport_setup.h2
-rw-r--r--src/core/security/server_secure_chttp2.c47
-rw-r--r--src/core/support/cancellable.c5
-rw-r--r--src/core/support/sync_posix.c2
-rw-r--r--src/core/support/sync_win32.c2
-rw-r--r--src/core/support/time.c92
-rw-r--r--src/core/support/time_posix.c11
-rw-r--r--src/core/support/time_win32.c1
-rw-r--r--src/core/surface/byte_buffer_queue.c8
-rw-r--r--src/core/surface/byte_buffer_queue.h2
-rw-r--r--src/core/surface/call.c19
-rw-r--r--src/core/surface/completion_queue.c11
-rw-r--r--src/core/surface/lame_client.c2
-rw-r--r--src/core/surface/secure_channel_create.c1
-rw-r--r--src/core/surface/server.c13
-rw-r--r--src/core/transport/chttp2/frame_window_update.c4
-rw-r--r--src/core/transport/chttp2/incoming_metadata.c4
-rw-r--r--src/core/transport/chttp2/internal.h20
-rw-r--r--src/core/transport/chttp2/parsing.c9
-rw-r--r--src/core/transport/chttp2/stream_encoder.c3
-rw-r--r--src/core/transport/chttp2/stream_lists.c7
-rw-r--r--src/core/transport/chttp2/timeout_encoding.c14
-rw-r--r--src/core/transport/chttp2/writing.c49
-rw-r--r--src/core/transport/chttp2_transport.c30
-rw-r--r--src/core/transport/stream_op.c2
-rw-r--r--src/core/transport/transport.h4
-rw-r--r--src/core/transport/transport_op_string.c5
43 files changed, 347 insertions, 153 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index f890f99237..10e01ebbb4 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -132,7 +132,7 @@ static void handle_op_after_cancellation(grpc_call_element *elem,
mdb.list.head = &calld->status;
mdb.list.tail = &calld->details;
mdb.garbage.head = mdb.garbage.tail = NULL;
- mdb.deadline = gpr_inf_future;
+ mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
grpc_sopb_add_metadata(op->recv_ops, mdb);
*op->recv_state = GRPC_STREAM_CLOSED;
op->on_done_recv->cb(op->on_done_recv->cb_arg, 1);
@@ -518,7 +518,7 @@ static void init_call_elem(grpc_call_element *elem,
gpr_mu_init(&calld->mu_state);
calld->elem = elem;
calld->state = CALL_CREATED;
- calld->deadline = gpr_inf_future;
+ calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
}
/* Destructor for call_data */
diff --git a/src/core/client_config/lb_policies/pick_first.h b/src/core/client_config/lb_policies/pick_first.h
index 94c2a9f0c7..31394985e5 100644
--- a/src/core/client_config/lb_policies/pick_first.h
+++ b/src/core/client_config/lb_policies/pick_first.h
@@ -36,6 +36,8 @@
#include "src/core/client_config/lb_policy.h"
+/** Returns a load balancing policy instance that picks up the first subchannel
+ * from \a subchannels to succesfully connect */
grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels,
size_t num_subchannels);
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 8cdad1015f..35f172683a 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -302,7 +302,7 @@ static void continue_connect(grpc_subchannel *c) {
static void start_connect(grpc_subchannel *c) {
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
c->next_attempt = now;
- c->backoff_delta = gpr_time_from_seconds(1);
+ c->backoff_delta = gpr_time_from_seconds(1, GPR_TIMESPAN);
continue_connect(c);
}
diff --git a/src/core/client_config/uri_parser.c b/src/core/client_config/uri_parser.c
index 776a255923..410a61c8cf 100644
--- a/src/core/client_config/uri_parser.c
+++ b/src/core/client_config/uri_parser.c
@@ -98,7 +98,7 @@ grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) {
if (uri_text[scheme_end + 1] == '/' && uri_text[scheme_end + 2] == '/') {
authority_begin = scheme_end + 3;
- for (i = authority_begin; uri_text[i] != 0; i++) {
+ for (i = authority_begin; uri_text[i] != 0 && authority_end == -1; i++) {
if (uri_text[i] == '/') {
authority_end = i;
}
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index 3f5557e08e..65997d5f44 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -165,6 +165,7 @@ static void start_write(internal_request *req) {
static void on_secure_transport_setup_done(void *rp,
grpc_security_status status,
+ grpc_endpoint *wrapped_endpoint,
grpc_endpoint *secure_endpoint) {
internal_request *req = rp;
if (status != GRPC_SECURITY_OK) {
diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c
index 5860834de3..5b9a37e5dd 100644
--- a/src/core/iomgr/alarm.c
+++ b/src/core/iomgr/alarm.c
@@ -102,7 +102,8 @@ void grpc_alarm_list_init(gpr_timespec now) {
void grpc_alarm_list_shutdown(void) {
int i;
- while (run_some_expired_alarms(NULL, gpr_inf_future, NULL, 0))
+ while (run_some_expired_alarms(NULL, gpr_inf_future(GPR_CLOCK_REALTIME), NULL,
+ 0))
;
for (i = 0; i < NUM_SHARDS; i++) {
shard_type *shard = &g_shards[i];
@@ -127,6 +128,7 @@ static gpr_timespec dbl_to_ts(double d) {
gpr_timespec ts;
ts.tv_sec = d;
ts.tv_nsec = 1e9 * (d - ts.tv_sec);
+ ts.clock_type = GPR_TIMESPAN;
return ts;
}
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c
index 3d3a193d00..8741241fb8 100644
--- a/src/core/iomgr/iocp_windows.c
+++ b/src/core/iomgr/iocp_windows.c
@@ -157,7 +157,7 @@ void grpc_iocp_shutdown(void) {
BOOL success;
gpr_event_set(&g_shutdown_iocp, (void *)1);
grpc_iocp_kick();
- gpr_event_wait(&g_iocp_done, gpr_inf_future);
+ gpr_event_wait(&g_iocp_done, gpr_inf_future(GPR_CLOCK_REALTIME));
success = CloseHandle(g_iocp);
GPR_ASSERT(success);
}
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index cca92d3b32..0244f689b1 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -57,9 +57,9 @@ static grpc_iomgr_object g_root_object;
static void background_callback_executor(void *ignored) {
gpr_mu_lock(&g_mu);
while (!g_shutdown) {
- gpr_timespec deadline = gpr_inf_future;
- gpr_timespec short_deadline =
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100));
+ gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ gpr_timespec short_deadline = gpr_time_add(
+ gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN));
if (g_cbs_head) {
grpc_iomgr_closure *closure = g_cbs_head;
g_cbs_head = closure->next;
@@ -110,8 +110,8 @@ static size_t count_objects(void) {
void grpc_iomgr_shutdown(void) {
grpc_iomgr_object *obj;
grpc_iomgr_closure *closure;
- gpr_timespec shutdown_deadline =
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10));
+ gpr_timespec shutdown_deadline = gpr_time_add(
+ gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10, GPR_TIMESPAN));
gpr_timespec last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
gpr_mu_lock(&g_mu);
@@ -119,7 +119,7 @@ void grpc_iomgr_shutdown(void) {
while (g_cbs_head != NULL || g_root_object.next != &g_root_object) {
if (gpr_time_cmp(
gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), last_warning_time),
- gpr_time_from_seconds(1)) >= 0) {
+ gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
if (g_cbs_head != NULL && g_root_object.next != &g_root_object) {
gpr_log(GPR_DEBUG,
"Waiting for %d iomgr objects to be destroyed and executing "
@@ -145,14 +145,14 @@ void grpc_iomgr_shutdown(void) {
} while (g_cbs_head);
continue;
}
- if (grpc_alarm_check(&g_mu, gpr_inf_future, NULL)) {
+ if (grpc_alarm_check(&g_mu, gpr_inf_future(GPR_CLOCK_REALTIME), NULL)) {
gpr_log(GPR_DEBUG, "got late alarm");
continue;
}
if (g_root_object.next != &g_root_object) {
int timeout = 0;
- gpr_timespec short_deadline =
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100));
+ gpr_timespec short_deadline = gpr_time_add(
+ gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN));
while (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) {
if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) {
timeout = 1;
@@ -174,7 +174,8 @@ void grpc_iomgr_shutdown(void) {
gpr_mu_unlock(&g_mu);
grpc_kick_poller();
- gpr_event_wait(&g_background_callback_executor_done, gpr_inf_future);
+ gpr_event_wait(&g_background_callback_executor_done,
+ gpr_inf_future(GPR_CLOCK_REALTIME));
grpc_alarm_list_shutdown();
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 85101764d2..7afff58eee 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -191,17 +191,17 @@ int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
gpr_timespec now) {
gpr_timespec timeout;
static const int max_spin_polling_us = 10;
- if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
+ if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) == 0) {
return -1;
}
- if (gpr_time_cmp(
- deadline,
- gpr_time_add(now, gpr_time_from_micros(max_spin_polling_us))) <= 0) {
+ if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
+ max_spin_polling_us,
+ GPR_TIMESPAN))) <= 0) {
return 0;
}
timeout = gpr_time_sub(deadline, now);
- return gpr_time_to_millis(
- gpr_time_add(timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1)));
+ return gpr_time_to_millis(gpr_time_add(
+ timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1, GPR_TIMESPAN)));
}
/*
diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h
index 98e3b552a7..6d73951c70 100644
--- a/src/core/iomgr/pollset_set.h
+++ b/src/core/iomgr/pollset_set.h
@@ -38,7 +38,7 @@
/* A grpc_pollset_set is a set of pollsets that are interested in an
action. Adding a pollset to a pollset_set automatically adds any
- fd's (etc) that have been registered with the set_set with that pollset.
+ fd's (etc) that have been registered with the set_set to that pollset.
Registering fd's automatically adds them to all current pollsets. */
#ifdef GPR_POSIX_SOCKET
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index e6e1d1499e..187009b2c8 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -116,7 +116,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
}
/* This happens asynchronously. Wait while that happens. */
while (s->active_ports) {
- gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future);
+ gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(&s->mu);
diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c
index 230f0dfb85..fb59fa4b0e 100644
--- a/src/core/security/credentials.c
+++ b/src/core/security/credentials.c
@@ -324,7 +324,7 @@ static void jwt_reset_cache(grpc_jwt_credentials *c) {
gpr_free(c->cached.service_url);
c->cached.service_url = NULL;
}
- c->cached.jwt_expiration = gpr_inf_past;
+ c->cached.jwt_expiration = gpr_inf_past(GPR_CLOCK_REALTIME);
}
static void jwt_destroy(grpc_credentials *creds) {
@@ -347,8 +347,8 @@ static void jwt_get_request_metadata(grpc_credentials *creds,
grpc_credentials_metadata_cb cb,
void *user_data) {
grpc_jwt_credentials *c = (grpc_jwt_credentials *)creds;
- gpr_timespec refresh_threshold = {GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS,
- 0};
+ gpr_timespec refresh_threshold = gpr_time_from_seconds(
+ GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS, GPR_TIMESPAN);
/* See if we can return a cached jwt. */
grpc_credentials_md_store *jwt_md = NULL;
@@ -516,6 +516,7 @@ grpc_oauth2_token_fetcher_credentials_parse_server_response(
access_token->value);
token_lifetime->tv_sec = strtol(expires_in->value, NULL, 10);
token_lifetime->tv_nsec = 0;
+ token_lifetime->clock_type = GPR_TIMESPAN;
if (*token_md != NULL) grpc_credentials_md_store_unref(*token_md);
*token_md = grpc_credentials_md_store_create(1);
grpc_credentials_md_store_add_cstrings(
@@ -552,7 +553,7 @@ static void on_oauth2_token_fetcher_http_response(
r->cb(r->user_data, c->access_token_md->entries,
c->access_token_md->num_entries, status);
} else {
- c->token_expiration = gpr_inf_past;
+ c->token_expiration = gpr_inf_past(GPR_CLOCK_REALTIME);
r->cb(r->user_data, NULL, 0, status);
}
gpr_mu_unlock(&c->mu);
@@ -564,8 +565,8 @@ static void oauth2_token_fetcher_get_request_metadata(
grpc_credentials_metadata_cb cb, void *user_data) {
grpc_oauth2_token_fetcher_credentials *c =
(grpc_oauth2_token_fetcher_credentials *)creds;
- gpr_timespec refresh_threshold = {GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS,
- 0};
+ gpr_timespec refresh_threshold = gpr_time_from_seconds(
+ GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS, GPR_TIMESPAN);
grpc_credentials_md_store *cached_access_token_md = NULL;
{
gpr_mu_lock(&c->mu);
@@ -596,7 +597,7 @@ static void init_oauth2_token_fetcher(grpc_oauth2_token_fetcher_credentials *c,
c->base.type = GRPC_CREDENTIALS_TYPE_OAUTH2;
gpr_ref_init(&c->base.refcount, 1);
gpr_mu_init(&c->mu);
- c->token_expiration = gpr_inf_past;
+ c->token_expiration = gpr_inf_past(GPR_CLOCK_REALTIME);
c->fetch_func = fetch_func;
grpc_httpcli_context_init(&c->httpcli_context);
}
diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c
index f622deff42..833484310f 100644
--- a/src/core/security/google_default_credentials.c
+++ b/src/core/security/google_default_credentials.c
@@ -91,7 +91,7 @@ static int is_stack_running_on_compute_engine(void) {
/* The http call is local. If it takes more than one sec, it is for sure not
on compute engine. */
- gpr_timespec max_detection_delay = {1, 0};
+ gpr_timespec max_detection_delay = gpr_time_from_seconds(1, GPR_TIMESPAN);
grpc_pollset_init(&detector.pollset);
detector.is_done = 0;
@@ -112,7 +112,7 @@ static int is_stack_running_on_compute_engine(void) {
called once for the lifetime of the process by the default credentials. */
gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset));
while (!detector.is_done) {
- grpc_pollset_work(&detector.pollset, gpr_inf_future);
+ grpc_pollset_work(&detector.pollset, gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset));
diff --git a/src/core/security/json_token.c b/src/core/security/json_token.c
index 9b1ea255ae..021912f333 100644
--- a/src/core/security/json_token.c
+++ b/src/core/security/json_token.c
@@ -49,7 +49,7 @@
/* --- Constants. --- */
/* 1 hour max. */
-const gpr_timespec grpc_max_auth_token_lifetime = {3600, 0};
+const gpr_timespec grpc_max_auth_token_lifetime = {3600, 0, GPR_TIMESPAN};
#define GRPC_JWT_RSA_SHA256_ALGORITHM "RS256"
#define GRPC_JWT_TYPE "JWT"
diff --git a/src/core/security/jwt_verifier.c b/src/core/security/jwt_verifier.c
index 9140eb2ef7..1276693da7 100644
--- a/src/core/security/jwt_verifier.c
+++ b/src/core/security/jwt_verifier.c
@@ -109,7 +109,7 @@ static const char *validate_string_field(const grpc_json *json,
static gpr_timespec validate_time_field(const grpc_json *json,
const char *key) {
- gpr_timespec result = gpr_time_0;
+ gpr_timespec result = gpr_time_0(GPR_CLOCK_REALTIME);
if (json->type != GRPC_JSON_NUMBER) {
gpr_log(GPR_ERROR, "Invalid %s field [%s]", key, json->value);
return result;
@@ -221,17 +221,17 @@ const char *grpc_jwt_claims_audience(const grpc_jwt_claims *claims) {
}
gpr_timespec grpc_jwt_claims_issued_at(const grpc_jwt_claims *claims) {
- if (claims == NULL) return gpr_inf_past;
+ if (claims == NULL) return gpr_inf_past(GPR_CLOCK_REALTIME);
return claims->iat;
}
gpr_timespec grpc_jwt_claims_expires_at(const grpc_jwt_claims *claims) {
- if (claims == NULL) return gpr_inf_future;
+ if (claims == NULL) return gpr_inf_future(GPR_CLOCK_REALTIME);
return claims->exp;
}
gpr_timespec grpc_jwt_claims_not_before(const grpc_jwt_claims *claims) {
- if (claims == NULL) return gpr_inf_past;
+ if (claims == NULL) return gpr_inf_past(GPR_CLOCK_REALTIME);
return claims->nbf;
}
@@ -242,9 +242,9 @@ grpc_jwt_claims *grpc_jwt_claims_from_json(grpc_json *json, gpr_slice buffer) {
memset(claims, 0, sizeof(grpc_jwt_claims));
claims->json = json;
claims->buffer = buffer;
- claims->iat = gpr_inf_past;
- claims->nbf = gpr_inf_past;
- claims->exp = gpr_inf_future;
+ claims->iat = gpr_inf_past(GPR_CLOCK_REALTIME);
+ claims->nbf = gpr_inf_past(GPR_CLOCK_REALTIME);
+ claims->exp = gpr_inf_future(GPR_CLOCK_REALTIME);
/* Per the spec, all fields are optional. */
for (cur = json->child; cur != NULL; cur = cur->next) {
@@ -262,13 +262,16 @@ grpc_jwt_claims *grpc_jwt_claims_from_json(grpc_json *json, gpr_slice buffer) {
if (claims->jti == NULL) goto error;
} else if (strcmp(cur->key, "iat") == 0) {
claims->iat = validate_time_field(cur, "iat");
- if (gpr_time_cmp(claims->iat, gpr_time_0) == 0) goto error;
+ if (gpr_time_cmp(claims->iat, gpr_time_0(GPR_CLOCK_REALTIME)) == 0)
+ goto error;
} else if (strcmp(cur->key, "exp") == 0) {
claims->exp = validate_time_field(cur, "exp");
- if (gpr_time_cmp(claims->exp, gpr_time_0) == 0) goto error;
+ if (gpr_time_cmp(claims->exp, gpr_time_0(GPR_CLOCK_REALTIME)) == 0)
+ goto error;
} else if (strcmp(cur->key, "nbf") == 0) {
claims->nbf = validate_time_field(cur, "nbf");
- if (gpr_time_cmp(claims->nbf, gpr_time_0) == 0) goto error;
+ if (gpr_time_cmp(claims->nbf, gpr_time_0(GPR_CLOCK_REALTIME)) == 0)
+ goto error;
}
}
return claims;
@@ -359,10 +362,10 @@ void verifier_cb_ctx_destroy(verifier_cb_ctx *ctx) {
/* --- grpc_jwt_verifier object. --- */
/* Clock skew defaults to one minute. */
-gpr_timespec grpc_jwt_verifier_clock_skew = {60, 0};
+gpr_timespec grpc_jwt_verifier_clock_skew = {60, 0, GPR_TIMESPAN};
/* Max delay defaults to one minute. */
-gpr_timespec grpc_jwt_verifier_max_delay = {60, 0};
+gpr_timespec grpc_jwt_verifier_max_delay = {60, 0, GPR_TIMESPAN};
typedef struct {
char *email_domain;
diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c
index 731b382f09..0c3572b53c 100644
--- a/src/core/security/secure_transport_setup.c
+++ b/src/core/security/secure_transport_setup.c
@@ -47,7 +47,8 @@ typedef struct {
tsi_handshaker *handshaker;
unsigned char *handshake_buffer;
size_t handshake_buffer_size;
- grpc_endpoint *endpoint;
+ grpc_endpoint *wrapped_endpoint;
+ grpc_endpoint *secure_endpoint;
gpr_slice_buffer left_overs;
grpc_secure_transport_setup_done_cb cb;
void *user_data;
@@ -63,13 +64,16 @@ static void on_handshake_data_sent_to_peer(void *setup,
static void secure_transport_setup_done(grpc_secure_transport_setup *s,
int is_success) {
if (is_success) {
- s->cb(s->user_data, GRPC_SECURITY_OK, s->endpoint);
+ s->cb(s->user_data, GRPC_SECURITY_OK, s->wrapped_endpoint,
+ s->secure_endpoint);
} else {
- if (s->endpoint != NULL) {
- grpc_endpoint_shutdown(s->endpoint);
- grpc_endpoint_destroy(s->endpoint);
+ if (s->secure_endpoint != NULL) {
+ grpc_endpoint_shutdown(s->secure_endpoint);
+ grpc_endpoint_destroy(s->secure_endpoint);
+ } else {
+ grpc_endpoint_destroy(s->wrapped_endpoint);
}
- s->cb(s->user_data, GRPC_SECURITY_ERROR, NULL);
+ s->cb(s->user_data, GRPC_SECURITY_ERROR, s->wrapped_endpoint, NULL);
}
if (s->handshaker != NULL) tsi_handshaker_destroy(s->handshaker);
if (s->handshake_buffer != NULL) gpr_free(s->handshake_buffer);
@@ -95,8 +99,9 @@ static void on_peer_checked(void *user_data, grpc_security_status status) {
secure_transport_setup_done(s, 0);
return;
}
- s->endpoint = grpc_secure_endpoint_create(
- protector, s->endpoint, s->left_overs.slices, s->left_overs.count);
+ s->secure_endpoint =
+ grpc_secure_endpoint_create(protector, s->wrapped_endpoint,
+ s->left_overs.slices, s->left_overs.count);
secure_transport_setup_done(s, 1);
return;
}
@@ -152,7 +157,7 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) {
gpr_slice_from_copied_buffer((const char *)s->handshake_buffer, offset);
/* TODO(klempner,jboeuf): This should probably use the client setup
deadline */
- write_status = grpc_endpoint_write(s->endpoint, &to_send, 1,
+ write_status = grpc_endpoint_write(s->wrapped_endpoint, &to_send, 1,
on_handshake_data_sent_to_peer, s);
if (write_status == GRPC_ENDPOINT_WRITE_ERROR) {
gpr_log(GPR_ERROR, "Could not send handshake data to peer.");
@@ -198,7 +203,7 @@ static void on_handshake_data_received_from_peer(
if (result == TSI_INCOMPLETE_DATA) {
/* TODO(klempner,jboeuf): This should probably use the client setup
deadline */
- grpc_endpoint_notify_on_read(s->endpoint,
+ grpc_endpoint_notify_on_read(s->wrapped_endpoint,
on_handshake_data_received_from_peer, setup);
cleanup_slices(slices, nslices);
return;
@@ -256,7 +261,7 @@ static void on_handshake_data_sent_to_peer(void *setup,
if (tsi_handshaker_is_in_progress(s->handshaker)) {
/* TODO(klempner,jboeuf): This should probably use the client setup
deadline */
- grpc_endpoint_notify_on_read(s->endpoint,
+ grpc_endpoint_notify_on_read(s->wrapped_endpoint,
on_handshake_data_received_from_peer, setup);
} else {
check_peer(s);
@@ -280,7 +285,7 @@ void grpc_setup_secure_transport(grpc_security_connector *connector,
GRPC_SECURITY_CONNECTOR_REF(connector, "secure_transport_setup");
s->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE;
s->handshake_buffer = gpr_malloc(s->handshake_buffer_size);
- s->endpoint = nonsecure_endpoint;
+ s->wrapped_endpoint = nonsecure_endpoint;
s->user_data = user_data;
s->cb = cb;
gpr_slice_buffer_init(&s->left_overs);
diff --git a/src/core/security/secure_transport_setup.h b/src/core/security/secure_transport_setup.h
index 58701c461d..29025f5236 100644
--- a/src/core/security/secure_transport_setup.h
+++ b/src/core/security/secure_transport_setup.h
@@ -42,7 +42,7 @@
/* Ownership of the secure_endpoint is transfered. */
typedef void (*grpc_secure_transport_setup_done_cb)(
void *user_data, grpc_security_status status,
- grpc_endpoint *secure_endpoint);
+ grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint);
/* Calls the callback upon completion. */
void grpc_setup_secure_transport(grpc_security_connector *connector,
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index 8a7ada07af..3717b8989f 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -51,10 +51,16 @@
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
+typedef struct tcp_endpoint_list {
+ grpc_endpoint *tcp_endpoint;
+ struct tcp_endpoint_list *next;
+} tcp_endpoint_list;
+
typedef struct grpc_server_secure_state {
grpc_server *server;
grpc_tcp_server *tcp;
grpc_security_connector *sc;
+ tcp_endpoint_list *handshaking_tcp_endpoints;
int is_shutdown;
gpr_mu mu;
gpr_refcount refcount;
@@ -88,14 +94,37 @@ static void setup_transport(void *statep, grpc_transport *transport,
grpc_channel_args_destroy(args_copy);
}
+static int remove_tcp_from_list_locked(grpc_server_secure_state *state,
+ grpc_endpoint *tcp) {
+ tcp_endpoint_list *node = state->handshaking_tcp_endpoints;
+ tcp_endpoint_list *tmp = NULL;
+ if (node && node->tcp_endpoint == tcp) {
+ state->handshaking_tcp_endpoints = state->handshaking_tcp_endpoints->next;
+ gpr_free(node);
+ return 0;
+ }
+ while (node) {
+ if (node->next->tcp_endpoint == tcp) {
+ tmp = node->next;
+ node->next = node->next->next;
+ gpr_free(tmp);
+ return 0;
+ }
+ node = node->next;
+ }
+ return -1;
+}
+
static void on_secure_transport_setup_done(void *statep,
grpc_security_status status,
+ grpc_endpoint *wrapped_endpoint,
grpc_endpoint *secure_endpoint) {
grpc_server_secure_state *state = statep;
grpc_transport *transport;
grpc_mdctx *mdctx;
if (status == GRPC_SECURITY_OK) {
gpr_mu_lock(&state->mu);
+ remove_tcp_from_list_locked(state, wrapped_endpoint);
if (!state->is_shutdown) {
mdctx = grpc_mdctx_create();
transport = grpc_create_chttp2_transport(
@@ -110,6 +139,9 @@ static void on_secure_transport_setup_done(void *statep,
}
gpr_mu_unlock(&state->mu);
} else {
+ gpr_mu_lock(&state->mu);
+ remove_tcp_from_list_locked(state, wrapped_endpoint);
+ gpr_mu_unlock(&state->mu);
gpr_log(GPR_ERROR, "Secure transport failed with error %d", status);
}
state_unref(state);
@@ -117,7 +149,14 @@ static void on_secure_transport_setup_done(void *statep,
static void on_accept(void *statep, grpc_endpoint *tcp) {
grpc_server_secure_state *state = statep;
+ tcp_endpoint_list *node;
state_ref(state);
+ node = gpr_malloc(sizeof(tcp_endpoint_list));
+ node->tcp_endpoint = tcp;
+ gpr_mu_lock(&state->mu);
+ node->next = state->handshaking_tcp_endpoints;
+ state->handshaking_tcp_endpoints = node;
+ gpr_mu_unlock(&state->mu);
grpc_setup_secure_transport(state->sc, tcp, on_secure_transport_setup_done,
state);
}
@@ -132,6 +171,13 @@ static void start(grpc_server *server, void *statep, grpc_pollset **pollsets,
static void destroy_done(void *statep) {
grpc_server_secure_state *state = statep;
grpc_server_listener_destroy_done(state->server);
+ gpr_mu_lock(&state->mu);
+ while (state->handshaking_tcp_endpoints != NULL) {
+ grpc_endpoint_shutdown(state->handshaking_tcp_endpoints->tcp_endpoint);
+ remove_tcp_from_list_locked(state,
+ state->handshaking_tcp_endpoints->tcp_endpoint);
+ }
+ gpr_mu_unlock(&state->mu);
state_unref(state);
}
@@ -209,6 +255,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
state->server = server;
state->tcp = tcp;
state->sc = sc;
+ state->handshaking_tcp_endpoints = NULL;
state->is_shutdown = 0;
gpr_mu_init(&state->mu);
gpr_ref_init(&state->refcount, 1);
diff --git a/src/core/support/cancellable.c b/src/core/support/cancellable.c
index 3cbb318ab6..4756f1e125 100644
--- a/src/core/support/cancellable.c
+++ b/src/core/support/cancellable.c
@@ -121,8 +121,9 @@ void gpr_cancellable_cancel(gpr_cancellable *c) {
} else {
gpr_event ev;
gpr_event_init(&ev);
- gpr_event_wait(&ev, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_micros(1000)));
+ gpr_event_wait(
+ &ev, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_micros(1000, GPR_TIMESPAN)));
}
}
} while (failures != 0);
diff --git a/src/core/support/sync_posix.c b/src/core/support/sync_posix.c
index 0ccbd4923f..41af8ceb0a 100644
--- a/src/core/support/sync_posix.c
+++ b/src/core/support/sync_posix.c
@@ -63,7 +63,7 @@ void gpr_cv_destroy(gpr_cv *cv) { GPR_ASSERT(pthread_cond_destroy(cv) == 0); }
int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline) {
int err = 0;
- if (gpr_time_cmp(abs_deadline, gpr_inf_future) == 0) {
+ if (gpr_time_cmp(abs_deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) == 0) {
err = pthread_cond_wait(cv, mu);
} else {
struct timespec abs_deadline_ts;
diff --git a/src/core/support/sync_win32.c b/src/core/support/sync_win32.c
index 29b77fc4c2..63196d10d3 100644
--- a/src/core/support/sync_win32.c
+++ b/src/core/support/sync_win32.c
@@ -83,7 +83,7 @@ int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline) {
int timeout = 0;
DWORD timeout_max_ms;
mu->locked = 0;
- if (gpr_time_cmp(abs_deadline, gpr_inf_future) == 0) {
+ if (gpr_time_cmp(abs_deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) == 0) {
SleepConditionVariableCS(cv, &mu->cs, INFINITE);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
diff --git a/src/core/support/time.c b/src/core/support/time.c
index d47b08b266..570f195bd1 100644
--- a/src/core/support/time.c
+++ b/src/core/support/time.c
@@ -41,6 +41,7 @@
int gpr_time_cmp(gpr_timespec a, gpr_timespec b) {
int cmp = (a.tv_sec > b.tv_sec) - (a.tv_sec < b.tv_sec);
+ GPR_ASSERT(a.clock_type == b.clock_type);
if (cmp == 0) {
cmp = (a.tv_nsec > b.tv_nsec) - (a.tv_nsec < b.tv_nsec);
}
@@ -71,19 +72,40 @@ gpr_timespec gpr_time_max(gpr_timespec a, gpr_timespec b) {
((t)(TYPE_IS_SIGNED(t) ? (TOP_BIT_OF_TYPE(t) - 1) \
: ((TOP_BIT_OF_TYPE(t) - 1) << 1) + 1))
-const gpr_timespec gpr_time_0 = {0, 0};
-const gpr_timespec gpr_inf_future = {TYPE_MAX(time_t), 0};
-const gpr_timespec gpr_inf_past = {TYPE_MIN(time_t), 0};
+gpr_timespec gpr_time_0(gpr_clock_type type) {
+ gpr_timespec out;
+ out.tv_sec = 0;
+ out.tv_nsec = 0;
+ out.clock_type = type;
+ return out;
+}
+
+gpr_timespec gpr_inf_future(gpr_clock_type type) {
+ gpr_timespec out;
+ out.tv_sec = TYPE_MAX(time_t);
+ out.tv_nsec = 0;
+ out.clock_type = type;
+ return out;
+}
+
+gpr_timespec gpr_inf_past(gpr_clock_type type) {
+ gpr_timespec out;
+ out.tv_sec = TYPE_MIN(time_t);
+ out.tv_nsec = 0;
+ out.clock_type = type;
+ return out;
+}
/* TODO(ctiller): consider merging _nanos, _micros, _millis into a single
function for maintainability. Similarly for _seconds, _minutes, and _hours */
-gpr_timespec gpr_time_from_nanos(long ns) {
+gpr_timespec gpr_time_from_nanos(long ns, gpr_clock_type type) {
gpr_timespec result;
+ result.clock_type = type;
if (ns == LONG_MAX) {
- result = gpr_inf_future;
+ result = gpr_inf_future(type);
} else if (ns == LONG_MIN) {
- result = gpr_inf_past;
+ result = gpr_inf_past(type);
} else if (ns >= 0) {
result.tv_sec = ns / GPR_NS_PER_SEC;
result.tv_nsec = (int)(ns - result.tv_sec * GPR_NS_PER_SEC);
@@ -95,12 +117,13 @@ gpr_timespec gpr_time_from_nanos(long ns) {
return result;
}
-gpr_timespec gpr_time_from_micros(long us) {
+gpr_timespec gpr_time_from_micros(long us, gpr_clock_type type) {
gpr_timespec result;
+ result.clock_type = type;
if (us == LONG_MAX) {
- result = gpr_inf_future;
+ result = gpr_inf_future(type);
} else if (us == LONG_MIN) {
- result = gpr_inf_past;
+ result = gpr_inf_past(type);
} else if (us >= 0) {
result.tv_sec = us / 1000000;
result.tv_nsec = (int)((us - result.tv_sec * 1000000) * 1000);
@@ -112,12 +135,13 @@ gpr_timespec gpr_time_from_micros(long us) {
return result;
}
-gpr_timespec gpr_time_from_millis(long ms) {
+gpr_timespec gpr_time_from_millis(long ms, gpr_clock_type type) {
gpr_timespec result;
+ result.clock_type = type;
if (ms == LONG_MAX) {
- result = gpr_inf_future;
+ result = gpr_inf_future(type);
} else if (ms == LONG_MIN) {
- result = gpr_inf_past;
+ result = gpr_inf_past(type);
} else if (ms >= 0) {
result.tv_sec = ms / 1000;
result.tv_nsec = (int)((ms - result.tv_sec * 1000) * 1000000);
@@ -129,12 +153,13 @@ gpr_timespec gpr_time_from_millis(long ms) {
return result;
}
-gpr_timespec gpr_time_from_seconds(long s) {
+gpr_timespec gpr_time_from_seconds(long s, gpr_clock_type type) {
gpr_timespec result;
+ result.clock_type = type;
if (s == LONG_MAX) {
- result = gpr_inf_future;
+ result = gpr_inf_future(type);
} else if (s == LONG_MIN) {
- result = gpr_inf_past;
+ result = gpr_inf_past(type);
} else {
result.tv_sec = s;
result.tv_nsec = 0;
@@ -142,12 +167,13 @@ gpr_timespec gpr_time_from_seconds(long s) {
return result;
}
-gpr_timespec gpr_time_from_minutes(long m) {
+gpr_timespec gpr_time_from_minutes(long m, gpr_clock_type type) {
gpr_timespec result;
+ result.clock_type = type;
if (m >= LONG_MAX / 60) {
- result = gpr_inf_future;
+ result = gpr_inf_future(type);
} else if (m <= LONG_MIN / 60) {
- result = gpr_inf_past;
+ result = gpr_inf_past(type);
} else {
result.tv_sec = m * 60;
result.tv_nsec = 0;
@@ -155,12 +181,13 @@ gpr_timespec gpr_time_from_minutes(long m) {
return result;
}
-gpr_timespec gpr_time_from_hours(long h) {
+gpr_timespec gpr_time_from_hours(long h, gpr_clock_type type) {
gpr_timespec result;
+ result.clock_type = type;
if (h >= LONG_MAX / 3600) {
- result = gpr_inf_future;
+ result = gpr_inf_future(type);
} else if (h <= LONG_MIN / 3600) {
- result = gpr_inf_past;
+ result = gpr_inf_past(type);
} else {
result.tv_sec = h * 3600;
result.tv_nsec = 0;
@@ -171,6 +198,8 @@ gpr_timespec gpr_time_from_hours(long h) {
gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b) {
gpr_timespec sum;
int inc = 0;
+ GPR_ASSERT(b.clock_type == GPR_TIMESPAN);
+ sum.clock_type = a.clock_type;
sum.tv_nsec = a.tv_nsec + b.tv_nsec;
if (sum.tv_nsec >= GPR_NS_PER_SEC) {
sum.tv_nsec -= GPR_NS_PER_SEC;
@@ -180,14 +209,14 @@ gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b) {
sum = a;
} else if (b.tv_sec == TYPE_MAX(time_t) ||
(b.tv_sec >= 0 && a.tv_sec >= TYPE_MAX(time_t) - b.tv_sec)) {
- sum = gpr_inf_future;
+ sum = gpr_inf_future(sum.clock_type);
} else if (b.tv_sec == TYPE_MIN(time_t) ||
(b.tv_sec <= 0 && a.tv_sec <= TYPE_MIN(time_t) - b.tv_sec)) {
- sum = gpr_inf_past;
+ sum = gpr_inf_past(sum.clock_type);
} else {
sum.tv_sec = a.tv_sec + b.tv_sec;
if (inc != 0 && sum.tv_sec == TYPE_MAX(time_t) - 1) {
- sum = gpr_inf_future;
+ sum = gpr_inf_future(sum.clock_type);
} else {
sum.tv_sec += inc;
}
@@ -198,6 +227,12 @@ gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b) {
gpr_timespec gpr_time_sub(gpr_timespec a, gpr_timespec b) {
gpr_timespec diff;
int dec = 0;
+ if (b.clock_type == GPR_TIMESPAN) {
+ diff.clock_type = a.clock_type;
+ } else {
+ GPR_ASSERT(a.clock_type == b.clock_type);
+ diff.clock_type = GPR_TIMESPAN;
+ }
diff.tv_nsec = a.tv_nsec - b.tv_nsec;
if (diff.tv_nsec < 0) {
diff.tv_nsec += GPR_NS_PER_SEC;
@@ -207,14 +242,14 @@ gpr_timespec gpr_time_sub(gpr_timespec a, gpr_timespec b) {
diff = a;
} else if (b.tv_sec == TYPE_MIN(time_t) ||
(b.tv_sec <= 0 && a.tv_sec >= TYPE_MAX(time_t) + b.tv_sec)) {
- diff = gpr_inf_future;
+ diff = gpr_inf_future(GPR_CLOCK_REALTIME);
} else if (b.tv_sec == TYPE_MAX(time_t) ||
(b.tv_sec >= 0 && a.tv_sec <= TYPE_MIN(time_t) + b.tv_sec)) {
- diff = gpr_inf_past;
+ diff = gpr_inf_past(GPR_CLOCK_REALTIME);
} else {
diff.tv_sec = a.tv_sec - b.tv_sec;
if (dec != 0 && diff.tv_sec == TYPE_MIN(time_t) + 1) {
- diff = gpr_inf_past;
+ diff = gpr_inf_past(GPR_CLOCK_REALTIME);
} else {
diff.tv_sec -= dec;
}
@@ -225,6 +260,9 @@ gpr_timespec gpr_time_sub(gpr_timespec a, gpr_timespec b) {
int gpr_time_similar(gpr_timespec a, gpr_timespec b, gpr_timespec threshold) {
int cmp_ab;
+ GPR_ASSERT(a.clock_type == b.clock_type);
+ GPR_ASSERT(threshold.clock_type == GPR_TIMESPAN);
+
cmp_ab = gpr_time_cmp(a, b);
if (cmp_ab == 0) return 1;
if (cmp_ab < 0) {
diff --git a/src/core/support/time_posix.c b/src/core/support/time_posix.c
index f9b7958783..258b2e640e 100644
--- a/src/core/support/time_posix.c
+++ b/src/core/support/time_posix.c
@@ -38,6 +38,7 @@
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
+#include <grpc/support/log.h>
#include <grpc/support/time.h>
static struct timespec timespec_from_gpr(gpr_timespec gts) {
@@ -48,10 +49,12 @@ static struct timespec timespec_from_gpr(gpr_timespec gts) {
}
#if _POSIX_TIMERS > 0
-static gpr_timespec gpr_from_timespec(struct timespec ts) {
+static gpr_timespec gpr_from_timespec(struct timespec ts,
+ gpr_clock_type clock) {
gpr_timespec rv;
rv.tv_sec = ts.tv_sec;
rv.tv_nsec = (int)ts.tv_nsec;
+ rv.clock_type = clock;
return rv;
}
@@ -62,8 +65,9 @@ void gpr_time_init(void) {}
gpr_timespec gpr_now(gpr_clock_type clock) {
struct timespec now;
+ GPR_ASSERT(clock != GPR_TIMESPAN);
clock_gettime(clockid_for_gpr_clock[clock], &now);
- return gpr_from_timespec(now);
+ return gpr_from_timespec(now, clock);
}
#else
/* For some reason Apple's OSes haven't implemented clock_gettime. */
@@ -88,6 +92,7 @@ gpr_timespec gpr_now(gpr_clock_type clock) {
struct timeval now_tv;
double now_dbl;
+ now.clock_type = clock;
switch (clock) {
case GPR_CLOCK_REALTIME:
gettimeofday(&now_tv, NULL);
@@ -99,6 +104,8 @@ gpr_timespec gpr_now(gpr_clock_type clock) {
now.tv_sec = now_dbl * 1e-9;
now.tv_nsec = now_dbl - now.tv_sec * 1e9;
break;
+ case GPR_TIMESPAN:
+ abort();
}
return now;
diff --git a/src/core/support/time_win32.c b/src/core/support/time_win32.c
index fa77c74eeb..238cd07ebc 100644
--- a/src/core/support/time_win32.c
+++ b/src/core/support/time_win32.c
@@ -55,6 +55,7 @@ gpr_timespec gpr_now(gpr_clock_type clock) {
struct _timeb now_tb;
LARGE_INTEGER timestamp;
double now_dbl;
+ now_tv.clock_type = clock;
switch (clock) {
case GPR_CLOCK_REALTIME:
_ftime_s(&now_tb);
diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c
index 7c31bfe5da..e47dc4f4ce 100644
--- a/src/core/surface/byte_buffer_queue.c
+++ b/src/core/surface/byte_buffer_queue.c
@@ -62,6 +62,7 @@ int grpc_bbq_empty(grpc_byte_buffer_queue *q) {
}
void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) {
+ q->bytes += grpc_byte_buffer_length(buffer);
bba_push(&q->filling, buffer);
}
@@ -72,8 +73,11 @@ void grpc_bbq_flush(grpc_byte_buffer_queue *q) {
}
}
+size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q) { return q->bytes; }
+
grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) {
grpc_bbq_array temp_array;
+ grpc_byte_buffer *out;
if (q->drain_pos == q->draining.count) {
if (q->filling.count == 0) {
@@ -87,5 +91,7 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) {
q->draining = temp_array;
}
- return q->draining.data[q->drain_pos++];
+ out = q->draining.data[q->drain_pos++];
+ q->bytes -= grpc_byte_buffer_length(out);
+ return out;
}
diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h
index 32c57f8756..f01958984f 100644
--- a/src/core/surface/byte_buffer_queue.h
+++ b/src/core/surface/byte_buffer_queue.h
@@ -49,6 +49,7 @@ typedef struct {
size_t drain_pos;
grpc_bbq_array filling;
grpc_bbq_array draining;
+ size_t bytes;
} grpc_byte_buffer_queue;
void grpc_bbq_destroy(grpc_byte_buffer_queue *q);
@@ -56,5 +57,6 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q);
void grpc_bbq_flush(grpc_byte_buffer_queue *q);
int grpc_bbq_empty(grpc_byte_buffer_queue *q);
void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb);
+size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q);
#endif /* GRPC_INTERNAL_CORE_SURFACE_BYTE_BUFFER_QUEUE_H */
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 0a551ac47f..1146d83982 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -347,7 +347,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
}
grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr,
CALL_STACK_FROM_CALL(call));
- if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) {
+ if (gpr_time_cmp(send_deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) {
set_deadline_alarm(call, send_deadline);
}
return call;
@@ -513,6 +513,8 @@ static void unlock(grpc_call *call) {
int completing_requests = 0;
int start_op = 0;
int i;
+ const gpr_uint32 MAX_RECV_PEEK_AHEAD = 65536;
+ size_t buffered_bytes;
int cancel_alarm = 0;
memset(&op, 0, sizeof(op));
@@ -528,6 +530,17 @@ static void unlock(grpc_call *call) {
op.recv_ops = &call->recv_ops;
op.recv_state = &call->recv_state;
op.on_done_recv = &call->on_done_recv;
+ if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) {
+ op.max_recv_bytes = call->incoming_message_length -
+ call->incoming_message.length + MAX_RECV_PEEK_AHEAD;
+ } else {
+ buffered_bytes = grpc_bbq_bytes(&call->incoming_queue);
+ if (buffered_bytes > MAX_RECV_PEEK_AHEAD) {
+ op.max_recv_bytes = 0;
+ } else {
+ op.max_recv_bytes = MAX_RECV_PEEK_AHEAD - buffered_bytes;
+ }
+ }
call->receiving = 1;
GRPC_CALL_INTERNAL_REF(call, "receiving");
start_op = 1;
@@ -972,7 +985,7 @@ static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op) {
mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
data.send_metadata.metadata);
mdb.garbage.head = mdb.garbage.tail = NULL;
- mdb.deadline = gpr_inf_future;
+ mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
/* send status */
/* TODO(ctiller): cache common status values */
data = call->request_data[GRPC_IOREQ_SEND_STATUS];
@@ -1325,7 +1338,7 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
l->md = 0;
}
}
- if (gpr_time_cmp(md->deadline, gpr_inf_future) != 0) {
+ if (gpr_time_cmp(md->deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) {
set_deadline_alarm(call, md->deadline);
}
if (!is_trailing) {
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index c67f75fc5c..8484418247 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -116,7 +116,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc) {
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
void (*done)(void *done_arg, grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) {
- int shutdown = gpr_unref(&cc->pending_events);
+ int shutdown;
storage->tag = tag;
storage->done = done;
@@ -124,15 +124,15 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
storage->next =
((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0));
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ shutdown = gpr_unref(&cc->pending_events);
if (!shutdown) {
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
cc->completed_tail->next =
((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
cc->completed_tail = storage;
grpc_pollset_kick(&cc->pollset);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
} else {
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
cc->completed_tail->next =
((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
cc->completed_tail = storage;
@@ -260,8 +260,9 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) {
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
grpc_pollset_kick(&cc->pollset);
- grpc_pollset_work(&cc->pollset, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_millis(100)));
+ grpc_pollset_work(&cc->pollset,
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(100, GPR_TIMESPAN)));
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 3dd56fe5a9..3f2bb5c8a9 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -72,7 +72,7 @@ static void lame_start_transport_stream_op(grpc_call_element *elem,
mdb.list.head = &calld->status;
mdb.list.tail = &calld->details;
mdb.garbage.head = mdb.garbage.tail = NULL;
- mdb.deadline = gpr_inf_future;
+ mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
grpc_sopb_add_metadata(op->recv_ops, mdb);
*op->recv_state = GRPC_STREAM_CLOSED;
op->on_done_recv->cb(op->on_done_recv->cb_arg, 1);
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 34ee3f8400..f3c7d8397b 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -75,6 +75,7 @@ static void connector_unref(grpc_connector *con) {
static void on_secure_transport_setup_done(void *arg,
grpc_security_status status,
+ grpc_endpoint *wrapped_endpoint,
grpc_endpoint *secure_endpoint) {
connector *c = arg;
grpc_iomgr_closure *notify;
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 4dc51bf031..37f0c3c005 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -487,9 +487,9 @@ static void maybe_finish_shutdown(grpc_server *server) {
if (server->root_channel_data.next != &server->root_channel_data ||
server->listeners_destroyed < num_listeners(server)) {
- if (gpr_time_cmp(
- gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), server->last_shutdown_message_time),
- gpr_time_from_seconds(1)) >= 0) {
+ if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
+ server->last_shutdown_message_time),
+ gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
gpr_log(GPR_DEBUG,
"Waiting for %d channels and %d/%d listeners to be destroyed"
@@ -536,7 +536,8 @@ static void server_on_recv(void *ptr, int success) {
grpc_stream_op *op = &ops[i];
if (op->type != GRPC_OP_METADATA) continue;
grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
- if (0 != gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future)) {
+ if (0 != gpr_time_cmp(op->data.metadata.deadline,
+ gpr_inf_future(GPR_CLOCK_REALTIME))) {
calld->deadline = op->data.metadata.deadline;
}
calld->got_initial_metadata = 1;
@@ -610,7 +611,7 @@ static void accept_stream(void *cd, grpc_transport *transport,
channel_data *chand = cd;
/* create a call */
grpc_call_create(chand->channel, NULL, transport_server_data, NULL, 0,
- gpr_inf_future);
+ gpr_inf_future(GPR_CLOCK_REALTIME));
}
static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) {
@@ -638,7 +639,7 @@ static void init_call_elem(grpc_call_element *elem,
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
memset(calld, 0, sizeof(call_data));
- calld->deadline = gpr_inf_future;
+ calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
calld->call = grpc_call_from_top_element(elem);
gpr_mu_init(&calld->mu_state);
diff --git a/src/core/transport/chttp2/frame_window_update.c b/src/core/transport/chttp2/frame_window_update.c
index b817df7745..d624298ad2 100644
--- a/src/core/transport/chttp2/frame_window_update.c
+++ b/src/core/transport/chttp2/frame_window_update.c
@@ -94,8 +94,8 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse(
}
GPR_ASSERT(is_last);
- if (transport_parsing->incoming_stream_id) {
- if (stream_parsing) {
+ if (transport_parsing->incoming_stream_id != 0) {
+ if (stream_parsing != NULL) {
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("update", transport_parsing,
stream_parsing, outgoing_window_update,
p->amount);
diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c
index 77162a6864..974b864ffb 100644
--- a/src/core/transport/chttp2/incoming_metadata.c
+++ b/src/core/transport/chttp2/incoming_metadata.c
@@ -42,7 +42,7 @@
void grpc_chttp2_incoming_metadata_buffer_init(
grpc_chttp2_incoming_metadata_buffer *buffer) {
- buffer->deadline = gpr_inf_future;
+ buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
}
void grpc_chttp2_incoming_metadata_buffer_destroy(
@@ -87,7 +87,7 @@ void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
b.list.tail = (void *)(gpr_intptr)buffer->count;
b.garbage.head = b.garbage.tail = NULL;
b.deadline = buffer->deadline;
- buffer->deadline = gpr_inf_future;
+ buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
grpc_sopb_add_metadata(sopb, b);
}
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index bdd4b432eb..e5e6f445b7 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -353,7 +353,19 @@ typedef struct {
/** window available for us to send to peer */
gpr_int64 outgoing_window;
- /** window available for peer to send to us - updated after parse */
+ /** The number of bytes the upper layers have offered to receive.
+ As the upper layer offers more bytes, this value increases.
+ As bytes are read, this value decreases. */
+ gpr_uint32 max_recv_bytes;
+ /** The number of bytes the upper layer has offered to read but we have
+ not yet announced to HTTP2 flow control.
+ As the upper layers offer to read more bytes, this value increases.
+ As we advertise incoming flow control window, this value decreases. */
+ gpr_uint32 unannounced_incoming_window;
+ /** The number of bytes of HTTP2 flow control we have advertised.
+ As we advertise incoming flow control window, this value increases.
+ As bytes are read, this value decreases.
+ Updated after parse. */
gpr_uint32 incoming_window;
/** stream ops the transport user would like to send */
grpc_stream_op_buffer *outgoing_sopb;
@@ -391,6 +403,8 @@ typedef struct {
grpc_stream_op_buffer sopb;
/** how strongly should we indicate closure with the next write */
grpc_chttp2_send_closed send_closed;
+ /** how much window should we announce? */
+ gpr_uint32 announce_window;
} grpc_chttp2_stream_writing;
struct grpc_chttp2_stream_parsing {
@@ -501,7 +515,9 @@ void grpc_chttp2_list_add_writable_window_update_stream(
grpc_chttp2_stream_global *stream_global);
int grpc_chttp2_list_pop_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global **stream_global);
+ grpc_chttp2_transport_writing *transport_writing,
+ grpc_chttp2_stream_global **stream_global,
+ grpc_chttp2_stream_writing **stream_writing);
void grpc_chttp2_list_remove_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c
index 9597395aab..aa32f2e44a 100644
--- a/src/core/transport/chttp2/parsing.c
+++ b/src/core/transport/chttp2/parsing.c
@@ -173,7 +173,14 @@ void grpc_chttp2_publish_reads(
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
"parsed", transport_parsing, stream_parsing, incoming_window_delta,
-(gpr_int64)stream_parsing->incoming_window_delta);
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
+ "parsed", transport_parsing, stream_global, max_recv_bytes,
+ -(gpr_int64)stream_parsing->incoming_window_delta);
stream_global->incoming_window -= stream_parsing->incoming_window_delta;
+ GPR_ASSERT(stream_global->max_recv_bytes >=
+ stream_parsing->incoming_window_delta);
+ stream_global->max_recv_bytes -=
+ stream_parsing->incoming_window_delta;
stream_parsing->incoming_window_delta = 0;
grpc_chttp2_list_add_writable_window_update_stream(transport_global,
stream_global);
@@ -594,7 +601,7 @@ static void on_header(void *tp, grpc_mdelem *md) {
cached_timeout)) {
gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'",
grpc_mdstr_as_c_string(md->value));
- *cached_timeout = gpr_inf_future;
+ *cached_timeout = gpr_inf_future(GPR_CLOCK_REALTIME);
}
grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
}
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index d553d80085..d7fc2da5d3 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -585,7 +585,8 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
l->md = hpack_enc(compressor, l->md, &st);
need_unref |= l->md != NULL;
}
- if (gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future) != 0) {
+ if (gpr_time_cmp(op->data.metadata.deadline,
+ gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) {
deadline_enc(compressor, op->data.metadata.deadline, &st);
}
curop++;
diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c
index 4fea058c19..590f6abfbc 100644
--- a/src/core/transport/chttp2/stream_lists.c
+++ b/src/core/transport/chttp2/stream_lists.c
@@ -139,6 +139,7 @@ static void stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
void grpc_chttp2_list_add_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
+ GPR_ASSERT(stream_global->id != 0);
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE);
}
@@ -204,6 +205,7 @@ int grpc_chttp2_list_pop_written_stream(
void grpc_chttp2_list_add_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
+ GPR_ASSERT(stream_global->id != 0);
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE);
@@ -211,11 +213,14 @@ void grpc_chttp2_list_add_writable_window_update_stream(
int grpc_chttp2_list_pop_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global **stream_global) {
+ grpc_chttp2_transport_writing *transport_writing,
+ grpc_chttp2_stream_global **stream_global,
+ grpc_chttp2_stream_writing **stream_writing) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE);
*stream_global = &stream->global;
+ *stream_writing = &stream->writing;
return r;
}
diff --git a/src/core/transport/chttp2/timeout_encoding.c b/src/core/transport/chttp2/timeout_encoding.c
index 33915c4039..1dd768ada4 100644
--- a/src/core/transport/chttp2/timeout_encoding.c
+++ b/src/core/transport/chttp2/timeout_encoding.c
@@ -147,7 +147,7 @@ int grpc_chttp2_decode_timeout(const char *buffer, gpr_timespec *timeout) {
gpr_uint32 xp = x * 10 + *p - '0';
have_digit = 1;
if (xp < x) {
- *timeout = gpr_inf_future;
+ *timeout = gpr_inf_future(GPR_CLOCK_REALTIME);
return 1;
}
x = xp;
@@ -159,22 +159,22 @@ int grpc_chttp2_decode_timeout(const char *buffer, gpr_timespec *timeout) {
/* decode unit specifier */
switch (*p) {
case 'n':
- *timeout = gpr_time_from_nanos(x);
+ *timeout = gpr_time_from_nanos(x, GPR_TIMESPAN);
break;
case 'u':
- *timeout = gpr_time_from_micros(x);
+ *timeout = gpr_time_from_micros(x, GPR_TIMESPAN);
break;
case 'm':
- *timeout = gpr_time_from_millis(x);
+ *timeout = gpr_time_from_millis(x, GPR_TIMESPAN);
break;
case 'S':
- *timeout = gpr_time_from_seconds(x);
+ *timeout = gpr_time_from_seconds(x, GPR_TIMESPAN);
break;
case 'M':
- *timeout = gpr_time_from_minutes(x);
+ *timeout = gpr_time_from_minutes(x, GPR_TIMESPAN);
break;
case 'H':
- *timeout = gpr_time_from_hours(x);
+ *timeout = gpr_time_from_hours(x, GPR_TIMESPAN);
break;
default:
return 0;
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index a78654334e..d8ec117aa5 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -66,11 +66,9 @@ int grpc_chttp2_unlocking_check_writes(
/* for each grpc_chttp2_stream that's become writable, frame it's data
(according to
available window sizes) and add to the output buffer */
- while (transport_global->outgoing_window &&
- grpc_chttp2_list_pop_writable_stream(transport_global,
+ while (grpc_chttp2_list_pop_writable_stream(transport_global,
transport_writing, &stream_global,
- &stream_writing) &&
- stream_global->outgoing_window > 0) {
+ &stream_writing)) {
stream_writing->id = stream_global->id;
window_delta = grpc_chttp2_preencode(
stream_global->outgoing_sopb->ops, &stream_global->outgoing_sopb->nops,
@@ -106,20 +104,21 @@ int grpc_chttp2_unlocking_check_writes(
/* for each grpc_chttp2_stream that wants to update its window, add that
* window here */
while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global,
- &stream_global)) {
- window_delta =
- transport_global->settings[GRPC_LOCAL_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
- stream_global->incoming_window;
- if (!stream_global->read_closed && window_delta > 0) {
- gpr_slice_buffer_add(
- &transport_writing->outbuf,
- grpc_chttp2_window_update_create(stream_global->id, window_delta));
+ transport_writing,
+ &stream_global,
+ &stream_writing)) {
+ stream_writing->id = stream_global->id;
+ if (!stream_global->read_closed && stream_global->unannounced_incoming_window > 0) {
+ stream_writing->announce_window = stream_global->unannounced_incoming_window;
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global,
- incoming_window, window_delta);
- stream_global->incoming_window += window_delta;
+ incoming_window, stream_global->unannounced_incoming_window);
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global,
+ unannounced_incoming_window, -(gpr_int64)stream_global->unannounced_incoming_window);
+ stream_global->incoming_window += stream_global->unannounced_incoming_window;
+ stream_global->unannounced_incoming_window = 0;
grpc_chttp2_list_add_incoming_window_updated(transport_global,
stream_global);
+ grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
}
}
@@ -169,10 +168,19 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
while (
grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) {
- grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops,
- stream_writing->send_closed != GRPC_DONT_SEND_CLOSED,
- stream_writing->id, &transport_writing->hpack_compressor,
- &transport_writing->outbuf);
+ if (stream_writing->sopb.nops > 0 || stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
+ grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops,
+ stream_writing->send_closed != GRPC_DONT_SEND_CLOSED,
+ stream_writing->id, &transport_writing->hpack_compressor,
+ &transport_writing->outbuf);
+ }
+ if (stream_writing->announce_window > 0) {
+ gpr_slice_buffer_add(
+ &transport_writing->outbuf,
+ grpc_chttp2_window_update_create(
+ stream_writing->id, stream_writing->announce_window));
+ stream_writing->announce_window = 0;
+ }
stream_writing->sopb.nops = 0;
if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) {
gpr_slice_buffer_add(&transport_writing->outbuf,
@@ -197,7 +205,8 @@ void grpc_chttp2_cleanup_writing(
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
- if (stream_global->outgoing_sopb->nops == 0) {
+ if (stream_global->outgoing_sopb != NULL &&
+ stream_global->outgoing_sopb->nops == 0) {
stream_global->outgoing_sopb = NULL;
grpc_chttp2_schedule_closure(transport_global,
stream_global->send_done_closure, 1);
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index ac399e4a1d..c923d5e42f 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -358,7 +358,9 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
s->global.outgoing_window =
t->global.settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->parsing.incoming_window = s->global.incoming_window =
+ s->global.max_recv_bytes =
+ s->parsing.incoming_window =
+ s->global.incoming_window =
t->global.settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
*t->accepting_stream = s;
@@ -562,6 +564,8 @@ static void maybe_start_some_streams(
stream_global->incoming_window =
transport_global->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ stream_global->max_recv_bytes =
+ GPR_MAX(stream_global->incoming_window, stream_global->max_recv_bytes);
grpc_chttp2_stream_map_add(
&TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map,
stream_global->id, STREAM_FROM_GLOBAL(stream_global));
@@ -570,6 +574,9 @@ static void maybe_start_some_streams(
grpc_chttp2_list_add_incoming_window_updated(transport_global,
stream_global);
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ grpc_chttp2_list_add_writable_window_update_stream(transport_global,
+ stream_global);
+
}
/* cancel out streams that will never be started */
while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID &&
@@ -620,12 +627,23 @@ static void perform_stream_op_locked(
stream_global->publish_sopb = op->recv_ops;
stream_global->publish_sopb->nops = 0;
stream_global->publish_state = op->recv_state;
+ if (stream_global->max_recv_bytes < op->max_recv_bytes) {
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("op", transport_global, stream_global,
+ max_recv_bytes, op->max_recv_bytes - stream_global->max_recv_bytes);
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
+ "op", transport_global, stream_global, unannounced_incoming_window,
+ op->max_recv_bytes - stream_global->max_recv_bytes);
+ stream_global->unannounced_incoming_window += op->max_recv_bytes - stream_global->max_recv_bytes;
+ stream_global->max_recv_bytes = op->max_recv_bytes;
+ }
grpc_chttp2_incoming_metadata_live_op_buffer_end(
&stream_global->outstanding_metadata);
- grpc_chttp2_list_add_read_write_state_changed(transport_global,
- stream_global);
- grpc_chttp2_list_add_writable_window_update_stream(transport_global,
- stream_global);
+ if (stream_global->id != 0) {
+ grpc_chttp2_list_add_read_write_state_changed(transport_global,
+ stream_global);
+ grpc_chttp2_list_add_writable_window_update_stream(transport_global,
+ stream_global);
+ }
}
if (op->bind_pollset) {
@@ -1038,7 +1056,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason,
identifier = gpr_strdup(context_scope);
}
gpr_log(GPR_INFO,
- "FLOWCTL: %s %-10s %8s %-23s %8lld %c %8lld = %8lld %-10s [%s:%d]",
+ "FLOWCTL: %s %-10s %8s %-27s %8lld %c %8lld = %8lld %-10s [%s:%d]",
is_client ? "client" : "server", identifier, context_thread, var,
current_value, delta < 0 ? '-' : '+', delta < 0 ? -delta : delta,
current_value + delta, reason, file, line);
diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c
index fdb50c6b71..71061fe0c7 100644
--- a/src/core/transport/stream_op.c
+++ b/src/core/transport/stream_op.c
@@ -205,7 +205,7 @@ void grpc_metadata_batch_assert_ok(grpc_metadata_batch *batch) {
void grpc_metadata_batch_init(grpc_metadata_batch *batch) {
batch->list.head = batch->list.tail = batch->garbage.head = batch->garbage.tail =
NULL;
- batch->deadline = gpr_inf_future;
+ batch->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
}
void grpc_metadata_batch_destroy(grpc_metadata_batch *batch) {
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 1429737721..64503604ee 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -72,6 +72,10 @@ typedef struct grpc_transport_stream_op {
grpc_stream_op_buffer *recv_ops;
grpc_stream_state *recv_state;
+ /** The number of bytes this peer is currently prepared to receive.
+ These bytes will be eventually used to replenish per-stream flow control
+ windows. */
+ gpr_uint32 max_recv_bytes;
grpc_iomgr_closure *on_done_recv;
grpc_pollset *bind_pollset;
diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c
index 0da396a320..9d127c5472 100644
--- a/src/core/transport/transport_op_string.c
+++ b/src/core/transport/transport_op_string.c
@@ -61,7 +61,7 @@ static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) {
if (m != md.list.head) gpr_strvec_add(b, gpr_strdup(", "));
put_metadata(b, m->md);
}
- if (gpr_time_cmp(md.deadline, gpr_inf_future) != 0) {
+ if (gpr_time_cmp(md.deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) {
char *tmp;
gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec,
md.deadline.tv_nsec);
@@ -128,7 +128,8 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) {
if (op->recv_ops) {
if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
first = 0;
- gpr_strvec_add(&b, gpr_strdup("RECV"));
+ gpr_asprintf(&tmp, "RECV:max_recv_bytes=%d", op->max_recv_bytes);
+ gpr_strvec_add(&b, tmp);
}
if (op->bind_pollset) {