diff options
Diffstat (limited to 'src/core')
24 files changed, 210 insertions, 100 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index 93d54fdcfe..8a98a6bcbe 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -205,7 +205,11 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&chand->mu_config); old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; - if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) { + if (lb_policy != NULL) { + grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, + NULL); + } else if (chand->resolver == NULL /* disconnected */) { + grpc_closure_list_fail_all(&chand->waiting_for_config_closures); grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, NULL); } @@ -293,6 +297,11 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_resolver_shutdown(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; + if (!chand->started_resolving) { + grpc_closure_list_fail_all(&chand->waiting_for_config_closures); + grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, + NULL); + } if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, chand->lb_policy->interested_parties, @@ -321,10 +330,10 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) { continue_picking_args *cpa = arg; - if (!success) { - grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL); - } else if (cpa->connected_subchannel == NULL) { + if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ + } else if (!success) { + grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL); } else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, cpa->initial_metadata_flags, cpa->connected_subchannel, cpa->on_ready)) { @@ -381,14 +390,19 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, &chand->incoming_configuration, &chand->on_config_changed); } - cpa = gpr_malloc(sizeof(*cpa)); - cpa->initial_metadata = initial_metadata; - cpa->initial_metadata_flags = initial_metadata_flags; - cpa->connected_subchannel = connected_subchannel; - cpa->on_ready = on_ready; - cpa->elem = elem; - grpc_closure_init(&cpa->closure, continue_picking, cpa); - grpc_closure_list_add(&chand->waiting_for_config_closures, &cpa->closure, 1); + if (chand->resolver != NULL) { + cpa = gpr_malloc(sizeof(*cpa)); + cpa->initial_metadata = initial_metadata; + cpa->initial_metadata_flags = initial_metadata_flags; + cpa->connected_subchannel = connected_subchannel; + cpa->on_ready = on_ready; + cpa->elem = elem; + grpc_closure_init(&cpa->closure, continue_picking, cpa); + grpc_closure_list_add(&chand->waiting_for_config_closures, &cpa->closure, + 1); + } else { + grpc_exec_ctx_enqueue(exec_ctx, on_ready, false, NULL); + } gpr_mu_unlock(&chand->mu_config); return 0; } diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index 125a291f21..c925c28c67 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -135,8 +135,6 @@ struct grpc_subchannel { int have_alarm; /** our alarm */ grpc_timer alarm; - /** current random value */ - uint32_t random; }; struct grpc_subchannel_call { @@ -297,10 +295,6 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, } } -static uint32_t random_seed() { - return (uint32_t)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC))); -} - grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, grpc_connector *connector, grpc_subchannel_args *args) { @@ -332,7 +326,6 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, grpc_set_initial_connect_string(&c->addr, &c->addr_len, &c->initial_connect_string); c->args = grpc_channel_args_copy(args->args); - c->random = random_seed(); c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; grpc_closure_init(&c->connected, subchannel_connected, c); diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c index 3db462b246..9918fbdcb4 100644 --- a/src/core/ext/client_config/subchannel_call_holder.c +++ b/src/core/ext/client_config/subchannel_call_holder.c @@ -252,9 +252,9 @@ char *grpc_subchannel_call_holder_get_peer( grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder) { grpc_subchannel_call *subchannel_call = GET_CALL(holder); - if (subchannel_call) { - return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); - } else { + if (subchannel_call == NULL || subchannel_call == CANCELLED_CALL) { return NULL; + } else { + return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); } } diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 2749b0ca01..620ba4e2aa 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -86,7 +86,8 @@ typedef struct { static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); -static void dns_start_resolving_locked(dns_resolver *r); +static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, + dns_resolver *r); static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, dns_resolver *r); @@ -119,7 +120,7 @@ static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&r->mu); if (!r->resolving) { gpr_backoff_reset(&r->backoff_state); - dns_start_resolving_locked(r); + dns_start_resolving_locked(exec_ctx, r); } gpr_mu_unlock(&r->mu); } @@ -134,7 +135,7 @@ static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, r->target_config = target_config; if (r->resolved_version == 0 && !r->resolving) { gpr_backoff_reset(&r->backoff_state); - dns_start_resolving_locked(r); + dns_start_resolving_locked(exec_ctx, r); } else { dns_maybe_finish_next_locked(exec_ctx, r); } @@ -149,7 +150,7 @@ static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, r->have_retry_timer = false; if (success) { if (!r->resolving) { - dns_start_resolving_locked(r); + dns_start_resolving_locked(exec_ctx, r); } } gpr_mu_unlock(&r->mu); @@ -201,11 +202,12 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "dns-resolving"); } -static void dns_start_resolving_locked(dns_resolver *r) { +static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, + dns_resolver *r) { GRPC_RESOLVER_REF(&r->base, "dns-resolving"); GPR_ASSERT(!r->resolving); r->resolving = 1; - grpc_resolve_address(r->name, r->default_port, dns_on_resolved, r); + grpc_resolve_address(exec_ctx, r->name, r->default_port, dns_on_resolved, r); } static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/resolver/zookeeper/zookeeper_resolver.c b/src/core/ext/resolver/zookeeper/zookeeper_resolver.c index 898632c3cd..deb4b9b1ef 100644 --- a/src/core/ext/resolver/zookeeper/zookeeper_resolver.c +++ b/src/core/ext/resolver/zookeeper/zookeeper_resolver.c @@ -299,7 +299,7 @@ static void zookeeper_get_children_node_completion(int rc, const char *value, address = zookeeper_parse_address(value, (size_t)value_len); if (address != NULL) { /** Further resolves address by DNS */ - grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); + grpc_resolve_address(&exec_ctx, address, NULL, zookeeper_dns_resolved, r); gpr_free(address); } else { gpr_log(GPR_ERROR, "Error in resolving a child node of %s", r->name); @@ -375,8 +375,10 @@ static void zookeeper_get_node_completion(int rc, const char *value, r->resolved_addrs->naddrs = 0; r->resolved_total = 1; /** Further resolves address by DNS */ - grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resolve_address(&exec_ctx, address, NULL, zookeeper_dns_resolved, r); gpr_free(address); + grpc_exec_ctx_finish(&exec_ctx); return; } diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 0ed115793b..c5d3d8d9cc 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -235,5 +235,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target, grpc_exec_ctx_finish(&exec_ctx); - return channel; /* may be NULL */ + return channel != NULL ? channel : grpc_lame_client_channel_create( + target, GRPC_STATUS_INTERNAL, + "Failed to create client channel"); } diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index 93c3e6d8b4..687936bfd3 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -639,7 +639,7 @@ static int on_hdr(grpc_chttp2_hpack_parser *p, grpc_mdelem *md, } } if (p->on_header == NULL) { - grpc_mdelem_unref(md); + GRPC_MDELEM_UNREF(md); return 0; } p->on_header(p->on_header_user_data, md); diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 229fdb5ef6..3d42d0e616 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -268,8 +268,14 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, channeld->default_compression_algorithm = grpc_channel_args_get_compression_algorithm(args->channel_args); /* Make sure the default isn't disabled. */ - GPR_ASSERT(grpc_compression_options_is_algorithm_enabled( - &channeld->compression_options, channeld->default_compression_algorithm)); + if (!grpc_compression_options_is_algorithm_enabled( + &channeld->compression_options, + channeld->default_compression_algorithm)) { + gpr_log(GPR_DEBUG, + "compression algorithm %d not enabled: switching to none", + channeld->default_compression_algorithm); + channeld->default_compression_algorithm = GRPC_COMPRESS_NONE; + } channeld->compression_options.default_compression_algorithm = channeld->default_compression_algorithm; diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 76bd1b64dc..f22721ac8f 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -246,7 +246,7 @@ static void internal_request_begin( grpc_pollset_set_add_pollset(exec_ctx, req->context->pollset_set, req->pollset); - grpc_resolve_address(request->host, req->handshaker->default_port, + grpc_resolve_address(exec_ctx, request->host, req->handshaker->default_port, on_resolved, req); } diff --git a/src/core/lib/http/parser.c b/src/core/lib/http/parser.c index 921c772453..a7efb5e73e 100644 --- a/src/core/lib/http/parser.c +++ b/src/core/lib/http/parser.c @@ -172,8 +172,8 @@ static int add_header(grpc_http_parser *parser) { while (cur != end && (*cur == ' ' || *cur == '\t')) { cur++; } - GPR_ASSERT(end - cur >= 2); - hdr.value = buf2str(cur, (size_t)(end - cur) - 2); + GPR_ASSERT((size_t)(end - cur) >= parser->cur_line_end_length); + hdr.value = buf2str(cur, (size_t)(end - cur) - parser->cur_line_end_length); if (parser->type == GRPC_HTTP_RESPONSE) { hdr_count = &parser->http.response.hdr_count; @@ -208,7 +208,7 @@ static int finish_line(grpc_http_parser *parser) { parser->state = GRPC_HTTP_HEADERS; break; case GRPC_HTTP_HEADERS: - if (parser->cur_line_length == 2) { + if (parser->cur_line_length == parser->cur_line_end_length) { parser->state = GRPC_HTTP_BODY; break; } @@ -248,6 +248,30 @@ static int addbyte_body(grpc_http_parser *parser, uint8_t byte) { return 1; } +static int check_line(grpc_http_parser *parser) { + if (parser->cur_line_length >= 2 && + parser->cur_line[parser->cur_line_length - 2] == '\r' && + parser->cur_line[parser->cur_line_length - 1] == '\n') { + return 1; + } + + // HTTP request with \n\r line termiantors. + else if (parser->cur_line_length >= 2 && + parser->cur_line[parser->cur_line_length - 2] == '\n' && + parser->cur_line[parser->cur_line_length - 1] == '\r') { + return 1; + } + + // HTTP request with only \n line terminators. + else if (parser->cur_line_length >= 1 && + parser->cur_line[parser->cur_line_length - 1] == '\n') { + parser->cur_line_end_length = 1; + return 1; + } + + return 0; +} + static int addbyte(grpc_http_parser *parser, uint8_t byte) { switch (parser->state) { case GRPC_HTTP_FIRST_LINE: @@ -260,9 +284,7 @@ static int addbyte(grpc_http_parser *parser, uint8_t byte) { } parser->cur_line[parser->cur_line_length] = byte; parser->cur_line_length++; - if (parser->cur_line_length >= 2 && - parser->cur_line[parser->cur_line_length - 2] == '\r' && - parser->cur_line[parser->cur_line_length - 1] == '\n') { + if (check_line(parser)) { return finish_line(parser); } else { return 1; @@ -278,6 +300,7 @@ void grpc_http_parser_init(grpc_http_parser *parser) { memset(parser, 0, sizeof(*parser)); parser->state = GRPC_HTTP_FIRST_LINE; parser->type = GRPC_HTTP_UNKNOWN; + parser->cur_line_end_length = 2; } void grpc_http_parser_destroy(grpc_http_parser *parser) { diff --git a/src/core/lib/http/parser.h b/src/core/lib/http/parser.h index 42fa5181b8..536637e9a2 100644 --- a/src/core/lib/http/parser.h +++ b/src/core/lib/http/parser.h @@ -105,6 +105,7 @@ typedef struct { uint8_t cur_line[GRPC_HTTP_PARSER_MAX_HEADER_LENGTH]; size_t cur_line_length; + size_t cur_line_end_length; } grpc_http_parser; void grpc_http_parser_init(grpc_http_parser *parser); diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c index d6f073fc9d..27793c32e4 100644 --- a/src/core/lib/iomgr/closure.c +++ b/src/core/lib/iomgr/closure.c @@ -54,6 +54,12 @@ void grpc_closure_list_add(grpc_closure_list *closure_list, closure_list->tail = closure; } +void grpc_closure_list_fail_all(grpc_closure_list *list) { + for (grpc_closure *c = list->head; c != NULL; c = grpc_closure_next(c)) { + c->final_data &= ~(uintptr_t)1; + } +} + bool grpc_closure_list_empty(grpc_closure_list closure_list) { return closure_list.head == NULL; } diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 8652b53a8b..fdc2daed9d 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -86,6 +86,9 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg); void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure, bool success); +/** force all success bits in \a list to false */ +void grpc_closure_list_fail_all(grpc_closure_list *list); + /** append all closures from \a src to \a dst and empty \a src. */ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst); diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index e09ef02400..976cc40347 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -93,6 +93,8 @@ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, grpc_workqueue *offload_target_or_null); void grpc_exec_ctx_global_init(void); + +void grpc_exec_ctx_global_init(void); void grpc_exec_ctx_global_shutdown(void); #endif /* GRPC_CORE_LIB_IOMGR_EXEC_CTX_H */ diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h index ecc06340a3..ef198fe0f6 100644 --- a/src/core/lib/iomgr/resolve_address.h +++ b/src/core/lib/iomgr/resolve_address.h @@ -59,8 +59,9 @@ typedef void (*grpc_resolve_cb)(grpc_exec_ctx *exec_ctx, void *arg, /* Asynchronously resolve addr. Use default_port if a port isn't designated in addr, otherwise use the port in addr. */ /* TODO(ctiller): add a timeout here */ -void grpc_resolve_address(const char *addr, const char *default_port, - grpc_resolve_cb cb, void *arg); +extern void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *addr, + const char *default_port, + grpc_resolve_cb cb, void *arg); /* Destroy resolved addresses */ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addresses); diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index b9d3bbdb89..cae91eec20 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -164,8 +164,9 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { gpr_free(addrs); } -void grpc_resolve_address(const char *name, const char *default_port, - grpc_resolve_cb cb, void *arg) { +static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, + const char *default_port, grpc_resolve_cb cb, + void *arg) { request *r = gpr_malloc(sizeof(request)); grpc_closure_init(&r->request_closure, do_request_thread, r); r->name = gpr_strdup(name); @@ -175,4 +176,8 @@ void grpc_resolve_address(const char *name, const char *default_port, grpc_executor_enqueue(&r->request_closure, 1); } +void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name, + const char *default_port, grpc_resolve_cb cb, + void *arg) = resolve_address_impl; + #endif diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c index 82763d11f4..914736234d 100644 --- a/src/core/lib/iomgr/resolve_address_windows.c +++ b/src/core/lib/iomgr/resolve_address_windows.c @@ -155,8 +155,9 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { gpr_free(addrs); } -void grpc_resolve_address(const char *name, const char *default_port, - grpc_resolve_cb cb, void *arg) { +static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, + const char *default_port, grpc_resolve_cb cb, + void *arg) { request *r = gpr_malloc(sizeof(request)); grpc_closure_init(&r->request_closure, do_request_thread, r); r->name = gpr_strdup(name); @@ -166,4 +167,8 @@ void grpc_resolve_address(const char *name, const char *default_port, grpc_executor_enqueue(&r->request_closure, 1); } +void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name, + const char *default_port, grpc_resolve_cb cb, + void *arg) = resolve_address_impl; + #endif diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 6430cb629f..e93d5734a0 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -211,11 +211,11 @@ finish: grpc_exec_ctx_enqueue(exec_ctx, closure, *ep != NULL, NULL); } -void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_endpoint **ep, - grpc_pollset_set *interested_parties, - const struct sockaddr *addr, size_t addr_len, - gpr_timespec deadline) { +static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, + grpc_closure *closure, grpc_endpoint **ep, + grpc_pollset_set *interested_parties, + const struct sockaddr *addr, + size_t addr_len, gpr_timespec deadline) { int fd; grpc_dualstack_mode dsmode; int err; @@ -303,4 +303,19 @@ done: gpr_free(addr_str); } +// overridden by api_fuzzer.c +void (*grpc_tcp_client_connect_impl)( + grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, + grpc_pollset_set *interested_parties, const struct sockaddr *addr, + size_t addr_len, gpr_timespec deadline) = tcp_client_connect_impl; + +void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_endpoint **ep, + grpc_pollset_set *interested_parties, + const struct sockaddr *addr, size_t addr_len, + gpr_timespec deadline) { + grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, addr, + addr_len, deadline); +} + #endif diff --git a/src/core/lib/iomgr/timer.c b/src/core/lib/iomgr/timer.c index 713f15b69e..acb5b26c87 100644 --- a/src/core/lib/iomgr/timer.c +++ b/src/core/lib/iomgr/timer.c @@ -70,6 +70,7 @@ static gpr_clock_type g_clock_type; static shard_type g_shards[NUM_SHARDS]; /* Protected by g_mu */ static shard_type *g_shard_queue[NUM_SHARDS]; +static bool g_initialized = false; static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_timespec *next, int success); @@ -83,6 +84,7 @@ static gpr_timespec compute_min_deadline(shard_type *shard) { void grpc_timer_list_init(gpr_timespec now) { uint32_t i; + g_initialized = true; gpr_mu_init(&g_mu); gpr_mu_init(&g_checker_mu); g_clock_type = now.clock_type; @@ -111,6 +113,7 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { } gpr_mu_destroy(&g_mu); gpr_mu_destroy(&g_checker_mu); + g_initialized = false; } /* This is a cheap, but good enough, pointer hash for sharding the tasks: */ @@ -180,6 +183,18 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, timer->deadline = deadline; timer->triggered = 0; + if (!g_initialized) { + timer->triggered = 1; + grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, false, NULL); + return; + } + + if (gpr_time_cmp(deadline, now) <= 0) { + timer->triggered = 1; + grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, true, NULL); + return; + } + /* TODO(ctiller): check deadline expired */ gpr_mu_lock(&shard->mu); diff --git a/src/core/lib/support/time_posix.c b/src/core/lib/support/time_posix.c index f5f62dadc6..11542072fe 100644 --- a/src/core/lib/support/time_posix.c +++ b/src/core/lib/support/time_posix.c @@ -78,7 +78,7 @@ static const clockid_t clockid_for_gpr_clock[] = {CLOCK_MONOTONIC, void gpr_time_init(void) { gpr_precise_clock_init(); } -gpr_timespec gpr_now(gpr_clock_type clock_type) { +static gpr_timespec now_impl(gpr_clock_type clock_type) { struct timespec now; GPR_ASSERT(clock_type != GPR_TIMESPAN); if (clock_type == GPR_CLOCK_PRECISE) { @@ -114,7 +114,7 @@ void gpr_time_init(void) { g_time_start = mach_absolute_time(); } -gpr_timespec gpr_now(gpr_clock_type clock) { +static gpr_timespec now_impl(gpr_clock_type clock) { gpr_timespec now; struct timeval now_tv; double now_dbl; @@ -142,6 +142,12 @@ gpr_timespec gpr_now(gpr_clock_type clock) { } #endif +gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type) = now_impl; + +gpr_timespec gpr_now(gpr_clock_type clock_type) { + return gpr_now_impl(clock_type); +} + void gpr_sleep_until(gpr_timespec until) { gpr_timespec now; gpr_timespec delta; diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 6581bbd3d1..fa12b6ea61 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -142,22 +142,23 @@ struct grpc_call { gpr_mu mu; /* client or server call */ - uint8_t is_client; + bool is_client; /* is the alarm set */ - uint8_t have_alarm; + bool have_alarm; /** has grpc_call_destroy been called */ - uint8_t destroy_called; + bool destroy_called; /** flag indicating that cancellation is inherited */ - uint8_t cancellation_is_inherited; + bool cancellation_is_inherited; /** bitmask of live batches */ uint8_t used_batches; /** which ops are in-flight */ - uint8_t sent_initial_metadata; - uint8_t sending_message; - uint8_t sent_final_op; - uint8_t received_initial_metadata; - uint8_t receiving_message; - uint8_t received_final_op; + bool sent_initial_metadata; + bool sending_message; + bool sent_final_op; + bool received_initial_metadata; + bool receiving_message; + bool requested_final_op; + bool received_final_op; /* have we received initial metadata */ bool has_initial_md_been_received; @@ -220,10 +221,7 @@ struct grpc_call { } server; } final_op; - struct { - void *bctlp; - bool success; - } saved_receiving_stream_ready_ctx; + void *saved_receiving_stream_ready_bctlp; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -554,21 +552,6 @@ static int prepare_application_metadata(grpc_call *call, int count, int i; grpc_metadata_batch *batch = &call->metadata_batch[0 /* is_receiving */][is_trailing]; - if (prepend_extra_metadata) { - if (call->send_extra_metadata_count == 0) { - prepend_extra_metadata = 0; - } else { - for (i = 0; i < call->send_extra_metadata_count; i++) { - GRPC_MDELEM_REF(call->send_extra_metadata[i].md); - } - for (i = 1; i < call->send_extra_metadata_count; i++) { - call->send_extra_metadata[i].prev = &call->send_extra_metadata[i - 1]; - } - for (i = 0; i < call->send_extra_metadata_count - 1; i++) { - call->send_extra_metadata[i].next = &call->send_extra_metadata[i + 1]; - } - } - } for (i = 0; i < count; i++) { grpc_metadata *md = &metadata[i]; grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; @@ -579,14 +562,37 @@ static int prepare_application_metadata(grpc_call *call, int count, GRPC_MDSTR_LENGTH(l->md->key))) { gpr_log(GPR_ERROR, "attempt to send invalid metadata key: %s", grpc_mdstr_as_c_string(l->md->key)); - return 0; + break; } else if (!grpc_is_binary_header(grpc_mdstr_as_c_string(l->md->key), GRPC_MDSTR_LENGTH(l->md->key)) && !grpc_header_nonbin_value_is_legal( grpc_mdstr_as_c_string(l->md->value), GRPC_MDSTR_LENGTH(l->md->value))) { gpr_log(GPR_ERROR, "attempt to send invalid metadata value"); - return 0; + break; + } + } + if (i != count) { + for (int j = 0; j <= i; j++) { + grpc_metadata *md = &metadata[j]; + grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; + GRPC_MDELEM_UNREF(l->md); + } + return 0; + } + if (prepend_extra_metadata) { + if (call->send_extra_metadata_count == 0) { + prepend_extra_metadata = 0; + } else { + for (i = 0; i < call->send_extra_metadata_count; i++) { + GRPC_MDELEM_REF(call->send_extra_metadata[i].md); + } + for (i = 1; i < call->send_extra_metadata_count; i++) { + call->send_extra_metadata[i].prev = &call->send_extra_metadata[i - 1]; + } + for (i = 0; i < call->send_extra_metadata_count - 1; i++) { + call->send_extra_metadata[i].next = &call->send_extra_metadata[i + 1]; + } } } for (i = 1; i < count; i++) { @@ -1057,12 +1063,12 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_call *call = bctl->call; gpr_mu_lock(&bctl->call->mu); - if (bctl->call->has_initial_md_been_received) { + if (bctl->call->has_initial_md_been_received || !success || + call->receiving_stream == NULL) { gpr_mu_unlock(&bctl->call->mu); process_data_after_md(exec_ctx, bctlp, success); } else { - call->saved_receiving_stream_ready_ctx.bctlp = bctlp; - call->saved_receiving_stream_ready_ctx.success = success; + call->saved_receiving_stream_ready_bctlp = bctlp; gpr_mu_unlock(&bctl->call->mu); } } @@ -1091,13 +1097,11 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, } call->has_initial_md_been_received = true; - if (call->saved_receiving_stream_ready_ctx.bctlp != NULL) { + if (call->saved_receiving_stream_ready_bctlp != NULL) { grpc_closure *saved_rsr_closure = grpc_closure_create( - receiving_stream_ready, call->saved_receiving_stream_ready_ctx.bctlp); - grpc_exec_ctx_enqueue( - exec_ctx, saved_rsr_closure, - call->saved_receiving_stream_ready_ctx.success && success, NULL); - call->saved_receiving_stream_ready_ctx.bctlp = NULL; + receiving_stream_ready, call->saved_receiving_stream_ready_bctlp); + call->saved_receiving_stream_ready_bctlp = NULL; + grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure, success, NULL); } gpr_mu_unlock(&call->mu); @@ -1133,6 +1137,7 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; grpc_metadata_batch_filter(md, recv_trailing_filter, call); + call->received_final_op = true; if (call->have_alarm) { grpc_timer_cancel(exec_ctx, &call->alarm); } @@ -1377,11 +1382,11 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_NOT_ON_SERVER; goto done_with_error; } - if (call->received_final_op) { + if (call->requested_final_op) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - call->received_final_op = 1; + call->requested_final_op = 1; call->buffered_metadata[1] = op->data.recv_status_on_client.trailing_metadata; call->final_op.client.status = op->data.recv_status_on_client.status; @@ -1404,11 +1409,11 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_NOT_ON_CLIENT; goto done_with_error; } - if (call->received_final_op) { + if (call->requested_final_op) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - call->received_final_op = 1; + call->requested_final_op = 1; call->final_op.server.cancelled = op->data.recv_close_on_server.cancelled; bctl->recv_final_op = 1; @@ -1457,7 +1462,7 @@ done_with_error: call->receiving_message = 0; } if (bctl->recv_final_op) { - call->received_final_op = 0; + call->requested_final_op = 0; } gpr_mu_unlock(&call->mu); goto done; diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index c1f6812c4e..80bd95df68 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -99,6 +99,9 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, if (op->on_consumed != NULL) { op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 1); } + if (op->send_ping != NULL) { + op->send_ping->cb(exec_ctx, op->send_ping->cb_arg, 0); + } } static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, diff --git a/src/core/lib/surface/validate_metadata.c b/src/core/lib/surface/validate_metadata.c index bf4126867f..84f0a083bc 100644 --- a/src/core/lib/surface/validate_metadata.c +++ b/src/core/lib/surface/validate_metadata.c @@ -40,7 +40,7 @@ static int conforms_to(const char *s, size_t len, const uint8_t *legal_bits) { const char *p = s; const char *e = s + len; for (; p != e; p++) { - int idx = *p; + int idx = (uint8_t)*p; int byte = idx / 8; int bit = idx % 8; if ((legal_bits[byte] & (1 << bit)) == 0) return 0; diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h index e29e8df2c9..713d9e6782 100644 --- a/src/core/lib/transport/metadata.h +++ b/src/core/lib/transport/metadata.h @@ -120,6 +120,7 @@ void grpc_mdelem_set_user_data(grpc_mdelem *md, void (*destroy_func)(void *), void *user_data); /* Reference counting */ +//#define GRPC_METADATA_REFCOUNT_DEBUG #ifdef GRPC_METADATA_REFCOUNT_DEBUG #define GRPC_MDSTR_REF(s) grpc_mdstr_ref((s), __FILE__, __LINE__) #define GRPC_MDSTR_UNREF(s) grpc_mdstr_unref((s), __FILE__, __LINE__) |