aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/client_config/client_channel.c65
-rw-r--r--src/core/ext/client_config/client_config_plugin.c6
-rw-r--r--src/core/ext/client_config/lb_policy.c10
-rw-r--r--src/core/ext/client_config/lb_policy.h10
-rw-r--r--src/core/ext/client_config/resolver_registry.c45
-rw-r--r--src/core/ext/client_config/resolver_registry.h5
-rw-r--r--src/core/ext/client_config/subchannel.c61
-rw-r--r--src/core/ext/client_config/subchannel.h3
-rw-r--r--src/core/ext/client_config/subchannel_factory.c49
-rw-r--r--src/core/ext/client_config/subchannel_factory.h66
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c37
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c20
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c20
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c10
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c19
-rw-r--r--src/core/lib/channel/channel_stack.c32
-rw-r--r--src/core/lib/channel/channel_stack.h18
-rw-r--r--src/core/lib/channel/deadline_filter.c302
-rw-r--r--src/core/lib/channel/deadline_filter.h79
-rw-r--r--src/core/lib/channel/http_client_filter.c4
-rw-r--r--src/core/lib/channel/message_size_filter.c46
-rw-r--r--src/core/lib/iomgr/error.c58
-rw-r--r--src/core/lib/iomgr/error.h8
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c2
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c96
-rw-r--r--src/core/lib/iomgr/tcp_windows.c1
-rw-r--r--src/core/lib/surface/call.c143
-rw-r--r--src/core/lib/surface/completion_queue.h2
-rw-r--r--src/core/lib/surface/init.c7
-rw-r--r--src/core/lib/surface/server.c6
-rw-r--r--src/core/lib/surface/server.h5
-rw-r--r--src/core/lib/transport/transport.c6
32 files changed, 820 insertions, 421 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index b2b4fea83c..feb4cbde7b 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -46,6 +46,7 @@
#include "src/core/ext/client_config/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/channel/deadline_filter.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/profiling/timers.h"
@@ -114,7 +115,7 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_cancel_picks(
exec_ctx, chand->lb_policy,
/* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY,
- /* check= */ 0);
+ /* check= */ 0, GRPC_ERROR_REF(error));
}
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
reason);
@@ -391,6 +392,17 @@ typedef enum {
for initial metadata before trying to create a call object,
and handling cancellation gracefully. */
typedef struct client_channel_call_data {
+ // State for handling deadlines.
+ // The code in deadline_filter.c requires this to be the first field.
+ // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
+ // and this struct both independently store a pointer to the call
+ // stack and each has its own mutex. If/when we have time, find a way
+ // to avoid this without breaking the grpc_deadline_state abstraction.
+ grpc_deadline_state deadline_state;
+ gpr_timespec deadline;
+
+ grpc_error *cancel_error;
+
/** either 0 for no call, 1 for cancelled, or a pointer to a
grpc_subchannel_call */
gpr_atm subchannel_call;
@@ -485,7 +497,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
gpr_atm_no_barrier_store(&calld->subchannel_call, 1);
fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING(
"Failed to create subchannel", &error, 1));
- } else if (1 == gpr_atm_acq_load(&calld->subchannel_call)) {
+ } else if (GET_CALL(calld) == CANCELLED_CALL) {
/* already cancelled before subchannel became ready */
fail_locked(exec_ctx, calld,
GRPC_ERROR_CREATE_REFERENCING(
@@ -493,7 +505,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
} else {
grpc_subchannel_call *subchannel_call = NULL;
grpc_error *new_error = grpc_connected_subchannel_create_call(
- exec_ctx, calld->connected_subchannel, calld->pollent,
+ exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline,
&subchannel_call);
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
@@ -531,7 +543,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
- grpc_closure *on_ready);
+ grpc_closure *on_ready, grpc_error *error);
static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
@@ -542,7 +554,8 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg,
grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL);
} else if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->initial_metadata_flags,
- cpa->connected_subchannel, cpa->on_ready)) {
+ cpa->connected_subchannel, cpa->on_ready,
+ GRPC_ERROR_NONE)) {
grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL);
}
gpr_free(cpa);
@@ -552,7 +565,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
- grpc_closure *on_ready) {
+ grpc_closure *on_ready, grpc_error *error) {
GPR_TIMER_BEGIN("pick_subchannel", 0);
channel_data *chand = elem->channel_data;
@@ -566,21 +579,24 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
if (initial_metadata == NULL) {
if (chand->lb_policy != NULL) {
grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy,
- connected_subchannel);
+ connected_subchannel, GRPC_ERROR_REF(error));
}
for (closure = chand->waiting_for_config_closures.head; closure != NULL;
closure = closure->next_data.next) {
cpa = closure->cb_arg;
if (cpa->connected_subchannel == connected_subchannel) {
cpa->connected_subchannel = NULL;
- grpc_exec_ctx_sched(exec_ctx, cpa->on_ready,
- GRPC_ERROR_CREATE("Pick cancelled"), NULL);
+ grpc_exec_ctx_sched(
+ exec_ctx, cpa->on_ready,
+ GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL);
}
}
gpr_mu_unlock(&chand->mu);
GPR_TIMER_END("pick_subchannel", 0);
+ GRPC_ERROR_UNREF(error);
return true;
}
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
if (chand->lb_policy != NULL) {
grpc_lb_policy *lb_policy = chand->lb_policy;
int r;
@@ -631,12 +647,13 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
/* try to (atomically) get the call */
grpc_subchannel_call *call = GET_CALL(calld);
GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0);
if (call == CANCELLED_CALL) {
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
- GRPC_ERROR_CANCELLED);
+ grpc_transport_stream_op_finish_with_failure(
+ exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
GPR_TIMER_END("cc_start_transport_stream_op", 0);
return;
}
@@ -652,8 +669,8 @@ retry:
call = GET_CALL(calld);
if (call == CANCELLED_CALL) {
gpr_mu_unlock(&calld->mu);
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
- GRPC_ERROR_CANCELLED);
+ grpc_transport_stream_op_finish_with_failure(
+ exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
GPR_TIMER_END("cc_start_transport_stream_op", 0);
return;
}
@@ -669,18 +686,24 @@ retry:
(gpr_atm)(uintptr_t)CANCELLED_CALL)) {
goto retry;
} else {
+ // Stash a copy of cancel_error in our call data, so that we can use
+ // it for subsequent operations. This ensures that if the call is
+ // cancelled before any ops are passed down (e.g., if the deadline
+ // is in the past when the call starts), we can return the right
+ // error to the caller when the first op does get passed down.
+ calld->cancel_error = GRPC_ERROR_REF(op->cancel_error);
switch (calld->creation_phase) {
case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error));
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
pick_subchannel(exec_ctx, elem, NULL, 0, &calld->connected_subchannel,
- NULL);
+ NULL, GRPC_ERROR_REF(op->cancel_error));
break;
}
gpr_mu_unlock(&calld->mu);
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
- GRPC_ERROR_CANCELLED);
+ grpc_transport_stream_op_finish_with_failure(
+ exec_ctx, op, GRPC_ERROR_REF(op->cancel_error));
GPR_TIMER_END("cc_start_transport_stream_op", 0);
return;
}
@@ -694,7 +717,8 @@ retry:
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata,
op->send_initial_metadata_flags,
- &calld->connected_subchannel, &calld->next_step)) {
+ &calld->connected_subchannel, &calld->next_step,
+ GRPC_ERROR_NONE)) {
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
}
@@ -704,7 +728,7 @@ retry:
calld->connected_subchannel != NULL) {
grpc_subchannel_call *subchannel_call = NULL;
grpc_error *error = grpc_connected_subchannel_create_call(
- exec_ctx, calld->connected_subchannel, calld->pollent,
+ exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline,
&subchannel_call);
if (error != GRPC_ERROR_NONE) {
subchannel_call = CANCELLED_CALL;
@@ -727,6 +751,9 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *calld = elem->call_data;
+ grpc_deadline_state_init(exec_ctx, elem, args);
+ calld->deadline = args->deadline;
+ calld->cancel_error = GRPC_ERROR_NONE;
gpr_atm_rel_store(&calld->subchannel_call, 0);
gpr_mu_init(&calld->mu);
calld->connected_subchannel = NULL;
@@ -745,6 +772,8 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
const grpc_call_final_info *final_info,
void *and_free_memory) {
call_data *calld = elem->call_data;
+ grpc_deadline_state_destroy(exec_ctx, elem);
+ GRPC_ERROR_UNREF(calld->cancel_error);
grpc_subchannel_call *call = GET_CALL(calld);
if (call != NULL && call != CANCELLED_CALL) {
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");
diff --git a/src/core/ext/client_config/client_config_plugin.c b/src/core/ext/client_config/client_config_plugin.c
index 5e31613420..dc3629d383 100644
--- a/src/core/ext/client_config/client_config_plugin.c
+++ b/src/core/ext/client_config/client_config_plugin.c
@@ -43,10 +43,6 @@
#include "src/core/ext/client_config/subchannel_index.h"
#include "src/core/lib/surface/channel_init.h"
-#ifndef GRPC_DEFAULT_NAME_PREFIX
-#define GRPC_DEFAULT_NAME_PREFIX "dns:///"
-#endif
-
static bool append_filter(grpc_channel_stack_builder *builder, void *arg) {
return grpc_channel_stack_builder_append_filter(
builder, (const grpc_channel_filter *)arg, NULL, NULL);
@@ -79,7 +75,7 @@ static bool set_default_host_if_unset(grpc_channel_stack_builder *builder,
void grpc_client_config_init(void) {
grpc_lb_policy_registry_init();
- grpc_resolver_registry_init(GRPC_DEFAULT_NAME_PREFIX);
+ grpc_resolver_registry_init();
grpc_subchannel_index_init();
grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MIN,
set_default_host_if_unset, NULL);
diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c
index 903563ef6b..46391272a6 100644
--- a/src/core/ext/client_config/lb_policy.c
+++ b/src/core/ext/client_config/lb_policy.c
@@ -108,16 +108,18 @@ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
}
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_connected_subchannel **target) {
- policy->vtable->cancel_pick(exec_ctx, policy, target);
+ grpc_connected_subchannel **target,
+ grpc_error *error) {
+ policy->vtable->cancel_pick(exec_ctx, policy, target, error);
}
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq) {
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error) {
policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask,
- initial_metadata_flags_eq);
+ initial_metadata_flags_eq, error);
}
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index 37c93d707c..7fb3e08cb3 100644
--- a/src/core/ext/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -77,12 +77,12 @@ struct grpc_lb_policy_vtable {
/** \see grpc_lb_policy_cancel_pick */
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_connected_subchannel **target);
+ grpc_connected_subchannel **target, grpc_error *error);
/** \see grpc_lb_policy_cancel_picks */
void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq);
+ uint32_t initial_metadata_flags_eq, grpc_error *error);
/** \see grpc_lb_policy_ping_one */
void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
@@ -161,7 +161,8 @@ void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
The \a on_complete callback of the pending picks will be invoked with \a
*target set to NULL. */
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_connected_subchannel **target);
+ grpc_connected_subchannel **target,
+ grpc_error *error);
/** Cancel all pending picks for which their \a initial_metadata_flags (as given
in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq
@@ -169,7 +170,8 @@ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq);
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error);
/** Try to enter a READY connectivity state */
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
diff --git a/src/core/ext/client_config/resolver_registry.c b/src/core/ext/client_config/resolver_registry.c
index b5308a140c..bd5c683878 100644
--- a/src/core/ext/client_config/resolver_registry.c
+++ b/src/core/ext/client_config/resolver_registry.c
@@ -40,22 +40,20 @@
#include <grpc/support/string_util.h>
#define MAX_RESOLVERS 10
+#define DEFAULT_RESOLVER_PREFIX_MAX_LENGTH 32
static grpc_resolver_factory *g_all_of_the_resolvers[MAX_RESOLVERS];
static int g_number_of_resolvers = 0;
-static char *g_default_resolver_prefix;
+static char g_default_resolver_prefix[DEFAULT_RESOLVER_PREFIX_MAX_LENGTH] =
+ "dns:///";
-void grpc_resolver_registry_init(const char *default_resolver_prefix) {
- g_default_resolver_prefix = gpr_strdup(default_resolver_prefix);
-}
+void grpc_resolver_registry_init() {}
void grpc_resolver_registry_shutdown(void) {
- int i;
- for (i = 0; i < g_number_of_resolvers; i++) {
+ for (int i = 0; i < g_number_of_resolvers; i++) {
grpc_resolver_factory_unref(g_all_of_the_resolvers[i]);
}
- gpr_free(g_default_resolver_prefix);
// FIXME(ctiller): this should live in grpc_resolver_registry_init,
// however that would have the client_config plugin call this AFTER we start
// registering resolvers from third party plugins, and so they'd never show
@@ -65,6 +63,17 @@ void grpc_resolver_registry_shutdown(void) {
g_number_of_resolvers = 0;
}
+void grpc_resolver_registry_set_default_prefix(
+ const char *default_resolver_prefix) {
+ const size_t len = strlen(default_resolver_prefix);
+ GPR_ASSERT(len < DEFAULT_RESOLVER_PREFIX_MAX_LENGTH &&
+ "default resolver prefix too long");
+ GPR_ASSERT(len > 0 && "default resolver prefix can't be empty");
+ // By the previous assert, default_resolver_prefix is safe to be copied with a
+ // plain strcpy.
+ strcpy(g_default_resolver_prefix, default_resolver_prefix);
+}
+
void grpc_register_resolver_type(grpc_resolver_factory *factory) {
int i;
for (i = 0; i < g_number_of_resolvers; i++) {
@@ -108,22 +117,16 @@ static grpc_resolver_factory *resolve_factory(const char *target,
*uri = grpc_uri_parse(target, 1);
factory = lookup_factory_by_uri(*uri);
if (factory == NULL) {
- if (g_default_resolver_prefix != NULL) {
- grpc_uri_destroy(*uri);
- gpr_asprintf(&tmp, "%s%s", g_default_resolver_prefix, target);
- *uri = grpc_uri_parse(tmp, 1);
- factory = lookup_factory_by_uri(*uri);
- if (factory == NULL) {
- grpc_uri_destroy(grpc_uri_parse(target, 0));
- grpc_uri_destroy(grpc_uri_parse(tmp, 0));
- gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", target,
- tmp);
- }
- gpr_free(tmp);
- } else {
+ grpc_uri_destroy(*uri);
+ gpr_asprintf(&tmp, "%s%s", g_default_resolver_prefix, target);
+ *uri = grpc_uri_parse(tmp, 1);
+ factory = lookup_factory_by_uri(*uri);
+ if (factory == NULL) {
grpc_uri_destroy(grpc_uri_parse(target, 0));
- gpr_log(GPR_ERROR, "don't know how to resolve '%s'", target);
+ grpc_uri_destroy(grpc_uri_parse(tmp, 0));
+ gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", target, tmp);
}
+ gpr_free(tmp);
}
return factory;
}
diff --git a/src/core/ext/client_config/resolver_registry.h b/src/core/ext/client_config/resolver_registry.h
index 92e248d548..4c6279b978 100644
--- a/src/core/ext/client_config/resolver_registry.h
+++ b/src/core/ext/client_config/resolver_registry.h
@@ -36,9 +36,12 @@
#include "src/core/ext/client_config/resolver_factory.h"
-void grpc_resolver_registry_init(const char *default_prefix);
+void grpc_resolver_registry_init();
void grpc_resolver_registry_shutdown(void);
+/** Set the default URI prefix to \a default_prefix. */
+void grpc_resolver_registry_set_default_prefix(const char *default_prefix);
+
/** Register a resolver type.
URI's of \a scheme will be resolved with the given resolver.
If \a priority is greater than zero, then the resolver will be eligible
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index 6818e92404..e3c1b86a14 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -219,8 +219,8 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta,
: gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val,
- old_val + delta, reason);
+ "SUBCHANNEL: %p %12s 0x%08d -> 0x%08d [%s]", c, purpose, (int)old_val,
+ (int)(old_val + delta), reason);
#endif
return old_val;
}
@@ -330,36 +330,40 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&c->connected, subchannel_connected, c);
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
"subchannel");
- gpr_backoff_init(&c->backoff_state,
- GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
- GRPC_SUBCHANNEL_RECONNECT_JITTER,
- GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000,
- GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+ int initial_backoff_ms =
+ GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
+ int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
+ bool fixed_reconnect_backoff = false;
if (c->args) {
for (size_t i = 0; i < c->args->num_args; i++) {
if (0 == strcmp(c->args->args[i].key,
"grpc.testing.fixed_reconnect_backoff")) {
GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER);
- gpr_backoff_init(&c->backoff_state, 1.0, 0.0,
- c->args->args[i].value.integer,
- c->args->args[i].value.integer);
- }
- if (0 ==
- strcmp(c->args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
- const grpc_integer_options options = {-1, 0, INT_MAX};
- const int value =
- grpc_channel_arg_get_integer(&c->args->args[i], options);
- if (value >= 0) {
- gpr_backoff_init(
- &c->backoff_state, GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
- GRPC_SUBCHANNEL_RECONNECT_JITTER,
- GPR_MIN(value,
- GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000),
- value);
- }
+ fixed_reconnect_backoff = true;
+ initial_backoff_ms = max_backoff_ms = grpc_channel_arg_get_integer(
+ &c->args->args[i],
+ (grpc_integer_options){initial_backoff_ms, 100, INT_MAX});
+ } else if (0 == strcmp(c->args->args[i].key,
+ GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
+ fixed_reconnect_backoff = false;
+ max_backoff_ms = grpc_channel_arg_get_integer(
+ &c->args->args[i],
+ (grpc_integer_options){max_backoff_ms, 100, INT_MAX});
+ } else if (0 == strcmp(c->args->args[i].key,
+ GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) {
+ fixed_reconnect_backoff = false;
+ initial_backoff_ms = grpc_channel_arg_get_integer(
+ &c->args->args[i],
+ (grpc_integer_options){initial_backoff_ms, 100, INT_MAX});
}
}
}
+ gpr_backoff_init(
+ &c->backoff_state,
+ fixed_reconnect_backoff ? 1.0
+ : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
+ fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER,
+ initial_backoff_ms, max_backoff_ms);
gpr_mu_init(&c->mu);
return grpc_subchannel_index_register(exec_ctx, key, c);
@@ -626,7 +630,9 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
c->have_alarm = 1;
grpc_connectivity_state_set(
exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1),
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
"connect_failed");
gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now);
const char *errmsg = grpc_error_string(error);
@@ -695,14 +701,15 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
- grpc_polling_entity *pollent, grpc_subchannel_call **call) {
+ grpc_polling_entity *pollent, gpr_timespec deadline,
+ grpc_subchannel_call **call) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
*call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
(*call)->connection = con; // Ref is added below.
grpc_error *error =
grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call,
- NULL, NULL, callstk);
+ NULL, NULL, deadline, callstk);
if (error != GRPC_ERROR_NONE) {
const char *error_string = grpc_error_string(error);
gpr_log(GPR_ERROR, "error: %s", error_string);
diff --git a/src/core/ext/client_config/subchannel.h b/src/core/ext/client_config/subchannel.h
index e4805b6722..5b1910213c 100644
--- a/src/core/ext/client_config/subchannel.h
+++ b/src/core/ext/client_config/subchannel.h
@@ -110,7 +110,8 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
/** construct a subchannel call */
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel,
- grpc_polling_entity *pollent, grpc_subchannel_call **subchannel_call);
+ grpc_polling_entity *pollent, gpr_timespec deadline,
+ grpc_subchannel_call **subchannel_call);
/** process a transport level op */
void grpc_connected_subchannel_process_transport_op(
diff --git a/src/core/ext/client_config/subchannel_factory.c b/src/core/ext/client_config/subchannel_factory.c
deleted file mode 100644
index d1e4d75a02..0000000000
--- a/src/core/ext/client_config/subchannel_factory.c
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/ext/client_config/subchannel_factory.h"
-
-void grpc_subchannel_factory_ref(grpc_subchannel_factory* factory) {
- factory->vtable->ref(factory);
-}
-
-void grpc_subchannel_factory_unref(grpc_exec_ctx* exec_ctx,
- grpc_subchannel_factory* factory) {
- factory->vtable->unref(exec_ctx, factory);
-}
-
-grpc_subchannel* grpc_subchannel_factory_create_subchannel(
- grpc_exec_ctx* exec_ctx, grpc_subchannel_factory* factory,
- grpc_subchannel_args* args) {
- return factory->vtable->create_subchannel(exec_ctx, factory, args);
-}
diff --git a/src/core/ext/client_config/subchannel_factory.h b/src/core/ext/client_config/subchannel_factory.h
deleted file mode 100644
index 0fb806d081..0000000000
--- a/src/core/ext/client_config/subchannel_factory.h
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H
-#define GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H
-
-#include "src/core/ext/client_config/subchannel.h"
-#include "src/core/lib/channel/channel_stack.h"
-
-typedef struct grpc_subchannel_factory grpc_subchannel_factory;
-typedef struct grpc_subchannel_factory_vtable grpc_subchannel_factory_vtable;
-
-/** Constructor for new configured channels.
- Creating decorators around this type is encouraged to adapt behavior. */
-struct grpc_subchannel_factory {
- const grpc_subchannel_factory_vtable *vtable;
-};
-
-struct grpc_subchannel_factory_vtable {
- void (*ref)(grpc_subchannel_factory *factory);
- void (*unref)(grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory);
- grpc_subchannel *(*create_subchannel)(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_factory *factory,
- grpc_subchannel_args *args);
-};
-
-void grpc_subchannel_factory_ref(grpc_subchannel_factory *factory);
-void grpc_subchannel_factory_unref(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_factory *factory);
-
-/** Create a new grpc_subchannel */
-grpc_subchannel *grpc_subchannel_factory_create_subchannel(
- grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory,
- grpc_subchannel_args *args);
-
-#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H */
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 8f8eaffcb4..2d218def52 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -686,7 +686,8 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_connected_subchannel **target) {
+ grpc_connected_subchannel **target,
+ grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
gpr_mu_lock(&glb_policy->mu);
pending_pick *pp = glb_policy->pending_picks;
@@ -697,8 +698,10 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity_del_from_pollset_set(
exec_ctx, pp->pollent, glb_policy->base.interested_parties);
*target = NULL;
- grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
- GRPC_ERROR_CANCELLED, NULL);
+ grpc_exec_ctx_sched(
+ exec_ctx, &pp->wrapped_on_complete,
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
+ gpr_free(pp);
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -706,12 +709,14 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&glb_policy->mu);
+ GRPC_ERROR_UNREF(error);
}
static grpc_call *lb_client_data_get_call(struct lb_client_data *lb_client);
static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq) {
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
gpr_mu_lock(&glb_policy->mu);
if (glb_policy->lb_client != NULL) {
@@ -726,8 +731,10 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
initial_metadata_flags_eq) {
grpc_polling_entity_del_from_pollset_set(
exec_ctx, pp->pollent, glb_policy->base.interested_parties);
- grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
- GRPC_ERROR_CANCELLED, NULL);
+ grpc_exec_ctx_sched(
+ exec_ctx, &pp->wrapped_on_complete,
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
+ gpr_free(pp);
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -735,6 +742,7 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&glb_policy->mu);
+ GRPC_ERROR_UNREF(error);
}
static void query_for_backends(grpc_exec_ctx *exec_ctx,
@@ -913,6 +921,9 @@ static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
+ GPR_ASSERT(glb_policy->server_name != NULL);
+ GPR_ASSERT(glb_policy->server_name[0] != '\0');
+
lb_client_data *lb_client = gpr_malloc(sizeof(lb_client_data));
memset(lb_client, 0, sizeof(lb_client_data));
@@ -924,10 +935,8 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client);
grpc_closure_init(&lb_client->srv_status_rcvd, srv_status_rcvd_cb, lb_client);
- /* TODO(dgq): get the deadline from the client config instead of fabricating
- * one here. */
- lb_client->deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_time_from_seconds(3, GPR_TIMESPAN));
+ /* TODO(dgq): get the deadline from the parent channel. */
+ lb_client->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
/* Note the following LB call progresses every time there's activity in \a
* glb_policy->base.interested_parties, which is comprised of the polling
@@ -935,14 +944,14 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
lb_client->lb_call = grpc_channel_create_pollset_set_call(
glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
glb_policy->base.interested_parties,
- "/grpc.lb.v1.LoadBalancer/BalanceLoad", NULL, lb_client->deadline, NULL);
+ "/grpc.lb.v1.LoadBalancer/BalanceLoad", glb_policy->server_name,
+ lb_client->deadline, NULL);
grpc_metadata_array_init(&lb_client->initial_metadata_recv);
grpc_metadata_array_init(&lb_client->trailing_metadata_recv);
- grpc_grpclb_request *request = grpc_grpclb_request_create(
- "load.balanced.service.name"); /* FIXME(dgq): get the name of the load
- balanced service from the resolver */
+ grpc_grpclb_request *request =
+ grpc_grpclb_request_create(glb_policy->server_name);
gpr_slice request_payload_slice = grpc_grpclb_request_encode(request);
lb_client->request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 892b5feda9..515b9fe0bc 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -128,7 +128,8 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_connected_subchannel **target) {
+ grpc_connected_subchannel **target,
+ grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -140,8 +141,9 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
*target = NULL;
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE("Pick Cancelled"), NULL);
+ grpc_exec_ctx_sched(
+ exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -150,11 +152,13 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&p->mu);
+ GRPC_ERROR_UNREF(error);
}
static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq) {
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -166,8 +170,9 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
initial_metadata_flags_eq) {
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE("Pick Cancelled"), NULL);
+ grpc_exec_ctx_sched(
+ exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -176,6 +181,7 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&p->mu);
+ GRPC_ERROR_UNREF(error);
}
static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
@@ -466,6 +472,8 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
}
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
+ /* server_name will be copied as part of the subchannel creation. This makes
+ * the copying of args->server_name (a borrowed pointer) OK. */
sc_args.server_name = args->server_name;
sc_args.addr = &args->addresses->addresses[i].address;
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index dc23f12015..d1e1728a38 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -308,7 +308,8 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_connected_subchannel **target) {
+ grpc_connected_subchannel **target,
+ grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -320,8 +321,9 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
*target = NULL;
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED,
- NULL);
+ grpc_exec_ctx_sched(
+ exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -330,11 +332,13 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&p->mu);
+ GRPC_ERROR_UNREF(error);
}
static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq) {
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -347,8 +351,9 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
*pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED,
- NULL);
+ grpc_exec_ctx_sched(
+ exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -357,6 +362,7 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&p->mu);
+ GRPC_ERROR_UNREF(error);
}
static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
@@ -629,6 +635,8 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
if (args->addresses->addresses[i].is_balancer) continue;
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
+ /* server_name will be copied as part of the subchannel creation. This makes
+ * the copying of args->server_name (a borrowed pointer) OK. */
sc_args.server_name = args->server_name;
sc_args.addr = &args->addresses->addresses[i].address;
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index a58dab10fc..17fd05f550 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -53,10 +53,10 @@ typedef struct {
gpr_refcount refs;
/** load balancing policy name */
char *lb_policy_name;
-
+ /** the path component of the uri passed in */
+ char *target_name;
/** the addresses that we've 'resolved' */
grpc_lb_addresses *addresses;
-
/** mutex guarding the rest of the state */
gpr_mu mu;
/** have we published? */
@@ -121,7 +121,8 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
if (r->next_completion != NULL && !r->published) {
r->published = true;
*r->target_result = grpc_resolver_result_create(
- "", grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */),
+ r->target_name,
+ grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */),
r->lb_policy_name, NULL);
grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
r->next_completion = NULL;
@@ -133,6 +134,7 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
gpr_mu_destroy(&r->mu);
grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */);
gpr_free(r->lb_policy_name);
+ gpr_free(r->target_name);
gpr_free(r);
}
@@ -219,10 +221,12 @@ static grpc_resolver *sockaddr_create(grpc_resolver_args *args,
if (errors_found) break;
}
+ r->target_name = gpr_strdup(args->uri->path);
gpr_slice_buffer_destroy(&path_parts);
gpr_slice_unref(path_slice);
if (errors_found) {
gpr_free(r->lb_policy_name);
+ gpr_free(r->target_name);
grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */);
gpr_free(r);
return NULL;
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
index 4350543c27..9af17fb5ae 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
@@ -50,10 +50,10 @@
#include "src/core/lib/surface/server.h"
void grpc_server_add_insecure_channel_from_fd(grpc_server *server,
- grpc_completion_queue *cq,
- int fd) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ void *reserved, int fd) {
+ GPR_ASSERT(reserved == NULL);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
char *name;
gpr_asprintf(&name, "fd:%d", fd);
@@ -65,7 +65,15 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server *server,
const grpc_channel_args *server_args = grpc_server_get_channel_args(server);
grpc_transport *transport = grpc_create_chttp2_transport(
&exec_ctx, server_args, server_endpoint, 0 /* is_client */);
- grpc_endpoint_add_to_pollset(&exec_ctx, server_endpoint, grpc_cq_pollset(cq));
+
+ grpc_pollset **pollsets;
+ size_t num_pollsets = 0;
+ grpc_server_get_pollsets(server, &pollsets, &num_pollsets);
+
+ for (size_t i = 0; i < num_pollsets; i++) {
+ grpc_endpoint_add_to_pollset(&exec_ctx, server_endpoint, pollsets[i]);
+ }
+
grpc_server_setup_transport(&exec_ctx, server, transport, NULL, server_args);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL);
grpc_exec_ctx_finish(&exec_ctx);
@@ -74,8 +82,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server *server,
#else // !GPR_SUPPORT_CHANNELS_FROM_FD
void grpc_server_add_insecure_channel_from_fd(grpc_server *server,
- grpc_completion_queue *cq,
- int fd) {
+ void *reserved, int fd) {
GPR_ASSERT(0);
}
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 0655b9353f..57d34d9e9a 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -158,13 +158,11 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
}
}
-grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack *channel_stack,
- int initial_refs, grpc_iomgr_cb_func destroy,
- void *destroy_arg,
- grpc_call_context_element *context,
- const void *transport_server_data,
- grpc_call_stack *call_stack) {
+grpc_error *grpc_call_stack_init(
+ grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
+ int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
+ grpc_call_context_element *context, const void *transport_server_data,
+ gpr_timespec deadline, grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
grpc_call_element_args args;
size_t count = channel_stack->count;
@@ -185,6 +183,7 @@ grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
args.call_stack = call_stack;
args.server_transport_data = transport_server_data;
args.context = context;
+ args.deadline = deadline;
call_elems[i].filter = channel_elems[i].filter;
call_elems[i].channel_data = channel_elems[i].channel_data;
call_elems[i].call_data = user_data;
@@ -276,16 +275,16 @@ static void destroy_op(grpc_exec_ctx *exec_ctx, void *op, grpc_error *error) {
}
void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx,
- grpc_call_element *cur_elem) {
+ grpc_call_element *elem) {
grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
memset(op, 0, sizeof(*op));
op->cancel_error = GRPC_ERROR_CANCELLED;
op->on_complete = grpc_closure_create(destroy_op, op);
- grpc_call_next_op(exec_ctx, cur_elem, op);
+ elem->filter->start_transport_stream_op(exec_ctx, elem, op);
}
void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *cur_elem,
+ grpc_call_element *elem,
grpc_status_code status,
gpr_slice *optional_message) {
grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
@@ -293,5 +292,16 @@ void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
op->on_complete = grpc_closure_create(destroy_op, op);
grpc_transport_stream_op_add_cancellation_with_message(op, status,
optional_message);
- grpc_call_next_op(exec_ctx, cur_elem, op);
+ elem->filter->start_transport_stream_op(exec_ctx, elem, op);
+}
+
+void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_status_code status,
+ gpr_slice *optional_message) {
+ grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
+ memset(op, 0, sizeof(*op));
+ op->on_complete = grpc_closure_create(destroy_op, op);
+ grpc_transport_stream_op_add_close(op, status, optional_message);
+ elem->filter->start_transport_stream_op(exec_ctx, elem, op);
}
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index 6b73cce380..1cfe2885d8 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -74,6 +74,7 @@ typedef struct {
grpc_call_stack *call_stack;
const void *server_transport_data;
grpc_call_context_element *context;
+ gpr_timespec deadline;
} grpc_call_element_args;
typedef struct {
@@ -220,13 +221,11 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
/* Initialize a call stack given a channel stack. transport_server_data is
expected to be NULL on a client, or an opaque transport owned pointer on the
server. */
-grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack *channel_stack,
- int initial_refs, grpc_iomgr_cb_func destroy,
- void *destroy_arg,
- grpc_call_context_element *context,
- const void *transport_server_data,
- grpc_call_stack *call_stack);
+grpc_error *grpc_call_stack_init(
+ grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
+ int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
+ grpc_call_context_element *context, const void *transport_server_data,
+ gpr_timespec deadline, grpc_call_stack *call_stack);
/* Set a pollset or a pollset_set for a call stack: must occur before the first
* op is started */
void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
@@ -290,6 +289,11 @@ void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
grpc_status_code status,
gpr_slice *optional_message);
+void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *cur_elem,
+ grpc_status_code status,
+ gpr_slice *optional_message);
+
extern int grpc_trace_channel;
#define GRPC_CALL_LOG_OP(sev, elem, op) \
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c
new file mode 100644
index 0000000000..079b98a2f8
--- /dev/null
+++ b/src/core/lib/channel/deadline_filter.c
@@ -0,0 +1,302 @@
+//
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+#include "src/core/lib/channel/deadline_filter.h"
+
+#include <stdbool.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/time.h>
+
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/timer.h"
+
+//
+// grpc_deadline_state
+//
+
+// Timer callback.
+static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_call_element* elem = arg;
+ grpc_deadline_state* deadline_state = elem->call_data;
+ gpr_mu_lock(&deadline_state->timer_mu);
+ deadline_state->timer_pending = false;
+ gpr_mu_unlock(&deadline_state->timer_mu);
+ if (error != GRPC_ERROR_CANCELLED) {
+ gpr_slice msg = gpr_slice_from_static_string("Deadline Exceeded");
+ grpc_call_element_send_cancel_with_message(
+ exec_ctx, elem, GRPC_STATUS_DEADLINE_EXCEEDED, &msg);
+ gpr_slice_unref(msg);
+ }
+ GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer");
+}
+
+// Starts the deadline timer.
+static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ gpr_timespec deadline) {
+ grpc_deadline_state* deadline_state = elem->call_data;
+ deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
+ if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
+ // Take a reference to the call stack, to be owned by the timer.
+ GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
+ gpr_mu_lock(&deadline_state->timer_mu);
+ deadline_state->timer_pending = true;
+ grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, timer_callback,
+ elem, gpr_now(GPR_CLOCK_MONOTONIC));
+ gpr_mu_unlock(&deadline_state->timer_mu);
+ }
+}
+
+// Cancels the deadline timer.
+static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
+ grpc_deadline_state* deadline_state) {
+ gpr_mu_lock(&deadline_state->timer_mu);
+ if (deadline_state->timer_pending) {
+ grpc_timer_cancel(exec_ctx, &deadline_state->timer);
+ deadline_state->timer_pending = false;
+ }
+ gpr_mu_unlock(&deadline_state->timer_mu);
+}
+
+// Callback run when the call is complete.
+static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ grpc_deadline_state* deadline_state = arg;
+ cancel_timer_if_needed(exec_ctx, deadline_state);
+ // Invoke the next callback.
+ deadline_state->next_on_complete->cb(
+ exec_ctx, deadline_state->next_on_complete->cb_arg, error);
+}
+
+// Inject our own on_complete callback into op.
+static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
+ grpc_transport_stream_op* op) {
+ deadline_state->next_on_complete = op->on_complete;
+ grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state);
+ op->on_complete = &deadline_state->on_complete;
+}
+
+// Callback and associated state for starting the timer after call stack
+// initialization has been completed.
+struct start_timer_after_init_state {
+ grpc_call_element* elem;
+ gpr_timespec deadline;
+ grpc_closure closure;
+};
+static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ struct start_timer_after_init_state* state = arg;
+ start_timer_if_needed(exec_ctx, state->elem, state->deadline);
+ gpr_free(state);
+}
+
+void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_call_element_args* args) {
+ grpc_deadline_state* deadline_state = elem->call_data;
+ memset(deadline_state, 0, sizeof(*deadline_state));
+ deadline_state->call_stack = args->call_stack;
+ gpr_mu_init(&deadline_state->timer_mu);
+ // Deadline will always be infinite on servers, so the timer will only be
+ // set on clients with a finite deadline.
+ const gpr_timespec deadline =
+ gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
+ if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
+ // When the deadline passes, we indicate the failure by sending down
+ // an op with cancel_error set. However, we can't send down any ops
+ // until after the call stack is fully initialized. If we start the
+ // timer here, we have no guarantee that the timer won't pop before
+ // call stack initialization is finished. To avoid that problem, we
+ // create a closure to start the timer, and we schedule that closure
+ // to be run after call stack initialization is done.
+ struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state));
+ state->elem = elem;
+ state->deadline = deadline;
+ grpc_closure_init(&state->closure, start_timer_after_init, state);
+ grpc_exec_ctx_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE, NULL);
+ }
+}
+
+void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem) {
+ grpc_deadline_state* deadline_state = elem->call_data;
+ cancel_timer_if_needed(exec_ctx, deadline_state);
+ gpr_mu_destroy(&deadline_state->timer_mu);
+}
+
+void grpc_deadline_state_client_start_transport_stream_op(
+ grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_transport_stream_op* op) {
+ grpc_deadline_state* deadline_state = elem->call_data;
+ if (op->cancel_error != GRPC_ERROR_NONE ||
+ op->close_error != GRPC_ERROR_NONE) {
+ cancel_timer_if_needed(exec_ctx, deadline_state);
+ } else {
+ // Make sure we know when the call is complete, so that we can cancel
+ // the timer.
+ if (op->recv_trailing_metadata != NULL) {
+ inject_on_complete_cb(deadline_state, op);
+ }
+ }
+}
+
+//
+// filter code
+//
+
+// Constructor for channel_data. Used for both client and server filters.
+static void init_channel_elem(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem,
+ grpc_channel_element_args* args) {
+ GPR_ASSERT(!args->is_last);
+}
+
+// Destructor for channel_data. Used for both client and server filters.
+static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem) {}
+
+// Call data used for both client and server filter.
+typedef struct base_call_data {
+ grpc_deadline_state deadline_state;
+} base_call_data;
+
+// Additional call data used only for the server filter.
+typedef struct server_call_data {
+ base_call_data base; // Must be first.
+ // The closure for receiving initial metadata.
+ grpc_closure recv_initial_metadata_ready;
+ // Received initial metadata batch.
+ grpc_metadata_batch* recv_initial_metadata;
+ // The original recv_initial_metadata_ready closure, which we chain to
+ // after our own closure is invoked.
+ grpc_closure* next_recv_initial_metadata_ready;
+} server_call_data;
+
+// Constructor for call_data. Used for both client and server filters.
+static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ grpc_call_element_args* args) {
+ // Note: size of call data is different between client and server.
+ memset(elem->call_data, 0, elem->filter->sizeof_call_data);
+ grpc_deadline_state_init(exec_ctx, elem, args);
+ return GRPC_ERROR_NONE;
+}
+
+// Destructor for call_data. Used for both client and server filters.
+static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ const grpc_call_final_info* final_info,
+ void* and_free_memory) {
+ grpc_deadline_state_destroy(exec_ctx, elem);
+}
+
+// Method for starting a call op for client filter.
+static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ grpc_transport_stream_op* op) {
+ grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
+ // Chain to next filter.
+ grpc_call_next_op(exec_ctx, elem, op);
+}
+
+// Callback for receiving initial metadata on the server.
+static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_call_element* elem = arg;
+ server_call_data* calld = elem->call_data;
+ // Get deadline from metadata and start the timer if needed.
+ start_timer_if_needed(exec_ctx, elem, calld->recv_initial_metadata->deadline);
+ // Invoke the next callback.
+ calld->next_recv_initial_metadata_ready->cb(
+ exec_ctx, calld->next_recv_initial_metadata_ready->cb_arg, error);
+}
+
+// Method for starting a call op for server filter.
+static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ grpc_transport_stream_op* op) {
+ server_call_data* calld = elem->call_data;
+ if (op->cancel_error != GRPC_ERROR_NONE ||
+ op->close_error != GRPC_ERROR_NONE) {
+ cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state);
+ } else {
+ // If we're receiving initial metadata, we need to get the deadline
+ // from the recv_initial_metadata_ready callback. So we inject our
+ // own callback into that hook.
+ if (op->recv_initial_metadata_ready != NULL) {
+ calld->next_recv_initial_metadata_ready = op->recv_initial_metadata_ready;
+ calld->recv_initial_metadata = op->recv_initial_metadata;
+ grpc_closure_init(&calld->recv_initial_metadata_ready,
+ recv_initial_metadata_ready, elem);
+ op->recv_initial_metadata_ready = &calld->recv_initial_metadata_ready;
+ }
+ // Make sure we know when the call is complete, so that we can cancel
+ // the timer.
+ // Note that we trigger this on recv_trailing_metadata, even though
+ // the client never sends trailing metadata, because this is the
+ // hook that tells us when the call is complete on the server side.
+ if (op->recv_trailing_metadata != NULL) {
+ inject_on_complete_cb(&calld->base.deadline_state, op);
+ }
+ }
+ // Chain to next filter.
+ grpc_call_next_op(exec_ctx, elem, op);
+}
+
+const grpc_channel_filter grpc_client_deadline_filter = {
+ client_start_transport_stream_op,
+ grpc_channel_next_op,
+ sizeof(base_call_data),
+ init_call_elem,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ destroy_call_elem,
+ 0, // sizeof(channel_data)
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ "deadline",
+};
+
+const grpc_channel_filter grpc_server_deadline_filter = {
+ server_start_transport_stream_op,
+ grpc_channel_next_op,
+ sizeof(server_call_data),
+ init_call_elem,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ destroy_call_elem,
+ 0, // sizeof(channel_data)
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ "deadline",
+};
diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h
new file mode 100644
index 0000000000..685df87761
--- /dev/null
+++ b/src/core/lib/channel/deadline_filter.h
@@ -0,0 +1,79 @@
+//
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+#ifndef GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H
+#define GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H
+
+#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/iomgr/timer.h"
+
+// State used for filters that enforce call deadlines.
+// Must be the first field in the filter's call_data.
+typedef struct grpc_deadline_state {
+ // We take a reference to the call stack for the timer callback.
+ grpc_call_stack* call_stack;
+ // Guards access to timer_pending and timer.
+ gpr_mu timer_mu;
+ // True if the timer callback is currently pending.
+ bool timer_pending;
+ // The deadline timer.
+ grpc_timer timer;
+ // Closure to invoke when the call is complete.
+ // We use this to cancel the timer.
+ grpc_closure on_complete;
+ // The original on_complete closure, which we chain to after our own
+ // closure is invoked.
+ grpc_closure* next_on_complete;
+} grpc_deadline_state;
+
+// To be used in a filter's init_call_elem(), destroy_call_elem(), and
+// start_transport_stream_op() methods to enforce call deadlines.
+//
+// REQUIRES: The first field in elem->call_data is a grpc_deadline_state.
+//
+// For grpc_deadline_state_client_start_transport_stream_op(), it is the
+// caller's responsibility to chain to the next filter if necessary
+// after the function returns.
+void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_call_element_args* args);
+void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem);
+void grpc_deadline_state_client_start_transport_stream_op(
+ grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_transport_stream_op* op);
+
+// Deadline filters for direct client channels and server channels.
+// Note: Deadlines for non-direct client channels are handled by the
+// client_channel filter.
+extern const grpc_channel_filter grpc_client_deadline_filter;
+extern const grpc_channel_filter grpc_server_deadline_filter;
+
+#endif /* GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H */
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index edcc741ff6..1dc05fb20d 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -103,8 +103,8 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
grpc_mdstr_as_c_string(md->value));
gpr_slice message = gpr_slice_from_copied_string(message_string);
gpr_free(message_string);
- grpc_call_element_send_cancel_with_message(a->exec_ctx, a->elem,
- GRPC_STATUS_CANCELLED, &message);
+ grpc_call_element_send_close_with_message(a->exec_ctx, a->elem,
+ GRPC_STATUS_CANCELLED, &message);
return NULL;
} else if (md == GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC) {
return NULL;
diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c
index 6613785dea..02fc68fc3a 100644
--- a/src/core/lib/channel/message_size_filter.c
+++ b/src/core/lib/channel/message_size_filter.c
@@ -40,8 +40,9 @@
#include "src/core/lib/channel/channel_args.h"
+#define DEFAULT_MAX_SEND_MESSAGE_LENGTH -1 // Unlimited.
// The protobuf library will (by default) start warning at 100 megs.
-#define DEFAULT_MAX_MESSAGE_LENGTH (4 * 1024 * 1024)
+#define DEFAULT_MAX_RECV_MESSAGE_LENGTH (4 * 1024 * 1024)
typedef struct call_data {
// Receive closures are chained: we inject this closure as the
@@ -55,8 +56,8 @@ typedef struct call_data {
} call_data;
typedef struct channel_data {
- size_t max_send_size;
- size_t max_recv_size;
+ int max_send_size;
+ int max_recv_size;
} channel_data;
// Callback invoked when we receive a message. Here we check the max
@@ -66,15 +67,15 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
grpc_call_element* elem = user_data;
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
- if (*calld->recv_message != NULL &&
- (*calld->recv_message)->length > chand->max_recv_size) {
+ if (*calld->recv_message != NULL && chand->max_recv_size >= 0 &&
+ (*calld->recv_message)->length > (size_t)chand->max_recv_size) {
char* message_string;
- gpr_asprintf(
- &message_string, "Received message larger than max (%u vs. %lu)",
- (*calld->recv_message)->length, (unsigned long)chand->max_recv_size);
+ gpr_asprintf(&message_string,
+ "Received message larger than max (%u vs. %d)",
+ (*calld->recv_message)->length, chand->max_recv_size);
gpr_slice message = gpr_slice_from_copied_string(message_string);
gpr_free(message_string);
- grpc_call_element_send_cancel_with_message(
+ grpc_call_element_send_close_with_message(
exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message);
}
// Invoke the next callback.
@@ -88,14 +89,14 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
// Check max send message size.
- if (op->send_message != NULL &&
- op->send_message->length > chand->max_send_size) {
+ if (op->send_message != NULL && chand->max_send_size >= 0 &&
+ op->send_message->length > (size_t)chand->max_send_size) {
char* message_string;
- gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %lu)",
- op->send_message->length, (unsigned long)chand->max_send_size);
+ gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)",
+ op->send_message->length, chand->max_send_size);
gpr_slice message = gpr_slice_from_copied_string(message_string);
gpr_free(message_string);
- grpc_call_element_send_cancel_with_message(
+ grpc_call_element_send_close_with_message(
exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message);
}
// Inject callback for receiving a message.
@@ -130,19 +131,22 @@ static void init_channel_elem(grpc_exec_ctx* exec_ctx,
GPR_ASSERT(!args->is_last);
channel_data* chand = elem->channel_data;
memset(chand, 0, sizeof(*chand));
- chand->max_send_size = DEFAULT_MAX_MESSAGE_LENGTH;
- chand->max_recv_size = DEFAULT_MAX_MESSAGE_LENGTH;
- const grpc_integer_options options = {DEFAULT_MAX_MESSAGE_LENGTH, 0, INT_MAX};
+ chand->max_send_size = DEFAULT_MAX_SEND_MESSAGE_LENGTH;
+ chand->max_recv_size = DEFAULT_MAX_RECV_MESSAGE_LENGTH;
for (size_t i = 0; i < args->channel_args->num_args; ++i) {
if (strcmp(args->channel_args->args[i].key,
GRPC_ARG_MAX_SEND_MESSAGE_LENGTH) == 0) {
- chand->max_send_size = (size_t)grpc_channel_arg_get_integer(
- &args->channel_args->args[i], options);
+ const grpc_integer_options options = {DEFAULT_MAX_SEND_MESSAGE_LENGTH, 0,
+ INT_MAX};
+ chand->max_send_size =
+ grpc_channel_arg_get_integer(&args->channel_args->args[i], options);
}
if (strcmp(args->channel_args->args[i].key,
GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH) == 0) {
- chand->max_recv_size = (size_t)grpc_channel_arg_get_integer(
- &args->channel_args->args[i], options);
+ const grpc_integer_options options = {DEFAULT_MAX_RECV_MESSAGE_LENGTH, 0,
+ INT_MAX};
+ chand->max_recv_size =
+ grpc_channel_arg_get_integer(&args->channel_args->args[i], options);
}
}
}
diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c
index e366961936..31c80260f8 100644
--- a/src/core/lib/iomgr/error.c
+++ b/src/core/lib/iomgr/error.c
@@ -324,6 +324,64 @@ const char *grpc_error_get_str(grpc_error *err, grpc_error_strs which) {
return gpr_avl_get(err->strs, (void *)(uintptr_t)which);
}
+typedef struct {
+ grpc_error *error;
+ grpc_status_code code;
+ const char *msg;
+} special_error_status_map;
+static special_error_status_map error_status_map[] = {
+ {GRPC_ERROR_NONE, GRPC_STATUS_OK, ""},
+ {GRPC_ERROR_CANCELLED, GRPC_STATUS_CANCELLED, "RPC cancelled"},
+ {GRPC_ERROR_OOM, GRPC_STATUS_RESOURCE_EXHAUSTED, "Out of memory"},
+};
+
+static grpc_error *recursively_find_error_with_status(grpc_error *error,
+ intptr_t *status) {
+ // If the error itself has a status code, return it.
+ if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, status)) {
+ return error;
+ }
+ // Otherwise, search through its children.
+ intptr_t key = 0;
+ while (true) {
+ grpc_error *child_error = gpr_avl_get(error->errs, (void *)key++);
+ if (child_error == NULL) break;
+ grpc_error *result =
+ recursively_find_error_with_status(child_error, status);
+ if (result != NULL) return result;
+ }
+ return NULL;
+}
+
+void grpc_error_get_status(grpc_error *error, grpc_status_code *code,
+ const char **msg) {
+ // Handle special errors via the static map.
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(error_status_map); ++i) {
+ if (error == error_status_map[i].error) {
+ *code = error_status_map[i].code;
+ *msg = error_status_map[i].msg;
+ return;
+ }
+ }
+ // Populate code.
+ // Start with the parent error and recurse through the tree of children
+ // until we find the first one that has a status code.
+ intptr_t status = GRPC_STATUS_UNKNOWN; // Default in case we don't find one.
+ grpc_error *found_error = recursively_find_error_with_status(error, &status);
+ *code = (grpc_status_code)status;
+ // Now populate msg.
+ // If we found an error with a status code above, use that; otherwise,
+ // fall back to using the parent error.
+ if (found_error == NULL) found_error = error;
+ // If the error has a status message, use it. Otherwise, fall back to
+ // the error description.
+ *msg = grpc_error_get_str(found_error, GRPC_ERROR_STR_GRPC_MESSAGE);
+ if (*msg == NULL) {
+ *msg = grpc_error_get_str(found_error, GRPC_ERROR_STR_DESCRIPTION);
+ if (*msg == NULL) *msg = "uknown error"; // Just in case.
+ }
+}
+
grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) {
GPR_TIMER_BEGIN("grpc_error_add_child", 0);
grpc_error *new = copy_error_and_unref(src);
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index 6c769accdb..7e2fd7a3bd 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -37,6 +37,7 @@
#include <stdbool.h>
#include <stdint.h>
+#include <grpc/status.h>
#include <grpc/support/time.h>
/// Opaque representation of an error.
@@ -175,6 +176,13 @@ grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which,
/// Returns NULL if the specified string is not set.
/// Caller does NOT own return value.
const char *grpc_error_get_str(grpc_error *error, grpc_error_strs which);
+
+/// A utility function to get the status code and message to be returned
+/// to the application. If not set in the top-level message, looks
+/// through child errors until it finds the first one with these attributes.
+void grpc_error_get_status(grpc_error *error, grpc_status_code *code,
+ const char **msg);
+
/// Add a child error: an error that is believed to have contributed to this
/// error occurring. Allows root causing high level errors from lower level
/// errors that contributed to them.
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 3de199d35a..5430d8836d 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -1897,7 +1897,7 @@ const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
}
if (!is_grpc_wakeup_signal_initialized) {
- grpc_use_signal(SIGRTMIN + 2);
+ grpc_use_signal(SIGRTMIN + 6);
}
fd_global_init();
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index 0c04e23bca..e7ae1ef695 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -146,61 +146,57 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
grpc_timer_cancel(exec_ctx, &ac->alarm);
gpr_mu_lock(&ac->mu);
- if (error == GRPC_ERROR_NONE) {
- do {
- so_error_size = sizeof(so_error);
- err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error,
- &so_error_size);
- } while (err < 0 && errno == EINTR);
- if (err < 0) {
- error = GRPC_OS_ERROR(errno, "getsockopt");
- goto finish;
- } else if (so_error != 0) {
- if (so_error == ENOBUFS) {
- /* We will get one of these errors if we have run out of
- memory in the kernel for the data structures allocated
- when you connect a socket. If this happens it is very
- likely that if we wait a little bit then try again the
- connection will work (since other programs or this
- program will close their network connections and free up
- memory). This does _not_ indicate that there is anything
- wrong with the server we are connecting to, this is a
- local problem.
-
- If you are looking at this code, then chances are that
- your program or another program on the same computer
- opened too many network connections. The "easy" fix:
- don't do that! */
- gpr_log(GPR_ERROR, "kernel out of buffers");
- gpr_mu_unlock(&ac->mu);
- grpc_fd_notify_on_write(exec_ctx, fd, &ac->write_closure);
- return;
- } else {
- switch (so_error) {
- case ECONNREFUSED:
- error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, errno);
- error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
- "Connection refused");
- break;
- default:
- error = GRPC_OS_ERROR(errno, "getsockopt(SO_ERROR)");
- break;
- }
- goto finish;
- }
- } else {
- grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd);
- *ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str);
- fd = NULL;
- goto finish;
- }
- } else {
+ if (error != GRPC_ERROR_NONE) {
error =
grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, "Timeout occurred");
goto finish;
}
- GPR_UNREACHABLE_CODE(return );
+ do {
+ so_error_size = sizeof(so_error);
+ err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error,
+ &so_error_size);
+ } while (err < 0 && errno == EINTR);
+ if (err < 0) {
+ error = GRPC_OS_ERROR(errno, "getsockopt");
+ goto finish;
+ }
+
+ switch (so_error) {
+ case 0:
+ grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd);
+ *ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str);
+ fd = NULL;
+ break;
+ case ENOBUFS:
+ /* We will get one of these errors if we have run out of
+ memory in the kernel for the data structures allocated
+ when you connect a socket. If this happens it is very
+ likely that if we wait a little bit then try again the
+ connection will work (since other programs or this
+ program will close their network connections and free up
+ memory). This does _not_ indicate that there is anything
+ wrong with the server we are connecting to, this is a
+ local problem.
+
+ If you are looking at this code, then chances are that
+ your program or another program on the same computer
+ opened too many network connections. The "easy" fix:
+ don't do that! */
+ gpr_log(GPR_ERROR, "kernel out of buffers");
+ gpr_mu_unlock(&ac->mu);
+ grpc_fd_notify_on_write(exec_ctx, fd, &ac->write_closure);
+ return;
+ case ECONNREFUSED:
+ /* This error shouldn't happen for anything other than connect(). */
+ error = GRPC_OS_ERROR(so_error, "connect");
+ break;
+ default:
+ /* We don't really know which syscall triggered the problem here,
+ so punt by reporting getsockopt(). */
+ error = GRPC_OS_ERROR(so_error, "getsockopt(SO_ERROR)");
+ break;
+ }
finish:
if (fd != NULL) {
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index 533f07abc0..a5f508a2c3 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -319,6 +319,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
? GRPC_ERROR_NONE
: GRPC_WSA_ERROR(info->wsa_error, "WSASend");
grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
+ if (allocated) gpr_free(allocated);
return;
}
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index e18bae1ed6..0362940922 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -127,8 +127,6 @@ struct grpc_call {
/* client or server call */
bool is_client;
- /* is the alarm set */
- bool have_alarm;
/** has grpc_call_destroy been called */
bool destroy_called;
/** flag indicating that cancellation is inherited */
@@ -171,9 +169,6 @@ struct grpc_call {
/* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT];
- /* Deadline alarm - if have_alarm is non-zero */
- grpc_timer alarm;
-
/* for the client, extra metadata is initial metadata; for the
server, it's trailing metadata */
grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
@@ -216,8 +211,6 @@ struct grpc_call {
#define CALL_FROM_TOP_ELEM(top_elem) \
CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
-static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call,
- gpr_timespec deadline);
static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_transport_stream_op *op);
static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
@@ -265,39 +258,8 @@ grpc_call *grpc_call_create(
call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
}
}
- call->send_deadline =
- gpr_convert_clock_type(send_deadline, GPR_CLOCK_MONOTONIC);
- GRPC_CHANNEL_INTERNAL_REF(channel, "call");
- /* initial refcount dropped by grpc_call_destroy */
- grpc_error *error = grpc_call_stack_init(
- &exec_ctx, channel_stack, 1, destroy_call, call, call->context,
- server_transport_data, CALL_STACK_FROM_CALL(call));
- if (error != GRPC_ERROR_NONE) {
- intptr_t status;
- if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status))
- status = GRPC_STATUS_UNKNOWN;
- const char *error_str =
- grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION);
- close_with_status(&exec_ctx, call, (grpc_status_code)status,
- error_str == NULL ? "unknown error" : error_str);
- GRPC_ERROR_UNREF(error);
- }
- if (cq != NULL) {
- GPR_ASSERT(
- pollset_set_alternative == NULL &&
- "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL.");
- GRPC_CQ_INTERNAL_REF(cq, "bind");
- call->pollent =
- grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
- }
- if (pollset_set_alternative != NULL) {
- call->pollent =
- grpc_polling_entity_create_from_pollset_set(pollset_set_alternative);
- }
- if (!grpc_polling_entity_is_empty(&call->pollent)) {
- grpc_call_stack_set_pollset_or_pollset_set(
- &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
- }
+ send_deadline = gpr_convert_clock_type(send_deadline, GPR_CLOCK_MONOTONIC);
+
if (parent_call != NULL) {
GRPC_CALL_INTERNAL_REF(parent_call, "child");
GPR_ASSERT(call->is_client);
@@ -339,10 +301,38 @@ grpc_call *grpc_call_create(
gpr_mu_unlock(&parent_call->mu);
}
- if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) !=
- 0) {
- set_deadline_alarm(&exec_ctx, call, send_deadline);
+
+ call->send_deadline = send_deadline;
+
+ GRPC_CHANNEL_INTERNAL_REF(channel, "call");
+ /* initial refcount dropped by grpc_call_destroy */
+ grpc_error *error = grpc_call_stack_init(
+ &exec_ctx, channel_stack, 1, destroy_call, call, call->context,
+ server_transport_data, send_deadline, CALL_STACK_FROM_CALL(call));
+ if (error != GRPC_ERROR_NONE) {
+ grpc_status_code status;
+ const char *error_str;
+ grpc_error_get_status(error, &status, &error_str);
+ close_with_status(&exec_ctx, call, status, error_str);
+ GRPC_ERROR_UNREF(error);
}
+ if (cq != NULL) {
+ GPR_ASSERT(
+ pollset_set_alternative == NULL &&
+ "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL.");
+ GRPC_CQ_INTERNAL_REF(cq, "bind");
+ call->pollent =
+ grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
+ }
+ if (pollset_set_alternative != NULL) {
+ call->pollent =
+ grpc_polling_entity_create_from_pollset_set(pollset_set_alternative);
+ }
+ if (!grpc_polling_entity_is_empty(&call->pollent)) {
+ grpc_call_stack_set_pollset_or_pollset_set(
+ &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
+ }
+
grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_call_create", 0);
return call;
@@ -455,20 +445,11 @@ static void set_status_details(grpc_call *call, status_source source,
static void set_status_from_error(grpc_call *call, status_source source,
grpc_error *error) {
- intptr_t status;
- if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) {
- set_status_code(call, source, (uint32_t)status);
- } else {
- set_status_code(call, source, GRPC_STATUS_INTERNAL);
- }
- const char *msg = grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE);
- bool free_msg = false;
- if (msg == NULL) {
- free_msg = true;
- msg = grpc_error_string(error);
- }
+ grpc_status_code status;
+ const char *msg;
+ grpc_error_get_status(error, &status, &msg);
+ set_status_code(call, source, (uint32_t)status);
set_status_details(call, source, grpc_mdstr_from_string(msg));
- if (free_msg) grpc_error_free_string(msg);
}
static void set_incoming_compression_algorithm(
@@ -741,9 +722,6 @@ void grpc_call_destroy(grpc_call *c) {
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->destroy_called);
c->destroy_called = 1;
- if (c->have_alarm) {
- grpc_timer_cancel(&exec_ctx, &c->alarm);
- }
cancel = !c->received_final_op;
gpr_mu_unlock(&c->mu);
if (cancel) grpc_call_cancel(c, NULL);
@@ -781,7 +759,6 @@ typedef struct termination_closure {
grpc_closure closure;
grpc_call *call;
grpc_error *error;
- grpc_closure *op_closure;
enum { TC_CANCEL, TC_CLOSE } type;
grpc_transport_stream_op op;
} termination_closure;
@@ -798,7 +775,6 @@ static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
break;
}
GRPC_ERROR_UNREF(tc->error);
- grpc_exec_ctx_sched(exec_ctx, tc->op_closure, GRPC_ERROR_NONE, NULL);
gpr_free(tc);
}
@@ -818,7 +794,6 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
tc->op.close_error = tc->error;
/* reuse closure to catch completion */
grpc_closure_init(&tc->closure, done_termination, tc);
- tc->op_closure = tc->op.on_complete;
tc->op.on_complete = &tc->closure;
execute_op(exec_ctx, tc->call, &tc->op);
}
@@ -901,32 +876,6 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
return CALL_FROM_TOP_ELEM(elem);
}
-static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- grpc_call *call = arg;
- gpr_mu_lock(&call->mu);
- call->have_alarm = 0;
- if (error != GRPC_ERROR_CANCELLED) {
- cancel_with_status(exec_ctx, call, GRPC_STATUS_DEADLINE_EXCEEDED,
- "Deadline Exceeded");
- }
- gpr_mu_unlock(&call->mu);
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "alarm");
-}
-
-static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call,
- gpr_timespec deadline) {
- if (call->have_alarm) {
- gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
- assert(0);
- return;
- }
- GRPC_CALL_INTERNAL_REF(call, "alarm");
- call->have_alarm = 1;
- call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
- grpc_timer_init(exec_ctx, &call->alarm, call->send_deadline, call_alarm, call,
- gpr_now(GPR_CLOCK_MONOTONIC));
-}
-
/* we offset status by a small amount when storing it into transport metadata
as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
*/
@@ -1268,9 +1217,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
0 &&
!call->is_client) {
- GPR_TIMER_BEGIN("set_deadline_alarm", 0);
- set_deadline_alarm(exec_ctx, call, md->deadline);
- GPR_TIMER_END("set_deadline_alarm", 0);
+ call->send_deadline =
+ gpr_convert_clock_type(md->deadline, GPR_CLOCK_MONOTONIC);
}
}
@@ -1299,9 +1247,17 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
GRPC_ERROR_REF(error);
gpr_mu_lock(&call->mu);
+
+ // If the error has an associated status code, set the call's status.
+ intptr_t status;
+ if (error != GRPC_ERROR_NONE &&
+ grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) {
+ set_status_from_error(call, STATUS_FROM_CORE, error);
+ }
+
if (bctl->send_initial_metadata) {
if (error != GRPC_ERROR_NONE) {
- set_status_code(call, STATUS_FROM_CORE, GRPC_STATUS_UNAVAILABLE);
+ set_status_from_error(call, STATUS_FROM_CORE, error);
}
grpc_metadata_batch_destroy(
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
@@ -1319,9 +1275,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_metadata_batch_filter(md, recv_trailing_filter, call);
call->received_final_op = true;
- if (call->have_alarm) {
- grpc_timer_cancel(exec_ctx, &call->alarm);
- }
/* propagate cancellation to any interested children */
child_call = call->first_child;
if (child_call != NULL) {
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index 3049284f68..4dbf3aae63 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -57,6 +57,8 @@ typedef struct grpc_cq_completion {
uintptr_t next;
} grpc_cq_completion;
+//#define GRPC_CQ_REF_COUNT_DEBUG
+
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line);
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 64bdfc3446..289f4ce8e8 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -43,6 +43,7 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/compress_filter.h"
#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/channel/deadline_filter.h"
#include "src/core/lib/channel/http_client_filter.h"
#include "src/core/lib/channel/http_server_filter.h"
#include "src/core/lib/channel/message_size_filter.h"
@@ -100,6 +101,12 @@ static bool maybe_add_http_filter(grpc_channel_stack_builder *builder,
static void register_builtin_channel_init() {
grpc_channel_init_register_stage(
+ GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ prepend_filter, (void *)&grpc_client_deadline_filter);
+ grpc_channel_init_register_stage(
+ GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
+ (void *)&grpc_server_deadline_filter);
+ grpc_channel_init_register_stage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
prepend_filter, (void *)&grpc_message_size_filter);
grpc_channel_init_register_stage(
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 56fb80e92e..7300d79b9f 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -1106,6 +1106,12 @@ void grpc_server_start(grpc_server *server) {
grpc_exec_ctx_finish(&exec_ctx);
}
+void grpc_server_get_pollsets(grpc_server *server, grpc_pollset ***pollsets,
+ size_t *pollset_count) {
+ *pollset_count = server->cq_count;
+ *pollsets = server->pollsets;
+}
+
void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
grpc_transport *transport,
grpc_pollset *accepting_pollset,
diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h
index fb6e4d60c5..551a40a4ff 100644
--- a/src/core/lib/surface/server.h
+++ b/src/core/lib/surface/server.h
@@ -60,4 +60,9 @@ const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server);
int grpc_server_has_open_connections(grpc_server *server);
+/* Do not call this before grpc_server_start. Returns the pollsets and the
+ * number of pollsets via 'pollsets' and 'pollset_count'. */
+void grpc_server_get_pollsets(grpc_server *server, grpc_pollset ***pollsets,
+ size_t *pollset_count);
+
#endif /* GRPC_CORE_LIB_SURFACE_SERVER_H */
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 08f9d7e8d9..82fc605218 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -47,7 +47,7 @@
void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) {
gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count);
gpr_log(GPR_DEBUG, "%s %p:%p REF %d->%d %s", refcount->object_type,
- refcount, refcount->destroy.cb_arg, val, val + 1, reason);
+ refcount, refcount->destroy.cb_arg, (int)val, (int)val + 1, reason);
#else
void grpc_stream_ref(grpc_stream_refcount *refcount) {
#endif
@@ -59,7 +59,7 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount,
const char *reason) {
gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count);
gpr_log(GPR_DEBUG, "%s %p:%p UNREF %d->%d %s", refcount->object_type,
- refcount, refcount->destroy.cb_arg, val, val - 1, reason);
+ refcount, refcount->destroy.cb_arg, (int)val, (int)val - 1, reason);
#else
void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_stream_refcount *refcount) {
@@ -224,7 +224,7 @@ void grpc_transport_stream_op_add_cancellation_with_message(
error = GRPC_ERROR_CREATE("Call cancelled");
}
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status);
- add_error(op, &op->close_error, error);
+ add_error(op, &op->cancel_error, error);
}
void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,