diff options
author | 2015-07-17 16:02:15 -0700 | |
---|---|---|
committer | 2015-07-17 16:02:15 -0700 | |
commit | 6016e260ca5d36408a3ae23db3c44481ee4c8427 (patch) | |
tree | 7dc0825c12b7c5eec567a833327b23706ce0363f /src/core | |
parent | 211e65be3cb1f4b6f7184c1945d8284905f79968 (diff) | |
parent | e9881bbaf3d53aa80099c42c80fb3331ff38270a (diff) |
Merge branch 'master' of github.com:grpc/grpc into str_join_with_sep
Diffstat (limited to 'src/core')
43 files changed, 347 insertions, 153 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index f890f99237..10e01ebbb4 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -132,7 +132,7 @@ static void handle_op_after_cancellation(grpc_call_element *elem, mdb.list.head = &calld->status; mdb.list.tail = &calld->details; mdb.garbage.head = mdb.garbage.tail = NULL; - mdb.deadline = gpr_inf_future; + mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); grpc_sopb_add_metadata(op->recv_ops, mdb); *op->recv_state = GRPC_STREAM_CLOSED; op->on_done_recv->cb(op->on_done_recv->cb_arg, 1); @@ -518,7 +518,7 @@ static void init_call_elem(grpc_call_element *elem, gpr_mu_init(&calld->mu_state); calld->elem = elem; calld->state = CALL_CREATED; - calld->deadline = gpr_inf_future; + calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } /* Destructor for call_data */ diff --git a/src/core/client_config/lb_policies/pick_first.h b/src/core/client_config/lb_policies/pick_first.h index 94c2a9f0c7..31394985e5 100644 --- a/src/core/client_config/lb_policies/pick_first.h +++ b/src/core/client_config/lb_policies/pick_first.h @@ -36,6 +36,8 @@ #include "src/core/client_config/lb_policy.h" +/** Returns a load balancing policy instance that picks up the first subchannel + * from \a subchannels to succesfully connect */ grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels, size_t num_subchannels); diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 8cdad1015f..35f172683a 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -302,7 +302,7 @@ static void continue_connect(grpc_subchannel *c) { static void start_connect(grpc_subchannel *c) { gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); c->next_attempt = now; - c->backoff_delta = gpr_time_from_seconds(1); + c->backoff_delta = gpr_time_from_seconds(1, GPR_TIMESPAN); continue_connect(c); } diff --git a/src/core/client_config/uri_parser.c b/src/core/client_config/uri_parser.c index 776a255923..410a61c8cf 100644 --- a/src/core/client_config/uri_parser.c +++ b/src/core/client_config/uri_parser.c @@ -98,7 +98,7 @@ grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) { if (uri_text[scheme_end + 1] == '/' && uri_text[scheme_end + 2] == '/') { authority_begin = scheme_end + 3; - for (i = authority_begin; uri_text[i] != 0; i++) { + for (i = authority_begin; uri_text[i] != 0 && authority_end == -1; i++) { if (uri_text[i] == '/') { authority_end = i; } diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index 3f5557e08e..65997d5f44 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -165,6 +165,7 @@ static void start_write(internal_request *req) { static void on_secure_transport_setup_done(void *rp, grpc_security_status status, + grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint) { internal_request *req = rp; if (status != GRPC_SECURITY_OK) { diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c index 5860834de3..5b9a37e5dd 100644 --- a/src/core/iomgr/alarm.c +++ b/src/core/iomgr/alarm.c @@ -102,7 +102,8 @@ void grpc_alarm_list_init(gpr_timespec now) { void grpc_alarm_list_shutdown(void) { int i; - while (run_some_expired_alarms(NULL, gpr_inf_future, NULL, 0)) + while (run_some_expired_alarms(NULL, gpr_inf_future(GPR_CLOCK_REALTIME), NULL, + 0)) ; for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; @@ -127,6 +128,7 @@ static gpr_timespec dbl_to_ts(double d) { gpr_timespec ts; ts.tv_sec = d; ts.tv_nsec = 1e9 * (d - ts.tv_sec); + ts.clock_type = GPR_TIMESPAN; return ts; } diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index 3d3a193d00..8741241fb8 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -157,7 +157,7 @@ void grpc_iocp_shutdown(void) { BOOL success; gpr_event_set(&g_shutdown_iocp, (void *)1); grpc_iocp_kick(); - gpr_event_wait(&g_iocp_done, gpr_inf_future); + gpr_event_wait(&g_iocp_done, gpr_inf_future(GPR_CLOCK_REALTIME)); success = CloseHandle(g_iocp); GPR_ASSERT(success); } diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index cca92d3b32..0244f689b1 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -57,9 +57,9 @@ static grpc_iomgr_object g_root_object; static void background_callback_executor(void *ignored) { gpr_mu_lock(&g_mu); while (!g_shutdown) { - gpr_timespec deadline = gpr_inf_future; - gpr_timespec short_deadline = - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100)); + gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + gpr_timespec short_deadline = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN)); if (g_cbs_head) { grpc_iomgr_closure *closure = g_cbs_head; g_cbs_head = closure->next; @@ -110,8 +110,8 @@ static size_t count_objects(void) { void grpc_iomgr_shutdown(void) { grpc_iomgr_object *obj; grpc_iomgr_closure *closure; - gpr_timespec shutdown_deadline = - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10)); + gpr_timespec shutdown_deadline = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10, GPR_TIMESPAN)); gpr_timespec last_warning_time = gpr_now(GPR_CLOCK_REALTIME); gpr_mu_lock(&g_mu); @@ -119,7 +119,7 @@ void grpc_iomgr_shutdown(void) { while (g_cbs_head != NULL || g_root_object.next != &g_root_object) { if (gpr_time_cmp( gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), last_warning_time), - gpr_time_from_seconds(1)) >= 0) { + gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) { if (g_cbs_head != NULL && g_root_object.next != &g_root_object) { gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed and executing " @@ -145,14 +145,14 @@ void grpc_iomgr_shutdown(void) { } while (g_cbs_head); continue; } - if (grpc_alarm_check(&g_mu, gpr_inf_future, NULL)) { + if (grpc_alarm_check(&g_mu, gpr_inf_future(GPR_CLOCK_REALTIME), NULL)) { gpr_log(GPR_DEBUG, "got late alarm"); continue; } if (g_root_object.next != &g_root_object) { int timeout = 0; - gpr_timespec short_deadline = - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100)); + gpr_timespec short_deadline = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN)); while (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) { if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) { timeout = 1; @@ -174,7 +174,8 @@ void grpc_iomgr_shutdown(void) { gpr_mu_unlock(&g_mu); grpc_kick_poller(); - gpr_event_wait(&g_background_callback_executor_done, gpr_inf_future); + gpr_event_wait(&g_background_callback_executor_done, + gpr_inf_future(GPR_CLOCK_REALTIME)); grpc_alarm_list_shutdown(); diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 85101764d2..7afff58eee 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -191,17 +191,17 @@ int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now) { gpr_timespec timeout; static const int max_spin_polling_us = 10; - if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { + if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) == 0) { return -1; } - if (gpr_time_cmp( - deadline, - gpr_time_add(now, gpr_time_from_micros(max_spin_polling_us))) <= 0) { + if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( + max_spin_polling_us, + GPR_TIMESPAN))) <= 0) { return 0; } timeout = gpr_time_sub(deadline, now); - return gpr_time_to_millis( - gpr_time_add(timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1))); + return gpr_time_to_millis(gpr_time_add( + timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1, GPR_TIMESPAN))); } /* diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h index 98e3b552a7..6d73951c70 100644 --- a/src/core/iomgr/pollset_set.h +++ b/src/core/iomgr/pollset_set.h @@ -38,7 +38,7 @@ /* A grpc_pollset_set is a set of pollsets that are interested in an action. Adding a pollset to a pollset_set automatically adds any - fd's (etc) that have been registered with the set_set with that pollset. + fd's (etc) that have been registered with the set_set to that pollset. Registering fd's automatically adds them to all current pollsets. */ #ifdef GPR_POSIX_SOCKET diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index e6e1d1499e..187009b2c8 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -116,7 +116,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s, } /* This happens asynchronously. Wait while that happens. */ while (s->active_ports) { - gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future); + gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } gpr_mu_unlock(&s->mu); diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index 230f0dfb85..fb59fa4b0e 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -324,7 +324,7 @@ static void jwt_reset_cache(grpc_jwt_credentials *c) { gpr_free(c->cached.service_url); c->cached.service_url = NULL; } - c->cached.jwt_expiration = gpr_inf_past; + c->cached.jwt_expiration = gpr_inf_past(GPR_CLOCK_REALTIME); } static void jwt_destroy(grpc_credentials *creds) { @@ -347,8 +347,8 @@ static void jwt_get_request_metadata(grpc_credentials *creds, grpc_credentials_metadata_cb cb, void *user_data) { grpc_jwt_credentials *c = (grpc_jwt_credentials *)creds; - gpr_timespec refresh_threshold = {GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS, - 0}; + gpr_timespec refresh_threshold = gpr_time_from_seconds( + GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS, GPR_TIMESPAN); /* See if we can return a cached jwt. */ grpc_credentials_md_store *jwt_md = NULL; @@ -516,6 +516,7 @@ grpc_oauth2_token_fetcher_credentials_parse_server_response( access_token->value); token_lifetime->tv_sec = strtol(expires_in->value, NULL, 10); token_lifetime->tv_nsec = 0; + token_lifetime->clock_type = GPR_TIMESPAN; if (*token_md != NULL) grpc_credentials_md_store_unref(*token_md); *token_md = grpc_credentials_md_store_create(1); grpc_credentials_md_store_add_cstrings( @@ -552,7 +553,7 @@ static void on_oauth2_token_fetcher_http_response( r->cb(r->user_data, c->access_token_md->entries, c->access_token_md->num_entries, status); } else { - c->token_expiration = gpr_inf_past; + c->token_expiration = gpr_inf_past(GPR_CLOCK_REALTIME); r->cb(r->user_data, NULL, 0, status); } gpr_mu_unlock(&c->mu); @@ -564,8 +565,8 @@ static void oauth2_token_fetcher_get_request_metadata( grpc_credentials_metadata_cb cb, void *user_data) { grpc_oauth2_token_fetcher_credentials *c = (grpc_oauth2_token_fetcher_credentials *)creds; - gpr_timespec refresh_threshold = {GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS, - 0}; + gpr_timespec refresh_threshold = gpr_time_from_seconds( + GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS, GPR_TIMESPAN); grpc_credentials_md_store *cached_access_token_md = NULL; { gpr_mu_lock(&c->mu); @@ -596,7 +597,7 @@ static void init_oauth2_token_fetcher(grpc_oauth2_token_fetcher_credentials *c, c->base.type = GRPC_CREDENTIALS_TYPE_OAUTH2; gpr_ref_init(&c->base.refcount, 1); gpr_mu_init(&c->mu); - c->token_expiration = gpr_inf_past; + c->token_expiration = gpr_inf_past(GPR_CLOCK_REALTIME); c->fetch_func = fetch_func; grpc_httpcli_context_init(&c->httpcli_context); } diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index f622deff42..833484310f 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -91,7 +91,7 @@ static int is_stack_running_on_compute_engine(void) { /* The http call is local. If it takes more than one sec, it is for sure not on compute engine. */ - gpr_timespec max_detection_delay = {1, 0}; + gpr_timespec max_detection_delay = gpr_time_from_seconds(1, GPR_TIMESPAN); grpc_pollset_init(&detector.pollset); detector.is_done = 0; @@ -112,7 +112,7 @@ static int is_stack_running_on_compute_engine(void) { called once for the lifetime of the process by the default credentials. */ gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset)); while (!detector.is_done) { - grpc_pollset_work(&detector.pollset, gpr_inf_future); + grpc_pollset_work(&detector.pollset, gpr_inf_future(GPR_CLOCK_REALTIME)); } gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset)); diff --git a/src/core/security/json_token.c b/src/core/security/json_token.c index 9b1ea255ae..021912f333 100644 --- a/src/core/security/json_token.c +++ b/src/core/security/json_token.c @@ -49,7 +49,7 @@ /* --- Constants. --- */ /* 1 hour max. */ -const gpr_timespec grpc_max_auth_token_lifetime = {3600, 0}; +const gpr_timespec grpc_max_auth_token_lifetime = {3600, 0, GPR_TIMESPAN}; #define GRPC_JWT_RSA_SHA256_ALGORITHM "RS256" #define GRPC_JWT_TYPE "JWT" diff --git a/src/core/security/jwt_verifier.c b/src/core/security/jwt_verifier.c index 9140eb2ef7..1276693da7 100644 --- a/src/core/security/jwt_verifier.c +++ b/src/core/security/jwt_verifier.c @@ -109,7 +109,7 @@ static const char *validate_string_field(const grpc_json *json, static gpr_timespec validate_time_field(const grpc_json *json, const char *key) { - gpr_timespec result = gpr_time_0; + gpr_timespec result = gpr_time_0(GPR_CLOCK_REALTIME); if (json->type != GRPC_JSON_NUMBER) { gpr_log(GPR_ERROR, "Invalid %s field [%s]", key, json->value); return result; @@ -221,17 +221,17 @@ const char *grpc_jwt_claims_audience(const grpc_jwt_claims *claims) { } gpr_timespec grpc_jwt_claims_issued_at(const grpc_jwt_claims *claims) { - if (claims == NULL) return gpr_inf_past; + if (claims == NULL) return gpr_inf_past(GPR_CLOCK_REALTIME); return claims->iat; } gpr_timespec grpc_jwt_claims_expires_at(const grpc_jwt_claims *claims) { - if (claims == NULL) return gpr_inf_future; + if (claims == NULL) return gpr_inf_future(GPR_CLOCK_REALTIME); return claims->exp; } gpr_timespec grpc_jwt_claims_not_before(const grpc_jwt_claims *claims) { - if (claims == NULL) return gpr_inf_past; + if (claims == NULL) return gpr_inf_past(GPR_CLOCK_REALTIME); return claims->nbf; } @@ -242,9 +242,9 @@ grpc_jwt_claims *grpc_jwt_claims_from_json(grpc_json *json, gpr_slice buffer) { memset(claims, 0, sizeof(grpc_jwt_claims)); claims->json = json; claims->buffer = buffer; - claims->iat = gpr_inf_past; - claims->nbf = gpr_inf_past; - claims->exp = gpr_inf_future; + claims->iat = gpr_inf_past(GPR_CLOCK_REALTIME); + claims->nbf = gpr_inf_past(GPR_CLOCK_REALTIME); + claims->exp = gpr_inf_future(GPR_CLOCK_REALTIME); /* Per the spec, all fields are optional. */ for (cur = json->child; cur != NULL; cur = cur->next) { @@ -262,13 +262,16 @@ grpc_jwt_claims *grpc_jwt_claims_from_json(grpc_json *json, gpr_slice buffer) { if (claims->jti == NULL) goto error; } else if (strcmp(cur->key, "iat") == 0) { claims->iat = validate_time_field(cur, "iat"); - if (gpr_time_cmp(claims->iat, gpr_time_0) == 0) goto error; + if (gpr_time_cmp(claims->iat, gpr_time_0(GPR_CLOCK_REALTIME)) == 0) + goto error; } else if (strcmp(cur->key, "exp") == 0) { claims->exp = validate_time_field(cur, "exp"); - if (gpr_time_cmp(claims->exp, gpr_time_0) == 0) goto error; + if (gpr_time_cmp(claims->exp, gpr_time_0(GPR_CLOCK_REALTIME)) == 0) + goto error; } else if (strcmp(cur->key, "nbf") == 0) { claims->nbf = validate_time_field(cur, "nbf"); - if (gpr_time_cmp(claims->nbf, gpr_time_0) == 0) goto error; + if (gpr_time_cmp(claims->nbf, gpr_time_0(GPR_CLOCK_REALTIME)) == 0) + goto error; } } return claims; @@ -359,10 +362,10 @@ void verifier_cb_ctx_destroy(verifier_cb_ctx *ctx) { /* --- grpc_jwt_verifier object. --- */ /* Clock skew defaults to one minute. */ -gpr_timespec grpc_jwt_verifier_clock_skew = {60, 0}; +gpr_timespec grpc_jwt_verifier_clock_skew = {60, 0, GPR_TIMESPAN}; /* Max delay defaults to one minute. */ -gpr_timespec grpc_jwt_verifier_max_delay = {60, 0}; +gpr_timespec grpc_jwt_verifier_max_delay = {60, 0, GPR_TIMESPAN}; typedef struct { char *email_domain; diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c index 731b382f09..0c3572b53c 100644 --- a/src/core/security/secure_transport_setup.c +++ b/src/core/security/secure_transport_setup.c @@ -47,7 +47,8 @@ typedef struct { tsi_handshaker *handshaker; unsigned char *handshake_buffer; size_t handshake_buffer_size; - grpc_endpoint *endpoint; + grpc_endpoint *wrapped_endpoint; + grpc_endpoint *secure_endpoint; gpr_slice_buffer left_overs; grpc_secure_transport_setup_done_cb cb; void *user_data; @@ -63,13 +64,16 @@ static void on_handshake_data_sent_to_peer(void *setup, static void secure_transport_setup_done(grpc_secure_transport_setup *s, int is_success) { if (is_success) { - s->cb(s->user_data, GRPC_SECURITY_OK, s->endpoint); + s->cb(s->user_data, GRPC_SECURITY_OK, s->wrapped_endpoint, + s->secure_endpoint); } else { - if (s->endpoint != NULL) { - grpc_endpoint_shutdown(s->endpoint); - grpc_endpoint_destroy(s->endpoint); + if (s->secure_endpoint != NULL) { + grpc_endpoint_shutdown(s->secure_endpoint); + grpc_endpoint_destroy(s->secure_endpoint); + } else { + grpc_endpoint_destroy(s->wrapped_endpoint); } - s->cb(s->user_data, GRPC_SECURITY_ERROR, NULL); + s->cb(s->user_data, GRPC_SECURITY_ERROR, s->wrapped_endpoint, NULL); } if (s->handshaker != NULL) tsi_handshaker_destroy(s->handshaker); if (s->handshake_buffer != NULL) gpr_free(s->handshake_buffer); @@ -95,8 +99,9 @@ static void on_peer_checked(void *user_data, grpc_security_status status) { secure_transport_setup_done(s, 0); return; } - s->endpoint = grpc_secure_endpoint_create( - protector, s->endpoint, s->left_overs.slices, s->left_overs.count); + s->secure_endpoint = + grpc_secure_endpoint_create(protector, s->wrapped_endpoint, + s->left_overs.slices, s->left_overs.count); secure_transport_setup_done(s, 1); return; } @@ -152,7 +157,7 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) { gpr_slice_from_copied_buffer((const char *)s->handshake_buffer, offset); /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ - write_status = grpc_endpoint_write(s->endpoint, &to_send, 1, + write_status = grpc_endpoint_write(s->wrapped_endpoint, &to_send, 1, on_handshake_data_sent_to_peer, s); if (write_status == GRPC_ENDPOINT_WRITE_ERROR) { gpr_log(GPR_ERROR, "Could not send handshake data to peer."); @@ -198,7 +203,7 @@ static void on_handshake_data_received_from_peer( if (result == TSI_INCOMPLETE_DATA) { /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ - grpc_endpoint_notify_on_read(s->endpoint, + grpc_endpoint_notify_on_read(s->wrapped_endpoint, on_handshake_data_received_from_peer, setup); cleanup_slices(slices, nslices); return; @@ -256,7 +261,7 @@ static void on_handshake_data_sent_to_peer(void *setup, if (tsi_handshaker_is_in_progress(s->handshaker)) { /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ - grpc_endpoint_notify_on_read(s->endpoint, + grpc_endpoint_notify_on_read(s->wrapped_endpoint, on_handshake_data_received_from_peer, setup); } else { check_peer(s); @@ -280,7 +285,7 @@ void grpc_setup_secure_transport(grpc_security_connector *connector, GRPC_SECURITY_CONNECTOR_REF(connector, "secure_transport_setup"); s->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE; s->handshake_buffer = gpr_malloc(s->handshake_buffer_size); - s->endpoint = nonsecure_endpoint; + s->wrapped_endpoint = nonsecure_endpoint; s->user_data = user_data; s->cb = cb; gpr_slice_buffer_init(&s->left_overs); diff --git a/src/core/security/secure_transport_setup.h b/src/core/security/secure_transport_setup.h index 58701c461d..29025f5236 100644 --- a/src/core/security/secure_transport_setup.h +++ b/src/core/security/secure_transport_setup.h @@ -42,7 +42,7 @@ /* Ownership of the secure_endpoint is transfered. */ typedef void (*grpc_secure_transport_setup_done_cb)( void *user_data, grpc_security_status status, - grpc_endpoint *secure_endpoint); + grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint); /* Calls the callback upon completion. */ void grpc_setup_secure_transport(grpc_security_connector *connector, diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index 8a7ada07af..3717b8989f 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -51,10 +51,16 @@ #include <grpc/support/sync.h> #include <grpc/support/useful.h> +typedef struct tcp_endpoint_list { + grpc_endpoint *tcp_endpoint; + struct tcp_endpoint_list *next; +} tcp_endpoint_list; + typedef struct grpc_server_secure_state { grpc_server *server; grpc_tcp_server *tcp; grpc_security_connector *sc; + tcp_endpoint_list *handshaking_tcp_endpoints; int is_shutdown; gpr_mu mu; gpr_refcount refcount; @@ -88,14 +94,37 @@ static void setup_transport(void *statep, grpc_transport *transport, grpc_channel_args_destroy(args_copy); } +static int remove_tcp_from_list_locked(grpc_server_secure_state *state, + grpc_endpoint *tcp) { + tcp_endpoint_list *node = state->handshaking_tcp_endpoints; + tcp_endpoint_list *tmp = NULL; + if (node && node->tcp_endpoint == tcp) { + state->handshaking_tcp_endpoints = state->handshaking_tcp_endpoints->next; + gpr_free(node); + return 0; + } + while (node) { + if (node->next->tcp_endpoint == tcp) { + tmp = node->next; + node->next = node->next->next; + gpr_free(tmp); + return 0; + } + node = node->next; + } + return -1; +} + static void on_secure_transport_setup_done(void *statep, grpc_security_status status, + grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint) { grpc_server_secure_state *state = statep; grpc_transport *transport; grpc_mdctx *mdctx; if (status == GRPC_SECURITY_OK) { gpr_mu_lock(&state->mu); + remove_tcp_from_list_locked(state, wrapped_endpoint); if (!state->is_shutdown) { mdctx = grpc_mdctx_create(); transport = grpc_create_chttp2_transport( @@ -110,6 +139,9 @@ static void on_secure_transport_setup_done(void *statep, } gpr_mu_unlock(&state->mu); } else { + gpr_mu_lock(&state->mu); + remove_tcp_from_list_locked(state, wrapped_endpoint); + gpr_mu_unlock(&state->mu); gpr_log(GPR_ERROR, "Secure transport failed with error %d", status); } state_unref(state); @@ -117,7 +149,14 @@ static void on_secure_transport_setup_done(void *statep, static void on_accept(void *statep, grpc_endpoint *tcp) { grpc_server_secure_state *state = statep; + tcp_endpoint_list *node; state_ref(state); + node = gpr_malloc(sizeof(tcp_endpoint_list)); + node->tcp_endpoint = tcp; + gpr_mu_lock(&state->mu); + node->next = state->handshaking_tcp_endpoints; + state->handshaking_tcp_endpoints = node; + gpr_mu_unlock(&state->mu); grpc_setup_secure_transport(state->sc, tcp, on_secure_transport_setup_done, state); } @@ -132,6 +171,13 @@ static void start(grpc_server *server, void *statep, grpc_pollset **pollsets, static void destroy_done(void *statep) { grpc_server_secure_state *state = statep; grpc_server_listener_destroy_done(state->server); + gpr_mu_lock(&state->mu); + while (state->handshaking_tcp_endpoints != NULL) { + grpc_endpoint_shutdown(state->handshaking_tcp_endpoints->tcp_endpoint); + remove_tcp_from_list_locked(state, + state->handshaking_tcp_endpoints->tcp_endpoint); + } + gpr_mu_unlock(&state->mu); state_unref(state); } @@ -209,6 +255,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, state->server = server; state->tcp = tcp; state->sc = sc; + state->handshaking_tcp_endpoints = NULL; state->is_shutdown = 0; gpr_mu_init(&state->mu); gpr_ref_init(&state->refcount, 1); diff --git a/src/core/support/cancellable.c b/src/core/support/cancellable.c index 3cbb318ab6..4756f1e125 100644 --- a/src/core/support/cancellable.c +++ b/src/core/support/cancellable.c @@ -121,8 +121,9 @@ void gpr_cancellable_cancel(gpr_cancellable *c) { } else { gpr_event ev; gpr_event_init(&ev); - gpr_event_wait(&ev, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(1000))); + gpr_event_wait( + &ev, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(1000, GPR_TIMESPAN))); } } } while (failures != 0); diff --git a/src/core/support/sync_posix.c b/src/core/support/sync_posix.c index 0ccbd4923f..41af8ceb0a 100644 --- a/src/core/support/sync_posix.c +++ b/src/core/support/sync_posix.c @@ -63,7 +63,7 @@ void gpr_cv_destroy(gpr_cv *cv) { GPR_ASSERT(pthread_cond_destroy(cv) == 0); } int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline) { int err = 0; - if (gpr_time_cmp(abs_deadline, gpr_inf_future) == 0) { + if (gpr_time_cmp(abs_deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) == 0) { err = pthread_cond_wait(cv, mu); } else { struct timespec abs_deadline_ts; diff --git a/src/core/support/sync_win32.c b/src/core/support/sync_win32.c index 29b77fc4c2..63196d10d3 100644 --- a/src/core/support/sync_win32.c +++ b/src/core/support/sync_win32.c @@ -83,7 +83,7 @@ int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline) { int timeout = 0; DWORD timeout_max_ms; mu->locked = 0; - if (gpr_time_cmp(abs_deadline, gpr_inf_future) == 0) { + if (gpr_time_cmp(abs_deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) == 0) { SleepConditionVariableCS(cv, &mu->cs, INFINITE); } else { gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); diff --git a/src/core/support/time.c b/src/core/support/time.c index d47b08b266..570f195bd1 100644 --- a/src/core/support/time.c +++ b/src/core/support/time.c @@ -41,6 +41,7 @@ int gpr_time_cmp(gpr_timespec a, gpr_timespec b) { int cmp = (a.tv_sec > b.tv_sec) - (a.tv_sec < b.tv_sec); + GPR_ASSERT(a.clock_type == b.clock_type); if (cmp == 0) { cmp = (a.tv_nsec > b.tv_nsec) - (a.tv_nsec < b.tv_nsec); } @@ -71,19 +72,40 @@ gpr_timespec gpr_time_max(gpr_timespec a, gpr_timespec b) { ((t)(TYPE_IS_SIGNED(t) ? (TOP_BIT_OF_TYPE(t) - 1) \ : ((TOP_BIT_OF_TYPE(t) - 1) << 1) + 1)) -const gpr_timespec gpr_time_0 = {0, 0}; -const gpr_timespec gpr_inf_future = {TYPE_MAX(time_t), 0}; -const gpr_timespec gpr_inf_past = {TYPE_MIN(time_t), 0}; +gpr_timespec gpr_time_0(gpr_clock_type type) { + gpr_timespec out; + out.tv_sec = 0; + out.tv_nsec = 0; + out.clock_type = type; + return out; +} + +gpr_timespec gpr_inf_future(gpr_clock_type type) { + gpr_timespec out; + out.tv_sec = TYPE_MAX(time_t); + out.tv_nsec = 0; + out.clock_type = type; + return out; +} + +gpr_timespec gpr_inf_past(gpr_clock_type type) { + gpr_timespec out; + out.tv_sec = TYPE_MIN(time_t); + out.tv_nsec = 0; + out.clock_type = type; + return out; +} /* TODO(ctiller): consider merging _nanos, _micros, _millis into a single function for maintainability. Similarly for _seconds, _minutes, and _hours */ -gpr_timespec gpr_time_from_nanos(long ns) { +gpr_timespec gpr_time_from_nanos(long ns, gpr_clock_type type) { gpr_timespec result; + result.clock_type = type; if (ns == LONG_MAX) { - result = gpr_inf_future; + result = gpr_inf_future(type); } else if (ns == LONG_MIN) { - result = gpr_inf_past; + result = gpr_inf_past(type); } else if (ns >= 0) { result.tv_sec = ns / GPR_NS_PER_SEC; result.tv_nsec = (int)(ns - result.tv_sec * GPR_NS_PER_SEC); @@ -95,12 +117,13 @@ gpr_timespec gpr_time_from_nanos(long ns) { return result; } -gpr_timespec gpr_time_from_micros(long us) { +gpr_timespec gpr_time_from_micros(long us, gpr_clock_type type) { gpr_timespec result; + result.clock_type = type; if (us == LONG_MAX) { - result = gpr_inf_future; + result = gpr_inf_future(type); } else if (us == LONG_MIN) { - result = gpr_inf_past; + result = gpr_inf_past(type); } else if (us >= 0) { result.tv_sec = us / 1000000; result.tv_nsec = (int)((us - result.tv_sec * 1000000) * 1000); @@ -112,12 +135,13 @@ gpr_timespec gpr_time_from_micros(long us) { return result; } -gpr_timespec gpr_time_from_millis(long ms) { +gpr_timespec gpr_time_from_millis(long ms, gpr_clock_type type) { gpr_timespec result; + result.clock_type = type; if (ms == LONG_MAX) { - result = gpr_inf_future; + result = gpr_inf_future(type); } else if (ms == LONG_MIN) { - result = gpr_inf_past; + result = gpr_inf_past(type); } else if (ms >= 0) { result.tv_sec = ms / 1000; result.tv_nsec = (int)((ms - result.tv_sec * 1000) * 1000000); @@ -129,12 +153,13 @@ gpr_timespec gpr_time_from_millis(long ms) { return result; } -gpr_timespec gpr_time_from_seconds(long s) { +gpr_timespec gpr_time_from_seconds(long s, gpr_clock_type type) { gpr_timespec result; + result.clock_type = type; if (s == LONG_MAX) { - result = gpr_inf_future; + result = gpr_inf_future(type); } else if (s == LONG_MIN) { - result = gpr_inf_past; + result = gpr_inf_past(type); } else { result.tv_sec = s; result.tv_nsec = 0; @@ -142,12 +167,13 @@ gpr_timespec gpr_time_from_seconds(long s) { return result; } -gpr_timespec gpr_time_from_minutes(long m) { +gpr_timespec gpr_time_from_minutes(long m, gpr_clock_type type) { gpr_timespec result; + result.clock_type = type; if (m >= LONG_MAX / 60) { - result = gpr_inf_future; + result = gpr_inf_future(type); } else if (m <= LONG_MIN / 60) { - result = gpr_inf_past; + result = gpr_inf_past(type); } else { result.tv_sec = m * 60; result.tv_nsec = 0; @@ -155,12 +181,13 @@ gpr_timespec gpr_time_from_minutes(long m) { return result; } -gpr_timespec gpr_time_from_hours(long h) { +gpr_timespec gpr_time_from_hours(long h, gpr_clock_type type) { gpr_timespec result; + result.clock_type = type; if (h >= LONG_MAX / 3600) { - result = gpr_inf_future; + result = gpr_inf_future(type); } else if (h <= LONG_MIN / 3600) { - result = gpr_inf_past; + result = gpr_inf_past(type); } else { result.tv_sec = h * 3600; result.tv_nsec = 0; @@ -171,6 +198,8 @@ gpr_timespec gpr_time_from_hours(long h) { gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b) { gpr_timespec sum; int inc = 0; + GPR_ASSERT(b.clock_type == GPR_TIMESPAN); + sum.clock_type = a.clock_type; sum.tv_nsec = a.tv_nsec + b.tv_nsec; if (sum.tv_nsec >= GPR_NS_PER_SEC) { sum.tv_nsec -= GPR_NS_PER_SEC; @@ -180,14 +209,14 @@ gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b) { sum = a; } else if (b.tv_sec == TYPE_MAX(time_t) || (b.tv_sec >= 0 && a.tv_sec >= TYPE_MAX(time_t) - b.tv_sec)) { - sum = gpr_inf_future; + sum = gpr_inf_future(sum.clock_type); } else if (b.tv_sec == TYPE_MIN(time_t) || (b.tv_sec <= 0 && a.tv_sec <= TYPE_MIN(time_t) - b.tv_sec)) { - sum = gpr_inf_past; + sum = gpr_inf_past(sum.clock_type); } else { sum.tv_sec = a.tv_sec + b.tv_sec; if (inc != 0 && sum.tv_sec == TYPE_MAX(time_t) - 1) { - sum = gpr_inf_future; + sum = gpr_inf_future(sum.clock_type); } else { sum.tv_sec += inc; } @@ -198,6 +227,12 @@ gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b) { gpr_timespec gpr_time_sub(gpr_timespec a, gpr_timespec b) { gpr_timespec diff; int dec = 0; + if (b.clock_type == GPR_TIMESPAN) { + diff.clock_type = a.clock_type; + } else { + GPR_ASSERT(a.clock_type == b.clock_type); + diff.clock_type = GPR_TIMESPAN; + } diff.tv_nsec = a.tv_nsec - b.tv_nsec; if (diff.tv_nsec < 0) { diff.tv_nsec += GPR_NS_PER_SEC; @@ -207,14 +242,14 @@ gpr_timespec gpr_time_sub(gpr_timespec a, gpr_timespec b) { diff = a; } else if (b.tv_sec == TYPE_MIN(time_t) || (b.tv_sec <= 0 && a.tv_sec >= TYPE_MAX(time_t) + b.tv_sec)) { - diff = gpr_inf_future; + diff = gpr_inf_future(GPR_CLOCK_REALTIME); } else if (b.tv_sec == TYPE_MAX(time_t) || (b.tv_sec >= 0 && a.tv_sec <= TYPE_MIN(time_t) + b.tv_sec)) { - diff = gpr_inf_past; + diff = gpr_inf_past(GPR_CLOCK_REALTIME); } else { diff.tv_sec = a.tv_sec - b.tv_sec; if (dec != 0 && diff.tv_sec == TYPE_MIN(time_t) + 1) { - diff = gpr_inf_past; + diff = gpr_inf_past(GPR_CLOCK_REALTIME); } else { diff.tv_sec -= dec; } @@ -225,6 +260,9 @@ gpr_timespec gpr_time_sub(gpr_timespec a, gpr_timespec b) { int gpr_time_similar(gpr_timespec a, gpr_timespec b, gpr_timespec threshold) { int cmp_ab; + GPR_ASSERT(a.clock_type == b.clock_type); + GPR_ASSERT(threshold.clock_type == GPR_TIMESPAN); + cmp_ab = gpr_time_cmp(a, b); if (cmp_ab == 0) return 1; if (cmp_ab < 0) { diff --git a/src/core/support/time_posix.c b/src/core/support/time_posix.c index f9b7958783..258b2e640e 100644 --- a/src/core/support/time_posix.c +++ b/src/core/support/time_posix.c @@ -38,6 +38,7 @@ #include <stdlib.h> #include <time.h> #include <unistd.h> +#include <grpc/support/log.h> #include <grpc/support/time.h> static struct timespec timespec_from_gpr(gpr_timespec gts) { @@ -48,10 +49,12 @@ static struct timespec timespec_from_gpr(gpr_timespec gts) { } #if _POSIX_TIMERS > 0 -static gpr_timespec gpr_from_timespec(struct timespec ts) { +static gpr_timespec gpr_from_timespec(struct timespec ts, + gpr_clock_type clock) { gpr_timespec rv; rv.tv_sec = ts.tv_sec; rv.tv_nsec = (int)ts.tv_nsec; + rv.clock_type = clock; return rv; } @@ -62,8 +65,9 @@ void gpr_time_init(void) {} gpr_timespec gpr_now(gpr_clock_type clock) { struct timespec now; + GPR_ASSERT(clock != GPR_TIMESPAN); clock_gettime(clockid_for_gpr_clock[clock], &now); - return gpr_from_timespec(now); + return gpr_from_timespec(now, clock); } #else /* For some reason Apple's OSes haven't implemented clock_gettime. */ @@ -88,6 +92,7 @@ gpr_timespec gpr_now(gpr_clock_type clock) { struct timeval now_tv; double now_dbl; + now.clock_type = clock; switch (clock) { case GPR_CLOCK_REALTIME: gettimeofday(&now_tv, NULL); @@ -99,6 +104,8 @@ gpr_timespec gpr_now(gpr_clock_type clock) { now.tv_sec = now_dbl * 1e-9; now.tv_nsec = now_dbl - now.tv_sec * 1e9; break; + case GPR_TIMESPAN: + abort(); } return now; diff --git a/src/core/support/time_win32.c b/src/core/support/time_win32.c index fa77c74eeb..238cd07ebc 100644 --- a/src/core/support/time_win32.c +++ b/src/core/support/time_win32.c @@ -55,6 +55,7 @@ gpr_timespec gpr_now(gpr_clock_type clock) { struct _timeb now_tb; LARGE_INTEGER timestamp; double now_dbl; + now_tv.clock_type = clock; switch (clock) { case GPR_CLOCK_REALTIME: _ftime_s(&now_tb); diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c index 7c31bfe5da..e47dc4f4ce 100644 --- a/src/core/surface/byte_buffer_queue.c +++ b/src/core/surface/byte_buffer_queue.c @@ -62,6 +62,7 @@ int grpc_bbq_empty(grpc_byte_buffer_queue *q) { } void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) { + q->bytes += grpc_byte_buffer_length(buffer); bba_push(&q->filling, buffer); } @@ -72,8 +73,11 @@ void grpc_bbq_flush(grpc_byte_buffer_queue *q) { } } +size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q) { return q->bytes; } + grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) { grpc_bbq_array temp_array; + grpc_byte_buffer *out; if (q->drain_pos == q->draining.count) { if (q->filling.count == 0) { @@ -87,5 +91,7 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) { q->draining = temp_array; } - return q->draining.data[q->drain_pos++]; + out = q->draining.data[q->drain_pos++]; + q->bytes -= grpc_byte_buffer_length(out); + return out; } diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h index 32c57f8756..f01958984f 100644 --- a/src/core/surface/byte_buffer_queue.h +++ b/src/core/surface/byte_buffer_queue.h @@ -49,6 +49,7 @@ typedef struct { size_t drain_pos; grpc_bbq_array filling; grpc_bbq_array draining; + size_t bytes; } grpc_byte_buffer_queue; void grpc_bbq_destroy(grpc_byte_buffer_queue *q); @@ -56,5 +57,6 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q); void grpc_bbq_flush(grpc_byte_buffer_queue *q); int grpc_bbq_empty(grpc_byte_buffer_queue *q); void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb); +size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q); #endif /* GRPC_INTERNAL_CORE_SURFACE_BYTE_BUFFER_QUEUE_H */ diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 0a551ac47f..1146d83982 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -347,7 +347,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, } grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr, CALL_STACK_FROM_CALL(call)); - if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) { + if (gpr_time_cmp(send_deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) { set_deadline_alarm(call, send_deadline); } return call; @@ -513,6 +513,8 @@ static void unlock(grpc_call *call) { int completing_requests = 0; int start_op = 0; int i; + const gpr_uint32 MAX_RECV_PEEK_AHEAD = 65536; + size_t buffered_bytes; int cancel_alarm = 0; memset(&op, 0, sizeof(op)); @@ -528,6 +530,17 @@ static void unlock(grpc_call *call) { op.recv_ops = &call->recv_ops; op.recv_state = &call->recv_state; op.on_done_recv = &call->on_done_recv; + if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) { + op.max_recv_bytes = call->incoming_message_length - + call->incoming_message.length + MAX_RECV_PEEK_AHEAD; + } else { + buffered_bytes = grpc_bbq_bytes(&call->incoming_queue); + if (buffered_bytes > MAX_RECV_PEEK_AHEAD) { + op.max_recv_bytes = 0; + } else { + op.max_recv_bytes = MAX_RECV_PEEK_AHEAD - buffered_bytes; + } + } call->receiving = 1; GRPC_CALL_INTERNAL_REF(call, "receiving"); start_op = 1; @@ -972,7 +985,7 @@ static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op) { mdb.list = chain_metadata_from_app(call, data.send_metadata.count, data.send_metadata.metadata); mdb.garbage.head = mdb.garbage.tail = NULL; - mdb.deadline = gpr_inf_future; + mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); /* send status */ /* TODO(ctiller): cache common status values */ data = call->request_data[GRPC_IOREQ_SEND_STATUS]; @@ -1325,7 +1338,7 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { l->md = 0; } } - if (gpr_time_cmp(md->deadline, gpr_inf_future) != 0) { + if (gpr_time_cmp(md->deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) { set_deadline_alarm(call, md->deadline); } if (!is_trailing) { diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index c67f75fc5c..8484418247 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -116,7 +116,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc) { void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, void (*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { - int shutdown = gpr_unref(&cc->pending_events); + int shutdown; storage->tag = tag; storage->done = done; @@ -124,15 +124,15 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, storage->next = ((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0)); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + shutdown = gpr_unref(&cc->pending_events); if (!shutdown) { - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); cc->completed_tail->next = ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); cc->completed_tail = storage; grpc_pollset_kick(&cc->pollset); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } else { - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); cc->completed_tail->next = ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); cc->completed_tail = storage; @@ -260,8 +260,9 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) { gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); grpc_pollset_kick(&cc->pollset); - grpc_pollset_work(&cc->pollset, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(100))); + grpc_pollset_work(&cc->pollset, + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(100, GPR_TIMESPAN))); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 3dd56fe5a9..3f2bb5c8a9 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -72,7 +72,7 @@ static void lame_start_transport_stream_op(grpc_call_element *elem, mdb.list.head = &calld->status; mdb.list.tail = &calld->details; mdb.garbage.head = mdb.garbage.tail = NULL; - mdb.deadline = gpr_inf_future; + mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); grpc_sopb_add_metadata(op->recv_ops, mdb); *op->recv_state = GRPC_STREAM_CLOSED; op->on_done_recv->cb(op->on_done_recv->cb_arg, 1); diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 34ee3f8400..f3c7d8397b 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -75,6 +75,7 @@ static void connector_unref(grpc_connector *con) { static void on_secure_transport_setup_done(void *arg, grpc_security_status status, + grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint) { connector *c = arg; grpc_iomgr_closure *notify; diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 4dc51bf031..37f0c3c005 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -487,9 +487,9 @@ static void maybe_finish_shutdown(grpc_server *server) { if (server->root_channel_data.next != &server->root_channel_data || server->listeners_destroyed < num_listeners(server)) { - if (gpr_time_cmp( - gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), server->last_shutdown_message_time), - gpr_time_from_seconds(1)) >= 0) { + if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), + server->last_shutdown_message_time), + gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) { server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME); gpr_log(GPR_DEBUG, "Waiting for %d channels and %d/%d listeners to be destroyed" @@ -536,7 +536,8 @@ static void server_on_recv(void *ptr, int success) { grpc_stream_op *op = &ops[i]; if (op->type != GRPC_OP_METADATA) continue; grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem); - if (0 != gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future)) { + if (0 != gpr_time_cmp(op->data.metadata.deadline, + gpr_inf_future(GPR_CLOCK_REALTIME))) { calld->deadline = op->data.metadata.deadline; } calld->got_initial_metadata = 1; @@ -610,7 +611,7 @@ static void accept_stream(void *cd, grpc_transport *transport, channel_data *chand = cd; /* create a call */ grpc_call_create(chand->channel, NULL, transport_server_data, NULL, 0, - gpr_inf_future); + gpr_inf_future(GPR_CLOCK_REALTIME)); } static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) { @@ -638,7 +639,7 @@ static void init_call_elem(grpc_call_element *elem, call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; memset(calld, 0, sizeof(call_data)); - calld->deadline = gpr_inf_future; + calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); calld->call = grpc_call_from_top_element(elem); gpr_mu_init(&calld->mu_state); diff --git a/src/core/transport/chttp2/frame_window_update.c b/src/core/transport/chttp2/frame_window_update.c index b817df7745..d624298ad2 100644 --- a/src/core/transport/chttp2/frame_window_update.c +++ b/src/core/transport/chttp2/frame_window_update.c @@ -94,8 +94,8 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse( } GPR_ASSERT(is_last); - if (transport_parsing->incoming_stream_id) { - if (stream_parsing) { + if (transport_parsing->incoming_stream_id != 0) { + if (stream_parsing != NULL) { GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("update", transport_parsing, stream_parsing, outgoing_window_update, p->amount); diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c index 77162a6864..974b864ffb 100644 --- a/src/core/transport/chttp2/incoming_metadata.c +++ b/src/core/transport/chttp2/incoming_metadata.c @@ -42,7 +42,7 @@ void grpc_chttp2_incoming_metadata_buffer_init( grpc_chttp2_incoming_metadata_buffer *buffer) { - buffer->deadline = gpr_inf_future; + buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } void grpc_chttp2_incoming_metadata_buffer_destroy( @@ -87,7 +87,7 @@ void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into( b.list.tail = (void *)(gpr_intptr)buffer->count; b.garbage.head = b.garbage.tail = NULL; b.deadline = buffer->deadline; - buffer->deadline = gpr_inf_future; + buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); grpc_sopb_add_metadata(sopb, b); } diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index bdd4b432eb..e5e6f445b7 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -353,7 +353,19 @@ typedef struct { /** window available for us to send to peer */ gpr_int64 outgoing_window; - /** window available for peer to send to us - updated after parse */ + /** The number of bytes the upper layers have offered to receive. + As the upper layer offers more bytes, this value increases. + As bytes are read, this value decreases. */ + gpr_uint32 max_recv_bytes; + /** The number of bytes the upper layer has offered to read but we have + not yet announced to HTTP2 flow control. + As the upper layers offer to read more bytes, this value increases. + As we advertise incoming flow control window, this value decreases. */ + gpr_uint32 unannounced_incoming_window; + /** The number of bytes of HTTP2 flow control we have advertised. + As we advertise incoming flow control window, this value increases. + As bytes are read, this value decreases. + Updated after parse. */ gpr_uint32 incoming_window; /** stream ops the transport user would like to send */ grpc_stream_op_buffer *outgoing_sopb; @@ -391,6 +403,8 @@ typedef struct { grpc_stream_op_buffer sopb; /** how strongly should we indicate closure with the next write */ grpc_chttp2_send_closed send_closed; + /** how much window should we announce? */ + gpr_uint32 announce_window; } grpc_chttp2_stream_writing; struct grpc_chttp2_stream_parsing { @@ -501,7 +515,9 @@ void grpc_chttp2_list_add_writable_window_update_stream( grpc_chttp2_stream_global *stream_global); int grpc_chttp2_list_pop_writable_window_update_stream( grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global **stream_global); + grpc_chttp2_transport_writing *transport_writing, + grpc_chttp2_stream_global **stream_global, + grpc_chttp2_stream_writing **stream_writing); void grpc_chttp2_list_remove_writable_window_update_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 9597395aab..aa32f2e44a 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -173,7 +173,14 @@ void grpc_chttp2_publish_reads( GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( "parsed", transport_parsing, stream_parsing, incoming_window_delta, -(gpr_int64)stream_parsing->incoming_window_delta); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( + "parsed", transport_parsing, stream_global, max_recv_bytes, + -(gpr_int64)stream_parsing->incoming_window_delta); stream_global->incoming_window -= stream_parsing->incoming_window_delta; + GPR_ASSERT(stream_global->max_recv_bytes >= + stream_parsing->incoming_window_delta); + stream_global->max_recv_bytes -= + stream_parsing->incoming_window_delta; stream_parsing->incoming_window_delta = 0; grpc_chttp2_list_add_writable_window_update_stream(transport_global, stream_global); @@ -594,7 +601,7 @@ static void on_header(void *tp, grpc_mdelem *md) { cached_timeout)) { gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'", grpc_mdstr_as_c_string(md->value)); - *cached_timeout = gpr_inf_future; + *cached_timeout = gpr_inf_future(GPR_CLOCK_REALTIME); } grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); } diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index d553d80085..d7fc2da5d3 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -585,7 +585,8 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, l->md = hpack_enc(compressor, l->md, &st); need_unref |= l->md != NULL; } - if (gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future) != 0) { + if (gpr_time_cmp(op->data.metadata.deadline, + gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) { deadline_enc(compressor, op->data.metadata.deadline, &st); } curop++; diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 4fea058c19..590f6abfbc 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -139,6 +139,7 @@ static void stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s, void grpc_chttp2_list_add_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { + GPR_ASSERT(stream_global->id != 0); stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE); } @@ -204,6 +205,7 @@ int grpc_chttp2_list_pop_written_stream( void grpc_chttp2_list_add_writable_window_update_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { + GPR_ASSERT(stream_global->id != 0); stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); @@ -211,11 +213,14 @@ void grpc_chttp2_list_add_writable_window_update_stream( int grpc_chttp2_list_pop_writable_window_update_stream( grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global **stream_global) { + grpc_chttp2_transport_writing *transport_writing, + grpc_chttp2_stream_global **stream_global, + grpc_chttp2_stream_writing **stream_writing) { grpc_chttp2_stream *stream; int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); *stream_global = &stream->global; + *stream_writing = &stream->writing; return r; } diff --git a/src/core/transport/chttp2/timeout_encoding.c b/src/core/transport/chttp2/timeout_encoding.c index 33915c4039..1dd768ada4 100644 --- a/src/core/transport/chttp2/timeout_encoding.c +++ b/src/core/transport/chttp2/timeout_encoding.c @@ -147,7 +147,7 @@ int grpc_chttp2_decode_timeout(const char *buffer, gpr_timespec *timeout) { gpr_uint32 xp = x * 10 + *p - '0'; have_digit = 1; if (xp < x) { - *timeout = gpr_inf_future; + *timeout = gpr_inf_future(GPR_CLOCK_REALTIME); return 1; } x = xp; @@ -159,22 +159,22 @@ int grpc_chttp2_decode_timeout(const char *buffer, gpr_timespec *timeout) { /* decode unit specifier */ switch (*p) { case 'n': - *timeout = gpr_time_from_nanos(x); + *timeout = gpr_time_from_nanos(x, GPR_TIMESPAN); break; case 'u': - *timeout = gpr_time_from_micros(x); + *timeout = gpr_time_from_micros(x, GPR_TIMESPAN); break; case 'm': - *timeout = gpr_time_from_millis(x); + *timeout = gpr_time_from_millis(x, GPR_TIMESPAN); break; case 'S': - *timeout = gpr_time_from_seconds(x); + *timeout = gpr_time_from_seconds(x, GPR_TIMESPAN); break; case 'M': - *timeout = gpr_time_from_minutes(x); + *timeout = gpr_time_from_minutes(x, GPR_TIMESPAN); break; case 'H': - *timeout = gpr_time_from_hours(x); + *timeout = gpr_time_from_hours(x, GPR_TIMESPAN); break; default: return 0; diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index a78654334e..d8ec117aa5 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -66,11 +66,9 @@ int grpc_chttp2_unlocking_check_writes( /* for each grpc_chttp2_stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ - while (transport_global->outgoing_window && - grpc_chttp2_list_pop_writable_stream(transport_global, + while (grpc_chttp2_list_pop_writable_stream(transport_global, transport_writing, &stream_global, - &stream_writing) && - stream_global->outgoing_window > 0) { + &stream_writing)) { stream_writing->id = stream_global->id; window_delta = grpc_chttp2_preencode( stream_global->outgoing_sopb->ops, &stream_global->outgoing_sopb->nops, @@ -106,20 +104,21 @@ int grpc_chttp2_unlocking_check_writes( /* for each grpc_chttp2_stream that wants to update its window, add that * window here */ while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global, - &stream_global)) { - window_delta = - transport_global->settings[GRPC_LOCAL_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] - - stream_global->incoming_window; - if (!stream_global->read_closed && window_delta > 0) { - gpr_slice_buffer_add( - &transport_writing->outbuf, - grpc_chttp2_window_update_create(stream_global->id, window_delta)); + transport_writing, + &stream_global, + &stream_writing)) { + stream_writing->id = stream_global->id; + if (!stream_global->read_closed && stream_global->unannounced_incoming_window > 0) { + stream_writing->announce_window = stream_global->unannounced_incoming_window; GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, - incoming_window, window_delta); - stream_global->incoming_window += window_delta; + incoming_window, stream_global->unannounced_incoming_window); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, + unannounced_incoming_window, -(gpr_int64)stream_global->unannounced_incoming_window); + stream_global->incoming_window += stream_global->unannounced_incoming_window; + stream_global->unannounced_incoming_window = 0; grpc_chttp2_list_add_incoming_window_updated(transport_global, stream_global); + grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); } } @@ -169,10 +168,19 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { while ( grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) { - grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops, - stream_writing->send_closed != GRPC_DONT_SEND_CLOSED, - stream_writing->id, &transport_writing->hpack_compressor, - &transport_writing->outbuf); + if (stream_writing->sopb.nops > 0 || stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { + grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops, + stream_writing->send_closed != GRPC_DONT_SEND_CLOSED, + stream_writing->id, &transport_writing->hpack_compressor, + &transport_writing->outbuf); + } + if (stream_writing->announce_window > 0) { + gpr_slice_buffer_add( + &transport_writing->outbuf, + grpc_chttp2_window_update_create( + stream_writing->id, stream_writing->announce_window)); + stream_writing->announce_window = 0; + } stream_writing->sopb.nops = 0; if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) { gpr_slice_buffer_add(&transport_writing->outbuf, @@ -197,7 +205,8 @@ void grpc_chttp2_cleanup_writing( while (grpc_chttp2_list_pop_written_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { - if (stream_global->outgoing_sopb->nops == 0) { + if (stream_global->outgoing_sopb != NULL && + stream_global->outgoing_sopb->nops == 0) { stream_global->outgoing_sopb = NULL; grpc_chttp2_schedule_closure(transport_global, stream_global->send_done_closure, 1); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index ac399e4a1d..c923d5e42f 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -358,7 +358,9 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, s->global.outgoing_window = t->global.settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - s->parsing.incoming_window = s->global.incoming_window = + s->global.max_recv_bytes = + s->parsing.incoming_window = + s->global.incoming_window = t->global.settings[GRPC_SENT_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; *t->accepting_stream = s; @@ -562,6 +564,8 @@ static void maybe_start_some_streams( stream_global->incoming_window = transport_global->settings[GRPC_SENT_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + stream_global->max_recv_bytes = + GPR_MAX(stream_global->incoming_window, stream_global->max_recv_bytes); grpc_chttp2_stream_map_add( &TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map, stream_global->id, STREAM_FROM_GLOBAL(stream_global)); @@ -570,6 +574,9 @@ static void maybe_start_some_streams( grpc_chttp2_list_add_incoming_window_updated(transport_global, stream_global); grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_list_add_writable_window_update_stream(transport_global, + stream_global); + } /* cancel out streams that will never be started */ while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID && @@ -620,12 +627,23 @@ static void perform_stream_op_locked( stream_global->publish_sopb = op->recv_ops; stream_global->publish_sopb->nops = 0; stream_global->publish_state = op->recv_state; + if (stream_global->max_recv_bytes < op->max_recv_bytes) { + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("op", transport_global, stream_global, + max_recv_bytes, op->max_recv_bytes - stream_global->max_recv_bytes); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( + "op", transport_global, stream_global, unannounced_incoming_window, + op->max_recv_bytes - stream_global->max_recv_bytes); + stream_global->unannounced_incoming_window += op->max_recv_bytes - stream_global->max_recv_bytes; + stream_global->max_recv_bytes = op->max_recv_bytes; + } grpc_chttp2_incoming_metadata_live_op_buffer_end( &stream_global->outstanding_metadata); - grpc_chttp2_list_add_read_write_state_changed(transport_global, - stream_global); - grpc_chttp2_list_add_writable_window_update_stream(transport_global, - stream_global); + if (stream_global->id != 0) { + grpc_chttp2_list_add_read_write_state_changed(transport_global, + stream_global); + grpc_chttp2_list_add_writable_window_update_stream(transport_global, + stream_global); + } } if (op->bind_pollset) { @@ -1038,7 +1056,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason, identifier = gpr_strdup(context_scope); } gpr_log(GPR_INFO, - "FLOWCTL: %s %-10s %8s %-23s %8lld %c %8lld = %8lld %-10s [%s:%d]", + "FLOWCTL: %s %-10s %8s %-27s %8lld %c %8lld = %8lld %-10s [%s:%d]", is_client ? "client" : "server", identifier, context_thread, var, current_value, delta < 0 ? '-' : '+', delta < 0 ? -delta : delta, current_value + delta, reason, file, line); diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c index fdb50c6b71..71061fe0c7 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/stream_op.c @@ -205,7 +205,7 @@ void grpc_metadata_batch_assert_ok(grpc_metadata_batch *batch) { void grpc_metadata_batch_init(grpc_metadata_batch *batch) { batch->list.head = batch->list.tail = batch->garbage.head = batch->garbage.tail = NULL; - batch->deadline = gpr_inf_future; + batch->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } void grpc_metadata_batch_destroy(grpc_metadata_batch *batch) { diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 1429737721..64503604ee 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -72,6 +72,10 @@ typedef struct grpc_transport_stream_op { grpc_stream_op_buffer *recv_ops; grpc_stream_state *recv_state; + /** The number of bytes this peer is currently prepared to receive. + These bytes will be eventually used to replenish per-stream flow control + windows. */ + gpr_uint32 max_recv_bytes; grpc_iomgr_closure *on_done_recv; grpc_pollset *bind_pollset; diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c index 0da396a320..9d127c5472 100644 --- a/src/core/transport/transport_op_string.c +++ b/src/core/transport/transport_op_string.c @@ -61,7 +61,7 @@ static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) { if (m != md.list.head) gpr_strvec_add(b, gpr_strdup(", ")); put_metadata(b, m->md); } - if (gpr_time_cmp(md.deadline, gpr_inf_future) != 0) { + if (gpr_time_cmp(md.deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) { char *tmp; gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec, md.deadline.tv_nsec); @@ -128,7 +128,8 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) { if (op->recv_ops) { if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); first = 0; - gpr_strvec_add(&b, gpr_strdup("RECV")); + gpr_asprintf(&tmp, "RECV:max_recv_bytes=%d", op->max_recv_bytes); + gpr_strvec_add(&b, tmp); } if (op->bind_pollset) { |