diff options
author | Craig Tiller <ctiller@google.com> | 2015-07-17 22:59:53 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-07-17 22:59:53 -0700 |
commit | b5980be9a08678212e5dbd6549b923f545d83539 (patch) | |
tree | e0318622c5a5576377537783608b60516b46c4cf /src/core | |
parent | a14215a67841ea7920260c655c01e4570595a3db (diff) | |
parent | f87a0984ab727e95b068237f3bb0689d9685c8ea (diff) |
Merge github.com:grpc/grpc into sometimes-its-good-just-to-check-in-with-each-other
Diffstat (limited to 'src/core')
42 files changed, 584 insertions, 327 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 835467d102..cc1302e415 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -145,7 +145,7 @@ static void handle_op_after_cancellation(grpc_call_element *elem, mdb.list.head = &calld->status; mdb.list.tail = &calld->details; mdb.garbage.head = mdb.garbage.tail = NULL; - mdb.deadline = gpr_inf_future; + mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); grpc_sopb_add_metadata(op->recv_ops, mdb); *op->recv_state = GRPC_STREAM_CLOSED; op->on_done_recv->cb(op->on_done_recv->cb_arg, 1); @@ -587,7 +587,7 @@ static void init_call_elem(grpc_call_element *elem, gpr_mu_init(&calld->mu_state); calld->elem = elem; calld->state = CALL_CREATED; - calld->deadline = gpr_inf_future; + calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } /* Destructor for call_data */ diff --git a/src/core/client_config/lb_policies/pick_first.h b/src/core/client_config/lb_policies/pick_first.h index 94c2a9f0c7..31394985e5 100644 --- a/src/core/client_config/lb_policies/pick_first.h +++ b/src/core/client_config/lb_policies/pick_first.h @@ -36,6 +36,8 @@ #include "src/core/client_config/lb_policy.h" +/** Returns a load balancing policy instance that picks up the first subchannel + * from \a subchannels to succesfully connect */ grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels, size_t num_subchannels); diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index a2f672df1c..5dd280a703 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -305,7 +305,7 @@ static void continue_connect(grpc_subchannel *c) { static void start_connect(grpc_subchannel *c) { gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); c->next_attempt = now; - c->backoff_delta = gpr_time_from_seconds(1); + c->backoff_delta = gpr_time_from_seconds(1, GPR_TIMESPAN); continue_connect(c); } diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c index 43e2ce553a..5718bbfc1c 100644 --- a/src/core/iomgr/alarm.c +++ b/src/core/iomgr/alarm.c @@ -102,7 +102,8 @@ void grpc_alarm_list_init(gpr_timespec now) { void grpc_alarm_list_shutdown(void) { int i; - while (run_some_expired_alarms(NULL, gpr_inf_future, NULL, 0)) + while (run_some_expired_alarms(NULL, gpr_inf_future(GPR_CLOCK_REALTIME), NULL, + 0)) ; for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; @@ -127,6 +128,7 @@ static gpr_timespec dbl_to_ts(double d) { gpr_timespec ts; ts.tv_sec = d; ts.tv_nsec = 1e9 * (d - ts.tv_sec); + ts.clock_type = GPR_TIMESPAN; return ts; } diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index 3d3a193d00..8741241fb8 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -157,7 +157,7 @@ void grpc_iocp_shutdown(void) { BOOL success; gpr_event_set(&g_shutdown_iocp, (void *)1); grpc_iocp_kick(); - gpr_event_wait(&g_iocp_done, gpr_inf_future); + gpr_event_wait(&g_iocp_done, gpr_inf_future(GPR_CLOCK_REALTIME)); success = CloseHandle(g_iocp); GPR_ASSERT(success); } diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index aebf89e35a..dbe464e270 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -57,9 +57,9 @@ static grpc_iomgr_object g_root_object; static void background_callback_executor(void *ignored) { gpr_mu_lock(&g_mu); while (!g_shutdown) { - gpr_timespec deadline = gpr_inf_future; - gpr_timespec short_deadline = - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100)); + gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + gpr_timespec short_deadline = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN)); if (g_cbs_head) { grpc_iomgr_closure *closure = g_cbs_head; g_cbs_head = closure->next; @@ -110,8 +110,8 @@ static size_t count_objects(void) { void grpc_iomgr_shutdown(void) { grpc_iomgr_object *obj; grpc_iomgr_closure *closure; - gpr_timespec shutdown_deadline = - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10)); + gpr_timespec shutdown_deadline = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10, GPR_TIMESPAN)); gpr_timespec last_warning_time = gpr_now(GPR_CLOCK_REALTIME); gpr_mu_lock(&g_mu); @@ -119,7 +119,7 @@ void grpc_iomgr_shutdown(void) { while (g_cbs_head != NULL || g_root_object.next != &g_root_object) { if (gpr_time_cmp( gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), last_warning_time), - gpr_time_from_seconds(1)) >= 0) { + gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) { if (g_cbs_head != NULL && g_root_object.next != &g_root_object) { gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed and executing " @@ -145,13 +145,13 @@ void grpc_iomgr_shutdown(void) { } while (g_cbs_head); continue; } - if (grpc_alarm_check(&g_mu, gpr_inf_future, NULL)) { + if (grpc_alarm_check(&g_mu, gpr_inf_future(GPR_CLOCK_REALTIME), NULL)) { continue; } if (g_root_object.next != &g_root_object) { int timeout = 0; - gpr_timespec short_deadline = - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100)); + gpr_timespec short_deadline = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN)); while (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) { if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) { timeout = 1; @@ -173,7 +173,8 @@ void grpc_iomgr_shutdown(void) { gpr_mu_unlock(&g_mu); grpc_kick_poller(); - gpr_event_wait(&g_background_callback_executor_done, gpr_inf_future); + gpr_event_wait(&g_background_callback_executor_done, + gpr_inf_future(GPR_CLOCK_REALTIME)); grpc_alarm_list_shutdown(); diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 3746c8edaf..d697b59e4c 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -50,12 +50,17 @@ typedef struct { } pollset_hdr; static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, - grpc_fd *fd) { + grpc_fd *fd, + int and_unlock_pollset) { pollset_hdr *h = pollset->data.ptr; struct epoll_event ev; int err; grpc_fd_watcher watcher; + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } + /* We pretend to be polling whilst adding an fd to keep the fd from being closed during the add. This may result in a spurious wakeup being assigned to this pollset whilst adding, but that should be benign. */ @@ -76,9 +81,15 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, } static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset, - grpc_fd *fd) { + grpc_fd *fd, + int and_unlock_pollset) { pollset_hdr *h = pollset->data.ptr; int err; + + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } + /* Note that this can race with concurrent poll, but that should be fine since * at worst it creates a spurious read event on a reused grpc_fd object. */ err = epoll_ctl(h->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL); @@ -183,7 +194,7 @@ static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, abort(); } for (i = 0; i < nfds; i++) { - multipoll_with_epoll_pollset_add_fd(pollset, fds[i]); + multipoll_with_epoll_pollset_add_fd(pollset, fds[i], 0); } grpc_wakeup_fd_create(&h->wakeup_fd); diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 7b717bd159..0084e83953 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -66,12 +66,13 @@ typedef struct { } pollset_hdr; static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset, - grpc_fd *fd) { + grpc_fd *fd, + int and_unlock_pollset) { size_t i; pollset_hdr *h = pollset->data.ptr; /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */ for (i = 0; i < h->fd_count; i++) { - if (h->fds[i] == fd) return; + if (h->fds[i] == fd) goto exit; } if (h->fd_count == h->fd_capacity) { h->fd_capacity = GPR_MAX(h->fd_capacity + 8, h->fd_count * 3 / 2); @@ -79,10 +80,15 @@ static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset, } h->fds[h->fd_count++] = fd; GRPC_FD_REF(fd, "multipoller"); +exit: + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } } static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset, - grpc_fd *fd) { + grpc_fd *fd, + int and_unlock_pollset) { /* will get removed next poll cycle */ pollset_hdr *h = pollset->data.ptr; if (h->del_count == h->del_capacity) { @@ -91,6 +97,9 @@ static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset, } h->dels[h->del_count++] = fd; GRPC_FD_REF(fd, "multipoller_del"); + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } } static void end_polling(grpc_pollset *pollset) { diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 85101764d2..efb301d81c 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -105,14 +105,28 @@ void grpc_pollset_init(grpc_pollset *pollset) { void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); - pollset->vtable->add_fd(pollset, fd); + pollset->vtable->add_fd(pollset, fd, 1); + /* the following (enabled only in debug) will reacquire and then release + our lock - meaning that if the unlocking flag passed to del_fd above is + not respected, the code will deadlock (in a way that we have a chance of + debugging) */ +#ifndef NDEBUG + gpr_mu_lock(&pollset->mu); gpr_mu_unlock(&pollset->mu); +#endif } void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); - pollset->vtable->del_fd(pollset, fd); + pollset->vtable->del_fd(pollset, fd, 1); + /* the following (enabled only in debug) will reacquire and then release + our lock - meaning that if the unlocking flag passed to del_fd above is + not respected, the code will deadlock (in a way that we have a chance of + debugging) */ +#ifndef NDEBUG + gpr_mu_lock(&pollset->mu); gpr_mu_unlock(&pollset->mu); +#endif } static void finish_shutdown(grpc_pollset *pollset) { @@ -191,17 +205,17 @@ int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now) { gpr_timespec timeout; static const int max_spin_polling_us = 10; - if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { + if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) == 0) { return -1; } - if (gpr_time_cmp( - deadline, - gpr_time_add(now, gpr_time_from_micros(max_spin_polling_us))) <= 0) { + if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( + max_spin_polling_us, + GPR_TIMESPAN))) <= 0) { return 0; } timeout = gpr_time_sub(deadline, now); - return gpr_time_to_millis( - gpr_time_add(timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1))); + return gpr_time_to_millis(gpr_time_add( + timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1, GPR_TIMESPAN))); } /* @@ -257,7 +271,7 @@ static void basic_do_promote(void *args, int success) { } else if (grpc_fd_is_orphaned(fd)) { /* Don't try to add it to anything, we'll drop our ref on it below */ } else if (pollset->vtable != original_vtable) { - pollset->vtable->add_fd(pollset, fd); + pollset->vtable->add_fd(pollset, fd, 0); } else if (fd != pollset->data.ptr) { grpc_fd *fds[2]; fds[0] = pollset->data.ptr; @@ -287,10 +301,11 @@ static void basic_do_promote(void *args, int success) { GRPC_FD_UNREF(fd, "basicpoll_add"); } -static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { +static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, + int and_unlock_pollset) { grpc_unary_promote_args *up_args; GPR_ASSERT(fd); - if (fd == pollset->data.ptr) return; + if (fd == pollset->data.ptr) goto exit; if (!pollset->counter) { /* Fast path -- no in flight cbs */ @@ -313,7 +328,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { pollset->data.ptr = fd; GRPC_FD_REF(fd, "basicpoll"); } - return; + goto exit; } /* Now we need to promote. This needs to happen when we're not polling. Since @@ -329,14 +344,24 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { grpc_iomgr_add_callback(&up_args->promotion_closure); grpc_pollset_kick(pollset); + +exit: + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } } -static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { +static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd, + int and_unlock_pollset) { GPR_ASSERT(fd); if (fd == pollset->data.ptr) { GRPC_FD_UNREF(pollset->data.ptr, "basicpoll"); pollset->data.ptr = NULL; } + + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } } static void basic_pollset_maybe_work(grpc_pollset *pollset, diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 53585a2886..37de1276d1 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -66,8 +66,10 @@ typedef struct grpc_pollset { } grpc_pollset; struct grpc_pollset_vtable { - void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd); - void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd); + void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd, + int and_unlock_pollset); + void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd, + int and_unlock_pollset); void (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now, int allow_synchronous_callback); void (*kick)(grpc_pollset *pollset); diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h index 98e3b552a7..6d73951c70 100644 --- a/src/core/iomgr/pollset_set.h +++ b/src/core/iomgr/pollset_set.h @@ -38,7 +38,7 @@ /* A grpc_pollset_set is a set of pollsets that are interested in an action. Adding a pollset to a pollset_set automatically adds any - fd's (etc) that have been registered with the set_set with that pollset. + fd's (etc) that have been registered with the set_set to that pollset. Registering fd's automatically adds them to all current pollsets. */ #ifdef GPR_POSIX_SOCKET diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index e6e1d1499e..187009b2c8 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -116,7 +116,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s, } /* This happens asynchronously. Wait while that happens. */ while (s->active_ports) { - gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future); + gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } gpr_mu_unlock(&s->mu); diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index 230f0dfb85..fb59fa4b0e 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -324,7 +324,7 @@ static void jwt_reset_cache(grpc_jwt_credentials *c) { gpr_free(c->cached.service_url); c->cached.service_url = NULL; } - c->cached.jwt_expiration = gpr_inf_past; + c->cached.jwt_expiration = gpr_inf_past(GPR_CLOCK_REALTIME); } static void jwt_destroy(grpc_credentials *creds) { @@ -347,8 +347,8 @@ static void jwt_get_request_metadata(grpc_credentials *creds, grpc_credentials_metadata_cb cb, void *user_data) { grpc_jwt_credentials *c = (grpc_jwt_credentials *)creds; - gpr_timespec refresh_threshold = {GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS, - 0}; + gpr_timespec refresh_threshold = gpr_time_from_seconds( + GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS, GPR_TIMESPAN); /* See if we can return a cached jwt. */ grpc_credentials_md_store *jwt_md = NULL; @@ -516,6 +516,7 @@ grpc_oauth2_token_fetcher_credentials_parse_server_response( access_token->value); token_lifetime->tv_sec = strtol(expires_in->value, NULL, 10); token_lifetime->tv_nsec = 0; + token_lifetime->clock_type = GPR_TIMESPAN; if (*token_md != NULL) grpc_credentials_md_store_unref(*token_md); *token_md = grpc_credentials_md_store_create(1); grpc_credentials_md_store_add_cstrings( @@ -552,7 +553,7 @@ static void on_oauth2_token_fetcher_http_response( r->cb(r->user_data, c->access_token_md->entries, c->access_token_md->num_entries, status); } else { - c->token_expiration = gpr_inf_past; + c->token_expiration = gpr_inf_past(GPR_CLOCK_REALTIME); r->cb(r->user_data, NULL, 0, status); } gpr_mu_unlock(&c->mu); @@ -564,8 +565,8 @@ static void oauth2_token_fetcher_get_request_metadata( grpc_credentials_metadata_cb cb, void *user_data) { grpc_oauth2_token_fetcher_credentials *c = (grpc_oauth2_token_fetcher_credentials *)creds; - gpr_timespec refresh_threshold = {GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS, - 0}; + gpr_timespec refresh_threshold = gpr_time_from_seconds( + GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS, GPR_TIMESPAN); grpc_credentials_md_store *cached_access_token_md = NULL; { gpr_mu_lock(&c->mu); @@ -596,7 +597,7 @@ static void init_oauth2_token_fetcher(grpc_oauth2_token_fetcher_credentials *c, c->base.type = GRPC_CREDENTIALS_TYPE_OAUTH2; gpr_ref_init(&c->base.refcount, 1); gpr_mu_init(&c->mu); - c->token_expiration = gpr_inf_past; + c->token_expiration = gpr_inf_past(GPR_CLOCK_REALTIME); c->fetch_func = fetch_func; grpc_httpcli_context_init(&c->httpcli_context); } diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index f622deff42..833484310f 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -91,7 +91,7 @@ static int is_stack_running_on_compute_engine(void) { /* The http call is local. If it takes more than one sec, it is for sure not on compute engine. */ - gpr_timespec max_detection_delay = {1, 0}; + gpr_timespec max_detection_delay = gpr_time_from_seconds(1, GPR_TIMESPAN); grpc_pollset_init(&detector.pollset); detector.is_done = 0; @@ -112,7 +112,7 @@ static int is_stack_running_on_compute_engine(void) { called once for the lifetime of the process by the default credentials. */ gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset)); while (!detector.is_done) { - grpc_pollset_work(&detector.pollset, gpr_inf_future); + grpc_pollset_work(&detector.pollset, gpr_inf_future(GPR_CLOCK_REALTIME)); } gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset)); diff --git a/src/core/security/json_token.c b/src/core/security/json_token.c index 9b1ea255ae..021912f333 100644 --- a/src/core/security/json_token.c +++ b/src/core/security/json_token.c @@ -49,7 +49,7 @@ /* --- Constants. --- */ /* 1 hour max. */ -const gpr_timespec grpc_max_auth_token_lifetime = {3600, 0}; +const gpr_timespec grpc_max_auth_token_lifetime = {3600, 0, GPR_TIMESPAN}; #define GRPC_JWT_RSA_SHA256_ALGORITHM "RS256" #define GRPC_JWT_TYPE "JWT" diff --git a/src/core/security/jwt_verifier.c b/src/core/security/jwt_verifier.c index 9140eb2ef7..1276693da7 100644 --- a/src/core/security/jwt_verifier.c +++ b/src/core/security/jwt_verifier.c @@ -109,7 +109,7 @@ static const char *validate_string_field(const grpc_json *json, static gpr_timespec validate_time_field(const grpc_json *json, const char *key) { - gpr_timespec result = gpr_time_0; + gpr_timespec result = gpr_time_0(GPR_CLOCK_REALTIME); if (json->type != GRPC_JSON_NUMBER) { gpr_log(GPR_ERROR, "Invalid %s field [%s]", key, json->value); return result; @@ -221,17 +221,17 @@ const char *grpc_jwt_claims_audience(const grpc_jwt_claims *claims) { } gpr_timespec grpc_jwt_claims_issued_at(const grpc_jwt_claims *claims) { - if (claims == NULL) return gpr_inf_past; + if (claims == NULL) return gpr_inf_past(GPR_CLOCK_REALTIME); return claims->iat; } gpr_timespec grpc_jwt_claims_expires_at(const grpc_jwt_claims *claims) { - if (claims == NULL) return gpr_inf_future; + if (claims == NULL) return gpr_inf_future(GPR_CLOCK_REALTIME); return claims->exp; } gpr_timespec grpc_jwt_claims_not_before(const grpc_jwt_claims *claims) { - if (claims == NULL) return gpr_inf_past; + if (claims == NULL) return gpr_inf_past(GPR_CLOCK_REALTIME); return claims->nbf; } @@ -242,9 +242,9 @@ grpc_jwt_claims *grpc_jwt_claims_from_json(grpc_json *json, gpr_slice buffer) { memset(claims, 0, sizeof(grpc_jwt_claims)); claims->json = json; claims->buffer = buffer; - claims->iat = gpr_inf_past; - claims->nbf = gpr_inf_past; - claims->exp = gpr_inf_future; + claims->iat = gpr_inf_past(GPR_CLOCK_REALTIME); + claims->nbf = gpr_inf_past(GPR_CLOCK_REALTIME); + claims->exp = gpr_inf_future(GPR_CLOCK_REALTIME); /* Per the spec, all fields are optional. */ for (cur = json->child; cur != NULL; cur = cur->next) { @@ -262,13 +262,16 @@ grpc_jwt_claims *grpc_jwt_claims_from_json(grpc_json *json, gpr_slice buffer) { if (claims->jti == NULL) goto error; } else if (strcmp(cur->key, "iat") == 0) { claims->iat = validate_time_field(cur, "iat"); - if (gpr_time_cmp(claims->iat, gpr_time_0) == 0) goto error; + if (gpr_time_cmp(claims->iat, gpr_time_0(GPR_CLOCK_REALTIME)) == 0) + goto error; } else if (strcmp(cur->key, "exp") == 0) { claims->exp = validate_time_field(cur, "exp"); - if (gpr_time_cmp(claims->exp, gpr_time_0) == 0) goto error; + if (gpr_time_cmp(claims->exp, gpr_time_0(GPR_CLOCK_REALTIME)) == 0) + goto error; } else if (strcmp(cur->key, "nbf") == 0) { claims->nbf = validate_time_field(cur, "nbf"); - if (gpr_time_cmp(claims->nbf, gpr_time_0) == 0) goto error; + if (gpr_time_cmp(claims->nbf, gpr_time_0(GPR_CLOCK_REALTIME)) == 0) + goto error; } } return claims; @@ -359,10 +362,10 @@ void verifier_cb_ctx_destroy(verifier_cb_ctx *ctx) { /* --- grpc_jwt_verifier object. --- */ /* Clock skew defaults to one minute. */ -gpr_timespec grpc_jwt_verifier_clock_skew = {60, 0}; +gpr_timespec grpc_jwt_verifier_clock_skew = {60, 0, GPR_TIMESPAN}; /* Max delay defaults to one minute. */ -gpr_timespec grpc_jwt_verifier_max_delay = {60, 0}; +gpr_timespec grpc_jwt_verifier_max_delay = {60, 0, GPR_TIMESPAN}; typedef struct { char *email_domain; diff --git a/src/core/support/cancellable.c b/src/core/support/cancellable.c index 3cbb318ab6..4756f1e125 100644 --- a/src/core/support/cancellable.c +++ b/src/core/support/cancellable.c @@ -121,8 +121,9 @@ void gpr_cancellable_cancel(gpr_cancellable *c) { } else { gpr_event ev; gpr_event_init(&ev); - gpr_event_wait(&ev, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(1000))); + gpr_event_wait( + &ev, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(1000, GPR_TIMESPAN))); } } } while (failures != 0); diff --git a/src/core/support/stack_lockfree.c b/src/core/support/stack_lockfree.c index 9497efbfb5..2844330379 100644 --- a/src/core/support/stack_lockfree.c +++ b/src/core/support/stack_lockfree.c @@ -65,8 +65,9 @@ typedef union lockfree_node { } lockfree_node; #define ENTRY_ALIGNMENT_BITS 3 /* make sure that entries aligned to 8-bytes */ -#define INVALID_ENTRY_INDEX ((1 << 16) - 1) /* reserve this entry as invalid \ - */ +#define INVALID_ENTRY_INDEX \ + ((1 << 16) - 1) /* reserve this entry as invalid \ + */ struct gpr_stack_lockfree { lockfree_node *entries; @@ -96,7 +97,7 @@ void gpr_stack_lockfree_destroy(gpr_stack_lockfree *stack) { gpr_free(stack); } -void gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) { +int gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) { lockfree_node head; lockfree_node newhead; @@ -112,6 +113,7 @@ void gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) { stack->entries[entry].contents.index = head.contents.index; } while (!gpr_atm_rel_cas(&(stack->head.atm), head.atm, newhead.atm)); /* Use rel_cas above to make sure that entry index is set properly */ + return head.contents.index == INVALID_ENTRY_INDEX; } int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack) { diff --git a/src/core/support/stack_lockfree.h b/src/core/support/stack_lockfree.h index 0bcf73635d..eec960fbb0 100644 --- a/src/core/support/stack_lockfree.h +++ b/src/core/support/stack_lockfree.h @@ -42,7 +42,8 @@ gpr_stack_lockfree* gpr_stack_lockfree_create(int entries); void gpr_stack_lockfree_destroy(gpr_stack_lockfree* stack); /* Pass in a valid entry number for the next stack entry */ -void gpr_stack_lockfree_push(gpr_stack_lockfree* stack, int entry); +/* Returns 1 if this is the first element on the stack, 0 otherwise */ +int gpr_stack_lockfree_push(gpr_stack_lockfree*, int entry); /* Returns -1 on empty or the actual entry number */ int gpr_stack_lockfree_pop(gpr_stack_lockfree* stack); diff --git a/src/core/support/sync_posix.c b/src/core/support/sync_posix.c index 0ccbd4923f..41af8ceb0a 100644 --- a/src/core/support/sync_posix.c +++ b/src/core/support/sync_posix.c @@ -63,7 +63,7 @@ void gpr_cv_destroy(gpr_cv *cv) { GPR_ASSERT(pthread_cond_destroy(cv) == 0); } int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline) { int err = 0; - if (gpr_time_cmp(abs_deadline, gpr_inf_future) == 0) { + if (gpr_time_cmp(abs_deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) == 0) { err = pthread_cond_wait(cv, mu); } else { struct timespec abs_deadline_ts; diff --git a/src/core/support/sync_win32.c b/src/core/support/sync_win32.c index 29b77fc4c2..63196d10d3 100644 --- a/src/core/support/sync_win32.c +++ b/src/core/support/sync_win32.c @@ -83,7 +83,7 @@ int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline) { int timeout = 0; DWORD timeout_max_ms; mu->locked = 0; - if (gpr_time_cmp(abs_deadline, gpr_inf_future) == 0) { + if (gpr_time_cmp(abs_deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) == 0) { SleepConditionVariableCS(cv, &mu->cs, INFINITE); } else { gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); diff --git a/src/core/support/time.c b/src/core/support/time.c index d47b08b266..570f195bd1 100644 --- a/src/core/support/time.c +++ b/src/core/support/time.c @@ -41,6 +41,7 @@ int gpr_time_cmp(gpr_timespec a, gpr_timespec b) { int cmp = (a.tv_sec > b.tv_sec) - (a.tv_sec < b.tv_sec); + GPR_ASSERT(a.clock_type == b.clock_type); if (cmp == 0) { cmp = (a.tv_nsec > b.tv_nsec) - (a.tv_nsec < b.tv_nsec); } @@ -71,19 +72,40 @@ gpr_timespec gpr_time_max(gpr_timespec a, gpr_timespec b) { ((t)(TYPE_IS_SIGNED(t) ? (TOP_BIT_OF_TYPE(t) - 1) \ : ((TOP_BIT_OF_TYPE(t) - 1) << 1) + 1)) -const gpr_timespec gpr_time_0 = {0, 0}; -const gpr_timespec gpr_inf_future = {TYPE_MAX(time_t), 0}; -const gpr_timespec gpr_inf_past = {TYPE_MIN(time_t), 0}; +gpr_timespec gpr_time_0(gpr_clock_type type) { + gpr_timespec out; + out.tv_sec = 0; + out.tv_nsec = 0; + out.clock_type = type; + return out; +} + +gpr_timespec gpr_inf_future(gpr_clock_type type) { + gpr_timespec out; + out.tv_sec = TYPE_MAX(time_t); + out.tv_nsec = 0; + out.clock_type = type; + return out; +} + +gpr_timespec gpr_inf_past(gpr_clock_type type) { + gpr_timespec out; + out.tv_sec = TYPE_MIN(time_t); + out.tv_nsec = 0; + out.clock_type = type; + return out; +} /* TODO(ctiller): consider merging _nanos, _micros, _millis into a single function for maintainability. Similarly for _seconds, _minutes, and _hours */ -gpr_timespec gpr_time_from_nanos(long ns) { +gpr_timespec gpr_time_from_nanos(long ns, gpr_clock_type type) { gpr_timespec result; + result.clock_type = type; if (ns == LONG_MAX) { - result = gpr_inf_future; + result = gpr_inf_future(type); } else if (ns == LONG_MIN) { - result = gpr_inf_past; + result = gpr_inf_past(type); } else if (ns >= 0) { result.tv_sec = ns / GPR_NS_PER_SEC; result.tv_nsec = (int)(ns - result.tv_sec * GPR_NS_PER_SEC); @@ -95,12 +117,13 @@ gpr_timespec gpr_time_from_nanos(long ns) { return result; } -gpr_timespec gpr_time_from_micros(long us) { +gpr_timespec gpr_time_from_micros(long us, gpr_clock_type type) { gpr_timespec result; + result.clock_type = type; if (us == LONG_MAX) { - result = gpr_inf_future; + result = gpr_inf_future(type); } else if (us == LONG_MIN) { - result = gpr_inf_past; + result = gpr_inf_past(type); } else if (us >= 0) { result.tv_sec = us / 1000000; result.tv_nsec = (int)((us - result.tv_sec * 1000000) * 1000); @@ -112,12 +135,13 @@ gpr_timespec gpr_time_from_micros(long us) { return result; } -gpr_timespec gpr_time_from_millis(long ms) { +gpr_timespec gpr_time_from_millis(long ms, gpr_clock_type type) { gpr_timespec result; + result.clock_type = type; if (ms == LONG_MAX) { - result = gpr_inf_future; + result = gpr_inf_future(type); } else if (ms == LONG_MIN) { - result = gpr_inf_past; + result = gpr_inf_past(type); } else if (ms >= 0) { result.tv_sec = ms / 1000; result.tv_nsec = (int)((ms - result.tv_sec * 1000) * 1000000); @@ -129,12 +153,13 @@ gpr_timespec gpr_time_from_millis(long ms) { return result; } -gpr_timespec gpr_time_from_seconds(long s) { +gpr_timespec gpr_time_from_seconds(long s, gpr_clock_type type) { gpr_timespec result; + result.clock_type = type; if (s == LONG_MAX) { - result = gpr_inf_future; + result = gpr_inf_future(type); } else if (s == LONG_MIN) { - result = gpr_inf_past; + result = gpr_inf_past(type); } else { result.tv_sec = s; result.tv_nsec = 0; @@ -142,12 +167,13 @@ gpr_timespec gpr_time_from_seconds(long s) { return result; } -gpr_timespec gpr_time_from_minutes(long m) { +gpr_timespec gpr_time_from_minutes(long m, gpr_clock_type type) { gpr_timespec result; + result.clock_type = type; if (m >= LONG_MAX / 60) { - result = gpr_inf_future; + result = gpr_inf_future(type); } else if (m <= LONG_MIN / 60) { - result = gpr_inf_past; + result = gpr_inf_past(type); } else { result.tv_sec = m * 60; result.tv_nsec = 0; @@ -155,12 +181,13 @@ gpr_timespec gpr_time_from_minutes(long m) { return result; } -gpr_timespec gpr_time_from_hours(long h) { +gpr_timespec gpr_time_from_hours(long h, gpr_clock_type type) { gpr_timespec result; + result.clock_type = type; if (h >= LONG_MAX / 3600) { - result = gpr_inf_future; + result = gpr_inf_future(type); } else if (h <= LONG_MIN / 3600) { - result = gpr_inf_past; + result = gpr_inf_past(type); } else { result.tv_sec = h * 3600; result.tv_nsec = 0; @@ -171,6 +198,8 @@ gpr_timespec gpr_time_from_hours(long h) { gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b) { gpr_timespec sum; int inc = 0; + GPR_ASSERT(b.clock_type == GPR_TIMESPAN); + sum.clock_type = a.clock_type; sum.tv_nsec = a.tv_nsec + b.tv_nsec; if (sum.tv_nsec >= GPR_NS_PER_SEC) { sum.tv_nsec -= GPR_NS_PER_SEC; @@ -180,14 +209,14 @@ gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b) { sum = a; } else if (b.tv_sec == TYPE_MAX(time_t) || (b.tv_sec >= 0 && a.tv_sec >= TYPE_MAX(time_t) - b.tv_sec)) { - sum = gpr_inf_future; + sum = gpr_inf_future(sum.clock_type); } else if (b.tv_sec == TYPE_MIN(time_t) || (b.tv_sec <= 0 && a.tv_sec <= TYPE_MIN(time_t) - b.tv_sec)) { - sum = gpr_inf_past; + sum = gpr_inf_past(sum.clock_type); } else { sum.tv_sec = a.tv_sec + b.tv_sec; if (inc != 0 && sum.tv_sec == TYPE_MAX(time_t) - 1) { - sum = gpr_inf_future; + sum = gpr_inf_future(sum.clock_type); } else { sum.tv_sec += inc; } @@ -198,6 +227,12 @@ gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b) { gpr_timespec gpr_time_sub(gpr_timespec a, gpr_timespec b) { gpr_timespec diff; int dec = 0; + if (b.clock_type == GPR_TIMESPAN) { + diff.clock_type = a.clock_type; + } else { + GPR_ASSERT(a.clock_type == b.clock_type); + diff.clock_type = GPR_TIMESPAN; + } diff.tv_nsec = a.tv_nsec - b.tv_nsec; if (diff.tv_nsec < 0) { diff.tv_nsec += GPR_NS_PER_SEC; @@ -207,14 +242,14 @@ gpr_timespec gpr_time_sub(gpr_timespec a, gpr_timespec b) { diff = a; } else if (b.tv_sec == TYPE_MIN(time_t) || (b.tv_sec <= 0 && a.tv_sec >= TYPE_MAX(time_t) + b.tv_sec)) { - diff = gpr_inf_future; + diff = gpr_inf_future(GPR_CLOCK_REALTIME); } else if (b.tv_sec == TYPE_MAX(time_t) || (b.tv_sec >= 0 && a.tv_sec <= TYPE_MIN(time_t) + b.tv_sec)) { - diff = gpr_inf_past; + diff = gpr_inf_past(GPR_CLOCK_REALTIME); } else { diff.tv_sec = a.tv_sec - b.tv_sec; if (dec != 0 && diff.tv_sec == TYPE_MIN(time_t) + 1) { - diff = gpr_inf_past; + diff = gpr_inf_past(GPR_CLOCK_REALTIME); } else { diff.tv_sec -= dec; } @@ -225,6 +260,9 @@ gpr_timespec gpr_time_sub(gpr_timespec a, gpr_timespec b) { int gpr_time_similar(gpr_timespec a, gpr_timespec b, gpr_timespec threshold) { int cmp_ab; + GPR_ASSERT(a.clock_type == b.clock_type); + GPR_ASSERT(threshold.clock_type == GPR_TIMESPAN); + cmp_ab = gpr_time_cmp(a, b); if (cmp_ab == 0) return 1; if (cmp_ab < 0) { diff --git a/src/core/support/time_posix.c b/src/core/support/time_posix.c index f9b7958783..258b2e640e 100644 --- a/src/core/support/time_posix.c +++ b/src/core/support/time_posix.c @@ -38,6 +38,7 @@ #include <stdlib.h> #include <time.h> #include <unistd.h> +#include <grpc/support/log.h> #include <grpc/support/time.h> static struct timespec timespec_from_gpr(gpr_timespec gts) { @@ -48,10 +49,12 @@ static struct timespec timespec_from_gpr(gpr_timespec gts) { } #if _POSIX_TIMERS > 0 -static gpr_timespec gpr_from_timespec(struct timespec ts) { +static gpr_timespec gpr_from_timespec(struct timespec ts, + gpr_clock_type clock) { gpr_timespec rv; rv.tv_sec = ts.tv_sec; rv.tv_nsec = (int)ts.tv_nsec; + rv.clock_type = clock; return rv; } @@ -62,8 +65,9 @@ void gpr_time_init(void) {} gpr_timespec gpr_now(gpr_clock_type clock) { struct timespec now; + GPR_ASSERT(clock != GPR_TIMESPAN); clock_gettime(clockid_for_gpr_clock[clock], &now); - return gpr_from_timespec(now); + return gpr_from_timespec(now, clock); } #else /* For some reason Apple's OSes haven't implemented clock_gettime. */ @@ -88,6 +92,7 @@ gpr_timespec gpr_now(gpr_clock_type clock) { struct timeval now_tv; double now_dbl; + now.clock_type = clock; switch (clock) { case GPR_CLOCK_REALTIME: gettimeofday(&now_tv, NULL); @@ -99,6 +104,8 @@ gpr_timespec gpr_now(gpr_clock_type clock) { now.tv_sec = now_dbl * 1e-9; now.tv_nsec = now_dbl - now.tv_sec * 1e9; break; + case GPR_TIMESPAN: + abort(); } return now; diff --git a/src/core/support/time_win32.c b/src/core/support/time_win32.c index fa77c74eeb..238cd07ebc 100644 --- a/src/core/support/time_win32.c +++ b/src/core/support/time_win32.c @@ -55,6 +55,7 @@ gpr_timespec gpr_now(gpr_clock_type clock) { struct _timeb now_tb; LARGE_INTEGER timestamp; double now_dbl; + now_tv.clock_type = clock; switch (clock) { case GPR_CLOCK_REALTIME: _ftime_s(&now_tb); diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c index 7c31bfe5da..e47dc4f4ce 100644 --- a/src/core/surface/byte_buffer_queue.c +++ b/src/core/surface/byte_buffer_queue.c @@ -62,6 +62,7 @@ int grpc_bbq_empty(grpc_byte_buffer_queue *q) { } void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) { + q->bytes += grpc_byte_buffer_length(buffer); bba_push(&q->filling, buffer); } @@ -72,8 +73,11 @@ void grpc_bbq_flush(grpc_byte_buffer_queue *q) { } } +size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q) { return q->bytes; } + grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) { grpc_bbq_array temp_array; + grpc_byte_buffer *out; if (q->drain_pos == q->draining.count) { if (q->filling.count == 0) { @@ -87,5 +91,7 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) { q->draining = temp_array; } - return q->draining.data[q->drain_pos++]; + out = q->draining.data[q->drain_pos++]; + q->bytes -= grpc_byte_buffer_length(out); + return out; } diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h index 32c57f8756..f01958984f 100644 --- a/src/core/surface/byte_buffer_queue.h +++ b/src/core/surface/byte_buffer_queue.h @@ -49,6 +49,7 @@ typedef struct { size_t drain_pos; grpc_bbq_array filling; grpc_bbq_array draining; + size_t bytes; } grpc_byte_buffer_queue; void grpc_bbq_destroy(grpc_byte_buffer_queue *q); @@ -56,5 +57,6 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q); void grpc_bbq_flush(grpc_byte_buffer_queue *q); int grpc_bbq_empty(grpc_byte_buffer_queue *q); void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb); +size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q); #endif /* GRPC_INTERNAL_CORE_SURFACE_BYTE_BUFFER_QUEUE_H */ diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 0a551ac47f..1146d83982 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -347,7 +347,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, } grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr, CALL_STACK_FROM_CALL(call)); - if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) { + if (gpr_time_cmp(send_deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) { set_deadline_alarm(call, send_deadline); } return call; @@ -513,6 +513,8 @@ static void unlock(grpc_call *call) { int completing_requests = 0; int start_op = 0; int i; + const gpr_uint32 MAX_RECV_PEEK_AHEAD = 65536; + size_t buffered_bytes; int cancel_alarm = 0; memset(&op, 0, sizeof(op)); @@ -528,6 +530,17 @@ static void unlock(grpc_call *call) { op.recv_ops = &call->recv_ops; op.recv_state = &call->recv_state; op.on_done_recv = &call->on_done_recv; + if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) { + op.max_recv_bytes = call->incoming_message_length - + call->incoming_message.length + MAX_RECV_PEEK_AHEAD; + } else { + buffered_bytes = grpc_bbq_bytes(&call->incoming_queue); + if (buffered_bytes > MAX_RECV_PEEK_AHEAD) { + op.max_recv_bytes = 0; + } else { + op.max_recv_bytes = MAX_RECV_PEEK_AHEAD - buffered_bytes; + } + } call->receiving = 1; GRPC_CALL_INTERNAL_REF(call, "receiving"); start_op = 1; @@ -972,7 +985,7 @@ static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op) { mdb.list = chain_metadata_from_app(call, data.send_metadata.count, data.send_metadata.metadata); mdb.garbage.head = mdb.garbage.tail = NULL; - mdb.deadline = gpr_inf_future; + mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); /* send status */ /* TODO(ctiller): cache common status values */ data = call->request_data[GRPC_IOREQ_SEND_STATUS]; @@ -1325,7 +1338,7 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { l->md = 0; } } - if (gpr_time_cmp(md->deadline, gpr_inf_future) != 0) { + if (gpr_time_cmp(md->deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) { set_deadline_alarm(call, md->deadline); } if (!is_trailing) { diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index c67f75fc5c..8484418247 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -116,7 +116,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc) { void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, void (*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { - int shutdown = gpr_unref(&cc->pending_events); + int shutdown; storage->tag = tag; storage->done = done; @@ -124,15 +124,15 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, storage->next = ((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0)); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + shutdown = gpr_unref(&cc->pending_events); if (!shutdown) { - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); cc->completed_tail->next = ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); cc->completed_tail = storage; grpc_pollset_kick(&cc->pollset); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } else { - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); cc->completed_tail->next = ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); cc->completed_tail = storage; @@ -260,8 +260,9 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) { gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); grpc_pollset_kick(&cc->pollset); - grpc_pollset_work(&cc->pollset, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(100))); + grpc_pollset_work(&cc->pollset, + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(100, GPR_TIMESPAN))); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 3dd56fe5a9..3f2bb5c8a9 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -72,7 +72,7 @@ static void lame_start_transport_stream_op(grpc_call_element *elem, mdb.list.head = &calld->status; mdb.list.tail = &calld->details; mdb.garbage.head = mdb.garbage.tail = NULL; - mdb.deadline = gpr_inf_future; + mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); grpc_sopb_add_metadata(op->recv_ops, mdb); *op->recv_state = GRPC_STREAM_CLOSED; op->on_done_recv->cb(op->on_done_recv->cb_arg, 1); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 4dc51bf031..fa120088e1 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -36,22 +36,22 @@ #include <stdlib.h> #include <string.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/useful.h> + #include "src/core/channel/census_filter.h" #include "src/core/channel/channel_args.h" #include "src/core/channel/connected_channel.h" #include "src/core/iomgr/iomgr.h" +#include "src/core/support/stack_lockfree.h" #include "src/core/support/string.h" #include "src/core/surface/call.h" #include "src/core/surface/channel.h" #include "src/core/surface/completion_queue.h" #include "src/core/surface/init.h" #include "src/core/transport/metadata.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <grpc/support/useful.h> - -typedef enum { PENDING_START, CALL_LIST_COUNT } call_list; typedef struct listener { void *arg; @@ -74,8 +74,8 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; typedef struct requested_call { requested_call_type type; - struct requested_call *next; void *tag; + grpc_server *server; grpc_completion_queue *cq_bound_to_call; grpc_completion_queue *cq_for_notification; grpc_call **call; @@ -94,14 +94,6 @@ typedef struct requested_call { } data; } requested_call; -struct registered_method { - char *method; - char *host; - call_data *pending; - requested_call *requests; - registered_method *next; -}; - typedef struct channel_registered_method { registered_method *server_registered_method; grpc_mdstr *method; @@ -130,44 +122,6 @@ typedef struct shutdown_tag { grpc_cq_completion completion; } shutdown_tag; -struct grpc_server { - size_t channel_filter_count; - const grpc_channel_filter **channel_filters; - grpc_channel_args *channel_args; - - grpc_completion_queue **cqs; - grpc_pollset **pollsets; - size_t cq_count; - - /* The two following mutexes control access to server-state - mu_global controls access to non-call-related state (e.g., channel state) - mu_call controls access to call-related state (e.g., the call lists) - - If they are ever required to be nested, you must lock mu_global - before mu_call. This is currently used in shutdown processing - (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */ - gpr_mu mu_global; /* mutex for server and channel state */ - gpr_mu mu_call; /* mutex for call-specific state */ - - registered_method *registered_methods; - requested_call *requests; - - gpr_uint8 shutdown; - gpr_uint8 shutdown_published; - size_t num_shutdown_tags; - shutdown_tag *shutdown_tags; - - call_data *lists[CALL_LIST_COUNT]; - channel_data root_channel_data; - - listener *listeners; - int listeners_destroyed; - gpr_refcount internal_refcount; - - /** when did we print the last shutdown progress message */ - gpr_timespec last_shutdown_message_time; -}; - typedef enum { /* waiting for metadata */ NOT_STARTED, @@ -179,6 +133,8 @@ typedef enum { ZOMBIED } call_state; +typedef struct request_matcher request_matcher; + struct call_data { grpc_call *call; @@ -201,8 +157,20 @@ struct call_data { grpc_iomgr_closure server_on_recv; grpc_iomgr_closure kill_zombie_closure; - call_data **root[CALL_LIST_COUNT]; - call_link links[CALL_LIST_COUNT]; + call_data *pending_next; +}; + +struct request_matcher { + call_data *pending_head; + call_data *pending_tail; + gpr_stack_lockfree *requests; +}; + +struct registered_method { + char *method; + char *host; + request_matcher request_matcher; + registered_method *next; }; typedef struct { @@ -210,6 +178,48 @@ typedef struct { size_t num_channels; } channel_broadcaster; +struct grpc_server { + size_t channel_filter_count; + const grpc_channel_filter **channel_filters; + grpc_channel_args *channel_args; + + grpc_completion_queue **cqs; + grpc_pollset **pollsets; + size_t cq_count; + + /* The two following mutexes control access to server-state + mu_global controls access to non-call-related state (e.g., channel state) + mu_call controls access to call-related state (e.g., the call lists) + + If they are ever required to be nested, you must lock mu_global + before mu_call. This is currently used in shutdown processing + (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */ + gpr_mu mu_global; /* mutex for server and channel state */ + gpr_mu mu_call; /* mutex for call-specific state */ + + registered_method *registered_methods; + request_matcher unregistered_request_matcher; + /** free list of available requested_calls indices */ + gpr_stack_lockfree *request_freelist; + /** requested call backing data */ + requested_call *requested_calls; + int max_requested_calls; + + gpr_atm shutdown_flag; + gpr_uint8 shutdown_published; + size_t num_shutdown_tags; + shutdown_tag *shutdown_tags; + + channel_data root_channel_data; + + listener *listeners; + int listeners_destroyed; + gpr_refcount internal_refcount; + + /** when did we print the last shutdown progress message */ + gpr_timespec last_shutdown_message_time; +}; + #define SERVER_FROM_CALL_ELEM(elem) \ (((channel_data *)(elem)->channel_data)->server) @@ -220,7 +230,9 @@ static void fail_call(grpc_server *server, requested_call *rc); hold mu_call */ static void maybe_finish_shutdown(grpc_server *server); -/* channel broadcaster */ +/* + * channel broadcaster + */ /* assumes server locked */ static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) { @@ -281,55 +293,44 @@ static void channel_broadcaster_shutdown(channel_broadcaster *cb, gpr_free(cb->channels); } -/* call list */ +/* + * request_matcher + */ -static int call_list_join(call_data **root, call_data *call, call_list list) { - GPR_ASSERT(!call->root[list]); - call->root[list] = root; - if (!*root) { - *root = call; - call->links[list].next = call->links[list].prev = call; - } else { - call->links[list].next = *root; - call->links[list].prev = (*root)->links[list].prev; - call->links[list].next->links[list].prev = - call->links[list].prev->links[list].next = call; - } - return 1; +static void request_matcher_init(request_matcher *request_matcher, + int entries) { + memset(request_matcher, 0, sizeof(*request_matcher)); + request_matcher->requests = gpr_stack_lockfree_create(entries); } -static call_data *call_list_remove_head(call_data **root, call_list list) { - call_data *out = *root; - if (out) { - out->root[list] = NULL; - if (out->links[list].next == out) { - *root = NULL; - } else { - *root = out->links[list].next; - out->links[list].next->links[list].prev = out->links[list].prev; - out->links[list].prev->links[list].next = out->links[list].next; - } - } - return out; +static void request_matcher_destroy(request_matcher *request_matcher) { + GPR_ASSERT(gpr_stack_lockfree_pop(request_matcher->requests) == -1); + gpr_stack_lockfree_destroy(request_matcher->requests); } -static int call_list_remove(call_data *call, call_list list) { - call_data **root = call->root[list]; - if (root == NULL) return 0; - call->root[list] = NULL; - if (*root == call) { - *root = call->links[list].next; - if (*root == call) { - *root = NULL; - return 1; - } +static void kill_zombie(void *elem, int success) { + grpc_call_destroy(grpc_call_from_top_element(elem)); +} + +static void request_matcher_zombify_all_pending_calls( + request_matcher *request_matcher) { + while (request_matcher->pending_head) { + call_data *calld = request_matcher->pending_head; + request_matcher->pending_head = calld->pending_next; + gpr_mu_lock(&calld->mu_state); + calld->state = ZOMBIED; + gpr_mu_unlock(&calld->mu_state); + grpc_iomgr_closure_init( + &calld->kill_zombie_closure, kill_zombie, + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); + grpc_iomgr_add_callback(&calld->kill_zombie_closure); } - GPR_ASSERT(*root != call); - call->links[list].next->links[list].prev = call->links[list].prev; - call->links[list].prev->links[list].next = call->links[list].next; - return 1; } +/* + * server proper + */ + static void server_ref(grpc_server *server) { gpr_ref(&server->internal_refcount); } @@ -343,6 +344,7 @@ static void server_delete(grpc_server *server) { gpr_free(server->channel_filters); while ((rm = server->registered_methods) != NULL) { server->registered_methods = rm->next; + request_matcher_destroy(&rm->request_matcher); gpr_free(rm->method); gpr_free(rm->host); gpr_free(rm); @@ -350,9 +352,12 @@ static void server_delete(grpc_server *server) { for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server"); } + request_matcher_destroy(&server->unregistered_request_matcher); + gpr_stack_lockfree_destroy(server->request_freelist); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); + gpr_free(server->requested_calls); gpr_free(server); } @@ -391,25 +396,29 @@ static void destroy_channel(channel_data *chand) { } static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, - call_data **pending_root, - requested_call **requests) { - requested_call *rc; + request_matcher *request_matcher) { call_data *calld = elem->call_data; - gpr_mu_lock(&server->mu_call); - rc = *requests; - if (rc == NULL) { + int request_id; + + request_id = gpr_stack_lockfree_pop(request_matcher->requests); + if (request_id == -1) { + gpr_mu_lock(&server->mu_call); gpr_mu_lock(&calld->mu_state); calld->state = PENDING; gpr_mu_unlock(&calld->mu_state); - call_list_join(pending_root, calld, PENDING_START); + if (request_matcher->pending_head == NULL) { + request_matcher->pending_tail = request_matcher->pending_head = calld; + } else { + request_matcher->pending_tail->pending_next = calld; + request_matcher->pending_tail = calld; + } + calld->pending_next = NULL; gpr_mu_unlock(&server->mu_call); } else { - *requests = rc->next; gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - gpr_mu_unlock(&server->mu_call); - begin_call(server, calld, rc); + begin_call(server, calld, &server->requested_calls[request_id]); } } @@ -431,8 +440,8 @@ static void start_new_rpc(grpc_call_element *elem) { if (!rm) break; if (rm->host != calld->host) continue; if (rm->method != calld->path) continue; - finish_start_new_rpc(server, elem, &rm->server_registered_method->pending, - &rm->server_registered_method->requests); + finish_start_new_rpc(server, elem, + &rm->server_registered_method->request_matcher); return; } /* check for a wildcard method definition (no host set) */ @@ -443,17 +452,12 @@ static void start_new_rpc(grpc_call_element *elem) { if (!rm) break; if (rm->host != NULL) continue; if (rm->method != calld->path) continue; - finish_start_new_rpc(server, elem, &rm->server_registered_method->pending, - &rm->server_registered_method->requests); + finish_start_new_rpc(server, elem, + &rm->server_registered_method->request_matcher); return; } } - finish_start_new_rpc(server, elem, &server->lists[PENDING_START], - &server->requests); -} - -static void kill_zombie(void *elem, int success) { - grpc_call_destroy(grpc_call_from_top_element(elem)); + finish_start_new_rpc(server, elem, &server->unregistered_request_matcher); } static int num_listeners(grpc_server *server) { @@ -481,15 +485,15 @@ static int num_channels(grpc_server *server) { static void maybe_finish_shutdown(grpc_server *server) { size_t i; - if (!server->shutdown || server->shutdown_published) { + if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) { return; } if (server->root_channel_data.next != &server->root_channel_data || server->listeners_destroyed < num_listeners(server)) { - if (gpr_time_cmp( - gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), server->last_shutdown_message_time), - gpr_time_from_seconds(1)) >= 0) { + if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), + server->last_shutdown_message_time), + gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) { server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME); gpr_log(GPR_DEBUG, "Waiting for %d channels and %d/%d listeners to be destroyed" @@ -526,7 +530,6 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { static void server_on_recv(void *ptr, int success) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; if (success && !calld->got_initial_metadata) { size_t i; @@ -536,7 +539,8 @@ static void server_on_recv(void *ptr, int success) { grpc_stream_op *op = &ops[i]; if (op->type != GRPC_OP_METADATA) continue; grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem); - if (0 != gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future)) { + if (0 != gpr_time_cmp(op->data.metadata.deadline, + gpr_inf_future(GPR_CLOCK_REALTIME))) { calld->deadline = op->data.metadata.deadline; } calld->got_initial_metadata = 1; @@ -571,11 +575,8 @@ static void server_on_recv(void *ptr, int success) { } else if (calld->state == PENDING) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); - gpr_mu_lock(&chand->server->mu_call); - call_list_remove(calld, PENDING_START); - gpr_mu_unlock(&chand->server->mu_call); - grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_iomgr_add_callback(&calld->kill_zombie_closure); + /* zombied call will be destroyed when it's removed from the pending + queue... later */ } else { gpr_mu_unlock(&calld->mu_state); } @@ -610,7 +611,7 @@ static void accept_stream(void *cd, grpc_transport *transport, channel_data *chand = cd; /* create a call */ grpc_call_create(chand->channel, NULL, transport_server_data, NULL, 0, - gpr_inf_future); + gpr_inf_future(GPR_CLOCK_REALTIME)); } static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) { @@ -638,7 +639,7 @@ static void init_call_elem(grpc_call_element *elem, call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; memset(calld, 0, sizeof(call_data)); - calld->deadline = gpr_inf_future; + calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); calld->call = grpc_call_from_top_element(elem); gpr_mu_init(&calld->mu_state); @@ -653,11 +654,7 @@ static void destroy_call_elem(grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - if (calld->state == PENDING) { - gpr_mu_lock(&chand->server->mu_call); - call_list_remove(elem->call_data, PENDING_START); - gpr_mu_unlock(&chand->server->mu_call); - } + GPR_ASSERT(calld->state != PENDING); if (calld->host) { GRPC_MDSTR_UNREF(calld->host); @@ -764,6 +761,18 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, server->root_channel_data.next = server->root_channel_data.prev = &server->root_channel_data; + /* TODO(ctiller): expose a channel_arg for this */ + server->max_requested_calls = 32768; + server->request_freelist = + gpr_stack_lockfree_create(server->max_requested_calls); + for (i = 0; i < (size_t)server->max_requested_calls; i++) { + gpr_stack_lockfree_push(server->request_freelist, i); + } + request_matcher_init(&server->unregistered_request_matcher, + server->max_requested_calls); + server->requested_calls = gpr_malloc(server->max_requested_calls * + sizeof(*server->requested_calls)); + /* Server filter stack is: server_surface_filter - for making surface API calls @@ -811,6 +820,7 @@ void *grpc_server_register_method(grpc_server *server, const char *method, } m = gpr_malloc(sizeof(registered_method)); memset(m, 0, sizeof(*m)); + request_matcher_init(&m->request_matcher, server->max_requested_calls); m->method = gpr_strdup(method); m->host = gpr_strdup(host); m->next = server->registered_methods; @@ -926,13 +936,49 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, grpc_transport_perform_op(transport, &op); } +typedef struct { + requested_call **requests; + size_t count; + size_t capacity; +} request_killer; + +static void request_killer_init(request_killer *rk) { + memset(rk, 0, sizeof(*rk)); +} + +static void request_killer_add(request_killer *rk, requested_call *rc) { + if (rk->capacity == rk->count) { + rk->capacity = GPR_MAX(8, rk->capacity * 2); + rk->requests = + gpr_realloc(rk->requests, rk->capacity * sizeof(*rk->requests)); + } + rk->requests[rk->count++] = rc; +} + +static void request_killer_add_request_matcher(request_killer *rk, + grpc_server *server, + request_matcher *rm) { + int request_id; + while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) { + request_killer_add(rk, &server->requested_calls[request_id]); + } +} + +static void request_killer_run(request_killer *rk, grpc_server *server) { + size_t i; + for (i = 0; i < rk->count; i++) { + fail_call(server, rk->requests[i]); + } + gpr_free(rk->requests); +} + void grpc_server_shutdown_and_notify(grpc_server *server, grpc_completion_queue *cq, void *tag) { listener *l; - requested_call *requests = NULL; registered_method *rm; shutdown_tag *sdt; channel_broadcaster broadcaster; + request_killer reqkill; /* lock, and gather up some stuff to do */ gpr_mu_lock(&server->mu_global); @@ -943,7 +989,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, sdt = &server->shutdown_tags[server->num_shutdown_tags++]; sdt->tag = tag; sdt->cq = cq; - if (server->shutdown) { + if (gpr_atm_acq_load(&server->shutdown_flag)) { gpr_mu_unlock(&server->mu_global); return; } @@ -951,31 +997,26 @@ void grpc_server_shutdown_and_notify(grpc_server *server, server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME); channel_broadcaster_init(server, &broadcaster); + request_killer_init(&reqkill); /* collect all unregistered then registered calls */ gpr_mu_lock(&server->mu_call); - requests = server->requests; - server->requests = NULL; + request_killer_add_request_matcher(&reqkill, server, + &server->unregistered_request_matcher); + request_matcher_zombify_all_pending_calls( + &server->unregistered_request_matcher); for (rm = server->registered_methods; rm; rm = rm->next) { - while (rm->requests != NULL) { - requested_call *c = rm->requests; - rm->requests = c->next; - c->next = requests; - requests = c; - } + request_killer_add_request_matcher(&reqkill, server, &rm->request_matcher); + request_matcher_zombify_all_pending_calls(&rm->request_matcher); } gpr_mu_unlock(&server->mu_call); - server->shutdown = 1; + gpr_atm_rel_store(&server->shutdown_flag, 1); maybe_finish_shutdown(server); gpr_mu_unlock(&server->mu_global); /* terminate all the requested calls */ - while (requests != NULL) { - requested_call *next = requests->next; - fail_call(server, requests); - requests = next; - } + request_killer_run(&reqkill, server); /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { @@ -1007,7 +1048,7 @@ void grpc_server_destroy(grpc_server *server) { listener *l; gpr_mu_lock(&server->mu_global); - GPR_ASSERT(server->shutdown || !server->listeners); + GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners); GPR_ASSERT(server->listeners_destroyed == num_listeners(server)); while (server->listeners) { @@ -1037,39 +1078,55 @@ void grpc_server_add_listener(grpc_server *server, void *arg, static grpc_call_error queue_call_request(grpc_server *server, requested_call *rc) { call_data *calld = NULL; - requested_call **requests = NULL; - gpr_mu_lock(&server->mu_call); - if (server->shutdown) { - gpr_mu_unlock(&server->mu_call); + request_matcher *request_matcher = NULL; + int request_id; + if (gpr_atm_acq_load(&server->shutdown_flag)) { + fail_call(server, rc); + return GRPC_CALL_OK; + } + request_id = gpr_stack_lockfree_pop(server->request_freelist); + if (request_id == -1) { + /* out of request ids: just fail this one */ fail_call(server, rc); return GRPC_CALL_OK; } switch (rc->type) { case BATCH_CALL: - calld = - call_list_remove_head(&server->lists[PENDING_START], PENDING_START); - requests = &server->requests; + request_matcher = &server->unregistered_request_matcher; break; case REGISTERED_CALL: - calld = call_list_remove_head( - &rc->data.registered.registered_method->pending, PENDING_START); - requests = &rc->data.registered.registered_method->requests; + request_matcher = &rc->data.registered.registered_method->request_matcher; break; } - if (calld != NULL) { - gpr_mu_unlock(&server->mu_call); - gpr_mu_lock(&calld->mu_state); - GPR_ASSERT(calld->state == PENDING); - calld->state = ACTIVATED; - gpr_mu_unlock(&calld->mu_state); - begin_call(server, calld, rc); - return GRPC_CALL_OK; - } else { - rc->next = *requests; - *requests = rc; + server->requested_calls[request_id] = *rc; + gpr_free(rc); + if (gpr_stack_lockfree_push(request_matcher->requests, request_id)) { + /* this was the first queued request: we need to lock and start + matching calls */ + gpr_mu_lock(&server->mu_call); + while ((calld = request_matcher->pending_head) != NULL) { + request_id = gpr_stack_lockfree_pop(request_matcher->requests); + if (request_id == -1) break; + request_matcher->pending_head = calld->pending_next; + gpr_mu_unlock(&server->mu_call); + gpr_mu_lock(&calld->mu_state); + if (calld->state == ZOMBIED) { + gpr_mu_unlock(&calld->mu_state); + grpc_iomgr_closure_init( + &calld->kill_zombie_closure, kill_zombie, + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); + grpc_iomgr_add_callback(&calld->kill_zombie_closure); + } else { + GPR_ASSERT(calld->state == PENDING); + calld->state = ACTIVATED; + gpr_mu_unlock(&calld->mu_state); + begin_call(server, calld, &server->requested_calls[request_id]); + } + gpr_mu_lock(&server->mu_call); + } gpr_mu_unlock(&server->mu_call); - return GRPC_CALL_OK; } + return GRPC_CALL_OK; } grpc_call_error grpc_server_request_call( @@ -1087,6 +1144,7 @@ grpc_call_error grpc_server_request_call( } grpc_cq_begin_op(cq_for_notification); rc->type = BATCH_CALL; + rc->server = server; rc->tag = tag; rc->cq_bound_to_call = cq_bound_to_call; rc->cq_for_notification = cq_for_notification; @@ -1109,6 +1167,7 @@ grpc_call_error grpc_server_request_registered_call( } grpc_cq_begin_op(cq_for_notification); rc->type = REGISTERED_CALL; + rc->server = server; rc->tag = tag; rc->cq_bound_to_call = cq_bound_to_call; rc->cq_for_notification = cq_for_notification; @@ -1188,7 +1247,16 @@ static void begin_call(grpc_server *server, call_data *calld, } static void done_request_event(void *req, grpc_cq_completion *c) { - gpr_free(req); + requested_call *rc = req; + grpc_server *server = rc->server; + + if (rc >= server->requested_calls && + rc < server->requested_calls + server->max_requested_calls) { + gpr_stack_lockfree_push(server->request_freelist, + rc - server->requested_calls); + } else { + gpr_free(req); + } } static void fail_call(grpc_server *server, requested_call *rc) { diff --git a/src/core/transport/chttp2/frame_window_update.c b/src/core/transport/chttp2/frame_window_update.c index b817df7745..d624298ad2 100644 --- a/src/core/transport/chttp2/frame_window_update.c +++ b/src/core/transport/chttp2/frame_window_update.c @@ -94,8 +94,8 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse( } GPR_ASSERT(is_last); - if (transport_parsing->incoming_stream_id) { - if (stream_parsing) { + if (transport_parsing->incoming_stream_id != 0) { + if (stream_parsing != NULL) { GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("update", transport_parsing, stream_parsing, outgoing_window_update, p->amount); diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c index 77162a6864..974b864ffb 100644 --- a/src/core/transport/chttp2/incoming_metadata.c +++ b/src/core/transport/chttp2/incoming_metadata.c @@ -42,7 +42,7 @@ void grpc_chttp2_incoming_metadata_buffer_init( grpc_chttp2_incoming_metadata_buffer *buffer) { - buffer->deadline = gpr_inf_future; + buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } void grpc_chttp2_incoming_metadata_buffer_destroy( @@ -87,7 +87,7 @@ void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into( b.list.tail = (void *)(gpr_intptr)buffer->count; b.garbage.head = b.garbage.tail = NULL; b.deadline = buffer->deadline; - buffer->deadline = gpr_inf_future; + buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); grpc_sopb_add_metadata(sopb, b); } diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index bdd4b432eb..e5e6f445b7 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -353,7 +353,19 @@ typedef struct { /** window available for us to send to peer */ gpr_int64 outgoing_window; - /** window available for peer to send to us - updated after parse */ + /** The number of bytes the upper layers have offered to receive. + As the upper layer offers more bytes, this value increases. + As bytes are read, this value decreases. */ + gpr_uint32 max_recv_bytes; + /** The number of bytes the upper layer has offered to read but we have + not yet announced to HTTP2 flow control. + As the upper layers offer to read more bytes, this value increases. + As we advertise incoming flow control window, this value decreases. */ + gpr_uint32 unannounced_incoming_window; + /** The number of bytes of HTTP2 flow control we have advertised. + As we advertise incoming flow control window, this value increases. + As bytes are read, this value decreases. + Updated after parse. */ gpr_uint32 incoming_window; /** stream ops the transport user would like to send */ grpc_stream_op_buffer *outgoing_sopb; @@ -391,6 +403,8 @@ typedef struct { grpc_stream_op_buffer sopb; /** how strongly should we indicate closure with the next write */ grpc_chttp2_send_closed send_closed; + /** how much window should we announce? */ + gpr_uint32 announce_window; } grpc_chttp2_stream_writing; struct grpc_chttp2_stream_parsing { @@ -501,7 +515,9 @@ void grpc_chttp2_list_add_writable_window_update_stream( grpc_chttp2_stream_global *stream_global); int grpc_chttp2_list_pop_writable_window_update_stream( grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global **stream_global); + grpc_chttp2_transport_writing *transport_writing, + grpc_chttp2_stream_global **stream_global, + grpc_chttp2_stream_writing **stream_writing); void grpc_chttp2_list_remove_writable_window_update_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 9597395aab..aa32f2e44a 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -173,7 +173,14 @@ void grpc_chttp2_publish_reads( GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( "parsed", transport_parsing, stream_parsing, incoming_window_delta, -(gpr_int64)stream_parsing->incoming_window_delta); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( + "parsed", transport_parsing, stream_global, max_recv_bytes, + -(gpr_int64)stream_parsing->incoming_window_delta); stream_global->incoming_window -= stream_parsing->incoming_window_delta; + GPR_ASSERT(stream_global->max_recv_bytes >= + stream_parsing->incoming_window_delta); + stream_global->max_recv_bytes -= + stream_parsing->incoming_window_delta; stream_parsing->incoming_window_delta = 0; grpc_chttp2_list_add_writable_window_update_stream(transport_global, stream_global); @@ -594,7 +601,7 @@ static void on_header(void *tp, grpc_mdelem *md) { cached_timeout)) { gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'", grpc_mdstr_as_c_string(md->value)); - *cached_timeout = gpr_inf_future; + *cached_timeout = gpr_inf_future(GPR_CLOCK_REALTIME); } grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); } diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index d553d80085..d7fc2da5d3 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -585,7 +585,8 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, l->md = hpack_enc(compressor, l->md, &st); need_unref |= l->md != NULL; } - if (gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future) != 0) { + if (gpr_time_cmp(op->data.metadata.deadline, + gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) { deadline_enc(compressor, op->data.metadata.deadline, &st); } curop++; diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 4fea058c19..590f6abfbc 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -139,6 +139,7 @@ static void stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s, void grpc_chttp2_list_add_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { + GPR_ASSERT(stream_global->id != 0); stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE); } @@ -204,6 +205,7 @@ int grpc_chttp2_list_pop_written_stream( void grpc_chttp2_list_add_writable_window_update_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { + GPR_ASSERT(stream_global->id != 0); stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); @@ -211,11 +213,14 @@ void grpc_chttp2_list_add_writable_window_update_stream( int grpc_chttp2_list_pop_writable_window_update_stream( grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global **stream_global) { + grpc_chttp2_transport_writing *transport_writing, + grpc_chttp2_stream_global **stream_global, + grpc_chttp2_stream_writing **stream_writing) { grpc_chttp2_stream *stream; int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); *stream_global = &stream->global; + *stream_writing = &stream->writing; return r; } diff --git a/src/core/transport/chttp2/timeout_encoding.c b/src/core/transport/chttp2/timeout_encoding.c index 33915c4039..1dd768ada4 100644 --- a/src/core/transport/chttp2/timeout_encoding.c +++ b/src/core/transport/chttp2/timeout_encoding.c @@ -147,7 +147,7 @@ int grpc_chttp2_decode_timeout(const char *buffer, gpr_timespec *timeout) { gpr_uint32 xp = x * 10 + *p - '0'; have_digit = 1; if (xp < x) { - *timeout = gpr_inf_future; + *timeout = gpr_inf_future(GPR_CLOCK_REALTIME); return 1; } x = xp; @@ -159,22 +159,22 @@ int grpc_chttp2_decode_timeout(const char *buffer, gpr_timespec *timeout) { /* decode unit specifier */ switch (*p) { case 'n': - *timeout = gpr_time_from_nanos(x); + *timeout = gpr_time_from_nanos(x, GPR_TIMESPAN); break; case 'u': - *timeout = gpr_time_from_micros(x); + *timeout = gpr_time_from_micros(x, GPR_TIMESPAN); break; case 'm': - *timeout = gpr_time_from_millis(x); + *timeout = gpr_time_from_millis(x, GPR_TIMESPAN); break; case 'S': - *timeout = gpr_time_from_seconds(x); + *timeout = gpr_time_from_seconds(x, GPR_TIMESPAN); break; case 'M': - *timeout = gpr_time_from_minutes(x); + *timeout = gpr_time_from_minutes(x, GPR_TIMESPAN); break; case 'H': - *timeout = gpr_time_from_hours(x); + *timeout = gpr_time_from_hours(x, GPR_TIMESPAN); break; default: return 0; diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index a78654334e..d8ec117aa5 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -66,11 +66,9 @@ int grpc_chttp2_unlocking_check_writes( /* for each grpc_chttp2_stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ - while (transport_global->outgoing_window && - grpc_chttp2_list_pop_writable_stream(transport_global, + while (grpc_chttp2_list_pop_writable_stream(transport_global, transport_writing, &stream_global, - &stream_writing) && - stream_global->outgoing_window > 0) { + &stream_writing)) { stream_writing->id = stream_global->id; window_delta = grpc_chttp2_preencode( stream_global->outgoing_sopb->ops, &stream_global->outgoing_sopb->nops, @@ -106,20 +104,21 @@ int grpc_chttp2_unlocking_check_writes( /* for each grpc_chttp2_stream that wants to update its window, add that * window here */ while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global, - &stream_global)) { - window_delta = - transport_global->settings[GRPC_LOCAL_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] - - stream_global->incoming_window; - if (!stream_global->read_closed && window_delta > 0) { - gpr_slice_buffer_add( - &transport_writing->outbuf, - grpc_chttp2_window_update_create(stream_global->id, window_delta)); + transport_writing, + &stream_global, + &stream_writing)) { + stream_writing->id = stream_global->id; + if (!stream_global->read_closed && stream_global->unannounced_incoming_window > 0) { + stream_writing->announce_window = stream_global->unannounced_incoming_window; GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, - incoming_window, window_delta); - stream_global->incoming_window += window_delta; + incoming_window, stream_global->unannounced_incoming_window); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, + unannounced_incoming_window, -(gpr_int64)stream_global->unannounced_incoming_window); + stream_global->incoming_window += stream_global->unannounced_incoming_window; + stream_global->unannounced_incoming_window = 0; grpc_chttp2_list_add_incoming_window_updated(transport_global, stream_global); + grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); } } @@ -169,10 +168,19 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { while ( grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) { - grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops, - stream_writing->send_closed != GRPC_DONT_SEND_CLOSED, - stream_writing->id, &transport_writing->hpack_compressor, - &transport_writing->outbuf); + if (stream_writing->sopb.nops > 0 || stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { + grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops, + stream_writing->send_closed != GRPC_DONT_SEND_CLOSED, + stream_writing->id, &transport_writing->hpack_compressor, + &transport_writing->outbuf); + } + if (stream_writing->announce_window > 0) { + gpr_slice_buffer_add( + &transport_writing->outbuf, + grpc_chttp2_window_update_create( + stream_writing->id, stream_writing->announce_window)); + stream_writing->announce_window = 0; + } stream_writing->sopb.nops = 0; if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) { gpr_slice_buffer_add(&transport_writing->outbuf, @@ -197,7 +205,8 @@ void grpc_chttp2_cleanup_writing( while (grpc_chttp2_list_pop_written_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { - if (stream_global->outgoing_sopb->nops == 0) { + if (stream_global->outgoing_sopb != NULL && + stream_global->outgoing_sopb->nops == 0) { stream_global->outgoing_sopb = NULL; grpc_chttp2_schedule_closure(transport_global, stream_global->send_done_closure, 1); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 1ad5066f27..ac8a4665db 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -360,7 +360,9 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, s->global.outgoing_window = t->global.settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - s->parsing.incoming_window = s->global.incoming_window = + s->global.max_recv_bytes = + s->parsing.incoming_window = + s->global.incoming_window = t->global.settings[GRPC_SENT_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; *t->accepting_stream = s; @@ -564,6 +566,8 @@ static void maybe_start_some_streams( stream_global->incoming_window = transport_global->settings[GRPC_SENT_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + stream_global->max_recv_bytes = + GPR_MAX(stream_global->incoming_window, stream_global->max_recv_bytes); grpc_chttp2_stream_map_add( &TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map, stream_global->id, STREAM_FROM_GLOBAL(stream_global)); @@ -572,6 +576,9 @@ static void maybe_start_some_streams( grpc_chttp2_list_add_incoming_window_updated(transport_global, stream_global); grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_list_add_writable_window_update_stream(transport_global, + stream_global); + } /* cancel out streams that will never be started */ while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID && @@ -622,12 +629,23 @@ static void perform_stream_op_locked( stream_global->publish_sopb = op->recv_ops; stream_global->publish_sopb->nops = 0; stream_global->publish_state = op->recv_state; + if (stream_global->max_recv_bytes < op->max_recv_bytes) { + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("op", transport_global, stream_global, + max_recv_bytes, op->max_recv_bytes - stream_global->max_recv_bytes); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( + "op", transport_global, stream_global, unannounced_incoming_window, + op->max_recv_bytes - stream_global->max_recv_bytes); + stream_global->unannounced_incoming_window += op->max_recv_bytes - stream_global->max_recv_bytes; + stream_global->max_recv_bytes = op->max_recv_bytes; + } grpc_chttp2_incoming_metadata_live_op_buffer_end( &stream_global->outstanding_metadata); - grpc_chttp2_list_add_read_write_state_changed(transport_global, - stream_global); - grpc_chttp2_list_add_writable_window_update_stream(transport_global, - stream_global); + if (stream_global->id != 0) { + grpc_chttp2_list_add_read_write_state_changed(transport_global, + stream_global); + grpc_chttp2_list_add_writable_window_update_stream(transport_global, + stream_global); + } } if (op->bind_pollset) { @@ -1056,7 +1074,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason, identifier = gpr_strdup(context_scope); } gpr_log(GPR_INFO, - "FLOWCTL: %s %-10s %8s %-23s %8lld %c %8lld = %8lld %-10s [%s:%d]", + "FLOWCTL: %s %-10s %8s %-27s %8lld %c %8lld = %8lld %-10s [%s:%d]", is_client ? "client" : "server", identifier, context_thread, var, current_value, delta < 0 ? '-' : '+', delta < 0 ? -delta : delta, current_value + delta, reason, file, line); diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c index fdb50c6b71..71061fe0c7 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/stream_op.c @@ -205,7 +205,7 @@ void grpc_metadata_batch_assert_ok(grpc_metadata_batch *batch) { void grpc_metadata_batch_init(grpc_metadata_batch *batch) { batch->list.head = batch->list.tail = batch->garbage.head = batch->garbage.tail = NULL; - batch->deadline = gpr_inf_future; + batch->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } void grpc_metadata_batch_destroy(grpc_metadata_batch *batch) { diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 5af81232ac..aac42303a9 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -72,6 +72,10 @@ typedef struct grpc_transport_stream_op { grpc_stream_op_buffer *recv_ops; grpc_stream_state *recv_state; + /** The number of bytes this peer is currently prepared to receive. + These bytes will be eventually used to replenish per-stream flow control + windows. */ + gpr_uint32 max_recv_bytes; grpc_iomgr_closure *on_done_recv; grpc_pollset *bind_pollset; diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c index 0da396a320..9d127c5472 100644 --- a/src/core/transport/transport_op_string.c +++ b/src/core/transport/transport_op_string.c @@ -61,7 +61,7 @@ static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) { if (m != md.list.head) gpr_strvec_add(b, gpr_strdup(", ")); put_metadata(b, m->md); } - if (gpr_time_cmp(md.deadline, gpr_inf_future) != 0) { + if (gpr_time_cmp(md.deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) { char *tmp; gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec, md.deadline.tv_nsec); @@ -128,7 +128,8 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) { if (op->recv_ops) { if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); first = 0; - gpr_strvec_add(&b, gpr_strdup("RECV")); + gpr_asprintf(&tmp, "RECV:max_recv_bytes=%d", op->max_recv_bytes); + gpr_strvec_add(&b, tmp); } if (op->bind_pollset) { |