aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/client_config/client_channel.c38
-rw-r--r--src/core/ext/client_config/subchannel.c7
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.c6
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c14
-rw-r--r--src/core/ext/resolver/zookeeper/zookeeper_resolver.c6
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c4
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.c2
-rw-r--r--src/core/lib/channel/compress_filter.c10
-rw-r--r--src/core/lib/http/httpcli.c2
-rw-r--r--src/core/lib/iomgr/closure.c6
-rw-r--r--src/core/lib/iomgr/closure.h3
-rw-r--r--src/core/lib/iomgr/exec_ctx.h2
-rw-r--r--src/core/lib/iomgr/resolve_address.h5
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.c9
-rw-r--r--src/core/lib/iomgr/resolve_address_windows.c9
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c25
-rw-r--r--src/core/lib/iomgr/timer.c15
-rw-r--r--src/core/lib/support/time_posix.c10
-rw-r--r--src/core/lib/surface/call.c95
-rw-r--r--src/core/lib/surface/lame_client.c3
-rw-r--r--src/core/lib/surface/validate_metadata.c2
-rw-r--r--src/core/lib/transport/metadata.h1
22 files changed, 180 insertions, 94 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index 93d54fdcfe..8a98a6bcbe 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -205,7 +205,11 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_lock(&chand->mu_config);
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
- if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
+ if (lb_policy != NULL) {
+ grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
+ NULL);
+ } else if (chand->resolver == NULL /* disconnected */) {
+ grpc_closure_list_fail_all(&chand->waiting_for_config_closures);
grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
NULL);
}
@@ -293,6 +297,11 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_resolver_shutdown(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
+ if (!chand->started_resolving) {
+ grpc_closure_list_fail_all(&chand->waiting_for_config_closures);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
+ NULL);
+ }
if (chand->lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx,
chand->lb_policy->interested_parties,
@@ -321,10 +330,10 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
continue_picking_args *cpa = arg;
- if (!success) {
- grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL);
- } else if (cpa->connected_subchannel == NULL) {
+ if (cpa->connected_subchannel == NULL) {
/* cancelled, do nothing */
+ } else if (!success) {
+ grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL);
} else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->initial_metadata_flags,
cpa->connected_subchannel, cpa->on_ready)) {
@@ -381,14 +390,19 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
&chand->incoming_configuration,
&chand->on_config_changed);
}
- cpa = gpr_malloc(sizeof(*cpa));
- cpa->initial_metadata = initial_metadata;
- cpa->initial_metadata_flags = initial_metadata_flags;
- cpa->connected_subchannel = connected_subchannel;
- cpa->on_ready = on_ready;
- cpa->elem = elem;
- grpc_closure_init(&cpa->closure, continue_picking, cpa);
- grpc_closure_list_add(&chand->waiting_for_config_closures, &cpa->closure, 1);
+ if (chand->resolver != NULL) {
+ cpa = gpr_malloc(sizeof(*cpa));
+ cpa->initial_metadata = initial_metadata;
+ cpa->initial_metadata_flags = initial_metadata_flags;
+ cpa->connected_subchannel = connected_subchannel;
+ cpa->on_ready = on_ready;
+ cpa->elem = elem;
+ grpc_closure_init(&cpa->closure, continue_picking, cpa);
+ grpc_closure_list_add(&chand->waiting_for_config_closures, &cpa->closure,
+ 1);
+ } else {
+ grpc_exec_ctx_enqueue(exec_ctx, on_ready, false, NULL);
+ }
gpr_mu_unlock(&chand->mu_config);
return 0;
}
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index 125a291f21..c925c28c67 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -135,8 +135,6 @@ struct grpc_subchannel {
int have_alarm;
/** our alarm */
grpc_timer alarm;
- /** current random value */
- uint32_t random;
};
struct grpc_subchannel_call {
@@ -297,10 +295,6 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
}
}
-static uint32_t random_seed() {
- return (uint32_t)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC)));
-}
-
grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
grpc_connector *connector,
grpc_subchannel_args *args) {
@@ -332,7 +326,6 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
grpc_set_initial_connect_string(&c->addr, &c->addr_len,
&c->initial_connect_string);
c->args = grpc_channel_args_copy(args->args);
- c->random = random_seed();
c->root_external_state_watcher.next = c->root_external_state_watcher.prev =
&c->root_external_state_watcher;
grpc_closure_init(&c->connected, subchannel_connected, c);
diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c
index 3db462b246..9918fbdcb4 100644
--- a/src/core/ext/client_config/subchannel_call_holder.c
+++ b/src/core/ext/client_config/subchannel_call_holder.c
@@ -252,9 +252,9 @@ char *grpc_subchannel_call_holder_get_peer(
grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder) {
grpc_subchannel_call *subchannel_call = GET_CALL(holder);
- if (subchannel_call) {
- return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
- } else {
+ if (subchannel_call == NULL || subchannel_call == CANCELLED_CALL) {
return NULL;
+ } else {
+ return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
}
}
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index 2749b0ca01..620ba4e2aa 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -86,7 +86,8 @@ typedef struct {
static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
-static void dns_start_resolving_locked(dns_resolver *r);
+static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx,
+ dns_resolver *r);
static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
dns_resolver *r);
@@ -119,7 +120,7 @@ static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&r->mu);
if (!r->resolving) {
gpr_backoff_reset(&r->backoff_state);
- dns_start_resolving_locked(r);
+ dns_start_resolving_locked(exec_ctx, r);
}
gpr_mu_unlock(&r->mu);
}
@@ -134,7 +135,7 @@ static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
r->target_config = target_config;
if (r->resolved_version == 0 && !r->resolving) {
gpr_backoff_reset(&r->backoff_state);
- dns_start_resolving_locked(r);
+ dns_start_resolving_locked(exec_ctx, r);
} else {
dns_maybe_finish_next_locked(exec_ctx, r);
}
@@ -149,7 +150,7 @@ static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,
r->have_retry_timer = false;
if (success) {
if (!r->resolving) {
- dns_start_resolving_locked(r);
+ dns_start_resolving_locked(exec_ctx, r);
}
}
gpr_mu_unlock(&r->mu);
@@ -201,11 +202,12 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "dns-resolving");
}
-static void dns_start_resolving_locked(dns_resolver *r) {
+static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx,
+ dns_resolver *r) {
GRPC_RESOLVER_REF(&r->base, "dns-resolving");
GPR_ASSERT(!r->resolving);
r->resolving = 1;
- grpc_resolve_address(r->name, r->default_port, dns_on_resolved, r);
+ grpc_resolve_address(exec_ctx, r->name, r->default_port, dns_on_resolved, r);
}
static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/resolver/zookeeper/zookeeper_resolver.c b/src/core/ext/resolver/zookeeper/zookeeper_resolver.c
index 898632c3cd..deb4b9b1ef 100644
--- a/src/core/ext/resolver/zookeeper/zookeeper_resolver.c
+++ b/src/core/ext/resolver/zookeeper/zookeeper_resolver.c
@@ -299,7 +299,7 @@ static void zookeeper_get_children_node_completion(int rc, const char *value,
address = zookeeper_parse_address(value, (size_t)value_len);
if (address != NULL) {
/** Further resolves address by DNS */
- grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r);
+ grpc_resolve_address(&exec_ctx, address, NULL, zookeeper_dns_resolved, r);
gpr_free(address);
} else {
gpr_log(GPR_ERROR, "Error in resolving a child node of %s", r->name);
@@ -375,8 +375,10 @@ static void zookeeper_get_node_completion(int rc, const char *value,
r->resolved_addrs->naddrs = 0;
r->resolved_total = 1;
/** Further resolves address by DNS */
- grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_resolve_address(&exec_ctx, address, NULL, zookeeper_dns_resolved, r);
gpr_free(address);
+ grpc_exec_ctx_finish(&exec_ctx);
return;
}
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index 0ed115793b..c5d3d8d9cc 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -235,5 +235,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
grpc_exec_ctx_finish(&exec_ctx);
- return channel; /* may be NULL */
+ return channel != NULL ? channel : grpc_lame_client_channel_create(
+ target, GRPC_STATUS_INTERNAL,
+ "Failed to create client channel");
}
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c
index 93c3e6d8b4..687936bfd3 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c
@@ -639,7 +639,7 @@ static int on_hdr(grpc_chttp2_hpack_parser *p, grpc_mdelem *md,
}
}
if (p->on_header == NULL) {
- grpc_mdelem_unref(md);
+ GRPC_MDELEM_UNREF(md);
return 0;
}
p->on_header(p->on_header_user_data, md);
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 229fdb5ef6..3d42d0e616 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -268,8 +268,14 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
channeld->default_compression_algorithm =
grpc_channel_args_get_compression_algorithm(args->channel_args);
/* Make sure the default isn't disabled. */
- GPR_ASSERT(grpc_compression_options_is_algorithm_enabled(
- &channeld->compression_options, channeld->default_compression_algorithm));
+ if (!grpc_compression_options_is_algorithm_enabled(
+ &channeld->compression_options,
+ channeld->default_compression_algorithm)) {
+ gpr_log(GPR_DEBUG,
+ "compression algorithm %d not enabled: switching to none",
+ channeld->default_compression_algorithm);
+ channeld->default_compression_algorithm = GRPC_COMPRESS_NONE;
+ }
channeld->compression_options.default_compression_algorithm =
channeld->default_compression_algorithm;
diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c
index 76bd1b64dc..f22721ac8f 100644
--- a/src/core/lib/http/httpcli.c
+++ b/src/core/lib/http/httpcli.c
@@ -246,7 +246,7 @@ static void internal_request_begin(
grpc_pollset_set_add_pollset(exec_ctx, req->context->pollset_set,
req->pollset);
- grpc_resolve_address(request->host, req->handshaker->default_port,
+ grpc_resolve_address(exec_ctx, request->host, req->handshaker->default_port,
on_resolved, req);
}
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c
index d6f073fc9d..27793c32e4 100644
--- a/src/core/lib/iomgr/closure.c
+++ b/src/core/lib/iomgr/closure.c
@@ -54,6 +54,12 @@ void grpc_closure_list_add(grpc_closure_list *closure_list,
closure_list->tail = closure;
}
+void grpc_closure_list_fail_all(grpc_closure_list *list) {
+ for (grpc_closure *c = list->head; c != NULL; c = grpc_closure_next(c)) {
+ c->final_data &= ~(uintptr_t)1;
+ }
+}
+
bool grpc_closure_list_empty(grpc_closure_list closure_list) {
return closure_list.head == NULL;
}
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index 8652b53a8b..fdc2daed9d 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -86,6 +86,9 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg);
void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure,
bool success);
+/** force all success bits in \a list to false */
+void grpc_closure_list_fail_all(grpc_closure_list *list);
+
/** append all closures from \a src to \a dst and empty \a src. */
void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst);
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index e09ef02400..976cc40347 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -93,6 +93,8 @@ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
grpc_workqueue *offload_target_or_null);
void grpc_exec_ctx_global_init(void);
+
+void grpc_exec_ctx_global_init(void);
void grpc_exec_ctx_global_shutdown(void);
#endif /* GRPC_CORE_LIB_IOMGR_EXEC_CTX_H */
diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h
index ecc06340a3..ef198fe0f6 100644
--- a/src/core/lib/iomgr/resolve_address.h
+++ b/src/core/lib/iomgr/resolve_address.h
@@ -59,8 +59,9 @@ typedef void (*grpc_resolve_cb)(grpc_exec_ctx *exec_ctx, void *arg,
/* Asynchronously resolve addr. Use default_port if a port isn't designated
in addr, otherwise use the port in addr. */
/* TODO(ctiller): add a timeout here */
-void grpc_resolve_address(const char *addr, const char *default_port,
- grpc_resolve_cb cb, void *arg);
+extern void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *addr,
+ const char *default_port,
+ grpc_resolve_cb cb, void *arg);
/* Destroy resolved addresses */
void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addresses);
diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c
index b9d3bbdb89..cae91eec20 100644
--- a/src/core/lib/iomgr/resolve_address_posix.c
+++ b/src/core/lib/iomgr/resolve_address_posix.c
@@ -164,8 +164,9 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
gpr_free(addrs);
}
-void grpc_resolve_address(const char *name, const char *default_port,
- grpc_resolve_cb cb, void *arg) {
+static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
+ const char *default_port, grpc_resolve_cb cb,
+ void *arg) {
request *r = gpr_malloc(sizeof(request));
grpc_closure_init(&r->request_closure, do_request_thread, r);
r->name = gpr_strdup(name);
@@ -175,4 +176,8 @@ void grpc_resolve_address(const char *name, const char *default_port,
grpc_executor_enqueue(&r->request_closure, 1);
}
+void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name,
+ const char *default_port, grpc_resolve_cb cb,
+ void *arg) = resolve_address_impl;
+
#endif
diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c
index 82763d11f4..914736234d 100644
--- a/src/core/lib/iomgr/resolve_address_windows.c
+++ b/src/core/lib/iomgr/resolve_address_windows.c
@@ -155,8 +155,9 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
gpr_free(addrs);
}
-void grpc_resolve_address(const char *name, const char *default_port,
- grpc_resolve_cb cb, void *arg) {
+static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
+ const char *default_port, grpc_resolve_cb cb,
+ void *arg) {
request *r = gpr_malloc(sizeof(request));
grpc_closure_init(&r->request_closure, do_request_thread, r);
r->name = gpr_strdup(name);
@@ -166,4 +167,8 @@ void grpc_resolve_address(const char *name, const char *default_port,
grpc_executor_enqueue(&r->request_closure, 1);
}
+void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name,
+ const char *default_port, grpc_resolve_cb cb,
+ void *arg) = resolve_address_impl;
+
#endif
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index 6430cb629f..e93d5734a0 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -211,11 +211,11 @@ finish:
grpc_exec_ctx_enqueue(exec_ctx, closure, *ep != NULL, NULL);
}
-void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_endpoint **ep,
- grpc_pollset_set *interested_parties,
- const struct sockaddr *addr, size_t addr_len,
- gpr_timespec deadline) {
+static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure, grpc_endpoint **ep,
+ grpc_pollset_set *interested_parties,
+ const struct sockaddr *addr,
+ size_t addr_len, gpr_timespec deadline) {
int fd;
grpc_dualstack_mode dsmode;
int err;
@@ -303,4 +303,19 @@ done:
gpr_free(addr_str);
}
+// overridden by api_fuzzer.c
+void (*grpc_tcp_client_connect_impl)(
+ grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep,
+ grpc_pollset_set *interested_parties, const struct sockaddr *addr,
+ size_t addr_len, gpr_timespec deadline) = tcp_client_connect_impl;
+
+void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_endpoint **ep,
+ grpc_pollset_set *interested_parties,
+ const struct sockaddr *addr, size_t addr_len,
+ gpr_timespec deadline) {
+ grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, addr,
+ addr_len, deadline);
+}
+
#endif
diff --git a/src/core/lib/iomgr/timer.c b/src/core/lib/iomgr/timer.c
index 713f15b69e..acb5b26c87 100644
--- a/src/core/lib/iomgr/timer.c
+++ b/src/core/lib/iomgr/timer.c
@@ -70,6 +70,7 @@ static gpr_clock_type g_clock_type;
static shard_type g_shards[NUM_SHARDS];
/* Protected by g_mu */
static shard_type *g_shard_queue[NUM_SHARDS];
+static bool g_initialized = false;
static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_timespec *next, int success);
@@ -83,6 +84,7 @@ static gpr_timespec compute_min_deadline(shard_type *shard) {
void grpc_timer_list_init(gpr_timespec now) {
uint32_t i;
+ g_initialized = true;
gpr_mu_init(&g_mu);
gpr_mu_init(&g_checker_mu);
g_clock_type = now.clock_type;
@@ -111,6 +113,7 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {
}
gpr_mu_destroy(&g_mu);
gpr_mu_destroy(&g_checker_mu);
+ g_initialized = false;
}
/* This is a cheap, but good enough, pointer hash for sharding the tasks: */
@@ -180,6 +183,18 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
timer->deadline = deadline;
timer->triggered = 0;
+ if (!g_initialized) {
+ timer->triggered = 1;
+ grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, false, NULL);
+ return;
+ }
+
+ if (gpr_time_cmp(deadline, now) <= 0) {
+ timer->triggered = 1;
+ grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, true, NULL);
+ return;
+ }
+
/* TODO(ctiller): check deadline expired */
gpr_mu_lock(&shard->mu);
diff --git a/src/core/lib/support/time_posix.c b/src/core/lib/support/time_posix.c
index f5f62dadc6..11542072fe 100644
--- a/src/core/lib/support/time_posix.c
+++ b/src/core/lib/support/time_posix.c
@@ -78,7 +78,7 @@ static const clockid_t clockid_for_gpr_clock[] = {CLOCK_MONOTONIC,
void gpr_time_init(void) { gpr_precise_clock_init(); }
-gpr_timespec gpr_now(gpr_clock_type clock_type) {
+static gpr_timespec now_impl(gpr_clock_type clock_type) {
struct timespec now;
GPR_ASSERT(clock_type != GPR_TIMESPAN);
if (clock_type == GPR_CLOCK_PRECISE) {
@@ -114,7 +114,7 @@ void gpr_time_init(void) {
g_time_start = mach_absolute_time();
}
-gpr_timespec gpr_now(gpr_clock_type clock) {
+static gpr_timespec now_impl(gpr_clock_type clock) {
gpr_timespec now;
struct timeval now_tv;
double now_dbl;
@@ -142,6 +142,12 @@ gpr_timespec gpr_now(gpr_clock_type clock) {
}
#endif
+gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type) = now_impl;
+
+gpr_timespec gpr_now(gpr_clock_type clock_type) {
+ return gpr_now_impl(clock_type);
+}
+
void gpr_sleep_until(gpr_timespec until) {
gpr_timespec now;
gpr_timespec delta;
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 6581bbd3d1..fa12b6ea61 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -142,22 +142,23 @@ struct grpc_call {
gpr_mu mu;
/* client or server call */
- uint8_t is_client;
+ bool is_client;
/* is the alarm set */
- uint8_t have_alarm;
+ bool have_alarm;
/** has grpc_call_destroy been called */
- uint8_t destroy_called;
+ bool destroy_called;
/** flag indicating that cancellation is inherited */
- uint8_t cancellation_is_inherited;
+ bool cancellation_is_inherited;
/** bitmask of live batches */
uint8_t used_batches;
/** which ops are in-flight */
- uint8_t sent_initial_metadata;
- uint8_t sending_message;
- uint8_t sent_final_op;
- uint8_t received_initial_metadata;
- uint8_t receiving_message;
- uint8_t received_final_op;
+ bool sent_initial_metadata;
+ bool sending_message;
+ bool sent_final_op;
+ bool received_initial_metadata;
+ bool receiving_message;
+ bool requested_final_op;
+ bool received_final_op;
/* have we received initial metadata */
bool has_initial_md_been_received;
@@ -220,10 +221,7 @@ struct grpc_call {
} server;
} final_op;
- struct {
- void *bctlp;
- bool success;
- } saved_receiving_stream_ready_ctx;
+ void *saved_receiving_stream_ready_bctlp;
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -554,21 +552,6 @@ static int prepare_application_metadata(grpc_call *call, int count,
int i;
grpc_metadata_batch *batch =
&call->metadata_batch[0 /* is_receiving */][is_trailing];
- if (prepend_extra_metadata) {
- if (call->send_extra_metadata_count == 0) {
- prepend_extra_metadata = 0;
- } else {
- for (i = 0; i < call->send_extra_metadata_count; i++) {
- GRPC_MDELEM_REF(call->send_extra_metadata[i].md);
- }
- for (i = 1; i < call->send_extra_metadata_count; i++) {
- call->send_extra_metadata[i].prev = &call->send_extra_metadata[i - 1];
- }
- for (i = 0; i < call->send_extra_metadata_count - 1; i++) {
- call->send_extra_metadata[i].next = &call->send_extra_metadata[i + 1];
- }
- }
- }
for (i = 0; i < count; i++) {
grpc_metadata *md = &metadata[i];
grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
@@ -579,14 +562,37 @@ static int prepare_application_metadata(grpc_call *call, int count,
GRPC_MDSTR_LENGTH(l->md->key))) {
gpr_log(GPR_ERROR, "attempt to send invalid metadata key: %s",
grpc_mdstr_as_c_string(l->md->key));
- return 0;
+ break;
} else if (!grpc_is_binary_header(grpc_mdstr_as_c_string(l->md->key),
GRPC_MDSTR_LENGTH(l->md->key)) &&
!grpc_header_nonbin_value_is_legal(
grpc_mdstr_as_c_string(l->md->value),
GRPC_MDSTR_LENGTH(l->md->value))) {
gpr_log(GPR_ERROR, "attempt to send invalid metadata value");
- return 0;
+ break;
+ }
+ }
+ if (i != count) {
+ for (int j = 0; j <= i; j++) {
+ grpc_metadata *md = &metadata[j];
+ grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
+ GRPC_MDELEM_UNREF(l->md);
+ }
+ return 0;
+ }
+ if (prepend_extra_metadata) {
+ if (call->send_extra_metadata_count == 0) {
+ prepend_extra_metadata = 0;
+ } else {
+ for (i = 0; i < call->send_extra_metadata_count; i++) {
+ GRPC_MDELEM_REF(call->send_extra_metadata[i].md);
+ }
+ for (i = 1; i < call->send_extra_metadata_count; i++) {
+ call->send_extra_metadata[i].prev = &call->send_extra_metadata[i - 1];
+ }
+ for (i = 0; i < call->send_extra_metadata_count - 1; i++) {
+ call->send_extra_metadata[i].next = &call->send_extra_metadata[i + 1];
+ }
}
}
for (i = 1; i < count; i++) {
@@ -1057,12 +1063,12 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_call *call = bctl->call;
gpr_mu_lock(&bctl->call->mu);
- if (bctl->call->has_initial_md_been_received) {
+ if (bctl->call->has_initial_md_been_received || !success ||
+ call->receiving_stream == NULL) {
gpr_mu_unlock(&bctl->call->mu);
process_data_after_md(exec_ctx, bctlp, success);
} else {
- call->saved_receiving_stream_ready_ctx.bctlp = bctlp;
- call->saved_receiving_stream_ready_ctx.success = success;
+ call->saved_receiving_stream_ready_bctlp = bctlp;
gpr_mu_unlock(&bctl->call->mu);
}
}
@@ -1091,13 +1097,11 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
}
call->has_initial_md_been_received = true;
- if (call->saved_receiving_stream_ready_ctx.bctlp != NULL) {
+ if (call->saved_receiving_stream_ready_bctlp != NULL) {
grpc_closure *saved_rsr_closure = grpc_closure_create(
- receiving_stream_ready, call->saved_receiving_stream_ready_ctx.bctlp);
- grpc_exec_ctx_enqueue(
- exec_ctx, saved_rsr_closure,
- call->saved_receiving_stream_ready_ctx.success && success, NULL);
- call->saved_receiving_stream_ready_ctx.bctlp = NULL;
+ receiving_stream_ready, call->saved_receiving_stream_ready_bctlp);
+ call->saved_receiving_stream_ready_bctlp = NULL;
+ grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure, success, NULL);
}
gpr_mu_unlock(&call->mu);
@@ -1133,6 +1137,7 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
grpc_metadata_batch_filter(md, recv_trailing_filter, call);
+ call->received_final_op = true;
if (call->have_alarm) {
grpc_timer_cancel(exec_ctx, &call->alarm);
}
@@ -1377,11 +1382,11 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_NOT_ON_SERVER;
goto done_with_error;
}
- if (call->received_final_op) {
+ if (call->requested_final_op) {
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
- call->received_final_op = 1;
+ call->requested_final_op = 1;
call->buffered_metadata[1] =
op->data.recv_status_on_client.trailing_metadata;
call->final_op.client.status = op->data.recv_status_on_client.status;
@@ -1404,11 +1409,11 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
goto done_with_error;
}
- if (call->received_final_op) {
+ if (call->requested_final_op) {
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
- call->received_final_op = 1;
+ call->requested_final_op = 1;
call->final_op.server.cancelled =
op->data.recv_close_on_server.cancelled;
bctl->recv_final_op = 1;
@@ -1457,7 +1462,7 @@ done_with_error:
call->receiving_message = 0;
}
if (bctl->recv_final_op) {
- call->received_final_op = 0;
+ call->requested_final_op = 0;
}
gpr_mu_unlock(&call->mu);
goto done;
diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c
index c1f6812c4e..80bd95df68 100644
--- a/src/core/lib/surface/lame_client.c
+++ b/src/core/lib/surface/lame_client.c
@@ -99,6 +99,9 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
if (op->on_consumed != NULL) {
op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 1);
}
+ if (op->send_ping != NULL) {
+ op->send_ping->cb(exec_ctx, op->send_ping->cb_arg, 0);
+ }
}
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
diff --git a/src/core/lib/surface/validate_metadata.c b/src/core/lib/surface/validate_metadata.c
index bf4126867f..84f0a083bc 100644
--- a/src/core/lib/surface/validate_metadata.c
+++ b/src/core/lib/surface/validate_metadata.c
@@ -40,7 +40,7 @@ static int conforms_to(const char *s, size_t len, const uint8_t *legal_bits) {
const char *p = s;
const char *e = s + len;
for (; p != e; p++) {
- int idx = *p;
+ int idx = (uint8_t)*p;
int byte = idx / 8;
int bit = idx % 8;
if ((legal_bits[byte] & (1 << bit)) == 0) return 0;
diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h
index e29e8df2c9..713d9e6782 100644
--- a/src/core/lib/transport/metadata.h
+++ b/src/core/lib/transport/metadata.h
@@ -120,6 +120,7 @@ void grpc_mdelem_set_user_data(grpc_mdelem *md, void (*destroy_func)(void *),
void *user_data);
/* Reference counting */
+//#define GRPC_METADATA_REFCOUNT_DEBUG
#ifdef GRPC_METADATA_REFCOUNT_DEBUG
#define GRPC_MDSTR_REF(s) grpc_mdstr_ref((s), __FILE__, __LINE__)
#define GRPC_MDSTR_UNREF(s) grpc_mdstr_unref((s), __FILE__, __LINE__)