aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2016-06-20 22:04:48 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2016-06-20 22:05:32 -0700
commit280fd2a1dcc4b8873fe6b2f1df57d50c92c27e1e (patch)
treebb2ddbda53db7f8418cd8504bc18dbeb7f91bebd /src/core/ext
parent8782d1b7e104b74e0bf1ecba386420088140c76c (diff)
parent5988716d9d6e33cd59631865527d73d3caa87387 (diff)
Merge branch 'master' of github.com:grpc/grpc into grpclb_v0
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/census/grpc_filter.c6
-rw-r--r--src/core/ext/client_config/channel_connectivity.c34
-rw-r--r--src/core/ext/client_config/client_channel.c111
-rw-r--r--src/core/ext/client_config/connector.h2
-rw-r--r--src/core/ext/client_config/lb_policy.c6
-rw-r--r--src/core/ext/client_config/lb_policy.h10
-rw-r--r--src/core/ext/client_config/subchannel.c89
-rw-r--r--src/core/ext/client_config/subchannel.h2
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.c47
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c135
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c78
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c61
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c31
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c4
-rw-r--r--src/core/ext/resolver/zookeeper/zookeeper_resolver.c513
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c8
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c26
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.c63
-rw-r--r--src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c91
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c373
-rw-r--r--src/core/ext/transport/chttp2/transport/frame.h8
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_data.c71
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_data.h11
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_goaway.c37
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_goaway.h4
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_ping.c19
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_ping.h6
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_rst_stream.c30
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_rst_stream.h4
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_settings.c44
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_settings.h4
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_window_update.c26
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_window_update.h4
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.c650
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.h14
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_table.c44
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_table.h9
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h39
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c437
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c17
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c4
41 files changed, 1549 insertions, 1623 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c
index 86ec2ae87a..72e4e5427e 100644
--- a/src/core/ext/census/grpc_filter.c
+++ b/src/core/ext/census/grpc_filter.c
@@ -91,14 +91,14 @@ static void client_start_transport_op(grpc_exec_ctx *exec_ctx,
}
static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr,
- bool success) {
+ grpc_error *error) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- if (success) {
+ if (error == GRPC_ERROR_NONE) {
extract_and_annotate_method_tag(calld->recv_initial_metadata, calld, chand);
}
- calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success);
+ calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, error);
}
static void server_mutate_op(grpc_call_element *elem,
diff --git a/src/core/ext/client_config/channel_connectivity.c b/src/core/ext/client_config/channel_connectivity.c
index cc60f2485a..20c01a9a7c 100644
--- a/src/core/ext/client_config/channel_connectivity.c
+++ b/src/core/ext/client_config/channel_connectivity.c
@@ -75,7 +75,6 @@ typedef enum {
typedef struct {
gpr_mu mu;
callback_phase phase;
- int success;
grpc_closure on_complete;
grpc_timer alarm;
grpc_connectivity_state state;
@@ -122,7 +121,7 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
}
static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
- int due_to_completion) {
+ bool due_to_completion, grpc_error *error) {
int delete = 0;
if (due_to_completion) {
@@ -130,14 +129,26 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
}
gpr_mu_lock(&w->mu);
+
if (due_to_completion) {
- w->success = 1;
+ if (grpc_trace_operation_failures) {
+ GRPC_LOG_IF_ERROR("watch_completion_error", GRPC_ERROR_REF(error));
+ }
+ GRPC_ERROR_UNREF(error);
+ error = GRPC_ERROR_NONE;
+ } else {
+ if (error == GRPC_ERROR_NONE) {
+ error =
+ GRPC_ERROR_CREATE("Timed out waiting for connection state change");
+ } else if (error == GRPC_ERROR_CANCELLED) {
+ error = GRPC_ERROR_NONE;
+ }
}
switch (w->phase) {
case WAITING:
w->phase = CALLING_BACK;
- grpc_cq_end_op(exec_ctx, w->cq, w->tag, w->success, finished_completion,
- w, &w->completion_storage);
+ grpc_cq_end_op(exec_ctx, w->cq, w->tag, GRPC_ERROR_REF(error),
+ finished_completion, w, &w->completion_storage);
break;
case CALLING_BACK:
w->phase = CALLING_BACK_AND_FINISHED;
@@ -153,14 +164,18 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
if (delete) {
delete_state_watcher(exec_ctx, w);
}
+
+ GRPC_ERROR_UNREF(error);
}
-static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, bool success) {
- partly_done(exec_ctx, pw, 1);
+static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw,
+ grpc_error *error) {
+ partly_done(exec_ctx, pw, true, GRPC_ERROR_REF(error));
}
-static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, bool success) {
- partly_done(exec_ctx, pw, 0);
+static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw,
+ grpc_error *error) {
+ partly_done(exec_ctx, pw, false, GRPC_ERROR_REF(error));
}
void grpc_channel_watch_connectivity_state(
@@ -185,7 +200,6 @@ void grpc_channel_watch_connectivity_state(
grpc_closure_init(&w->on_complete, watch_complete, w);
w->phase = WAITING;
w->state = last_observed_state;
- w->success = 0;
w->cq = cq;
w->tag = tag;
w->channel = channel;
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index e297dfa0ef..1d5a7d5224 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -117,6 +117,7 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
channel_data *chand,
grpc_connectivity_state state,
+ grpc_error *error,
const char *reason) {
if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
state == GRPC_CHANNEL_SHUTDOWN) &&
@@ -127,11 +128,13 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
/* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY,
/* check= */ 0);
}
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, reason);
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
+ reason);
}
-static void on_lb_policy_state_changed_locked(
- grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) {
+static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
+ lb_policy_connectivity_watcher *w,
+ grpc_error *error) {
grpc_connectivity_state publish_state = w->state;
/* check if the notification is for a stale policy */
if (w->lb_policy != w->chand->lb_policy) return;
@@ -143,18 +146,18 @@ static void on_lb_policy_state_changed_locked(
w->chand->lb_policy = NULL;
}
set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
- "lb_changed");
+ GRPC_ERROR_REF(error), "lb_changed");
if (w->state != GRPC_CHANNEL_SHUTDOWN) {
watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
}
}
static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
lb_policy_connectivity_watcher *w = arg;
gpr_mu_lock(&w->chand->mu_config);
- on_lb_policy_state_changed_locked(exec_ctx, w);
+ on_lb_policy_state_changed_locked(exec_ctx, w, error);
gpr_mu_unlock(&w->chand->mu_config);
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
@@ -176,19 +179,22 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
}
static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
channel_data *chand = arg;
grpc_lb_policy *lb_policy = NULL;
grpc_lb_policy *old_lb_policy;
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
int exit_idle = 0;
+ grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy");
if (chand->incoming_configuration != NULL) {
lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
if (lb_policy != NULL) {
GRPC_LB_POLICY_REF(lb_policy, "channel");
GRPC_LB_POLICY_REF(lb_policy, "config_change");
- state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy);
+ GRPC_ERROR_UNREF(state_error);
+ state =
+ grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error);
}
grpc_client_config_unref(exec_ctx, chand->incoming_configuration);
@@ -208,7 +214,9 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
NULL);
} else if (chand->resolver == NULL /* disconnected */) {
- grpc_closure_list_fail_all(&chand->waiting_for_config_closures);
+ grpc_closure_list_fail_all(
+ &chand->waiting_for_config_closures,
+ GRPC_ERROR_CREATE_REFERENCING("Channel disconnected", &error, 1));
grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
NULL);
}
@@ -218,9 +226,9 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
chand->exit_idle_when_lb_policy_arrives = 0;
}
- if (iomgr_success && chand->resolver) {
- set_channel_connectivity_state_locked(exec_ctx, chand, state,
- "new_lb+resolver");
+ if (error == GRPC_ERROR_NONE && chand->resolver) {
+ set_channel_connectivity_state_locked(
+ exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
if (lb_policy != NULL) {
watch_lb_policy(exec_ctx, chand, lb_policy, state);
}
@@ -235,8 +243,12 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
}
+ grpc_error *refs[] = {error, state_error};
set_channel_connectivity_state_locked(
- exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN, "resolver_gone");
+ exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_CREATE_REFERENCING("Got config after disconnection", refs,
+ GPR_ARRAY_SIZE(refs)),
+ "resolver_gone");
gpr_mu_unlock(&chand->mu_config);
}
@@ -256,6 +268,7 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver");
+ GRPC_ERROR_UNREF(state_error);
}
static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
@@ -263,7 +276,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_transport_op *op) {
channel_data *chand = elem->channel_data;
- grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
GPR_ASSERT(op->set_accept_stream == false);
if (op->bind_pollset != NULL) {
@@ -282,7 +295,9 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
if (op->send_ping != NULL) {
if (chand->lb_policy == NULL) {
- grpc_exec_ctx_enqueue(exec_ctx, op->send_ping, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, op->send_ping,
+ GRPC_ERROR_CREATE("Ping with no load balancing"),
+ NULL);
} else {
grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping);
op->bind_pollset = NULL;
@@ -290,24 +305,29 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
op->send_ping = NULL;
}
- if (op->disconnect && chand->resolver != NULL) {
- set_channel_connectivity_state_locked(exec_ctx, chand,
- GRPC_CHANNEL_SHUTDOWN, "disconnect");
- grpc_resolver_shutdown(exec_ctx, chand->resolver);
- GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
- chand->resolver = NULL;
- if (!chand->started_resolving) {
- grpc_closure_list_fail_all(&chand->waiting_for_config_closures);
- grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
- NULL);
- }
- if (chand->lb_policy != NULL) {
- grpc_pollset_set_del_pollset_set(exec_ctx,
- chand->lb_policy->interested_parties,
- chand->interested_parties);
- GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
- chand->lb_policy = NULL;
+ if (op->disconnect_with_error != GRPC_ERROR_NONE) {
+ if (chand->resolver != NULL) {
+ set_channel_connectivity_state_locked(
+ exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
+ grpc_resolver_shutdown(exec_ctx, chand->resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
+ chand->resolver = NULL;
+ if (!chand->started_resolving) {
+ grpc_closure_list_fail_all(&chand->waiting_for_config_closures,
+ GRPC_ERROR_REF(op->disconnect_with_error));
+ grpc_exec_ctx_enqueue_list(exec_ctx,
+ &chand->waiting_for_config_closures, NULL);
+ }
+ if (chand->lb_policy != NULL) {
+ grpc_pollset_set_del_pollset_set(exec_ctx,
+ chand->lb_policy->interested_parties,
+ chand->interested_parties);
+ GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
+ chand->lb_policy = NULL;
+ }
}
+ GRPC_ERROR_UNREF(op->disconnect_with_error);
}
gpr_mu_unlock(&chand->mu_config);
}
@@ -327,16 +347,17 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready);
-static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
continue_picking_args *cpa = arg;
if (cpa->connected_subchannel == NULL) {
/* cancelled, do nothing */
- } else if (!success) {
- grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL);
+ } else if (error != GRPC_ERROR_NONE) {
+ grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL);
} else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->initial_metadata_flags,
cpa->connected_subchannel, cpa->on_ready)) {
- grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL);
}
gpr_free(cpa);
}
@@ -361,11 +382,12 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
connected_subchannel);
}
for (closure = chand->waiting_for_config_closures.head; closure != NULL;
- closure = grpc_closure_next(closure)) {
+ closure = closure->next_data.next) {
cpa = closure->cb_arg;
if (cpa->connected_subchannel == connected_subchannel) {
cpa->connected_subchannel = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, cpa->on_ready,
+ GRPC_ERROR_CREATE("Pick cancelled"), NULL);
}
}
gpr_mu_unlock(&chand->mu_config);
@@ -397,10 +419,11 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
cpa->on_ready = on_ready;
cpa->elem = elem;
grpc_closure_init(&cpa->closure, continue_picking, cpa);
- grpc_closure_list_add(&chand->waiting_for_config_closures, &cpa->closure,
- 1);
+ grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure,
+ GRPC_ERROR_NONE);
} else {
- grpc_exec_ctx_enqueue(exec_ctx, on_ready, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"),
+ NULL);
}
gpr_mu_unlock(&chand->mu_config);
return 0;
@@ -507,7 +530,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
channel_data *chand = elem->channel_data;
grpc_connectivity_state out;
gpr_mu_lock(&chand->mu_config);
- out = grpc_connectivity_state_check(&chand->state_tracker);
+ out = grpc_connectivity_state_check(&chand->state_tracker, NULL);
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
if (chand->lb_policy != NULL) {
grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy);
@@ -534,7 +557,7 @@ typedef struct {
} external_connectivity_watcher;
static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
external_connectivity_watcher *w = arg;
grpc_closure *follow_up = w->on_complete;
grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
@@ -542,7 +565,7 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
"external_connectivity_watcher");
gpr_free(w);
- follow_up->cb(exec_ctx, follow_up->cb_arg, iomgr_success);
+ follow_up->cb(exec_ctx, follow_up->cb_arg, error);
}
void grpc_client_channel_watch_connectivity_state(
diff --git a/src/core/ext/client_config/connector.h b/src/core/ext/client_config/connector.h
index dd85dfcb7d..ea9d23706e 100644
--- a/src/core/ext/client_config/connector.h
+++ b/src/core/ext/client_config/connector.h
@@ -64,7 +64,7 @@ typedef struct {
grpc_transport *transport;
/** channel arguments (to be passed to the filters) */
- const grpc_channel_args *channel_args;
+ grpc_channel_args *channel_args;
} grpc_connect_out_args;
struct grpc_connector_vtable {
diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c
index dc1612428e..8b980b2cca 100644
--- a/src/core/ext/client_config/lb_policy.c
+++ b/src/core/ext/client_config/lb_policy.c
@@ -139,6 +139,8 @@ void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,
}
grpc_connectivity_state grpc_lb_policy_check_connectivity(
- grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
- return policy->vtable->check_connectivity(exec_ctx, policy);
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_error **connectivity_error) {
+ return policy->vtable->check_connectivity(exec_ctx, policy,
+ connectivity_error);
}
diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index b07824ae1b..a2f5446fc6 100644
--- a/src/core/ext/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -76,9 +76,10 @@ struct grpc_lb_policy_vtable {
/** Try to enter a READY connectivity state */
void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
- /** Check the current connectivity */
- grpc_connectivity_state (*check_connectivity)(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *policy);
+ /** check the current connectivity of the lb_policy */
+ grpc_connectivity_state (*check_connectivity)(
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_error **connectivity_error);
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the policy. Calling with a NULL \a
@@ -159,6 +160,7 @@ void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,
grpc_closure *closure);
grpc_connectivity_state grpc_lb_policy_check_connectivity(
- grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_error **connectivity_error);
#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_H */
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index 85183b0564..468067ea57 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -54,7 +54,7 @@
#define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
#define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20
-#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 2
+#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
@@ -147,7 +147,7 @@ struct grpc_subchannel_call {
(((grpc_subchannel_call *)(callstack)) - 1)
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
- bool iomgr_success);
+ grpc_error *error);
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
#define REF_REASON reason
@@ -177,7 +177,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
*/
static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg,
- bool success) {
+ grpc_error *error) {
grpc_connected_subchannel *c = arg;
grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c));
gpr_free(c);
@@ -200,7 +200,7 @@ void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
*/
static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
- bool success) {
+ grpc_error *error) {
grpc_subchannel *c = arg;
gpr_free((void *)c->filters);
grpc_channel_args_destroy(c->args);
@@ -290,8 +290,8 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
gpr_atm old_refs;
old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF"));
if (old_refs == 1) {
- grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c),
- true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(subchannel_destroy, c),
+ GRPC_ERROR_NONE, NULL);
}
}
@@ -382,7 +382,8 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
args.initial_connect_string = c->initial_connect_string;
grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
- GRPC_CHANNEL_CONNECTING, "state_change");
+ GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
+ "state_change");
grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result,
&c->connected);
}
@@ -393,16 +394,17 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
continue_connect(exec_ctx, c);
}
-grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
+grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c,
+ grpc_error **error) {
grpc_connectivity_state state;
gpr_mu_lock(&c->mu);
- state = grpc_connectivity_state_check(&c->state_tracker);
+ state = grpc_connectivity_state_check(&c->state_tracker, error);
gpr_mu_unlock(&c->mu);
return state;
}
static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
- bool success) {
+ grpc_error *error) {
external_state_watcher *w = arg;
grpc_closure *follow_up = w->notify;
if (w->pollset_set != NULL) {
@@ -415,7 +417,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&w->subchannel->mu);
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher");
gpr_free(w);
- follow_up->cb(exec_ctx, follow_up->cb_arg, success);
+ follow_up->cb(exec_ctx, follow_up->cb_arg, error);
}
void grpc_subchannel_notify_on_state_change(
@@ -469,7 +471,7 @@ void grpc_connected_subchannel_process_transport_op(
}
static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
- bool iomgr_success) {
+ grpc_error *error) {
state_watcher *sw = p;
grpc_subchannel *c = sw->subchannel;
gpr_mu *mu = &c->mu;
@@ -477,20 +479,19 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
gpr_mu_lock(mu);
/* if we failed just leave this closure */
- if (iomgr_success) {
- if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- /* any errors on a subchannel ==> we're done, create a new one */
- sw->connectivity_state = GRPC_CHANNEL_SHUTDOWN;
- }
- grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
- sw->connectivity_state, "reflect_child");
- if (sw->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
- grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), NULL,
- &sw->connectivity_state, &sw->closure);
- GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
- sw = NULL;
- }
+ if (sw->connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ /* any errors on a subchannel ==> we're done, create a new one */
+ sw->connectivity_state = GRPC_CHANNEL_SHUTDOWN;
+ }
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
+ sw->connectivity_state, GRPC_ERROR_REF(error),
+ "reflect_child");
+ if (sw->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), NULL,
+ &sw->connectivity_state, &sw->closure);
+ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
+ sw = NULL;
}
gpr_mu_unlock(mu);
@@ -592,17 +593,20 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
/* signal completion */
grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY,
- "connected");
+ GRPC_ERROR_NONE, "connected");
}
-static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) {
+static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_subchannel *c = arg;
gpr_mu_lock(&c->mu);
c->have_alarm = 0;
if (c->disconnected) {
- iomgr_success = 0;
+ error = GRPC_ERROR_CREATE_REFERENCING("Disconnected", &error, 1);
+ } else {
+ GRPC_ERROR_REF(error);
}
- if (iomgr_success) {
+ if (error == GRPC_ERROR_NONE) {
+ gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
c->next_attempt =
gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC));
continue_connect(exec_ctx, c);
@@ -611,11 +615,13 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) {
gpr_mu_unlock(&c->mu);
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
}
+ GRPC_ERROR_UNREF(error);
}
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
grpc_subchannel *c = arg;
+ grpc_channel_args *delete_channel_args = c->connecting_result.channel_args;
GRPC_SUBCHANNEL_WEAK_REF(c, "connected");
gpr_mu_lock(&c->mu);
@@ -627,13 +633,26 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
GPR_ASSERT(!c->have_alarm);
c->have_alarm = 1;
- grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connect_failed");
+ grpc_connectivity_state_set(
+ exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1),
+ "connect_failed");
+ gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now);
+ const char *errmsg = grpc_error_string(error);
+ gpr_log(GPR_INFO, "Connect failed: %s", errmsg);
+ if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <=
+ 0) {
+ gpr_log(GPR_INFO, "Retry immediately");
+ } else {
+ gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds",
+ time_til_next.tv_sec, time_til_next.tv_nsec);
+ }
grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
+ grpc_error_free_string(errmsg);
}
gpr_mu_unlock(&c->mu);
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
+ grpc_channel_args_destroy(delete_channel_args);
}
/*
@@ -641,7 +660,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
*/
static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
- bool success) {
+ grpc_error *error) {
grpc_subchannel_call *c = call;
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
grpc_connected_subchannel *connection = c->connection;
diff --git a/src/core/ext/client_config/subchannel.h b/src/core/ext/client_config/subchannel.h
index 525f854a44..b6d39f5dc5 100644
--- a/src/core/ext/client_config/subchannel.h
+++ b/src/core/ext/client_config/subchannel.h
@@ -119,7 +119,7 @@ void grpc_connected_subchannel_process_transport_op(
/** poll the current connectivity state of a channel */
grpc_connectivity_state grpc_subchannel_check_connectivity(
- grpc_subchannel *channel);
+ grpc_subchannel *channel, grpc_error **error);
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the channel */
diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c
index 3df1f254d6..e31800edd9 100644
--- a/src/core/ext/client_config/subchannel_call_holder.c
+++ b/src/core/ext/client_config/subchannel_call_holder.c
@@ -43,14 +43,14 @@
#define CANCELLED_CALL ((grpc_subchannel_call *)1)
static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder,
- bool success);
+ grpc_error *error);
static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args,
- bool success);
+ grpc_error *error);
static void add_waiting_locked(grpc_subchannel_call_holder *holder,
grpc_transport_stream_op *op);
static void fail_locked(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_call_holder *holder);
+ grpc_subchannel_call_holder *holder, grpc_error *error);
static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder);
@@ -91,7 +91,8 @@ void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *call = GET_CALL(holder);
GPR_TIMER_BEGIN("grpc_subchannel_call_holder_perform_op", 0);
if (call == CANCELLED_CALL) {
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
+ GRPC_ERROR_CANCELLED);
GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
return;
}
@@ -107,7 +108,8 @@ retry:
call = GET_CALL(holder);
if (call == CANCELLED_CALL) {
gpr_mu_unlock(&holder->mu);
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
+ GRPC_ERROR_CANCELLED);
GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
return;
}
@@ -124,7 +126,10 @@ retry:
} else {
switch (holder->creation_phase) {
case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
- fail_locked(exec_ctx, holder);
+ fail_locked(exec_ctx, holder,
+ grpc_error_set_int(GRPC_ERROR_CREATE("Cancelled"),
+ GRPC_ERROR_INT_GRPC_STATUS,
+ op->cancel_with_status));
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL,
@@ -132,7 +137,8 @@ retry:
break;
}
gpr_mu_unlock(&holder->mu);
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
+ GRPC_ERROR_CANCELLED);
GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
return;
}
@@ -168,7 +174,8 @@ retry:
GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
}
-static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
grpc_subchannel_call_holder *holder = arg;
gpr_mu_lock(&holder->mu);
GPR_ASSERT(holder->creation_phase ==
@@ -176,10 +183,14 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
if (holder->connected_subchannel == NULL) {
gpr_atm_no_barrier_store(&holder->subchannel_call, 1);
- fail_locked(exec_ctx, holder);
+ fail_locked(exec_ctx, holder,
+ GRPC_ERROR_CREATE_REFERENCING("Failed to create subchannel",
+ &error, 1));
} else if (1 == gpr_atm_acq_load(&holder->subchannel_call)) {
/* already cancelled before subchannel became ready */
- fail_locked(exec_ctx, holder);
+ fail_locked(exec_ctx, holder,
+ GRPC_ERROR_CREATE_REFERENCING(
+ "Cancelled before creating subchannel", &error, 1));
} else {
gpr_atm_rel_store(
&holder->subchannel_call,
@@ -205,18 +216,18 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
a->call = GET_CALL(holder);
if (a->call == CANCELLED_CALL) {
gpr_free(a);
- fail_locked(exec_ctx, holder);
+ fail_locked(exec_ctx, holder, GRPC_ERROR_CANCELLED);
return;
}
holder->waiting_ops = NULL;
holder->waiting_ops_count = 0;
holder->waiting_ops_capacity = 0;
GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops");
- grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(retry_ops, a), true,
- NULL);
+ grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(retry_ops, a),
+ GRPC_ERROR_NONE, NULL);
}
-static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, bool success) {
+static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
retry_ops_args *a = args;
size_t i;
for (i = 0; i < a->nops; i++) {
@@ -241,13 +252,15 @@ static void add_waiting_locked(grpc_subchannel_call_holder *holder,
}
static void fail_locked(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_call_holder *holder) {
+ grpc_subchannel_call_holder *holder,
+ grpc_error *error) {
size_t i;
for (i = 0; i < holder->waiting_ops_count; i++) {
- grpc_transport_stream_op_finish_with_failure(exec_ctx,
- &holder->waiting_ops[i]);
+ grpc_transport_stream_op_finish_with_failure(
+ exec_ctx, &holder->waiting_ops[i], GRPC_ERROR_REF(error));
}
holder->waiting_ops_count = 0;
+ GRPC_ERROR_UNREF(error);
}
char *grpc_subchannel_call_holder_get_peer(
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 26cead51b0..ede6b3e071 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -60,18 +60,19 @@ typedef struct wrapped_rr_closure_arg {
* reference to its associated round robin instance. We wrap this closure in
* order to unref the round robin instance upon its invocation */
static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
- bool success) {
+ grpc_error *error) {
wrapped_rr_closure_arg *wc = arg;
if (wc->rr_policy != NULL) {
if (grpc_lb_glb_trace) {
- gpr_log(GPR_INFO, "Unreffing RR %p", wc->rr_policy);
+ gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
+ (intptr_t)wc->rr_policy);
}
GRPC_LB_POLICY_UNREF(exec_ctx, wc->rr_policy, "wrapped_rr_closure");
}
if (wc->wrapped_closure != NULL) {
- grpc_exec_ctx_enqueue(exec_ctx, wc->wrapped_closure, success, NULL);
+ grpc_exec_ctx_sched(exec_ctx, wc->wrapped_closure, error, NULL);
}
gpr_free(wc);
}
@@ -162,28 +163,34 @@ struct glb_lb_policy {
rr_connectivity_data *rr_connectivity;
};
-static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *p);
+static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *p,
+ grpc_error *error);
static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
rr_connectivity_data *rrcd = arg;
- if (!iomgr_success) {
- gpr_free(rrcd);
- return;
- }
glb_lb_policy *p = rrcd->p;
- const grpc_connectivity_state new_state = p->rr_connectivity->state;
- if (new_state == GRPC_CHANNEL_SHUTDOWN && p->serverlist != NULL) {
- /* a RR policy is shutting down but there's a serverlist available ->
- * perform a handover */
- rr_handover(exec_ctx, p);
+ if (rrcd->state == GRPC_CHANNEL_SHUTDOWN) {
+ if (p->serverlist != NULL) {
+ /* a RR policy is shutting down but there's a serverlist available ->
+ * perform a handover */
+ rr_handover(exec_ctx, p, error);
+ } else {
+ /* shutting down and no new serverlist available. bail out. */
+ gpr_free(rrcd);
+ }
} else {
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker, new_state,
- "rr_connectivity_changed");
- /* resubscribe */
- grpc_lb_policy_notify_on_state_change(exec_ctx, p->rr_policy,
- &p->rr_connectivity->state,
- &p->rr_connectivity->on_change);
+ if (error == GRPC_ERROR_NONE) {
+ /* not shutting down. mimic the RR's policy state */
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, rrcd->state,
+ error, "rr_connectivity_changed");
+ /* resubscribe */
+ grpc_lb_policy_notify_on_state_change(exec_ctx, p->rr_policy,
+ &rrcd->state, &rrcd->on_change);
+ } else { /* error */
+ gpr_free(rrcd);
+ }
}
+ GRPC_ERROR_UNREF(error);
}
static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent,
@@ -220,10 +227,10 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
static void lb_client_data_destroy(lb_client_data *lbcd);
-static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
lb_client_data *lbcd = arg;
GPR_ASSERT(lbcd->c);
- grpc_call_error error;
+ grpc_call_error call_error;
grpc_op ops[1];
memset(ops, 0, sizeof(ops));
grpc_op *op = ops;
@@ -232,15 +239,15 @@ static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
op->flags = 0;
op->reserved = NULL;
op++;
- error = grpc_call_start_batch_and_execute(exec_ctx, lbcd->c, ops,
- (size_t)(op - ops), &lbcd->md_rcvd);
- GPR_ASSERT(GRPC_CALL_OK == error);
+ call_error = grpc_call_start_batch_and_execute(
+ exec_ctx, lbcd->c, ops, (size_t)(op - ops), &lbcd->md_rcvd);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
}
-static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
lb_client_data *lbcd = arg;
GPR_ASSERT(lbcd->c);
- grpc_call_error error;
+ grpc_call_error call_error;
grpc_op ops[1];
memset(ops, 0, sizeof(ops));
grpc_op *op = ops;
@@ -250,14 +257,14 @@ static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
op->flags = 0;
op->reserved = NULL;
op++;
- error = grpc_call_start_batch_and_execute(
+ call_error = grpc_call_start_batch_and_execute(
exec_ctx, lbcd->c, ops, (size_t)(op - ops), &lbcd->req_sent);
- GPR_ASSERT(GRPC_CALL_OK == error);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
}
-static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
lb_client_data *lbcd = arg;
- grpc_call_error error;
+ grpc_call_error call_error;
grpc_op ops[1];
memset(ops, 0, sizeof(ops));
@@ -268,12 +275,12 @@ static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
op->flags = 0;
op->reserved = NULL;
op++;
- error = grpc_call_start_batch_and_execute(
+ call_error = grpc_call_start_batch_and_execute(
exec_ctx, lbcd->c, ops, (size_t)(op - ops), &lbcd->res_rcvd);
- GPR_ASSERT(GRPC_CALL_OK == error);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
}
-static void res_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void res_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
/* look inside lbcd->response_payload, ideally to send it back as the
* serverlist. */
lb_client_data *lbcd = arg;
@@ -308,7 +315,7 @@ static void res_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
if (lbcd->p->rr_policy == NULL) {
/* initial "handover", in this case from a null RR policy, meaning it'll
* just create the first one */
- rr_handover(exec_ctx, lbcd->p);
+ rr_handover(exec_ctx, lbcd->p, error);
} else {
/* unref the RR policy, eventually leading to its substitution with a
* new one constructed from the received serverlist (see
@@ -323,10 +330,10 @@ static void res_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
op->flags = 0;
op->reserved = NULL;
op++;
- const grpc_call_error error = grpc_call_start_batch_and_execute(
+ const grpc_call_error call_error = grpc_call_start_batch_and_execute(
exec_ctx, lbcd->c, ops, (size_t)(op - ops),
&lbcd->res_rcvd); /* loop */
- GPR_ASSERT(GRPC_CALL_OK == error);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
return;
} else {
gpr_log(GPR_ERROR, "Invalid LB response received: '%s'",
@@ -338,22 +345,23 @@ static void res_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
op->flags = 0;
op->reserved = NULL;
op++;
- grpc_call_error error = grpc_call_start_batch_and_execute(
+ grpc_call_error call_error = grpc_call_start_batch_and_execute(
exec_ctx, lbcd->c, ops, (size_t)(op - ops), &lbcd->close_sent);
- GPR_ASSERT(GRPC_CALL_OK == error);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
}
}
/* empty payload: call cancelled by server. Cleanups happening in
* srv_status_rcvd_cb */
}
-static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO,
"Close from LB client sent. Waiting from server status now");
}
}
static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
- bool success) {
+ grpc_error *error) {
lb_client_data *lbcd = arg;
glb_lb_policy *p = lbcd->p;
if (grpc_lb_glb_trace) {
@@ -450,14 +458,15 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->wrapped_on_complete, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pp->wrapped_on_complete, GRPC_ERROR_NONE,
+ NULL);
gpr_free(pp);
pp = next;
}
while (pping != NULL) {
pending_ping *next = pping->next;
- grpc_exec_ctx_enqueue(exec_ctx, pping->wrapped_notify, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pping->wrapped_notify, GRPC_ERROR_NONE, NULL);
pping = next;
}
@@ -468,8 +477,9 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
GRPC_LB_POLICY_UNREF(exec_ctx, p->rr_policy, "glb_shutdown");
}
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_SHUTDOWN, "glb_shutdown");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_CREATE("Channel Shutdown"), "glb_shutdown");
}
static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
@@ -484,7 +494,8 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
*target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->wrapped_on_complete, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pp->wrapped_on_complete,
+ GRPC_ERROR_CANCELLED, NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -512,7 +523,8 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
initial_metadata_flags_eq) {
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
- grpc_exec_ctx_enqueue(exec_ctx, pp->wrapped_on_complete, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pp->wrapped_on_complete,
+ GRPC_ERROR_CANCELLED, NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -527,7 +539,7 @@ static void query_for_backends(grpc_exec_ctx *exec_ctx, glb_lb_policy *p) {
GPR_ASSERT(p->lb_server_channel != NULL);
p->lbcd = lb_client_data_create(p);
- grpc_call_error error;
+ grpc_call_error call_error;
grpc_op ops[1];
memset(ops, 0, sizeof(ops));
grpc_op *op = ops;
@@ -536,9 +548,9 @@ static void query_for_backends(grpc_exec_ctx *exec_ctx, glb_lb_policy *p) {
op->flags = 0;
op->reserved = NULL;
op++;
- error = grpc_call_start_batch_and_execute(
+ call_error = grpc_call_start_batch_and_execute(
exec_ctx, p->lbcd->c, ops, (size_t)(op - ops), &p->lbcd->md_sent);
- GPR_ASSERT(GRPC_CALL_OK == error);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
op = ops;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
@@ -551,9 +563,9 @@ static void query_for_backends(grpc_exec_ctx *exec_ctx, glb_lb_policy *p) {
op->flags = 0;
op->reserved = NULL;
op++;
- error = grpc_call_start_batch_and_execute(
+ call_error = grpc_call_start_batch_and_execute(
exec_ctx, p->lbcd->c, ops, (size_t)(op - ops), &p->lbcd->srv_status_rcvd);
- GPR_ASSERT(GRPC_CALL_OK == error);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
}
static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
@@ -612,20 +624,23 @@ static void start_picking(grpc_exec_ctx *exec_ctx, glb_lb_policy *p) {
query_for_backends(exec_ctx, p);
}
-static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *p) {
+static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *p,
+ grpc_error *error) {
+ GRPC_ERROR_REF(error);
p->rr_policy = create_rr(exec_ctx, p->serverlist, p);
+
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Created RR policy (0x%" PRIxPTR ")",
(intptr_t)p->rr_policy);
}
GPR_ASSERT(p->rr_policy != NULL);
p->rr_connectivity->state =
- grpc_lb_policy_check_connectivity(exec_ctx, p->rr_policy);
+ grpc_lb_policy_check_connectivity(exec_ctx, p->rr_policy, &error);
grpc_lb_policy_notify_on_state_change(exec_ctx, p->rr_policy,
&p->rr_connectivity->state,
&p->rr_connectivity->on_change);
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- p->rr_connectivity->state, "rr_handover");
+ p->rr_connectivity->state, error, "rr_handover");
grpc_lb_policy_exit_idle(exec_ctx, p->rr_policy);
/* flush pending ops */
@@ -656,6 +671,7 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *p) {
grpc_lb_policy_ping_one(exec_ctx, p->rr_policy, pping->wrapped_notify);
gpr_free(pping);
}
+ GRPC_ERROR_UNREF(error);
}
static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
@@ -695,7 +711,7 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
/* the call to grpc_lb_policy_pick has been sychronous. Invoke a neutered
* wrapped closure */
warg->wrapped_closure = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, wrapped_on_complete, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, wrapped_on_complete, GRPC_ERROR_NONE, NULL);
}
} else {
grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
@@ -712,12 +728,13 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
return r;
}
-static grpc_connectivity_state glb_check_connectivity(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol) {
+static grpc_connectivity_state glb_check_connectivity(
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ grpc_error **connectivity_error) {
glb_lb_policy *p = (glb_lb_policy *)pol;
grpc_connectivity_state st;
gpr_mu_lock(&p->mu);
- st = grpc_connectivity_state_check(&p->state_tracker);
+ st = grpc_connectivity_state_check(&p->state_tracker, connectivity_error);
gpr_mu_unlock(&p->mu);
return st;
}
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index cc559eb2da..9decf70692 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -103,8 +103,9 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
p->shutdown = 1;
pp = p->pending_picks;
p->pending_picks = NULL;
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_SHUTDOWN, "shutdown");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_CREATE("Channel shutdown"), "shutdown");
/* cancel subscription */
if (selected != NULL) {
grpc_connected_subchannel_notify_on_state_change(
@@ -120,7 +121,7 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
*pp->target = NULL;
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp);
pp = next;
}
@@ -139,7 +140,8 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
*target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE("Pick Cancelled"), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -164,7 +166,8 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
initial_metadata_flags_eq) {
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE("Pick Cancelled"), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -237,7 +240,7 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
pick_first_lb_policy *p = arg;
size_t i;
size_t num_subchannels = p->num_subchannels;
@@ -258,12 +261,14 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
}
static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
pick_first_lb_policy *p = arg;
grpc_subchannel *selected_subchannel;
pending_pick *pp;
grpc_connected_subchannel *selected;
+ GRPC_ERROR_REF(error);
+
gpr_mu_lock(&p->mu);
selected = GET_SELECTED(p);
@@ -271,6 +276,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
if (p->shutdown) {
gpr_mu_unlock(&p->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
+ GRPC_ERROR_UNREF(error);
return;
} else if (selected != NULL) {
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
@@ -278,7 +284,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
p->checking_connectivity = GRPC_CHANNEL_SHUTDOWN;
}
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- p->checking_connectivity, "selected_changed");
+ p->checking_connectivity, GRPC_ERROR_REF(error),
+ "selected_changed");
if (p->checking_connectivity != GRPC_CHANNEL_SHUTDOWN) {
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, selected, p->base.interested_parties,
@@ -291,7 +298,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
switch (p->checking_connectivity) {
case GRPC_CHANNEL_READY:
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_READY, "connecting_ready");
+ GRPC_CHANNEL_READY, GRPC_ERROR_NONE,
+ "connecting_ready");
selected_subchannel = p->subchannels[p->checking_subchannel];
selected =
grpc_subchannel_get_connected_subchannel(selected_subchannel);
@@ -300,15 +308,16 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
gpr_atm_rel_store(&p->selected, (gpr_atm)selected);
- grpc_exec_ctx_enqueue(
- exec_ctx, grpc_closure_create(destroy_subchannels, p), true, NULL);
+ grpc_exec_ctx_sched(exec_ctx,
+ grpc_closure_create(destroy_subchannels, p),
+ GRPC_ERROR_NONE, NULL);
/* update any calls that were waiting for a pick */
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = selected;
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp);
}
grpc_connected_subchannel_notify_on_state_change(
@@ -320,12 +329,13 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
(p->checking_subchannel + 1) % p->num_subchannels;
if (p->checking_subchannel == 0) {
/* only trigger transient failure when we've tried all alternatives */
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connecting_transient_failure");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error), "connecting_transient_failure");
}
+ GRPC_ERROR_UNREF(error);
p->checking_connectivity = grpc_subchannel_check_connectivity(
- p->subchannels[p->checking_subchannel]);
+ p->subchannels[p->checking_subchannel], &error);
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
@@ -337,9 +347,9 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
break;
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_CONNECTING,
- "connecting_changed");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING,
+ GRPC_ERROR_REF(error), "connecting_changed");
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
p->base.interested_parties, &p->checking_connectivity,
@@ -352,38 +362,45 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels],
"pick_first");
if (p->num_subchannels == 0) {
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_SHUTDOWN,
- "no_more_channels");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_CREATE_REFERENCING("Pick first exhausted channels",
+ &error, 1),
+ "no_more_channels");
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE,
+ NULL);
gpr_free(pp);
}
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
"pick_first_connectivity");
} else {
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "subchannel_failed");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error), "subchannel_failed");
p->checking_subchannel %= p->num_subchannels;
+ GRPC_ERROR_UNREF(error);
p->checking_connectivity = grpc_subchannel_check_connectivity(
- p->subchannels[p->checking_subchannel]);
+ p->subchannels[p->checking_subchannel], &error);
goto loop;
}
}
}
gpr_mu_unlock(&p->mu);
+
+ GRPC_ERROR_UNREF(error);
}
static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol) {
+ grpc_lb_policy *pol,
+ grpc_error **error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
grpc_connectivity_state st;
gpr_mu_lock(&p->mu);
- st = grpc_connectivity_state_check(&p->state_tracker);
+ st = grpc_connectivity_state_check(&p->state_tracker, error);
gpr_mu_unlock(&p->mu);
return st;
}
@@ -406,7 +423,8 @@ static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (selected) {
grpc_connected_subchannel_ping(exec_ctx, selected, closure);
} else {
- grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected"),
+ NULL);
}
}
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 40dd7c5940..7bcf608ab9 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -265,11 +265,13 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE("Channel Shutdown"), NULL);
gpr_free(pp);
}
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_SHUTDOWN, "shutdown");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_CREATE("Channel Shutdown"), "shutdown");
for (i = 0; i < p->num_subchannels; i++) {
subchannel_data *sd = p->subchannels[i];
grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL,
@@ -291,7 +293,8 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
*target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED,
+ NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -317,7 +320,8 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED,
+ NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -396,7 +400,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
subchannel_data *sd = arg;
round_robin_lb_policy *p = sd->policy;
pending_pick *pp;
@@ -404,6 +408,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
int unref = 0;
+ GRPC_ERROR_REF(error);
gpr_mu_lock(&p->mu);
if (p->shutdown) {
@@ -412,7 +417,8 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
switch (sd->connectivity_state) {
case GRPC_CHANNEL_READY:
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_READY, "connecting_ready");
+ GRPC_CHANNEL_READY, GRPC_ERROR_REF(error),
+ "connecting_ready");
/* add the newly connected subchannel to the list of connected ones.
* Note that it goes to the "end of the line". */
sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel);
@@ -436,7 +442,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp);
}
grpc_subchannel_notify_on_state_change(
@@ -445,9 +451,9 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
break;
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- sd->connectivity_state,
- "connecting_changed");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, sd->connectivity_state,
+ GRPC_ERROR_REF(error), "connecting_changed");
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->connectivity_state, &sd->connectivity_changed_closure);
@@ -463,9 +469,9 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
remove_disconnected_sc_locked(p, sd->ready_list_node);
sd->ready_list_node = NULL;
}
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connecting_transient_failure");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error), "connecting_transient_failure");
break;
case GRPC_CHANNEL_SHUTDOWN:
if (sd->ready_list_node != NULL) {
@@ -482,19 +488,22 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
unref = 1;
if (p->num_subchannels == 0) {
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_SHUTDOWN,
- "no_more_channels");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_CREATE_REFERENCING("Round Robin Channels Exhausted",
+ &error, 1),
+ "no_more_channels");
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE,
+ NULL);
gpr_free(pp);
}
} else {
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "subchannel_failed");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error), "subchannel_failed");
}
} /* switch */
} /* !unref */
@@ -504,14 +513,17 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
if (unref) {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "round_robin_connectivity");
}
+
+ GRPC_ERROR_UNREF(error);
}
static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol) {
+ grpc_lb_policy *pol,
+ grpc_error **error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
grpc_connectivity_state st;
gpr_mu_lock(&p->mu);
- st = grpc_connectivity_state_check(&p->state_tracker);
+ st = grpc_connectivity_state_check(&p->state_tracker, error);
gpr_mu_unlock(&p->mu);
return st;
}
@@ -539,7 +551,8 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel_ping(exec_ctx, target, closure);
} else {
gpr_mu_unlock(&p->mu);
- grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, closure,
+ GRPC_ERROR_CREATE("Round Robin not connected"), NULL);
}
}
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index 5efc95e0fa..31ac968670 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -82,6 +82,9 @@ typedef struct {
grpc_timer retry_timer;
/** retry backoff state */
gpr_backoff backoff_state;
+
+ /** currently resolving addresses */
+ grpc_resolved_addresses *addresses;
} dns_resolver;
static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
@@ -108,7 +111,8 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
}
if (r->next_completion != NULL) {
*r->target_config = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, r->next_completion,
+ GRPC_ERROR_CREATE("Resolver Shutdown"), NULL);
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
@@ -143,12 +147,12 @@ static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
}
static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,
- bool success) {
+ grpc_error *error) {
dns_resolver *r = arg;
gpr_mu_lock(&r->mu);
r->have_retry_timer = false;
- if (success) {
+ if (error == GRPC_ERROR_NONE) {
if (!r->resolving) {
dns_start_resolving_locked(exec_ctx, r);
}
@@ -159,13 +163,14 @@ static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,
}
static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_resolved_addresses *addresses) {
+ grpc_error *error) {
dns_resolver *r = arg;
grpc_client_config *config = NULL;
grpc_lb_policy *lb_policy;
gpr_mu_lock(&r->mu);
GPR_ASSERT(r->resolving);
r->resolving = 0;
+ grpc_resolved_addresses *addresses = r->addresses;
if (addresses != NULL) {
grpc_lb_policy_args lb_policy_args;
config = grpc_client_config_create();
@@ -183,12 +188,18 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now);
gpr_timespec timeout = gpr_time_sub(next_try, now);
- gpr_log(GPR_DEBUG,
- "dns resolution failed: retrying in %" PRId64 ".%09d seconds",
- timeout.tv_sec, timeout.tv_nsec);
+ const char *msg = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg);
+ grpc_error_free_string(msg);
GPR_ASSERT(!r->have_retry_timer);
r->have_retry_timer = true;
GRPC_RESOLVER_REF(&r->base, "retry-timer");
+ if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) <= 0) {
+ gpr_log(GPR_DEBUG, "retrying in %" PRId64 ".%09d seconds", timeout.tv_sec,
+ timeout.tv_nsec);
+ } else {
+ gpr_log(GPR_DEBUG, "retrying immediately");
+ }
grpc_timer_init(exec_ctx, &r->retry_timer, next_try, dns_on_retry_timer, r,
now);
}
@@ -208,7 +219,9 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx,
GRPC_RESOLVER_REF(&r->base, "dns-resolving");
GPR_ASSERT(!r->resolving);
r->resolving = 1;
- grpc_resolve_address(exec_ctx, r->name, r->default_port, dns_on_resolved, r);
+ r->addresses = NULL;
+ grpc_resolve_address(exec_ctx, r->name, r->default_port,
+ grpc_closure_create(dns_on_resolved, r), &r->addresses);
}
static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
@@ -219,7 +232,7 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
if (r->resolved_config) {
grpc_client_config_ref(r->resolved_config);
}
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
r->next_completion = NULL;
r->published_version = r->resolved_version;
}
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index a4fa9acf22..1f7cce2f43 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -92,7 +92,7 @@ static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
@@ -133,7 +133,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr");
r->published = 1;
*r->target_config = cfg;
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
r->next_completion = NULL;
}
}
diff --git a/src/core/ext/resolver/zookeeper/zookeeper_resolver.c b/src/core/ext/resolver/zookeeper/zookeeper_resolver.c
deleted file mode 100644
index deb4b9b1ef..0000000000
--- a/src/core/ext/resolver/zookeeper/zookeeper_resolver.c
+++ /dev/null
@@ -1,513 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <string.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/string_util.h>
-
-#include <grpc/grpc_zookeeper.h>
-#include <zookeeper/zookeeper.h>
-
-#include "src/core/ext/client_config/lb_policy_registry.h"
-#include "src/core/ext/client_config/resolver_registry.h"
-#include "src/core/lib/iomgr/resolve_address.h"
-#include "src/core/lib/json/json.h"
-#include "src/core/lib/support/string.h"
-#include "src/core/lib/surface/api_trace.h"
-
-/** Zookeeper session expiration time in milliseconds */
-#define GRPC_ZOOKEEPER_SESSION_TIMEOUT 15000
-
-typedef struct {
- /** base class: must be first */
- grpc_resolver base;
- /** refcount */
- gpr_refcount refs;
- /** name to resolve */
- char *name;
- /** subchannel factory */
- grpc_client_channel_factory *client_channel_factory;
- /** load balancing policy name */
- char *lb_policy_name;
-
- /** mutex guarding the rest of the state */
- gpr_mu mu;
- /** are we currently resolving? */
- int resolving;
- /** which version of resolved_config have we published? */
- int published_version;
- /** which version of resolved_config is current? */
- int resolved_version;
- /** pending next completion, or NULL */
- grpc_closure *next_completion;
- /** target config address for next completion */
- grpc_client_config **target_config;
- /** current (fully resolved) config */
- grpc_client_config *resolved_config;
-
- /** zookeeper handle */
- zhandle_t *zookeeper_handle;
- /** zookeeper resolved addresses */
- grpc_resolved_addresses *resolved_addrs;
- /** total number of addresses to be resolved */
- int resolved_total;
- /** number of addresses resolved */
- int resolved_num;
-} zookeeper_resolver;
-
-static void zookeeper_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
-
-static void zookeeper_start_resolving_locked(zookeeper_resolver *r);
-static void zookeeper_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
- zookeeper_resolver *r);
-
-static void zookeeper_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
-static void zookeeper_channel_saw_error(grpc_exec_ctx *exec_ctx,
- grpc_resolver *r);
-static void zookeeper_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r,
- grpc_client_config **target_config,
- grpc_closure *on_complete);
-
-static const grpc_resolver_vtable zookeeper_resolver_vtable = {
- zookeeper_destroy, zookeeper_shutdown, zookeeper_channel_saw_error,
- zookeeper_next};
-
-static void zookeeper_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_resolver *resolver) {
- zookeeper_resolver *r = (zookeeper_resolver *)resolver;
- grpc_closure *call = NULL;
- gpr_mu_lock(&r->mu);
- if (r->next_completion != NULL) {
- *r->target_config = NULL;
- call = r->next_completion;
- r->next_completion = NULL;
- }
- zookeeper_close(r->zookeeper_handle);
- gpr_mu_unlock(&r->mu);
- if (call != NULL) {
- call->cb(exec_ctx, call->cb_arg, 1);
- }
-}
-
-static void zookeeper_channel_saw_error(grpc_exec_ctx *exec_ctx,
- grpc_resolver *resolver) {
- zookeeper_resolver *r = (zookeeper_resolver *)resolver;
- gpr_mu_lock(&r->mu);
- if (r->resolving == 0) {
- zookeeper_start_resolving_locked(r);
- }
- gpr_mu_unlock(&r->mu);
-}
-
-static void zookeeper_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
- grpc_client_config **target_config,
- grpc_closure *on_complete) {
- zookeeper_resolver *r = (zookeeper_resolver *)resolver;
- gpr_mu_lock(&r->mu);
- GPR_ASSERT(r->next_completion == NULL);
- r->next_completion = on_complete;
- r->target_config = target_config;
- if (r->resolved_version == 0 && r->resolving == 0) {
- zookeeper_start_resolving_locked(r);
- } else {
- zookeeper_maybe_finish_next_locked(exec_ctx, r);
- }
- gpr_mu_unlock(&r->mu);
-}
-
-/** Zookeeper global watcher for connection management
- TODO: better connection management besides logs */
-static void zookeeper_global_watcher(zhandle_t *zookeeper_handle, int type,
- int state, const char *path,
- void *watcher_ctx) {
- if (type == ZOO_SESSION_EVENT) {
- if (state == ZOO_EXPIRED_SESSION_STATE) {
- gpr_log(GPR_ERROR, "Zookeeper session expired");
- } else if (state == ZOO_AUTH_FAILED_STATE) {
- gpr_log(GPR_ERROR, "Zookeeper authentication failed");
- }
- }
-}
-
-/** Zookeeper watcher triggered by changes to watched nodes
- Once triggered, it tries to resolve again to get updated addresses */
-static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state,
- const char *path, void *watcher_ctx) {
- if (watcher_ctx != NULL) {
- zookeeper_resolver *r = (zookeeper_resolver *)watcher_ctx;
- if (state == ZOO_CONNECTED_STATE) {
- gpr_mu_lock(&r->mu);
- if (r->resolving == 0) {
- zookeeper_start_resolving_locked(r);
- }
- gpr_mu_unlock(&r->mu);
- }
- }
-}
-
-/** Callback function after getting all resolved addresses
- Creates a subchannel for each address */
-static void zookeeper_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_resolved_addresses *addresses) {
- zookeeper_resolver *r = arg;
- grpc_client_config *config = NULL;
- grpc_lb_policy *lb_policy;
-
- if (addresses != NULL) {
- grpc_lb_policy_args lb_policy_args;
- config = grpc_client_config_create();
- lb_policy_args.addresses = addresses;
- lb_policy_args.client_channel_factory = r->client_channel_factory;
- lb_policy =
- grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
-
- if (lb_policy != NULL) {
- grpc_client_config_set_lb_policy(config, lb_policy);
- GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction");
- }
- grpc_resolved_addresses_destroy(addresses);
- }
- gpr_mu_lock(&r->mu);
- GPR_ASSERT(r->resolving == 1);
- r->resolving = 0;
- if (r->resolved_config != NULL) {
- grpc_client_config_unref(exec_ctx, r->resolved_config);
- }
- r->resolved_config = config;
- r->resolved_version++;
- zookeeper_maybe_finish_next_locked(exec_ctx, r);
- gpr_mu_unlock(&r->mu);
-
- GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "zookeeper-resolving");
-}
-
-/** Callback function for each DNS resolved address */
-static void zookeeper_dns_resolved(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_resolved_addresses *addresses) {
- size_t i;
- zookeeper_resolver *r = arg;
- int resolve_done = 0;
-
- gpr_mu_lock(&r->mu);
- r->resolved_num++;
- r->resolved_addrs->addrs =
- gpr_realloc(r->resolved_addrs->addrs,
- sizeof(grpc_resolved_address) *
- (r->resolved_addrs->naddrs + addresses->naddrs));
- for (i = 0; i < addresses->naddrs; i++) {
- memcpy(r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].addr,
- addresses->addrs[i].addr, addresses->addrs[i].len);
- r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].len =
- addresses->addrs[i].len;
- }
-
- r->resolved_addrs->naddrs += addresses->naddrs;
- grpc_resolved_addresses_destroy(addresses);
-
- /** Wait for all addresses to be resolved */
- resolve_done = (r->resolved_num == r->resolved_total);
- gpr_mu_unlock(&r->mu);
- if (resolve_done) {
- zookeeper_on_resolved(exec_ctx, r, r->resolved_addrs);
- }
-}
-
-/** Parses JSON format address of a zookeeper node */
-static char *zookeeper_parse_address(const char *value, size_t value_len) {
- grpc_json *json;
- grpc_json *cur;
- const char *host;
- const char *port;
- char *buffer;
- char *address = NULL;
-
- buffer = gpr_malloc(value_len);
- memcpy(buffer, value, value_len);
- json = grpc_json_parse_string_with_len(buffer, value_len);
- if (json != NULL) {
- host = NULL;
- port = NULL;
- for (cur = json->child; cur != NULL; cur = cur->next) {
- if (!strcmp(cur->key, "host")) {
- host = cur->value;
- if (port != NULL) {
- break;
- }
- } else if (!strcmp(cur->key, "port")) {
- port = cur->value;
- if (host != NULL) {
- break;
- }
- }
- }
- if (host != NULL && port != NULL) {
- gpr_asprintf(&address, "%s:%s", host, port);
- }
- grpc_json_destroy(json);
- }
- gpr_free(buffer);
-
- return address;
-}
-
-static void zookeeper_get_children_node_completion(int rc, const char *value,
- int value_len,
- const struct Stat *stat,
- const void *arg) {
- char *address = NULL;
- zookeeper_resolver *r = (zookeeper_resolver *)arg;
- int resolve_done = 0;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
-
- if (rc != 0) {
- gpr_log(GPR_ERROR, "Error in getting a child node of %s", r->name);
- grpc_exec_ctx_finish(&exec_ctx);
- return;
- }
-
- address = zookeeper_parse_address(value, (size_t)value_len);
- if (address != NULL) {
- /** Further resolves address by DNS */
- grpc_resolve_address(&exec_ctx, address, NULL, zookeeper_dns_resolved, r);
- gpr_free(address);
- } else {
- gpr_log(GPR_ERROR, "Error in resolving a child node of %s", r->name);
- gpr_mu_lock(&r->mu);
- r->resolved_total--;
- resolve_done = (r->resolved_num == r->resolved_total);
- gpr_mu_unlock(&r->mu);
- if (resolve_done) {
- zookeeper_on_resolved(&exec_ctx, r, r->resolved_addrs);
- }
- }
-
- grpc_exec_ctx_finish(&exec_ctx);
-}
-
-static void zookeeper_get_children_completion(
- int rc, const struct String_vector *children, const void *arg) {
- char *path;
- int status;
- int i;
- zookeeper_resolver *r = (zookeeper_resolver *)arg;
-
- if (rc != 0) {
- gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name);
- return;
- }
-
- if (children->count == 0) {
- gpr_log(GPR_ERROR, "Error in resolving zookeeper address %s", r->name);
- return;
- }
-
- r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses));
- r->resolved_addrs->addrs = NULL;
- r->resolved_addrs->naddrs = 0;
- r->resolved_total = children->count;
-
- /** TODO: Replace expensive heap allocation with stack
- if we can get maximum length of zookeeper path */
- for (i = 0; i < children->count; i++) {
- gpr_asprintf(&path, "%s/%s", r->name, children->data[i]);
- status = zoo_awget(r->zookeeper_handle, path, zookeeper_watcher, r,
- zookeeper_get_children_node_completion, r);
- gpr_free(path);
- if (status != 0) {
- gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", path);
- }
- }
-}
-
-static void zookeeper_get_node_completion(int rc, const char *value,
- int value_len,
- const struct Stat *stat,
- const void *arg) {
- int status;
- char *address = NULL;
- zookeeper_resolver *r = (zookeeper_resolver *)arg;
- r->resolved_addrs = NULL;
- r->resolved_total = 0;
- r->resolved_num = 0;
-
- if (rc != 0) {
- gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name);
- return;
- }
-
- /** If zookeeper node of path r->name does not have address
- (i.e. service node), get its children */
- address = zookeeper_parse_address(value, (size_t)value_len);
- if (address != NULL) {
- r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses));
- r->resolved_addrs->addrs = NULL;
- r->resolved_addrs->naddrs = 0;
- r->resolved_total = 1;
- /** Further resolves address by DNS */
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_resolve_address(&exec_ctx, address, NULL, zookeeper_dns_resolved, r);
- gpr_free(address);
- grpc_exec_ctx_finish(&exec_ctx);
- return;
- }
-
- status = zoo_awget_children(r->zookeeper_handle, r->name, zookeeper_watcher,
- r, zookeeper_get_children_completion, r);
- if (status != 0) {
- gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name);
- }
-}
-
-static void zookeeper_resolve_address(zookeeper_resolver *r) {
- int status;
- status = zoo_awget(r->zookeeper_handle, r->name, zookeeper_watcher, r,
- zookeeper_get_node_completion, r);
- if (status != 0) {
- gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name);
- }
-}
-
-static void zookeeper_start_resolving_locked(zookeeper_resolver *r) {
- GRPC_RESOLVER_REF(&r->base, "zookeeper-resolving");
- GPR_ASSERT(r->resolving == 0);
- r->resolving = 1;
- zookeeper_resolve_address(r);
-}
-
-static void zookeeper_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
- zookeeper_resolver *r) {
- if (r->next_completion != NULL &&
- r->resolved_version != r->published_version) {
- *r->target_config = r->resolved_config;
- if (r->resolved_config != NULL) {
- grpc_client_config_ref(r->resolved_config);
- }
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
- r->next_completion = NULL;
- r->published_version = r->resolved_version;
- }
-}
-
-static void zookeeper_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
- zookeeper_resolver *r = (zookeeper_resolver *)gr;
- gpr_mu_destroy(&r->mu);
- if (r->resolved_config != NULL) {
- grpc_client_config_unref(exec_ctx, r->resolved_config);
- }
- grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory);
- gpr_free(r->name);
- gpr_free(r->lb_policy_name);
- gpr_free(r);
-}
-
-static grpc_resolver *zookeeper_create(grpc_resolver_args *args,
- const char *lb_policy_name) {
- zookeeper_resolver *r;
- size_t length;
- char *path = args->uri->path;
-
- if (0 == strcmp(args->uri->authority, "")) {
- gpr_log(GPR_ERROR, "No authority specified in zookeeper uri");
- return NULL;
- }
-
- /** Removes the trailing slash if exists */
- length = strlen(path);
- if (length > 1 && path[length - 1] == '/') {
- path[length - 1] = 0;
- }
-
- r = gpr_malloc(sizeof(zookeeper_resolver));
- memset(r, 0, sizeof(*r));
- gpr_ref_init(&r->refs, 1);
- gpr_mu_init(&r->mu);
- grpc_resolver_init(&r->base, &zookeeper_resolver_vtable);
- r->name = gpr_strdup(path);
-
- r->client_channel_factory = args->client_channel_factory;
- grpc_client_channel_factory_ref(r->client_channel_factory);
-
- r->lb_policy_name = gpr_strdup(lb_policy_name);
-
- /** Initializes zookeeper client */
- zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
- r->zookeeper_handle =
- zookeeper_init(args->uri->authority, zookeeper_global_watcher,
- GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0);
- if (r->zookeeper_handle == NULL) {
- gpr_log(GPR_ERROR, "Unable to connect to zookeeper server");
- return NULL;
- }
-
- return &r->base;
-}
-
-/*
- * FACTORY
- */
-
-static void zookeeper_factory_ref(grpc_resolver_factory *factory) {}
-
-static void zookeeper_factory_unref(grpc_resolver_factory *factory) {}
-
-static char *zookeeper_factory_get_default_hostname(
- grpc_resolver_factory *factory, grpc_uri *uri) {
- return NULL;
-}
-
-static grpc_resolver *zookeeper_factory_create_resolver(
- grpc_resolver_factory *factory, grpc_resolver_args *args) {
- return zookeeper_create(args, "pick_first");
-}
-
-static const grpc_resolver_factory_vtable zookeeper_factory_vtable = {
- zookeeper_factory_ref, zookeeper_factory_unref,
- zookeeper_factory_create_resolver, zookeeper_factory_get_default_hostname,
- "zookeeper"};
-
-static grpc_resolver_factory zookeeper_resolver_factory = {
- &zookeeper_factory_vtable};
-
-static grpc_resolver_factory *zookeeper_resolver_factory_create() {
- return &zookeeper_resolver_factory;
-}
-
-static void zookeeper_plugin_init() {
- grpc_register_resolver_type(zookeeper_resolver_factory_create());
-}
-
-void grpc_zookeeper_register() {
- GRPC_API_TRACE("grpc_zookeeper_register(void)", 0, ());
- grpc_register_plugin(zookeeper_plugin_init, NULL);
-}
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index c5d3d8d9cc..85f9efb3b6 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -79,11 +79,11 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
}
static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
- bool success) {
+ grpc_error *error) {
connector_unref(exec_ctx, arg);
}
-static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
connector *c = arg;
grpc_closure *notify;
grpc_endpoint *tcp = c->tcp;
@@ -103,13 +103,13 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL,
0);
GPR_ASSERT(c->result->transport);
- c->result->channel_args = c->args.channel_args;
+ c->result->channel_args = grpc_channel_args_copy(c->args.channel_args);
} else {
memset(c->result, 0, sizeof(*c->result));
}
notify = c->notify;
c->notify = NULL;
- notify->cb(exec_ctx, notify->cb_arg, 1);
+ grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL);
}
static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {}
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index a262306085..721ba82d8f 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -90,7 +90,6 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_auth_context *auth_context) {
connector *c = arg;
grpc_closure *notify;
- grpc_channel_args *args_copy = NULL;
gpr_mu_lock(&c->mu);
if (c->connecting_endpoint == NULL) {
memset(c->result, 0, sizeof(*c->result));
@@ -109,26 +108,23 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL,
0);
auth_context_arg = grpc_auth_context_to_arg(auth_context);
- args_copy = grpc_channel_args_copy_and_add(c->args.channel_args,
- &auth_context_arg, 1);
- c->result->channel_args = args_copy;
+ c->result->channel_args = grpc_channel_args_copy_and_add(
+ c->args.channel_args, &auth_context_arg, 1);
}
notify = c->notify;
c->notify = NULL;
- /* look at c->args which are connector args. */
- notify->cb(exec_ctx, notify->cb_arg, 1);
- if (args_copy != NULL) grpc_channel_args_destroy(args_copy);
+ grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_NONE, NULL);
}
static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
- bool success) {
+ grpc_error *error) {
connector *c = arg;
- grpc_channel_security_connector_do_handshake(exec_ctx, c->security_connector,
- c->connecting_endpoint,
- on_secure_handshake_done, c);
+ grpc_channel_security_connector_do_handshake(
+ exec_ctx, c->security_connector, c->connecting_endpoint, c->args.deadline,
+ on_secure_handshake_done, c);
}
-static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
connector *c = arg;
grpc_closure *notify;
grpc_endpoint *tcp = c->newly_connecting_endpoint;
@@ -147,13 +143,14 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
&c->initial_string_sent);
} else {
grpc_channel_security_connector_do_handshake(
- exec_ctx, c->security_connector, tcp, on_secure_handshake_done, c);
+ exec_ctx, c->security_connector, tcp, c->args.deadline,
+ on_secure_handshake_done, c);
}
} else {
memset(c->result, 0, sizeof(*c->result));
notify = c->notify;
c->notify = NULL;
- notify->cb(exec_ctx, notify->cb_arg, 1);
+ grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL);
}
}
@@ -175,7 +172,6 @@ static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con,
grpc_closure *notify) {
connector *c = (connector *)con;
GPR_ASSERT(c->notify == NULL);
- GPR_ASSERT(notify->cb);
c->notify = notify;
c->args = *args;
c->result = result;
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index c95dd20d1d..9bae3a94f9 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -35,6 +35,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/http_server_filter.h"
@@ -74,34 +75,40 @@ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
grpc_closure *destroy_done) {
grpc_tcp_server *tcp = tcpp;
grpc_tcp_server_unref(exec_ctx, tcp);
- grpc_exec_ctx_enqueue(exec_ctx, destroy_done, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL);
}
int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
grpc_resolved_addresses *resolved = NULL;
grpc_tcp_server *tcp = NULL;
size_t i;
- unsigned count = 0;
+ size_t count = 0;
int port_num = -1;
int port_temp;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_error *err = GRPC_ERROR_NONE;
GRPC_API_TRACE("grpc_server_add_insecure_http2_port(server=%p, addr=%s)", 2,
(server, addr));
- resolved = grpc_blocking_resolve_address(addr, "http");
- if (!resolved) {
+ grpc_error **errors = NULL;
+ err = grpc_blocking_resolve_address(addr, "https", &resolved);
+ if (err != GRPC_ERROR_NONE) {
goto error;
}
- tcp = grpc_tcp_server_create(NULL);
- GPR_ASSERT(tcp);
+ err = grpc_tcp_server_create(NULL, &tcp);
+ if (err != GRPC_ERROR_NONE) {
+ goto error;
+ }
- for (i = 0; i < resolved->naddrs; i++) {
- port_temp = grpc_tcp_server_add_port(
+ const size_t naddrs = resolved->naddrs;
+ errors = gpr_malloc(sizeof(*errors) * naddrs);
+ for (i = 0; i < naddrs; i++) {
+ errors[i] = grpc_tcp_server_add_port(
tcp, (struct sockaddr *)&resolved->addrs[i].addr,
- resolved->addrs[i].len);
- if (port_temp > 0) {
+ resolved->addrs[i].len, &port_temp);
+ if (errors[i] == GRPC_ERROR_NONE) {
if (port_num == -1) {
port_num = port_temp;
} else {
@@ -111,14 +118,24 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
}
}
if (count == 0) {
- gpr_log(GPR_ERROR, "No address added out of total %" PRIuPTR " resolved",
- resolved->naddrs);
+ char *msg;
+ gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved",
+ naddrs);
+ err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs);
+ gpr_free(msg);
goto error;
- }
- if (count != resolved->naddrs) {
- gpr_log(GPR_ERROR,
- "Only %d addresses added out of total %" PRIuPTR " resolved", count,
- resolved->naddrs);
+ } else if (count != naddrs) {
+ char *msg;
+ gpr_asprintf(&msg, "Only %" PRIuPTR
+ " addresses added out of total %" PRIuPTR " resolved",
+ count, naddrs);
+ err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs);
+ gpr_free(msg);
+
+ const char *warning_message = grpc_error_string(err);
+ gpr_log(GPR_INFO, "WARNING: %s", warning_message);
+ grpc_error_free_string(warning_message);
+ /* we managed to bind some addresses: continue */
}
grpc_resolved_addresses_destroy(resolved);
@@ -128,6 +145,7 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
/* Error path: cleanup and return */
error:
+ GPR_ASSERT(err != GRPC_ERROR_NONE);
if (resolved) {
grpc_resolved_addresses_destroy(resolved);
}
@@ -136,7 +154,18 @@ error:
}
port_num = 0;
+ const char *msg = grpc_error_string(err);
+ gpr_log(GPR_ERROR, "%s", msg);
+ grpc_error_free_string(msg);
+ GRPC_ERROR_UNREF(err);
+
done:
grpc_exec_ctx_finish(&exec_ctx);
+ if (errors != NULL) {
+ for (i = 0; i < naddrs; i++) {
+ GRPC_ERROR_UNREF(errors[i]);
+ }
+ }
+ gpr_free(errors);
return port_num;
}
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
index e3437e5ed3..ead8a4d566 100644
--- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
@@ -37,6 +37,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
@@ -128,9 +129,11 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp,
state->state = statep;
state_ref(state->state);
state->accepting_pollset = accepting_pollset;
- grpc_server_security_connector_do_handshake(exec_ctx, state->state->sc,
- acceptor, tcp,
- on_secure_handshake_done, state);
+ grpc_server_security_connector_do_handshake(
+ exec_ctx, state->state->sc, acceptor, tcp,
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_seconds(120, GPR_TIMESPAN)),
+ on_secure_handshake_done, state);
}
/* Server callback: start listening on our ports */
@@ -141,11 +144,12 @@ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep,
on_accept, state);
}
-static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, bool success) {
+static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep,
+ grpc_error *error) {
server_secure_state *state = statep;
if (state->destroy_callback != NULL) {
state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg,
- success);
+ GRPC_ERROR_REF(error));
}
grpc_server_security_connector_shutdown(exec_ctx, state->sc);
state_unref(state);
@@ -171,12 +175,14 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
grpc_tcp_server *tcp = NULL;
server_secure_state *state = NULL;
size_t i;
- unsigned count = 0;
+ size_t count = 0;
int port_num = -1;
int port_temp;
grpc_security_status status = GRPC_SECURITY_ERROR;
grpc_server_security_connector *sc = NULL;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_error *err = GRPC_ERROR_NONE;
+ grpc_error **errors = NULL;
GRPC_API_TRACE(
"grpc_server_add_secure_http2_port("
@@ -184,26 +190,34 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
3, (server, addr, creds));
/* create security context */
- if (creds == NULL) goto error;
+ if (creds == NULL) {
+ err = GRPC_ERROR_CREATE(
+ "No credentials specified for secure server port (creds==NULL)");
+ goto error;
+ }
status = grpc_server_credentials_create_security_connector(creds, &sc);
if (status != GRPC_SECURITY_OK) {
- gpr_log(GPR_ERROR,
- "Unable to create secure server with credentials of type %s.",
- creds->type);
+ char *msg;
+ gpr_asprintf(&msg,
+ "Unable to create secure server with credentials of type %s.",
+ creds->type);
+ err = grpc_error_set_int(GRPC_ERROR_CREATE(msg),
+ GRPC_ERROR_INT_SECURITY_STATUS, status);
+ gpr_free(msg);
goto error;
}
sc->channel_args = grpc_server_get_channel_args(server);
/* resolve address */
- resolved = grpc_blocking_resolve_address(addr, "https");
- if (!resolved) {
+ err = grpc_blocking_resolve_address(addr, "https", &resolved);
+ if (err != GRPC_ERROR_NONE) {
goto error;
}
state = gpr_malloc(sizeof(*state));
memset(state, 0, sizeof(*state));
grpc_closure_init(&state->destroy_closure, destroy_done, state);
- tcp = grpc_tcp_server_create(&state->destroy_closure);
- if (!tcp) {
+ err = grpc_tcp_server_create(&state->destroy_closure, &tcp);
+ if (err != GRPC_ERROR_NONE) {
goto error;
}
@@ -215,11 +229,12 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
gpr_mu_init(&state->mu);
gpr_ref_init(&state->refcount, 1);
+ errors = gpr_malloc(sizeof(*errors) * resolved->naddrs);
for (i = 0; i < resolved->naddrs; i++) {
- port_temp = grpc_tcp_server_add_port(
+ errors[i] = grpc_tcp_server_add_port(
tcp, (struct sockaddr *)&resolved->addrs[i].addr,
- resolved->addrs[i].len);
- if (port_temp > 0) {
+ resolved->addrs[i].len, &port_temp);
+ if (errors[i] == GRPC_ERROR_NONE) {
if (port_num == -1) {
port_num = port_temp;
} else {
@@ -229,16 +244,31 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
}
}
if (count == 0) {
- gpr_log(GPR_ERROR, "No address added out of total %" PRIuPTR " resolved",
- resolved->naddrs);
+ char *msg;
+ gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved",
+ resolved->naddrs);
+ err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, resolved->naddrs);
+ gpr_free(msg);
goto error;
+ } else if (count != resolved->naddrs) {
+ char *msg;
+ gpr_asprintf(&msg, "Only %" PRIuPTR
+ " addresses added out of total %" PRIuPTR " resolved",
+ count, resolved->naddrs);
+ err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, resolved->naddrs);
+ gpr_free(msg);
+
+ const char *warning_message = grpc_error_string(err);
+ gpr_log(GPR_INFO, "WARNING: %s", warning_message);
+ grpc_error_free_string(warning_message);
+ /* we managed to bind some addresses: continue */
+ } else {
+ for (i = 0; i < resolved->naddrs; i++) {
+ GRPC_ERROR_UNREF(errors[i]);
+ }
}
- if (count != resolved->naddrs) {
- gpr_log(GPR_ERROR,
- "Only %d addresses added out of total %" PRIuPTR " resolved", count,
- resolved->naddrs);
- /* if it's an error, don't we want to goto error; here ? */
- }
+ gpr_free(errors);
+ errors = NULL;
grpc_resolved_addresses_destroy(resolved);
/* Register with the server only upon success */
@@ -249,6 +279,13 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
/* Error path: cleanup and return */
error:
+ GPR_ASSERT(err != GRPC_ERROR_NONE);
+ if (errors != NULL) {
+ for (i = 0; i < resolved->naddrs; i++) {
+ GRPC_ERROR_UNREF(errors[i]);
+ }
+ gpr_free(errors);
+ }
if (resolved) {
grpc_resolved_addresses_destroy(resolved);
}
@@ -263,5 +300,9 @@ error:
}
}
grpc_exec_ctx_finish(&exec_ctx);
+ const char *msg = grpc_error_string(err);
+ GRPC_ERROR_UNREF(err);
+ gpr_log(GPR_ERROR, "%s", msg);
+ grpc_error_free_string(msg);
return 0;
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 6e8640f1b3..9aa39ba26c 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -85,19 +85,17 @@ int grpc_flowctl_trace = 0;
static const grpc_transport_vtable vtable;
/* forward declarations of various callbacks that we'll build closures around */
-static void writing_action(grpc_exec_ctx *exec_ctx, void *t,
- bool iomgr_success_ignored);
-static void reading_action(grpc_exec_ctx *exec_ctx, void *t,
- bool iomgr_success_ignored);
-static void parsing_action(grpc_exec_ctx *exec_ctx, void *t,
- bool iomgr_success_ignored);
+static void writing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
+static void reading_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
+static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
/** Set a transport level setting, and push it to our peer */
static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
uint32_t value);
/** Start disconnection chain */
-static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
+static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_error *error);
/** Perform a transport_op */
static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
@@ -135,7 +133,7 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx,
static void connectivity_state_set(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
- grpc_connectivity_state state, const char *reason);
+ grpc_connectivity_state state, grpc_error *error, const char *reason);
static void check_read_ops(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global);
@@ -149,7 +147,9 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream *s,
void *byte_stream);
static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_stream_global *stream_global);
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global,
+ grpc_error *error);
/*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
@@ -194,7 +194,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
and maybe they hold resources that need to be freed */
while (t->global.pings.next != &t->global.pings) {
grpc_chttp2_outstanding_ping *ping = t->global.pings.next;
- grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, ping->on_recv,
+ GRPC_ERROR_CREATE("Transport closed"), NULL);
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
gpr_free(ping);
@@ -409,7 +410,7 @@ static void destroy_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream *s_ignored,
void *arg_ignored) {
t->destroying = 1;
- drop_connection(exec_ctx, t);
+ drop_connection(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed"));
}
static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
@@ -445,12 +446,11 @@ static void destroy_endpoint(grpc_exec_ctx *exec_ctx,
static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_ignored,
- void *arg_ignored) {
+ grpc_error *error) {
if (!t->closed) {
t->closed = 1;
connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_SHUTDOWN,
- "close_transport");
+ GRPC_ERROR_REF(error), "close_transport");
if (t->ep) {
allow_endpoint_shutdown_locked(exec_ctx, t);
}
@@ -463,6 +463,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
}
}
+ GRPC_ERROR_UNREF(error);
}
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
@@ -551,7 +552,9 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
s->global.id == 0);
GPR_ASSERT(!s->global.in_stream_map);
if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
- close_transport_locked(exec_ctx, t, NULL, NULL);
+ close_transport_locked(
+ exec_ctx, t,
+ GRPC_ERROR_CREATE("Last stream closed after sending goaway"));
}
if (!t->executor.parsing_active && s->global.id) {
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map,
@@ -645,7 +648,7 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx,
t->executor.writing_active = 1;
REF_TRANSPORT(t, "writing");
prevent_endpoint_shutdown(t);
- grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL);
}
check_read_ops(exec_ctx, &t->global);
@@ -756,12 +759,12 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s_ignored,
void *a) {
- bool success = (bool)(uintptr_t)a;
+ grpc_error *error = a;
allow_endpoint_shutdown_locked(exec_ctx, t);
- if (!success) {
- drop_connection(exec_ctx, t);
+ if (error != GRPC_ERROR_NONE) {
+ drop_connection(exec_ctx, t, GRPC_ERROR_REF(error));
}
grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing);
@@ -769,7 +772,8 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global;
while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global,
&stream_global)) {
- fail_pending_writes(exec_ctx, stream_global);
+ fail_pending_writes(exec_ctx, &t->global, stream_global,
+ GRPC_ERROR_REF(error));
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes");
}
@@ -782,18 +786,18 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx,
}
UNREF_TRANSPORT(exec_ctx, t, "writing");
+ GRPC_ERROR_UNREF(error);
}
void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
- void *transport_writing, bool success) {
+ void *transport_writing, grpc_error *error) {
grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
- grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL,
- terminate_writing_with_lock,
- (void *)(uintptr_t)success, 0);
+ grpc_chttp2_run_with_global_lock(
+ exec_ctx, t, NULL, terminate_writing_with_lock, GRPC_ERROR_REF(error), 0);
}
static void writing_action(grpc_exec_ctx *exec_ctx, void *gt,
- bool iomgr_success_ignored) {
+ grpc_error *error) {
grpc_chttp2_transport *t = gt;
GPR_TIMER_BEGIN("writing_action", 0);
grpc_chttp2_perform_writes(exec_ctx, &t->writing, t->ep);
@@ -806,13 +810,19 @@ void grpc_chttp2_add_incoming_goaway(
char *msg = gpr_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII);
GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg));
- gpr_free(msg);
gpr_slice_unref(goaway_text);
transport_global->seen_goaway = 1;
/* lie: use transient failure from the transport to indicate goaway has been
* received */
- connectivity_state_set(exec_ctx, transport_global,
- GRPC_CHANNEL_TRANSIENT_FAILURE, "got_goaway");
+ connectivity_state_set(
+ exec_ctx, transport_global, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ grpc_error_set_str(
+ grpc_error_set_int(GRPC_ERROR_CREATE("GOAWAY received"),
+ GRPC_ERROR_INT_HTTP2_ERROR,
+ (intptr_t)goaway_error),
+ GRPC_ERROR_STR_RAW_BYTES, msg),
+ "got_goaway");
+ gpr_free(msg);
}
static void maybe_start_some_streams(
@@ -841,9 +851,9 @@ static void maybe_start_some_streams(
transport_global->next_stream_id += 2;
if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) {
- connectivity_state_set(exec_ctx, transport_global,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "no_more_stream_ids");
+ connectivity_state_set(
+ exec_ctx, transport_global, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_CREATE("Stream IDs exhausted"), "no_more_stream_ids");
}
stream_global->outgoing_window =
@@ -871,34 +881,40 @@ static void maybe_start_some_streams(
}
#define CLOSURE_BARRIER_STATS_BIT (1 << 0)
-#define CLOSURE_BARRIER_FAILURE_BIT (1 << 1)
#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
static grpc_closure *add_closure_barrier(grpc_closure *closure) {
- closure->final_data += CLOSURE_BARRIER_FIRST_REF_BIT;
+ closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT;
return closure;
}
-void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_stream_global *stream_global,
- grpc_closure **pclosure, int success) {
+void grpc_chttp2_complete_closure_step(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global, grpc_closure **pclosure,
+ grpc_error *error) {
grpc_closure *closure = *pclosure;
if (closure == NULL) {
+ GRPC_ERROR_UNREF(error);
return;
}
- closure->final_data -= CLOSURE_BARRIER_FIRST_REF_BIT;
- if (!success) {
- closure->final_data |= CLOSURE_BARRIER_FAILURE_BIT;
+ closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
+ if (error != GRPC_ERROR_NONE) {
+ if (closure->error == GRPC_ERROR_NONE) {
+ closure->error =
+ GRPC_ERROR_CREATE("Error in HTTP transport completing operation");
+ closure->error = grpc_error_set_str(
+ closure->error, GRPC_ERROR_STR_TARGET_ADDRESS,
+ TRANSPORT_FROM_GLOBAL(transport_global)->peer_string);
+ }
+ closure->error = grpc_error_add_child(closure->error, error);
}
- if (closure->final_data < CLOSURE_BARRIER_FIRST_REF_BIT) {
- if (closure->final_data & CLOSURE_BARRIER_STATS_BIT) {
+ if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
+ if (closure->next_data.scratch & CLOSURE_BARRIER_STATS_BIT) {
grpc_transport_move_stats(&stream_global->stats,
stream_global->collecting_stats);
stream_global->collecting_stats = NULL;
}
- grpc_exec_ctx_enqueue(
- exec_ctx, closure,
- (closure->final_data & CLOSURE_BARRIER_FAILURE_BIT) == 0, NULL);
+ grpc_exec_ctx_sched(exec_ctx, closure, closure->error, NULL);
}
*pclosure = NULL;
}
@@ -916,7 +932,7 @@ static int contains_non_ok_status(
return 0;
}
-static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, bool success) {}
+static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
@@ -933,12 +949,13 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
}
/* use final_data as a barrier until enqueue time; the inital counter is
dropped at the end of this function */
- on_complete->final_data = CLOSURE_BARRIER_FIRST_REF_BIT;
+ on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
+ on_complete->error = GRPC_ERROR_NONE;
if (op->collect_stats != NULL) {
GPR_ASSERT(stream_global->collecting_stats == NULL);
stream_global->collecting_stats = op->collect_stats;
- on_complete->final_data |= CLOSURE_BARRIER_STATS_BIT;
+ on_complete->next_data.scratch |= CLOSURE_BARRIER_STATS_BIT;
}
if (op->cancel_with_status != GRPC_STATUS_OK) {
@@ -985,8 +1002,10 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
}
} else {
grpc_chttp2_complete_closure_step(
- exec_ctx, stream_global,
- &stream_global->send_initial_metadata_finished, 0);
+ exec_ctx, transport_global, stream_global,
+ &stream_global->send_initial_metadata_finished,
+ GRPC_ERROR_CREATE(
+ "Attempt to send initial metadata after stream was closed"));
}
}
}
@@ -997,7 +1016,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
stream_global->send_message_finished = add_closure_barrier(on_complete);
if (stream_global->write_closed) {
grpc_chttp2_complete_closure_step(
- exec_ctx, stream_global, &stream_global->send_message_finished, 0);
+ exec_ctx, transport_global, stream_global,
+ &stream_global->send_message_finished,
+ GRPC_ERROR_CREATE("Attempt to send message after stream was closed"));
} else {
stream_global->send_message = op->send_message;
if (stream_global->id != 0) {
@@ -1031,9 +1052,12 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
}
if (stream_global->write_closed) {
grpc_chttp2_complete_closure_step(
- exec_ctx, stream_global,
+ exec_ctx, transport_global, stream_global,
&stream_global->send_trailing_metadata_finished,
- grpc_metadata_batch_is_empty(op->send_trailing_metadata));
+ grpc_metadata_batch_is_empty(op->send_trailing_metadata)
+ ? GRPC_ERROR_NONE
+ : GRPC_ERROR_CREATE("Attempt to send trailing metadata after "
+ "stream was closed"));
} else if (stream_global->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
@@ -1072,7 +1096,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
- grpc_chttp2_complete_closure_step(exec_ctx, stream_global, &on_complete, 1);
+ grpc_chttp2_complete_closure_step(exec_ctx, transport_global, stream_global,
+ &on_complete, GRPC_ERROR_NONE);
GPR_TIMER_END("perform_stream_op_locked", 0);
}
@@ -1109,7 +1134,7 @@ static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
for (ping = transport_global->pings.next; ping != &transport_global->pings;
ping = ping->next) {
if (0 == memcmp(opaque_8bytes, ping->id, 8)) {
- grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL);
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
gpr_free(ping);
@@ -1131,7 +1156,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream *s_unused,
void *stream_op) {
grpc_transport_op *op = stream_op;
- bool close_transport = op->disconnect;
+ grpc_error *close_transport = op->disconnect_with_error;
/* If there's a set_accept_stream ensure that we're not parsing
to avoid changing things out from underneath */
@@ -1142,7 +1167,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
return;
}
- grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
if (op->on_connectivity_state_change != NULL) {
grpc_connectivity_state_notify_on_state_change(
@@ -1156,7 +1181,9 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
t->global.last_incoming_stream_id,
(uint32_t)grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
gpr_slice_ref(*op->goaway_message), &t->global.qbuf);
- close_transport = !grpc_chttp2_has_streams(t);
+ close_transport = grpc_chttp2_has_streams(t)
+ ? GRPC_ERROR_NONE
+ : GRPC_ERROR_CREATE("GOAWAY sent");
}
if (op->set_accept_stream) {
@@ -1177,8 +1204,8 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
send_ping_locked(t, op->send_ping);
}
- if (close_transport) {
- close_transport_locked(exec_ctx, t, NULL, NULL);
+ if (close_transport != GRPC_ERROR_NONE) {
+ close_transport_locked(exec_ctx, t, close_transport);
}
}
@@ -1214,8 +1241,8 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_metadata_buffer_publish(
&stream_global->received_initial_metadata,
stream_global->recv_initial_metadata);
- grpc_exec_ctx_enqueue(
- exec_ctx, stream_global->recv_initial_metadata_ready, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, stream_global->recv_initial_metadata_ready,
+ GRPC_ERROR_NONE, NULL);
stream_global->recv_initial_metadata_ready = NULL;
}
if (stream_global->recv_message_ready != NULL) {
@@ -1228,13 +1255,13 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
*stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames);
GPR_ASSERT(*stream_global->recv_message != NULL);
- grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, true,
- NULL);
+ grpc_exec_ctx_sched(exec_ctx, stream_global->recv_message_ready,
+ GRPC_ERROR_NONE, NULL);
stream_global->recv_message_ready = NULL;
} else if (stream_global->published_trailing_metadata) {
*stream_global->recv_message = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, true,
- NULL);
+ grpc_exec_ctx_sched(exec_ctx, stream_global->recv_message_ready,
+ GRPC_ERROR_NONE, NULL);
stream_global->recv_message_ready = NULL;
}
}
@@ -1255,8 +1282,8 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
&stream_global->received_trailing_metadata,
stream_global->recv_trailing_metadata);
grpc_chttp2_complete_closure_step(
- exec_ctx, stream_global,
- &stream_global->recv_trailing_metadata_finished, 1);
+ exec_ctx, transport_global, stream_global,
+ &stream_global->recv_trailing_metadata_finished, GRPC_ERROR_NONE);
}
}
}
@@ -1272,7 +1299,7 @@ static void decrement_active_streams_locked(
}
static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- uint32_t id) {
+ uint32_t id, grpc_error *error) {
size_t new_stream_count;
grpc_chttp2_stream *s =
grpc_chttp2_stream_map_delete(&t->parsing_stream_map, id);
@@ -1287,12 +1314,15 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
if (s->parsing.data_parser.parsing_frame != NULL) {
grpc_chttp2_incoming_byte_stream_finished(
- exec_ctx, s->parsing.data_parser.parsing_frame, 0, 0);
+ exec_ctx, s->parsing.data_parser.parsing_frame,
+ GRPC_ERROR_CREATE_REFERENCING("Stream removed", &error, 1), 0);
s->parsing.data_parser.parsing_frame = NULL;
}
if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
- close_transport_locked(exec_ctx, t, NULL, NULL);
+ close_transport_locked(
+ exec_ctx, t,
+ GRPC_ERROR_CREATE("Last stream closed after sending GOAWAY"));
}
if (grpc_chttp2_list_remove_writable_stream(&t->global, &s->global)) {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "chttp2_writing");
@@ -1305,6 +1335,7 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->global.concurrent_stream_count = (uint32_t)new_stream_count;
maybe_start_some_streams(exec_ctx, &t->global);
}
+ GRPC_ERROR_UNREF(error);
}
static void cancel_from_api(grpc_exec_ctx *exec_ctx,
@@ -1332,8 +1363,10 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx,
stream_global->seen_error = true;
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
- grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1,
- 1);
+ grpc_chttp2_mark_stream_closed(
+ exec_ctx, transport_global, stream_global, 1, 1,
+ grpc_error_set_int(GRPC_ERROR_CREATE("Cancelled"),
+ GRPC_ERROR_INT_GRPC_STATUS, status));
}
void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
@@ -1374,23 +1407,27 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
}
static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_stream_global *stream_global) {
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global,
+ grpc_error *error) {
grpc_chttp2_complete_closure_step(
- exec_ctx, stream_global, &stream_global->send_initial_metadata_finished,
- 0);
+ exec_ctx, transport_global, stream_global,
+ &stream_global->send_initial_metadata_finished, GRPC_ERROR_REF(error));
grpc_chttp2_complete_closure_step(
- exec_ctx, stream_global, &stream_global->send_trailing_metadata_finished,
- 0);
- grpc_chttp2_complete_closure_step(exec_ctx, stream_global,
- &stream_global->send_message_finished, 0);
+ exec_ctx, transport_global, stream_global,
+ &stream_global->send_trailing_metadata_finished, GRPC_ERROR_REF(error));
+ grpc_chttp2_complete_closure_step(exec_ctx, transport_global, stream_global,
+ &stream_global->send_message_finished,
+ error);
}
void grpc_chttp2_mark_stream_closed(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global, int close_reads,
- int close_writes) {
+ grpc_chttp2_stream_global *stream_global, int close_reads, int close_writes,
+ grpc_error *error) {
if (stream_global->read_closed && stream_global->write_closed) {
/* already closed */
+ GRPC_ERROR_UNREF(error);
return;
}
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
@@ -1407,7 +1444,8 @@ void grpc_chttp2_mark_stream_closed(
grpc_chttp2_list_add_closed_waiting_for_writing(transport_global,
stream_global);
} else {
- fail_pending_writes(exec_ctx, stream_global);
+ fail_pending_writes(exec_ctx, transport_global, stream_global,
+ GRPC_ERROR_REF(error));
}
}
if (stream_global->read_closed && stream_global->write_closed) {
@@ -1418,11 +1456,12 @@ void grpc_chttp2_mark_stream_closed(
} else {
if (stream_global->id != 0) {
remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global),
- stream_global->id);
+ stream_global->id, GRPC_ERROR_REF(error));
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
}
}
+ GRPC_ERROR_UNREF(error);
}
static void close_from_api(grpc_exec_ctx *exec_ctx,
@@ -1529,8 +1568,16 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status,
optional_message);
+ grpc_error *err = GRPC_ERROR_CREATE("Stream closed");
+ err = grpc_error_set_int(err, GRPC_ERROR_INT_GRPC_STATUS, status);
+ if (optional_message) {
+ char *str =
+ gpr_dump_slice(*optional_message, GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ err = grpc_error_set_str(err, GRPC_ERROR_STR_GRPC_MESSAGE, str);
+ gpr_free(str);
+ }
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1,
- 1);
+ 1, err);
}
static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global,
@@ -1549,8 +1596,9 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx,
grpc_chttp2_for_all_streams(&t->global, exec_ctx, cancel_stream_cb);
}
-static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
- close_transport_locked(exec_ctx, t, NULL, NULL);
+static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_error *error) {
+ close_transport_locked(exec_ctx, t, error);
end_all_the_calls(exec_ctx, t);
}
@@ -1581,20 +1629,22 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
static void reading_action_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s_unused, void *arg);
-static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success);
+static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s_unused, void *arg);
static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *s_unused, void *arg);
-static void reading_action(grpc_exec_ctx *exec_ctx, void *tp, bool success) {
+static void reading_action(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
/* Control flow:
reading_action_locked ->
(parse_unlocked -> post_parse_locked)? ->
post_reading_action_locked */
grpc_chttp2_run_with_global_lock(exec_ctx, tp, NULL, reading_action_locked,
- (void *)(uintptr_t)success, 0);
+ GRPC_ERROR_REF(error), 0);
}
static void reading_action_locked(grpc_exec_ctx *exec_ctx,
@@ -1602,7 +1652,7 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream *s_unused, void *arg) {
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
- bool success = (bool)(uintptr_t)arg;
+ grpc_error *error = arg;
GPR_ASSERT(!t->executor.parsing_active);
if (!t->closed) {
@@ -1611,48 +1661,54 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map);
grpc_chttp2_prepare_to_read(transport_global, transport_parsing);
- grpc_exec_ctx_enqueue(exec_ctx, &t->parsing_action, success, NULL);
+ grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, error, NULL);
} else {
post_reading_action_locked(exec_ctx, t, s_unused, arg);
}
}
-static bool try_http_parsing(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t) {
+static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t) {
grpc_http_parser parser;
size_t i = 0;
- bool success = false;
+ grpc_error *error = GRPC_ERROR_NONE;
+ grpc_http_response response;
+ memset(&response, 0, sizeof(response));
- grpc_http_parser_init(&parser);
+ grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response);
- for (; i < t->read_buffer.count &&
- grpc_http_parser_parse(&parser, t->read_buffer.slices[i]);
- i++)
- ;
- if (grpc_http_parser_eof(&parser) && parser.type == GRPC_HTTP_RESPONSE) {
- success = true;
- GRPC_CHTTP2_IF_TRACING(gpr_log(
- GPR_DEBUG, "Trying to connect an http1.x server, received status:%d",
- parser.http.response.status));
+ grpc_error *parse_error = GRPC_ERROR_NONE;
+ for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) {
+ parse_error = grpc_http_parser_parse(&parser, t->read_buffer.slices[i]);
+ }
+ if (parse_error == GRPC_ERROR_NONE &&
+ (parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) {
+ error = grpc_error_set_int(
+ GRPC_ERROR_CREATE("Trying to connect an http1.x server"),
+ GRPC_ERROR_INT_HTTP_STATUS, response.status);
}
+ GRPC_ERROR_UNREF(parse_error);
grpc_http_parser_destroy(&parser);
- return success;
+ grpc_http_response_destroy(&response);
+ return error;
}
-static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
grpc_chttp2_transport *t = arg;
GPR_TIMER_BEGIN("reading_action.parse", 0);
size_t i = 0;
- for (; i < t->read_buffer.count &&
- grpc_chttp2_perform_read(exec_ctx, &t->parsing,
- t->read_buffer.slices[i]);
- i++)
- ;
+ grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
+ GRPC_ERROR_NONE};
+ for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
+ errors[1] = grpc_chttp2_perform_read(exec_ctx, &t->parsing,
+ t->read_buffer.slices[i]);
+ };
if (i != t->read_buffer.count) {
- success = false;
gpr_slice_unref(t->optional_drop_message);
- if (try_http_parsing(exec_ctx, t)) {
+ errors[2] = try_http_parsing(exec_ctx, t);
+ if (errors[2] != GRPC_ERROR_NONE) {
t->optional_drop_message = gpr_slice_from_copied_string(
"Connection dropped: received http1.x response");
} else {
@@ -1660,9 +1716,18 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
"Connection dropped: received unparseable response");
}
}
+ grpc_error *err =
+ errors[0] == GRPC_ERROR_NONE && errors[1] == GRPC_ERROR_NONE &&
+ errors[2] == GRPC_ERROR_NONE
+ ? GRPC_ERROR_NONE
+ : GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors,
+ GPR_ARRAY_SIZE(errors));
+ for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) {
+ GRPC_ERROR_UNREF(errors[i]);
+ }
GPR_TIMER_END("reading_action.parse", 0);
- grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, post_parse_locked,
- (void *)(uintptr_t)success, 0);
+ grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, post_parse_locked, err,
+ 0);
}
static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -1699,7 +1764,8 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GPR_ASSERT(stream_global->in_stream_map);
GPR_ASSERT(stream_global->write_closed);
GPR_ASSERT(stream_global->read_closed);
- remove_stream(exec_ctx, t, stream_global->id);
+ remove_stream(exec_ctx, t, stream_global->id,
+ GRPC_ERROR_CREATE("Stream removed"));
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
}
@@ -1710,10 +1776,13 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s_unused,
void *arg) {
- bool success = (bool)(uintptr_t)arg;
+ grpc_error *error = arg;
bool keep_reading = false;
- if (!success || t->closed) {
- drop_connection(exec_ctx, t);
+ if (error == GRPC_ERROR_NONE && t->closed) {
+ error = GRPC_ERROR_CREATE("Transport closed");
+ }
+ if (error != GRPC_ERROR_NONE) {
+ drop_connection(exec_ctx, t, GRPC_ERROR_REF(error));
t->endpoint_reading = 0;
if (!t->executor.writing_active && t->ep) {
grpc_endpoint_destroy(exec_ctx, t->ep);
@@ -1735,6 +1804,8 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
} else {
UNREF_TRANSPORT(exec_ctx, t, "reading_action");
}
+
+ GRPC_ERROR_UNREF(error);
}
/*******************************************************************************
@@ -1743,13 +1814,13 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
static void connectivity_state_set(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
- grpc_connectivity_state state, const char *reason) {
+ grpc_connectivity_state state, grpc_error *error, const char *reason) {
GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_DEBUG, "set connectivity_state=%d", state));
grpc_connectivity_state_set(
exec_ctx,
&TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker,
- state, reason);
+ state, error, reason);
}
/*******************************************************************************
@@ -1795,6 +1866,7 @@ static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs) {
if (gpr_unref(&bs->refs)) {
+ GRPC_ERROR_UNREF(bs->error);
gpr_slice_buffer_destroy(&bs->slices);
gpr_free(bs);
}
@@ -1863,9 +1935,10 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
}
if (bs->slices.count > 0) {
*arg->slice = gpr_slice_buffer_take_first(&bs->slices);
- grpc_exec_ctx_enqueue(exec_ctx, arg->on_complete, true, NULL);
- } else if (bs->failed) {
- grpc_exec_ctx_enqueue(exec_ctx, arg->on_complete, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_NONE, NULL);
+ } else if (bs->error != GRPC_ERROR_NONE) {
+ grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_REF(bs->error),
+ NULL);
} else {
bs->on_next = arg->on_complete;
bs->next = arg->slice;
@@ -1922,7 +1995,7 @@ static void incoming_byte_stream_push_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs = arg->byte_stream;
if (bs->on_next != NULL) {
*bs->next = arg->slice;
- grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL);
bs->on_next = NULL;
} else {
gpr_slice_buffer_add(&bs->slices, arg->slice);
@@ -1940,13 +2013,30 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
sizeof(arg));
}
+typedef struct {
+ grpc_chttp2_incoming_byte_stream *bs;
+ grpc_error *error;
+} bs_fail_args;
+
+static bs_fail_args *make_bs_fail_args(grpc_chttp2_incoming_byte_stream *bs,
+ grpc_error *error) {
+ bs_fail_args *a = gpr_malloc(sizeof(*a));
+ a->bs = bs;
+ a->error = error;
+ return a;
+}
+
static void incoming_byte_stream_finished_failed_locked(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
void *argp) {
- grpc_chttp2_incoming_byte_stream *bs = argp;
- grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, false, NULL);
+ bs_fail_args *a = argp;
+ grpc_chttp2_incoming_byte_stream *bs = a->bs;
+ grpc_error *error = a->error;
+ gpr_free(a);
+ grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL);
bs->on_next = NULL;
- bs->failed = 1;
+ GRPC_ERROR_UNREF(bs->error);
+ bs->error = error;
incoming_byte_stream_unref(exec_ctx, bs);
}
@@ -1959,25 +2049,26 @@ static void incoming_byte_stream_finished_ok_locked(grpc_exec_ctx *exec_ctx,
}
void grpc_chttp2_incoming_byte_stream_finished(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, int success,
- int from_parsing_thread) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
+ grpc_error *error, int from_parsing_thread) {
if (from_parsing_thread) {
- if (success) {
+ if (error == GRPC_ERROR_NONE) {
grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
incoming_byte_stream_finished_ok_locked,
bs, 0);
} else {
- incoming_byte_stream_finished_ok_locked(exec_ctx, bs->transport,
- bs->stream, bs);
- }
- } else {
- if (success) {
grpc_chttp2_run_with_global_lock(
exec_ctx, bs->transport, bs->stream,
- incoming_byte_stream_finished_failed_locked, bs, 0);
+ incoming_byte_stream_finished_failed_locked,
+ make_bs_fail_args(bs, error), 0);
+ }
+ } else {
+ if (error == GRPC_ERROR_NONE) {
+ incoming_byte_stream_finished_ok_locked(exec_ctx, bs->transport,
+ bs->stream, bs);
} else {
- incoming_byte_stream_finished_failed_locked(exec_ctx, bs->transport,
- bs->stream, bs);
+ incoming_byte_stream_finished_failed_locked(
+ exec_ctx, bs->transport, bs->stream, make_bs_fail_args(bs, error));
}
}
}
@@ -2000,7 +2091,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
gpr_slice_buffer_init(&incoming_byte_stream->slices);
incoming_byte_stream->on_next = NULL;
incoming_byte_stream->is_tail = 1;
- incoming_byte_stream->failed = 0;
+ incoming_byte_stream->error = GRPC_ERROR_NONE;
if (add_to_queue->head == NULL) {
add_to_queue->head = incoming_byte_stream;
} else {
@@ -2141,5 +2232,5 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport;
REF_TRANSPORT(t, "reading_action"); /* matches unref inside reading_action */
gpr_slice_buffer_addn(&t->read_buffer, slices, nslices);
- reading_action(exec_ctx, t, 1);
+ reading_action(exec_ctx, t, GRPC_ERROR_NONE);
}
diff --git a/src/core/ext/transport/chttp2/transport/frame.h b/src/core/ext/transport/chttp2/transport/frame.h
index 5c72d91c2a..7776609367 100644
--- a/src/core/ext/transport/chttp2/transport/frame.h
+++ b/src/core/ext/transport/chttp2/transport/frame.h
@@ -37,13 +37,7 @@
#include <grpc/support/port_platform.h>
#include <grpc/support/slice.h>
-/* Common definitions for frame handling in the chttp2 transport */
-
-typedef enum {
- GRPC_CHTTP2_PARSE_OK,
- GRPC_CHTTP2_STREAM_ERROR,
- GRPC_CHTTP2_CONNECTION_ERROR
-} grpc_chttp2_parse_error;
+#include "src/core/lib/iomgr/error.h"
/* defined in internal.h */
typedef struct grpc_chttp2_stream_parsing grpc_chttp2_stream_parsing;
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c
index 3a6d80e0a3..9046fbc453 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.c
+++ b/src/core/ext/transport/chttp2/transport/frame_data.c
@@ -37,24 +37,25 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/transport.h"
-grpc_chttp2_parse_error grpc_chttp2_data_parser_init(
- grpc_chttp2_data_parser *parser) {
+grpc_error *grpc_chttp2_data_parser_init(grpc_chttp2_data_parser *parser) {
parser->state = GRPC_CHTTP2_DATA_FH_0;
parser->parsing_frame = NULL;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *parser) {
grpc_byte_stream *bs;
if (parser->parsing_frame) {
- grpc_chttp2_incoming_byte_stream_finished(exec_ctx, parser->parsing_frame,
- 0, 1);
+ grpc_chttp2_incoming_byte_stream_finished(
+ exec_ctx, parser->parsing_frame, GRPC_ERROR_CREATE("Parser destroyed"),
+ 1);
}
while (
(bs = grpc_chttp2_incoming_frame_queue_pop(&parser->incoming_frames))) {
@@ -62,11 +63,16 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
}
}
-grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(
- grpc_chttp2_data_parser *parser, uint8_t flags) {
+grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
+ uint8_t flags,
+ uint32_t stream_id) {
if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
- gpr_log(GPR_ERROR, "unsupported data flags: 0x%02x", flags);
- return GRPC_CHTTP2_STREAM_ERROR;
+ char *msg;
+ gpr_asprintf(&msg, "unsupported data flags: 0x%02x", flags);
+ grpc_error *err = grpc_error_set_int(
+ GRPC_ERROR_CREATE(msg), GRPC_ERROR_INT_STREAM_ID, (intptr_t)stream_id);
+ gpr_free(msg);
+ return err;
}
if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
@@ -75,7 +81,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(
parser->is_last_frame = 0;
}
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
void grpc_chttp2_incoming_frame_queue_merge(
@@ -139,7 +145,7 @@ void grpc_chttp2_encode_data(uint32_t id, gpr_slice_buffer *inbuf,
stats->data_bytes += write_bytes;
}
-grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
+grpc_error *grpc_chttp2_data_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
@@ -149,19 +155,20 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
grpc_chttp2_data_parser *p = parser;
uint32_t message_flags;
grpc_chttp2_incoming_byte_stream *incoming_byte_stream;
+ char *msg;
if (is_last && p->is_last_frame) {
stream_parsing->received_close = 1;
}
if (cur == end) {
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
switch (p->state) {
case GRPC_CHTTP2_DATA_ERROR:
p->state = GRPC_CHTTP2_DATA_ERROR;
- return GRPC_CHTTP2_STREAM_ERROR;
+ return GRPC_ERROR_REF(p->error);
fh_0:
case GRPC_CHTTP2_DATA_FH_0:
stream_parsing->stats.incoming.framing_bytes++;
@@ -174,13 +181,23 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
p->is_frame_compressed = 1; /* GPR_TRUE */
break;
default:
- gpr_log(GPR_ERROR, "Bad GRPC frame type 0x%02x", p->frame_type);
+ gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
+ p->error = GRPC_ERROR_CREATE(msg);
+ p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
+ (intptr_t)stream_parsing->id);
+ gpr_free(msg);
+ msg = gpr_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ p->error =
+ grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES, msg);
+ gpr_free(msg);
+ p->error =
+ grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
p->state = GRPC_CHTTP2_DATA_ERROR;
- return GRPC_CHTTP2_STREAM_ERROR;
+ return GRPC_ERROR_REF(p->error);
}
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_1;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_1:
@@ -188,7 +205,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
p->frame_size = ((uint32_t)*cur) << 24;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_2;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_2:
@@ -196,7 +213,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
p->frame_size |= ((uint32_t)*cur) << 16;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_3;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_3:
@@ -204,7 +221,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
p->frame_size |= ((uint32_t)*cur) << 8;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_4;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_4:
@@ -225,7 +242,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
stream_parsing);
if (cur == end) {
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
uint32_t remaining = (uint32_t)(end - cur);
if (remaining == p->frame_size) {
@@ -233,19 +250,19 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
- grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, 1,
- 1);
+ grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
+ GRPC_ERROR_NONE, 1);
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
} else if (remaining > p->frame_size) {
stream_parsing->stats.incoming.data_bytes += p->frame_size;
grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
gpr_slice_sub(slice, (size_t)(cur - beg),
(size_t)(cur + p->frame_size - beg)));
- grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, 1,
- 1);
+ grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
+ GRPC_ERROR_NONE, 1);
p->parsing_frame = NULL;
cur += p->frame_size;
goto fh_0; /* loop */
@@ -256,9 +273,9 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
p->frame_size -= remaining;
stream_parsing->stats.incoming.data_bytes += remaining;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
}
- GPR_UNREACHABLE_CODE(return GRPC_CHTTP2_CONNECTION_ERROR);
+ GPR_UNREACHABLE_CODE(return GRPC_ERROR_CREATE("Should never reach here"));
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.h b/src/core/ext/transport/chttp2/transport/frame_data.h
index af71f483a2..a21a7942b9 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.h
+++ b/src/core/ext/transport/chttp2/transport/frame_data.h
@@ -66,6 +66,7 @@ typedef struct {
uint8_t is_last_frame;
uint8_t frame_type;
uint32_t frame_size;
+ grpc_error *error;
int is_frame_compressed;
grpc_chttp2_incoming_frame_queue incoming_frames;
@@ -79,19 +80,19 @@ grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
grpc_chttp2_incoming_frame_queue *q);
/* initialize per-stream state for data frame parsing */
-grpc_chttp2_parse_error grpc_chttp2_data_parser_init(
- grpc_chttp2_data_parser *parser);
+grpc_error *grpc_chttp2_data_parser_init(grpc_chttp2_data_parser *parser);
void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *parser);
/* start processing a new data frame */
-grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(
- grpc_chttp2_data_parser *parser, uint8_t flags);
+grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
+ uint8_t flags,
+ uint32_t stream_id);
/* handle a slice of a data frame - is_last indicates the last slice of a
frame */
-grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
+grpc_error *grpc_chttp2_data_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
diff --git a/src/core/ext/transport/chttp2/transport/frame_goaway.c b/src/core/ext/transport/chttp2/transport/frame_goaway.c
index 827e7a6977..299e27ad70 100644
--- a/src/core/ext/transport/chttp2/transport/frame_goaway.c
+++ b/src/core/ext/transport/chttp2/transport/frame_goaway.c
@@ -38,6 +38,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
void grpc_chttp2_goaway_parser_init(grpc_chttp2_goaway_parser *p) {
p->debug_data = NULL;
@@ -47,11 +48,15 @@ void grpc_chttp2_goaway_parser_destroy(grpc_chttp2_goaway_parser *p) {
gpr_free(p->debug_data);
}
-grpc_chttp2_parse_error grpc_chttp2_goaway_parser_begin_frame(
- grpc_chttp2_goaway_parser *p, uint32_t length, uint8_t flags) {
+grpc_error *grpc_chttp2_goaway_parser_begin_frame(grpc_chttp2_goaway_parser *p,
+ uint32_t length,
+ uint8_t flags) {
if (length < 8) {
- gpr_log(GPR_ERROR, "goaway frame too short (%d bytes)", length);
- return GRPC_CHTTP2_CONNECTION_ERROR;
+ char *msg;
+ gpr_asprintf(&msg, "goaway frame too short (%d bytes)", length);
+ grpc_error *err = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return err;
}
gpr_free(p->debug_data);
@@ -59,10 +64,10 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_begin_frame(
p->debug_data = gpr_malloc(p->debug_length);
p->debug_pos = 0;
p->state = GRPC_CHTTP2_GOAWAY_LSI0;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
-grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
+grpc_error *grpc_chttp2_goaway_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
@@ -75,7 +80,7 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
case GRPC_CHTTP2_GOAWAY_LSI0:
if (cur == end) {
p->state = GRPC_CHTTP2_GOAWAY_LSI0;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
p->last_stream_id = ((uint32_t)*cur) << 24;
++cur;
@@ -83,7 +88,7 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
case GRPC_CHTTP2_GOAWAY_LSI1:
if (cur == end) {
p->state = GRPC_CHTTP2_GOAWAY_LSI1;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
p->last_stream_id |= ((uint32_t)*cur) << 16;
++cur;
@@ -91,7 +96,7 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
case GRPC_CHTTP2_GOAWAY_LSI2:
if (cur == end) {
p->state = GRPC_CHTTP2_GOAWAY_LSI2;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
p->last_stream_id |= ((uint32_t)*cur) << 8;
++cur;
@@ -99,7 +104,7 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
case GRPC_CHTTP2_GOAWAY_LSI3:
if (cur == end) {
p->state = GRPC_CHTTP2_GOAWAY_LSI3;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
p->last_stream_id |= ((uint32_t)*cur);
++cur;
@@ -107,7 +112,7 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
case GRPC_CHTTP2_GOAWAY_ERR0:
if (cur == end) {
p->state = GRPC_CHTTP2_GOAWAY_ERR0;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
p->error_code = ((uint32_t)*cur) << 24;
++cur;
@@ -115,7 +120,7 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
case GRPC_CHTTP2_GOAWAY_ERR1:
if (cur == end) {
p->state = GRPC_CHTTP2_GOAWAY_ERR1;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
p->error_code |= ((uint32_t)*cur) << 16;
++cur;
@@ -123,7 +128,7 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
case GRPC_CHTTP2_GOAWAY_ERR2:
if (cur == end) {
p->state = GRPC_CHTTP2_GOAWAY_ERR2;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
p->error_code |= ((uint32_t)*cur) << 8;
++cur;
@@ -131,7 +136,7 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
case GRPC_CHTTP2_GOAWAY_ERR3:
if (cur == end) {
p->state = GRPC_CHTTP2_GOAWAY_ERR3;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
p->error_code |= ((uint32_t)*cur);
++cur;
@@ -151,9 +156,9 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
gpr_slice_new(p->debug_data, p->debug_length, gpr_free);
p->debug_data = NULL;
}
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
- GPR_UNREACHABLE_CODE(return GRPC_CHTTP2_CONNECTION_ERROR);
+ GPR_UNREACHABLE_CODE(return GRPC_ERROR_CREATE("Should never reach here"));
}
void grpc_chttp2_goaway_append(uint32_t last_stream_id, uint32_t error_code,
diff --git a/src/core/ext/transport/chttp2/transport/frame_goaway.h b/src/core/ext/transport/chttp2/transport/frame_goaway.h
index 7c38b26a39..eb4303405a 100644
--- a/src/core/ext/transport/chttp2/transport/frame_goaway.h
+++ b/src/core/ext/transport/chttp2/transport/frame_goaway.h
@@ -63,9 +63,9 @@ typedef struct {
void grpc_chttp2_goaway_parser_init(grpc_chttp2_goaway_parser *p);
void grpc_chttp2_goaway_parser_destroy(grpc_chttp2_goaway_parser *p);
-grpc_chttp2_parse_error grpc_chttp2_goaway_parser_begin_frame(
+grpc_error *grpc_chttp2_goaway_parser_begin_frame(
grpc_chttp2_goaway_parser *parser, uint32_t length, uint8_t flags);
-grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
+grpc_error *grpc_chttp2_goaway_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c
index 7e1815f0fe..1f814ab1bd 100644
--- a/src/core/ext/transport/chttp2/transport/frame_ping.c
+++ b/src/core/ext/transport/chttp2/transport/frame_ping.c
@@ -38,6 +38,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
gpr_slice grpc_chttp2_ping_create(uint8_t ack, uint8_t *opaque_8bytes) {
gpr_slice slice = gpr_slice_malloc(9 + 8);
@@ -57,18 +58,22 @@ gpr_slice grpc_chttp2_ping_create(uint8_t ack, uint8_t *opaque_8bytes) {
return slice;
}
-grpc_chttp2_parse_error grpc_chttp2_ping_parser_begin_frame(
- grpc_chttp2_ping_parser *parser, uint32_t length, uint8_t flags) {
+grpc_error *grpc_chttp2_ping_parser_begin_frame(grpc_chttp2_ping_parser *parser,
+ uint32_t length,
+ uint8_t flags) {
if (flags & 0xfe || length != 8) {
- gpr_log(GPR_ERROR, "invalid ping: length=%d, flags=%02x", length, flags);
- return GRPC_CHTTP2_CONNECTION_ERROR;
+ char *msg;
+ gpr_asprintf(&msg, "invalid ping: length=%d, flags=%02x", length, flags);
+ grpc_error *error = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return error;
}
parser->byte = 0;
parser->is_ack = flags;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
-grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse(
+grpc_error *grpc_chttp2_ping_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
@@ -93,5 +98,5 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse(
}
}
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.h b/src/core/ext/transport/chttp2/transport/frame_ping.h
index 4f7fcc1305..5a8723421c 100644
--- a/src/core/ext/transport/chttp2/transport/frame_ping.h
+++ b/src/core/ext/transport/chttp2/transport/frame_ping.h
@@ -46,9 +46,9 @@ typedef struct {
gpr_slice grpc_chttp2_ping_create(uint8_t ack, uint8_t *opaque_8bytes);
-grpc_chttp2_parse_error grpc_chttp2_ping_parser_begin_frame(
- grpc_chttp2_ping_parser *parser, uint32_t length, uint8_t flags);
-grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse(
+grpc_error *grpc_chttp2_ping_parser_begin_frame(grpc_chttp2_ping_parser *parser,
+ uint32_t length, uint8_t flags);
+grpc_error *grpc_chttp2_ping_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c
index 7f01105e3e..a7aefb9915 100644
--- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c
+++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c
@@ -34,7 +34,9 @@
#include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include "src/core/ext/transport/chttp2/transport/frame.h"
@@ -67,18 +69,21 @@ gpr_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code,
return slice;
}
-grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame(
+grpc_error *grpc_chttp2_rst_stream_parser_begin_frame(
grpc_chttp2_rst_stream_parser *parser, uint32_t length, uint8_t flags) {
if (length != 4) {
- gpr_log(GPR_ERROR, "invalid rst_stream: length=%d, flags=%02x", length,
- flags);
- return GRPC_CHTTP2_CONNECTION_ERROR;
+ char *msg;
+ gpr_asprintf(&msg, "invalid rst_stream: length=%d, flags=%02x", length,
+ flags);
+ grpc_error *err = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return err;
}
parser->byte = 0;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
-grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse(
+grpc_error *grpc_chttp2_rst_stream_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
@@ -97,12 +102,13 @@ grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse(
if (p->byte == 4) {
GPR_ASSERT(is_last);
stream_parsing->received_close = 1;
- stream_parsing->saw_rst_stream = 1;
- stream_parsing->rst_stream_reason = (((uint32_t)p->reason_bytes[0]) << 24) |
- (((uint32_t)p->reason_bytes[1]) << 16) |
- (((uint32_t)p->reason_bytes[2]) << 8) |
- (((uint32_t)p->reason_bytes[3]));
+ stream_parsing->forced_close_error = grpc_error_set_int(
+ GRPC_ERROR_CREATE("RST_STREAM"), GRPC_ERROR_INT_HTTP2_ERROR,
+ (intptr_t)((((uint32_t)p->reason_bytes[0]) << 24) |
+ (((uint32_t)p->reason_bytes[1]) << 16) |
+ (((uint32_t)p->reason_bytes[2]) << 8) |
+ (((uint32_t)p->reason_bytes[3]))));
}
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.h b/src/core/ext/transport/chttp2/transport/frame_rst_stream.h
index 9c1e756a94..11cf94f3ea 100644
--- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.h
+++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.h
@@ -47,9 +47,9 @@ typedef struct {
gpr_slice grpc_chttp2_rst_stream_create(uint32_t stream_id, uint32_t code,
grpc_transport_one_way_stats *stats);
-grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame(
+grpc_error *grpc_chttp2_rst_stream_parser_begin_frame(
grpc_chttp2_rst_stream_parser *parser, uint32_t length, uint8_t flags);
-grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse(
+grpc_error *grpc_chttp2_rst_stream_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.c b/src/core/ext/transport/chttp2/transport/frame_settings.c
index a3c1e15f35..04b96c4cd9 100644
--- a/src/core/ext/transport/chttp2/transport/frame_settings.c
+++ b/src/core/ext/transport/chttp2/transport/frame_settings.c
@@ -36,7 +36,9 @@
#include <string.h>
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
@@ -118,7 +120,7 @@ gpr_slice grpc_chttp2_settings_ack_create(void) {
return output;
}
-grpc_chttp2_parse_error grpc_chttp2_settings_parser_begin_frame(
+grpc_error *grpc_chttp2_settings_parser_begin_frame(
grpc_chttp2_settings_parser *parser, uint32_t length, uint8_t flags,
uint32_t *settings) {
parser->target_settings = settings;
@@ -129,31 +131,29 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_begin_frame(
if (flags == GRPC_CHTTP2_FLAG_ACK) {
parser->is_ack = 1;
if (length != 0) {
- gpr_log(GPR_ERROR, "non-empty settings ack frame received");
- return GRPC_CHTTP2_CONNECTION_ERROR;
+ return GRPC_ERROR_CREATE("non-empty settings ack frame received");
}
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
} else if (flags != 0) {
- gpr_log(GPR_ERROR, "invalid flags on settings frame");
- return GRPC_CHTTP2_CONNECTION_ERROR;
+ return GRPC_ERROR_CREATE("invalid flags on settings frame");
} else if (length % 6 != 0) {
- gpr_log(GPR_ERROR, "settings frames must be a multiple of six bytes");
- return GRPC_CHTTP2_CONNECTION_ERROR;
+ return GRPC_ERROR_CREATE("settings frames must be a multiple of six bytes");
} else {
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
}
-grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
+grpc_error *grpc_chttp2_settings_parser_parse(
grpc_exec_ctx *exec_ctx, void *p,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
grpc_chttp2_settings_parser *parser = p;
const uint8_t *cur = GPR_SLICE_START_PTR(slice);
const uint8_t *end = GPR_SLICE_END_PTR(slice);
+ char *msg;
if (parser->is_ack) {
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
for (;;) {
@@ -168,7 +168,7 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
gpr_slice_buffer_add(&transport_parsing->qbuf,
grpc_chttp2_settings_ack_create());
}
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
parser->id = (uint16_t)(((uint16_t)*cur) << 8);
cur++;
@@ -176,7 +176,7 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
case GRPC_CHTTP2_SPS_ID1:
if (cur == end) {
parser->state = GRPC_CHTTP2_SPS_ID1;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
parser->id = (uint16_t)(parser->id | (*cur));
cur++;
@@ -184,7 +184,7 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
case GRPC_CHTTP2_SPS_VAL0:
if (cur == end) {
parser->state = GRPC_CHTTP2_SPS_VAL0;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
parser->value = ((uint32_t)*cur) << 24;
cur++;
@@ -192,7 +192,7 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
case GRPC_CHTTP2_SPS_VAL1:
if (cur == end) {
parser->state = GRPC_CHTTP2_SPS_VAL1;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
parser->value |= ((uint32_t)*cur) << 16;
cur++;
@@ -200,7 +200,7 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
case GRPC_CHTTP2_SPS_VAL2:
if (cur == end) {
parser->state = GRPC_CHTTP2_SPS_VAL2;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
parser->value |= ((uint32_t)*cur) << 8;
cur++;
@@ -208,7 +208,7 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
case GRPC_CHTTP2_SPS_VAL3:
if (cur == end) {
parser->state = GRPC_CHTTP2_SPS_VAL3;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
} else {
parser->state = GRPC_CHTTP2_SPS_ID0;
}
@@ -229,9 +229,11 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
transport_parsing->last_incoming_stream_id, sp->error_value,
gpr_slice_from_static_string("HTTP2 settings error"),
&transport_parsing->qbuf);
- gpr_log(GPR_ERROR, "invalid value %u passed for %s",
- parser->value, sp->name);
- return GRPC_CHTTP2_CONNECTION_ERROR;
+ gpr_asprintf(&msg, "invalid value %u passed for %s",
+ parser->value, sp->name);
+ grpc_error *err = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return err;
}
}
if (parser->id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE &&
@@ -249,7 +251,7 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
transport_parsing->is_client ? "CLI" : "SVR", parser->id,
parser->value);
}
- } else {
+ } else if (grpc_http_trace) {
gpr_log(GPR_ERROR, "CHTTP2: Ignoring unknown setting %d (value %d)",
parser->id, parser->value);
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.h b/src/core/ext/transport/chttp2/transport/frame_settings.h
index d9e30f1ed0..f654c598c8 100644
--- a/src/core/ext/transport/chttp2/transport/frame_settings.h
+++ b/src/core/ext/transport/chttp2/transport/frame_settings.h
@@ -92,10 +92,10 @@ gpr_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new,
/* Create an ack settings frame */
gpr_slice grpc_chttp2_settings_ack_create(void);
-grpc_chttp2_parse_error grpc_chttp2_settings_parser_begin_frame(
+grpc_error *grpc_chttp2_settings_parser_begin_frame(
grpc_chttp2_settings_parser *parser, uint32_t length, uint8_t flags,
uint32_t *settings);
-grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
+grpc_error *grpc_chttp2_settings_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c
index 90243418bd..3cf848fd5c 100644
--- a/src/core/ext/transport/chttp2/transport/frame_window_update.c
+++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c
@@ -34,7 +34,9 @@
#include "src/core/ext/transport/chttp2/transport/frame_window_update.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
gpr_slice grpc_chttp2_window_update_create(
uint32_t id, uint32_t window_update, grpc_transport_one_way_stats *stats) {
@@ -62,19 +64,22 @@ gpr_slice grpc_chttp2_window_update_create(
return slice;
}
-grpc_chttp2_parse_error grpc_chttp2_window_update_parser_begin_frame(
+grpc_error *grpc_chttp2_window_update_parser_begin_frame(
grpc_chttp2_window_update_parser *parser, uint32_t length, uint8_t flags) {
if (flags || length != 4) {
- gpr_log(GPR_ERROR, "invalid window update: length=%d, flags=%02x", length,
- flags);
- return GRPC_CHTTP2_CONNECTION_ERROR;
+ char *msg;
+ gpr_asprintf(&msg, "invalid window update: length=%d, flags=%02x", length,
+ flags);
+ grpc_error *err = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return err;
}
parser->byte = 0;
parser->amount = 0;
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
-grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse(
+grpc_error *grpc_chttp2_window_update_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
@@ -96,8 +101,11 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse(
if (p->byte == 4) {
uint32_t received_update = p->amount;
if (received_update == 0 || (received_update & 0x80000000u)) {
- gpr_log(GPR_ERROR, "invalid window update bytes: %d", p->amount);
- return GRPC_CHTTP2_CONNECTION_ERROR;
+ char *msg;
+ gpr_asprintf(&msg, "invalid window update bytes: %d", p->amount);
+ grpc_error *err = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return err;
}
GPR_ASSERT(is_last);
@@ -115,5 +123,5 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse(
}
}
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.h b/src/core/ext/transport/chttp2/transport/frame_window_update.h
index d6e87b9329..1bcbbf9247 100644
--- a/src/core/ext/transport/chttp2/transport/frame_window_update.h
+++ b/src/core/ext/transport/chttp2/transport/frame_window_update.h
@@ -48,9 +48,9 @@ typedef struct {
gpr_slice grpc_chttp2_window_update_create(uint32_t id, uint32_t window_delta,
grpc_transport_one_way_stats *stats);
-grpc_chttp2_parse_error grpc_chttp2_window_update_parser_begin_frame(
+grpc_error *grpc_chttp2_window_update_parser_begin_frame(
grpc_chttp2_window_update_parser *parser, uint32_t length, uint8_t flags);
-grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse(
+grpc_error *grpc_chttp2_window_update_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c
index ed45bc9cb3..522455f7dc 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c
@@ -46,6 +46,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
@@ -77,63 +78,70 @@ typedef enum {
a set of indirect jumps, and so not waste stack space. */
/* forward declarations for parsing states */
-static int parse_begin(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end);
-static int parse_error(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end);
-static int parse_illegal_op(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end);
-
-static int parse_string_prefix(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
+static grpc_error *parse_begin(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
const uint8_t *end);
-static int parse_key_string(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end);
-static int parse_value_string_with_indexed_key(grpc_chttp2_hpack_parser *p,
- const uint8_t *cur,
- const uint8_t *end);
-static int parse_value_string_with_literal_key(grpc_chttp2_hpack_parser *p,
- const uint8_t *cur,
- const uint8_t *end);
-
-static int parse_value0(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end);
-static int parse_value1(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end);
-static int parse_value2(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end);
-static int parse_value3(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end);
-static int parse_value4(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end);
-static int parse_value5up(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end);
-
-static int parse_indexed_field(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end);
-static int parse_indexed_field_x(grpc_chttp2_hpack_parser *p,
- const uint8_t *cur, const uint8_t *end);
-static int parse_lithdr_incidx(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end);
-static int parse_lithdr_incidx_x(grpc_chttp2_hpack_parser *p,
- const uint8_t *cur, const uint8_t *end);
-static int parse_lithdr_incidx_v(grpc_chttp2_hpack_parser *p,
- const uint8_t *cur, const uint8_t *end);
-static int parse_lithdr_notidx(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end);
-static int parse_lithdr_notidx_x(grpc_chttp2_hpack_parser *p,
- const uint8_t *cur, const uint8_t *end);
-static int parse_lithdr_notidx_v(grpc_chttp2_hpack_parser *p,
- const uint8_t *cur, const uint8_t *end);
-static int parse_lithdr_nvridx(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end);
-static int parse_lithdr_nvridx_x(grpc_chttp2_hpack_parser *p,
- const uint8_t *cur, const uint8_t *end);
-static int parse_lithdr_nvridx_v(grpc_chttp2_hpack_parser *p,
- const uint8_t *cur, const uint8_t *end);
-static int parse_max_tbl_size(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end);
-static int parse_max_tbl_size_x(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
+static grpc_error *parse_error(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
+ const uint8_t *end, grpc_error *error);
+static grpc_error *still_parse_error(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end);
+static grpc_error *parse_illegal_op(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end);
+
+static grpc_error *parse_string_prefix(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end);
+static grpc_error *parse_key_string(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end);
+static grpc_error *parse_value_string_with_indexed_key(
+ grpc_chttp2_hpack_parser *p, const uint8_t *cur, const uint8_t *end);
+static grpc_error *parse_value_string_with_literal_key(
+ grpc_chttp2_hpack_parser *p, const uint8_t *cur, const uint8_t *end);
+
+static grpc_error *parse_value0(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
+ const uint8_t *end);
+static grpc_error *parse_value1(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
+ const uint8_t *end);
+static grpc_error *parse_value2(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
+ const uint8_t *end);
+static grpc_error *parse_value3(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
+ const uint8_t *end);
+static grpc_error *parse_value4(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
const uint8_t *end);
+static grpc_error *parse_value5up(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end);
+
+static grpc_error *parse_indexed_field(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end);
+static grpc_error *parse_indexed_field_x(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end);
+static grpc_error *parse_lithdr_incidx(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end);
+static grpc_error *parse_lithdr_incidx_x(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end);
+static grpc_error *parse_lithdr_incidx_v(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end);
+static grpc_error *parse_lithdr_notidx(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end);
+static grpc_error *parse_lithdr_notidx_x(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end);
+static grpc_error *parse_lithdr_notidx_v(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end);
+static grpc_error *parse_lithdr_nvridx(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end);
+static grpc_error *parse_lithdr_nvridx_x(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end);
+static grpc_error *parse_lithdr_nvridx_v(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end);
+static grpc_error *parse_max_tbl_size(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end);
+static grpc_error *parse_max_tbl_size_x(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end);
/* we translate the first byte of a hpack field into one of these decoding
cases, then use a lookup table to jump directly to the appropriate parser.
@@ -631,19 +639,18 @@ static const uint8_t inverse_base64[256] = {
};
/* emission helpers */
-static int on_hdr(grpc_chttp2_hpack_parser *p, grpc_mdelem *md,
- int add_to_table) {
+static grpc_error *on_hdr(grpc_chttp2_hpack_parser *p, grpc_mdelem *md,
+ int add_to_table) {
if (add_to_table) {
- if (!grpc_chttp2_hptbl_add(&p->table, md)) {
- return 0;
- }
+ grpc_error *err = grpc_chttp2_hptbl_add(&p->table, md);
+ if (err != GRPC_ERROR_NONE) return err;
}
if (p->on_header == NULL) {
GRPC_MDELEM_UNREF(md);
- return 0;
+ return GRPC_ERROR_CREATE("on_header callback not set");
}
p->on_header(p->on_header_user_data, md);
- return 1;
+ return GRPC_ERROR_NONE;
}
static grpc_mdstr *take_string(grpc_chttp2_hpack_parser *p,
@@ -654,70 +661,70 @@ static grpc_mdstr *take_string(grpc_chttp2_hpack_parser *p,
}
/* jump to the next state */
-static int parse_next(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_next(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
+ const uint8_t *end) {
p->state = *p->next_state++;
return p->state(p, cur, end);
}
/* begin parsing a header: all functionality is encoded into lookup tables
above */
-static int parse_begin(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_begin(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
+ const uint8_t *end) {
if (cur == end) {
p->state = parse_begin;
- return 1;
+ return GRPC_ERROR_NONE;
}
return first_byte_action[first_byte_lut[*cur]](p, cur, end);
}
/* stream dependency and prioritization data: we just skip it */
-static int parse_stream_weight(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_stream_weight(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end) {
if (cur == end) {
p->state = parse_stream_weight;
- return 1;
+ return GRPC_ERROR_NONE;
}
return p->after_prioritization(p, cur + 1, end);
}
-static int parse_stream_dep3(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_stream_dep3(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end) {
if (cur == end) {
p->state = parse_stream_dep3;
- return 1;
+ return GRPC_ERROR_NONE;
}
return parse_stream_weight(p, cur + 1, end);
}
-static int parse_stream_dep2(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_stream_dep2(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end) {
if (cur == end) {
p->state = parse_stream_dep2;
- return 1;
+ return GRPC_ERROR_NONE;
}
return parse_stream_dep3(p, cur + 1, end);
}
-static int parse_stream_dep1(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_stream_dep1(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end) {
if (cur == end) {
p->state = parse_stream_dep1;
- return 1;
+ return GRPC_ERROR_NONE;
}
return parse_stream_dep2(p, cur + 1, end);
}
-static int parse_stream_dep0(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_stream_dep0(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end) {
if (cur == end) {
p->state = parse_stream_dep0;
- return 1;
+ return GRPC_ERROR_NONE;
}
return parse_stream_dep1(p, cur + 1, end);
@@ -725,30 +732,34 @@ static int parse_stream_dep0(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
/* emit an indexed field; for now just logs it to console; jumps to
begin the next field on completion */
-static int finish_indexed_field(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *finish_indexed_field(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end) {
grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
if (md == NULL) {
- if (grpc_http_trace) {
- gpr_log(GPR_ERROR, "Invalid HPACK index received: %d", p->index);
- }
- return 0;
+ return grpc_error_set_int(
+ grpc_error_set_int(GRPC_ERROR_CREATE("Invalid HPACK index received"),
+ GRPC_ERROR_INT_INDEX, (intptr_t)p->index),
+ GRPC_ERROR_INT_SIZE, (intptr_t)p->table.num_ents);
}
GRPC_MDELEM_REF(md);
- return on_hdr(p, md, 0) && parse_begin(p, cur, end);
+ grpc_error *err = on_hdr(p, md, 0);
+ if (err != GRPC_ERROR_NONE) return err;
+ return parse_begin(p, cur, end);
}
/* parse an indexed field with index < 127 */
-static int parse_indexed_field(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_indexed_field(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end) {
p->dynamic_table_update_allowed = 0;
p->index = (*cur) & 0x7f;
return finish_indexed_field(p, cur + 1, end);
}
/* parse an indexed field with index >= 127 */
-static int parse_indexed_field_x(grpc_chttp2_hpack_parser *p,
- const uint8_t *cur, const uint8_t *end) {
+static grpc_error *parse_indexed_field_x(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end) {
static const grpc_chttp2_hpack_parser_state and_then[] = {
finish_indexed_field};
p->dynamic_table_update_allowed = 0;
@@ -760,28 +771,34 @@ static int parse_indexed_field_x(grpc_chttp2_hpack_parser *p,
/* finish a literal header with incremental indexing: just log, and jump to '
begin */
-static int finish_lithdr_incidx(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *finish_lithdr_incidx(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end) {
grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
GPR_ASSERT(md != NULL); /* handled in string parsing */
- return on_hdr(p, grpc_mdelem_from_metadata_strings(GRPC_MDSTR_REF(md->key),
- take_string(p, &p->value)),
- 1) &&
- parse_begin(p, cur, end);
+ grpc_error *err =
+ on_hdr(p, grpc_mdelem_from_metadata_strings(GRPC_MDSTR_REF(md->key),
+ take_string(p, &p->value)),
+ 1);
+ if (err != GRPC_ERROR_NONE) return parse_error(p, cur, end, err);
+ return parse_begin(p, cur, end);
}
/* finish a literal header with incremental indexing with no index */
-static int finish_lithdr_incidx_v(grpc_chttp2_hpack_parser *p,
- const uint8_t *cur, const uint8_t *end) {
- return on_hdr(p, grpc_mdelem_from_metadata_strings(take_string(p, &p->key),
- take_string(p, &p->value)),
- 1) &&
- parse_begin(p, cur, end);
+static grpc_error *finish_lithdr_incidx_v(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end) {
+ grpc_error *err =
+ on_hdr(p, grpc_mdelem_from_metadata_strings(take_string(p, &p->key),
+ take_string(p, &p->value)),
+ 1);
+ if (err != GRPC_ERROR_NONE) return parse_error(p, cur, end, err);
+ return parse_begin(p, cur, end);
}
/* parse a literal header with incremental indexing; index < 63 */
-static int parse_lithdr_incidx(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_lithdr_incidx(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end) {
static const grpc_chttp2_hpack_parser_state and_then[] = {
parse_value_string_with_indexed_key, finish_lithdr_incidx};
p->dynamic_table_update_allowed = 0;
@@ -791,8 +808,9 @@ static int parse_lithdr_incidx(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
}
/* parse a literal header with incremental indexing; index >= 63 */
-static int parse_lithdr_incidx_x(grpc_chttp2_hpack_parser *p,
- const uint8_t *cur, const uint8_t *end) {
+static grpc_error *parse_lithdr_incidx_x(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end) {
static const grpc_chttp2_hpack_parser_state and_then[] = {
parse_string_prefix, parse_value_string_with_indexed_key,
finish_lithdr_incidx};
@@ -804,8 +822,9 @@ static int parse_lithdr_incidx_x(grpc_chttp2_hpack_parser *p,
}
/* parse a literal header with incremental indexing; index = 0 */
-static int parse_lithdr_incidx_v(grpc_chttp2_hpack_parser *p,
- const uint8_t *cur, const uint8_t *end) {
+static grpc_error *parse_lithdr_incidx_v(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end) {
static const grpc_chttp2_hpack_parser_state and_then[] = {
parse_key_string, parse_string_prefix,
parse_value_string_with_literal_key, finish_lithdr_incidx_v};
@@ -815,28 +834,34 @@ static int parse_lithdr_incidx_v(grpc_chttp2_hpack_parser *p,
}
/* finish a literal header without incremental indexing */
-static int finish_lithdr_notidx(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *finish_lithdr_notidx(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end) {
grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
GPR_ASSERT(md != NULL); /* handled in string parsing */
- return on_hdr(p, grpc_mdelem_from_metadata_strings(GRPC_MDSTR_REF(md->key),
- take_string(p, &p->value)),
- 0) &&
- parse_begin(p, cur, end);
+ grpc_error *err =
+ on_hdr(p, grpc_mdelem_from_metadata_strings(GRPC_MDSTR_REF(md->key),
+ take_string(p, &p->value)),
+ 0);
+ if (err != GRPC_ERROR_NONE) return parse_error(p, cur, end, err);
+ return parse_begin(p, cur, end);
}
/* finish a literal header without incremental indexing with index = 0 */
-static int finish_lithdr_notidx_v(grpc_chttp2_hpack_parser *p,
- const uint8_t *cur, const uint8_t *end) {
- return on_hdr(p, grpc_mdelem_from_metadata_strings(take_string(p, &p->key),
- take_string(p, &p->value)),
- 0) &&
- parse_begin(p, cur, end);
+static grpc_error *finish_lithdr_notidx_v(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end) {
+ grpc_error *err =
+ on_hdr(p, grpc_mdelem_from_metadata_strings(take_string(p, &p->key),
+ take_string(p, &p->value)),
+ 0);
+ if (err != GRPC_ERROR_NONE) return parse_error(p, cur, end, err);
+ return parse_begin(p, cur, end);
}
/* parse a literal header without incremental indexing; index < 15 */
-static int parse_lithdr_notidx(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_lithdr_notidx(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end) {
static const grpc_chttp2_hpack_parser_state and_then[] = {
parse_value_string_with_indexed_key, finish_lithdr_notidx};
p->dynamic_table_update_allowed = 0;
@@ -846,8 +871,9 @@ static int parse_lithdr_notidx(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
}
/* parse a literal header without incremental indexing; index >= 15 */
-static int parse_lithdr_notidx_x(grpc_chttp2_hpack_parser *p,
- const uint8_t *cur, const uint8_t *end) {
+static grpc_error *parse_lithdr_notidx_x(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end) {
static const grpc_chttp2_hpack_parser_state and_then[] = {
parse_string_prefix, parse_value_string_with_indexed_key,
finish_lithdr_notidx};
@@ -859,8 +885,9 @@ static int parse_lithdr_notidx_x(grpc_chttp2_hpack_parser *p,
}
/* parse a literal header without incremental indexing; index == 0 */
-static int parse_lithdr_notidx_v(grpc_chttp2_hpack_parser *p,
- const uint8_t *cur, const uint8_t *end) {
+static grpc_error *parse_lithdr_notidx_v(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end) {
static const grpc_chttp2_hpack_parser_state and_then[] = {
parse_key_string, parse_string_prefix,
parse_value_string_with_literal_key, finish_lithdr_notidx_v};
@@ -870,28 +897,34 @@ static int parse_lithdr_notidx_v(grpc_chttp2_hpack_parser *p,
}
/* finish a literal header that is never indexed */
-static int finish_lithdr_nvridx(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *finish_lithdr_nvridx(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end) {
grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
GPR_ASSERT(md != NULL); /* handled in string parsing */
- return on_hdr(p, grpc_mdelem_from_metadata_strings(GRPC_MDSTR_REF(md->key),
- take_string(p, &p->value)),
- 0) &&
- parse_begin(p, cur, end);
+ grpc_error *err =
+ on_hdr(p, grpc_mdelem_from_metadata_strings(GRPC_MDSTR_REF(md->key),
+ take_string(p, &p->value)),
+ 0);
+ if (err != GRPC_ERROR_NONE) return parse_error(p, cur, end, err);
+ return parse_begin(p, cur, end);
}
/* finish a literal header that is never indexed with an extra value */
-static int finish_lithdr_nvridx_v(grpc_chttp2_hpack_parser *p,
- const uint8_t *cur, const uint8_t *end) {
- return on_hdr(p, grpc_mdelem_from_metadata_strings(take_string(p, &p->key),
- take_string(p, &p->value)),
- 0) &&
- parse_begin(p, cur, end);
+static grpc_error *finish_lithdr_nvridx_v(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end) {
+ grpc_error *err =
+ on_hdr(p, grpc_mdelem_from_metadata_strings(take_string(p, &p->key),
+ take_string(p, &p->value)),
+ 0);
+ if (err != GRPC_ERROR_NONE) return parse_error(p, cur, end, err);
+ return parse_begin(p, cur, end);
}
/* parse a literal header that is never indexed; index < 15 */
-static int parse_lithdr_nvridx(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_lithdr_nvridx(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end) {
static const grpc_chttp2_hpack_parser_state and_then[] = {
parse_value_string_with_indexed_key, finish_lithdr_nvridx};
p->dynamic_table_update_allowed = 0;
@@ -901,8 +934,9 @@ static int parse_lithdr_nvridx(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
}
/* parse a literal header that is never indexed; index >= 15 */
-static int parse_lithdr_nvridx_x(grpc_chttp2_hpack_parser *p,
- const uint8_t *cur, const uint8_t *end) {
+static grpc_error *parse_lithdr_nvridx_x(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end) {
static const grpc_chttp2_hpack_parser_state and_then[] = {
parse_string_prefix, parse_value_string_with_indexed_key,
finish_lithdr_nvridx};
@@ -914,8 +948,9 @@ static int parse_lithdr_nvridx_x(grpc_chttp2_hpack_parser *p,
}
/* parse a literal header that is never indexed; index == 0 */
-static int parse_lithdr_nvridx_v(grpc_chttp2_hpack_parser *p,
- const uint8_t *cur, const uint8_t *end) {
+static grpc_error *parse_lithdr_nvridx_v(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end) {
static const grpc_chttp2_hpack_parser_state and_then[] = {
parse_key_string, parse_string_prefix,
parse_value_string_with_literal_key, finish_lithdr_nvridx_v};
@@ -925,20 +960,25 @@ static int parse_lithdr_nvridx_v(grpc_chttp2_hpack_parser *p,
}
/* finish parsing a max table size change */
-static int finish_max_tbl_size(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *finish_max_tbl_size(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end) {
if (grpc_http_trace) {
gpr_log(GPR_INFO, "MAX TABLE SIZE: %d", p->index);
}
- return grpc_chttp2_hptbl_set_current_table_size(&p->table, p->index) &&
- parse_begin(p, cur, end);
+ grpc_error *err =
+ grpc_chttp2_hptbl_set_current_table_size(&p->table, p->index);
+ if (err != GRPC_ERROR_NONE) return parse_error(p, cur, end, err);
+ return parse_begin(p, cur, end);
}
/* parse a max table size change, max size < 15 */
-static int parse_max_tbl_size(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_max_tbl_size(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end) {
if (p->dynamic_table_update_allowed == 0) {
- return 0;
+ return parse_error(
+ p, cur, end,
+ GRPC_ERROR_CREATE(
+ "More than two max table size changes in a single frame"));
}
p->dynamic_table_update_allowed--;
p->index = (*cur) & 0x1f;
@@ -946,12 +986,16 @@ static int parse_max_tbl_size(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
}
/* parse a max table size change, max size >= 15 */
-static int parse_max_tbl_size_x(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_max_tbl_size_x(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur,
+ const uint8_t *end) {
static const grpc_chttp2_hpack_parser_state and_then[] = {
finish_max_tbl_size};
if (p->dynamic_table_update_allowed == 0) {
- return 0;
+ return parse_error(
+ p, cur, end,
+ GRPC_ERROR_CREATE(
+ "More than two max table size changes in a single frame"));
}
p->dynamic_table_update_allowed--;
p->next_state = and_then;
@@ -961,28 +1005,38 @@ static int parse_max_tbl_size_x(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
}
/* a parse error: jam the parse state into parse_error, and return error */
-static int parse_error(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
- p->state = parse_error;
- return 0;
+static grpc_error *parse_error(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
+ const uint8_t *end, grpc_error *err) {
+ GPR_ASSERT(err != GRPC_ERROR_NONE);
+ if (p->last_error == GRPC_ERROR_NONE) {
+ p->last_error = GRPC_ERROR_REF(err);
+ }
+ p->state = still_parse_error;
+ return err;
+}
+
+static grpc_error *still_parse_error(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end) {
+ return GRPC_ERROR_REF(p->last_error);
}
-static int parse_illegal_op(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_illegal_op(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end) {
GPR_ASSERT(cur != end);
- if (grpc_http_trace) {
- gpr_log(GPR_DEBUG, "Illegal hpack op code %d", *cur);
- }
- return parse_error(p, cur, end);
+ char *msg;
+ gpr_asprintf(&msg, "Illegal hpack op code %d", *cur);
+ grpc_error *err = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return parse_error(p, cur, end, err);
}
/* parse the 1st byte of a varint into p->parsing.value
no overflow is possible */
-static int parse_value0(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_value0(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
+ const uint8_t *end) {
if (cur == end) {
p->state = parse_value0;
- return 1;
+ return GRPC_ERROR_NONE;
}
*p->parsing.value += (*cur) & 0x7f;
@@ -996,11 +1050,11 @@ static int parse_value0(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
/* parse the 2nd byte of a varint into p->parsing.value
no overflow is possible */
-static int parse_value1(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_value1(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
+ const uint8_t *end) {
if (cur == end) {
p->state = parse_value1;
- return 1;
+ return GRPC_ERROR_NONE;
}
*p->parsing.value += (((uint32_t)*cur) & 0x7f) << 7;
@@ -1014,11 +1068,11 @@ static int parse_value1(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
/* parse the 3rd byte of a varint into p->parsing.value
no overflow is possible */
-static int parse_value2(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_value2(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
+ const uint8_t *end) {
if (cur == end) {
p->state = parse_value2;
- return 1;
+ return GRPC_ERROR_NONE;
}
*p->parsing.value += (((uint32_t)*cur) & 0x7f) << 14;
@@ -1032,11 +1086,11 @@ static int parse_value2(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
/* parse the 4th byte of a varint into p->parsing.value
no overflow is possible */
-static int parse_value3(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_value3(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
+ const uint8_t *end) {
if (cur == end) {
p->state = parse_value3;
- return 1;
+ return GRPC_ERROR_NONE;
}
*p->parsing.value += (((uint32_t)*cur) & 0x7f) << 21;
@@ -1050,15 +1104,16 @@ static int parse_value3(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
/* parse the 5th byte of a varint into p->parsing.value
depending on the byte, we may overflow, and care must be taken */
-static int parse_value4(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_value4(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
+ const uint8_t *end) {
uint8_t c;
uint32_t cur_value;
uint32_t add_value;
+ char *msg;
if (cur == end) {
p->state = parse_value4;
- return 1;
+ return GRPC_ERROR_NONE;
}
c = (*cur) & 0x7f;
@@ -1081,48 +1136,49 @@ static int parse_value4(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
}
error:
- if (grpc_http_trace) {
- gpr_log(GPR_ERROR,
- "integer overflow in hpack integer decoding: have 0x%08x, "
- "got byte 0x%02x on byte 5",
- *p->parsing.value, *cur);
- }
- return parse_error(p, cur, end);
+ gpr_asprintf(&msg,
+ "integer overflow in hpack integer decoding: have 0x%08x, "
+ "got byte 0x%02x on byte 5",
+ *p->parsing.value, *cur);
+ grpc_error *err = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return parse_error(p, cur, end, err);
}
/* parse any trailing bytes in a varint: it's possible to append an arbitrary
number of 0x80's and not affect the value - a zero will terminate - and
anything else will overflow */
-static int parse_value5up(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_value5up(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end) {
while (cur != end && *cur == 0x80) {
++cur;
}
if (cur == end) {
p->state = parse_value5up;
- return 1;
+ return GRPC_ERROR_NONE;
}
if (*cur == 0) {
return parse_next(p, cur + 1, end);
}
- if (grpc_http_trace) {
- gpr_log(GPR_ERROR,
- "integer overflow in hpack integer decoding: have 0x%08x, "
- "got byte 0x%02x sometime after byte 5",
- *p->parsing.value, *cur);
- }
- return parse_error(p, cur, end);
+ char *msg;
+ gpr_asprintf(&msg,
+ "integer overflow in hpack integer decoding: have 0x%08x, "
+ "got byte 0x%02x sometime after byte 5",
+ *p->parsing.value, *cur);
+ grpc_error *err = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return parse_error(p, cur, end, err);
}
/* parse a string prefix */
-static int parse_string_prefix(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_string_prefix(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end) {
if (cur == end) {
p->state = parse_string_prefix;
- return 1;
+ return GRPC_ERROR_NONE;
}
p->strlen = (*cur) & 0x7f;
@@ -1149,25 +1205,26 @@ static void append_bytes(grpc_chttp2_hpack_parser_string *str,
str->length += (uint32_t)length;
}
-static int append_string(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *append_string(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end) {
grpc_chttp2_hpack_parser_string *str = p->parsing.str;
uint32_t bits;
uint8_t decoded[3];
switch ((binary_state)p->binary) {
case NOT_BINARY:
append_bytes(str, cur, (size_t)(end - cur));
- return 1;
+ return GRPC_ERROR_NONE;
b64_byte0:
case B64_BYTE0:
if (cur == end) {
p->binary = B64_BYTE0;
- return 1;
+ return GRPC_ERROR_NONE;
}
bits = inverse_base64[*cur];
++cur;
if (bits == 255)
- return 0;
+ return parse_error(p, cur, end,
+ GRPC_ERROR_CREATE("Illegal base64 character"));
else if (bits == 64)
goto b64_byte0;
p->base64_buffer = bits << 18;
@@ -1176,12 +1233,13 @@ static int append_string(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
case B64_BYTE1:
if (cur == end) {
p->binary = B64_BYTE1;
- return 1;
+ return GRPC_ERROR_NONE;
}
bits = inverse_base64[*cur];
++cur;
if (bits == 255)
- return 0;
+ return parse_error(p, cur, end,
+ GRPC_ERROR_CREATE("Illegal base64 character"));
else if (bits == 64)
goto b64_byte1;
p->base64_buffer |= bits << 12;
@@ -1190,12 +1248,13 @@ static int append_string(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
case B64_BYTE2:
if (cur == end) {
p->binary = B64_BYTE2;
- return 1;
+ return GRPC_ERROR_NONE;
}
bits = inverse_base64[*cur];
++cur;
if (bits == 255)
- return 0;
+ return parse_error(p, cur, end,
+ GRPC_ERROR_CREATE("Illegal base64 character"));
else if (bits == 64)
goto b64_byte2;
p->base64_buffer |= bits << 6;
@@ -1204,12 +1263,13 @@ static int append_string(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
case B64_BYTE3:
if (cur == end) {
p->binary = B64_BYTE3;
- return 1;
+ return GRPC_ERROR_NONE;
}
bits = inverse_base64[*cur];
++cur;
if (bits == 255)
- return 0;
+ return parse_error(p, cur, end,
+ GRPC_ERROR_CREATE("Illegal base64 character"));
else if (bits == 64)
goto b64_byte3;
p->base64_buffer |= bits;
@@ -1220,11 +1280,13 @@ static int append_string(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
append_bytes(str, decoded, 3);
goto b64_byte0;
}
- GPR_UNREACHABLE_CODE(return 1);
+ GPR_UNREACHABLE_CODE(return parse_error(
+ p, cur, end, GRPC_ERROR_CREATE("Should never reach here")));
}
/* append a null terminator to a string */
-static int finish_str(grpc_chttp2_hpack_parser *p) {
+static grpc_error *finish_str(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
+ const uint8_t *end) {
uint8_t terminator = 0;
uint8_t decoded[2];
uint32_t bits;
@@ -1235,14 +1297,18 @@ static int finish_str(grpc_chttp2_hpack_parser *p) {
case B64_BYTE0:
break;
case B64_BYTE1:
- gpr_log(GPR_ERROR, "illegal base64 encoding");
- return 0; /* illegal encoding */
+ return parse_error(
+ p, cur, end,
+ GRPC_ERROR_CREATE("illegal base64 encoding")); /* illegal encoding */
case B64_BYTE2:
bits = p->base64_buffer;
if (bits & 0xffff) {
- gpr_log(GPR_ERROR, "trailing bits in base64 encoding: 0x%04x",
- bits & 0xffff);
- return 0;
+ char *msg;
+ gpr_asprintf(&msg, "trailing bits in base64 encoding: 0x%04x",
+ bits & 0xffff);
+ grpc_error *err = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return parse_error(p, cur, end, err);
}
decoded[0] = (uint8_t)(bits >> 16);
append_bytes(str, decoded, 1);
@@ -1250,9 +1316,12 @@ static int finish_str(grpc_chttp2_hpack_parser *p) {
case B64_BYTE3:
bits = p->base64_buffer;
if (bits & 0xff) {
- gpr_log(GPR_ERROR, "trailing bits in base64 encoding: 0x%02x",
- bits & 0xff);
- return 0;
+ char *msg;
+ gpr_asprintf(&msg, "trailing bits in base64 encoding: 0x%02x",
+ bits & 0xff);
+ grpc_error *err = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return parse_error(p, cur, end, err);
}
decoded[0] = (uint8_t)(bits >> 16);
decoded[1] = (uint8_t)(bits >> 8);
@@ -1261,38 +1330,42 @@ static int finish_str(grpc_chttp2_hpack_parser *p) {
}
append_bytes(str, &terminator, 1);
p->parsing.str->length--; /* don't actually count the null terminator */
- return 1;
+ return GRPC_ERROR_NONE;
}
/* decode a nibble from a huffman encoded stream */
-static int huff_nibble(grpc_chttp2_hpack_parser *p, uint8_t nibble) {
+static grpc_error *huff_nibble(grpc_chttp2_hpack_parser *p, uint8_t nibble) {
int16_t emit = emit_sub_tbl[16 * emit_tbl[p->huff_state] + nibble];
int16_t next = next_sub_tbl[16 * next_tbl[p->huff_state] + nibble];
if (emit != -1) {
if (emit >= 0 && emit < 256) {
uint8_t c = (uint8_t)emit;
- if (!append_string(p, &c, (&c) + 1)) return 0;
+ grpc_error *err = append_string(p, &c, (&c) + 1);
+ if (err != GRPC_ERROR_NONE) return err;
} else {
assert(emit == 256);
}
}
p->huff_state = next;
- return 1;
+ return GRPC_ERROR_NONE;
}
/* decode full bytes from a huffman encoded stream */
-static int add_huff_bytes(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *add_huff_bytes(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end) {
for (; cur != end; ++cur) {
- if (!huff_nibble(p, *cur >> 4) || !huff_nibble(p, *cur & 0xf)) return 0;
+ grpc_error *err = huff_nibble(p, *cur >> 4);
+ if (err != GRPC_ERROR_NONE) return parse_error(p, cur, end, err);
+ err = huff_nibble(p, *cur & 0xf);
+ if (err != GRPC_ERROR_NONE) return parse_error(p, cur, end, err);
}
- return 1;
+ return GRPC_ERROR_NONE;
}
/* decode some string bytes based on the current decoding mode
(huffman or not) */
-static int add_str_bytes(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *add_str_bytes(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end) {
if (p->huff) {
return add_huff_bytes(p, cur, end);
} else {
@@ -1301,26 +1374,31 @@ static int add_str_bytes(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
}
/* parse a string - tries to do large chunks at a time */
-static int parse_string(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_string(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
+ const uint8_t *end) {
size_t remaining = p->strlen - p->strgot;
size_t given = (size_t)(end - cur);
if (remaining <= given) {
- return add_str_bytes(p, cur, cur + remaining) && finish_str(p) &&
- parse_next(p, cur + remaining, end);
+ grpc_error *err = add_str_bytes(p, cur, cur + remaining);
+ if (err != GRPC_ERROR_NONE) return parse_error(p, cur, end, err);
+ err = finish_str(p, cur + remaining, end);
+ if (err != GRPC_ERROR_NONE) return parse_error(p, cur, end, err);
+ return parse_next(p, cur + remaining, end);
} else {
- if (!add_str_bytes(p, cur, cur + given)) return 0;
+ grpc_error *err = add_str_bytes(p, cur, cur + given);
+ if (err != GRPC_ERROR_NONE) return parse_error(p, cur, end, err);
GPR_ASSERT(given <= UINT32_MAX - p->strgot);
p->strgot += (uint32_t)given;
p->state = parse_string;
- return 1;
+ return GRPC_ERROR_NONE;
}
}
/* begin parsing a string - performs setup, calls parse_string */
-static int begin_parse_string(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end, uint8_t binary,
- grpc_chttp2_hpack_parser_string *str) {
+static grpc_error *begin_parse_string(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end,
+ uint8_t binary,
+ grpc_chttp2_hpack_parser_string *str) {
p->strgot = 0;
str->length = 0;
p->parsing.str = str;
@@ -1330,58 +1408,50 @@ static int begin_parse_string(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
}
/* parse the key string */
-static int parse_key_string(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_key_string(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end) {
return begin_parse_string(p, cur, end, NOT_BINARY, &p->key);
}
/* check if a key represents a binary header or not */
-typedef enum { BINARY_HEADER, PLAINTEXT_HEADER, ERROR_HEADER } is_binary_header;
-static is_binary_header is_binary_literal_header(grpc_chttp2_hpack_parser *p) {
- return grpc_is_binary_header(p->key.str, p->key.length) ? BINARY_HEADER
- : PLAINTEXT_HEADER;
+static bool is_binary_literal_header(grpc_chttp2_hpack_parser *p) {
+ return grpc_is_binary_header(p->key.str, p->key.length);
}
-static is_binary_header is_binary_indexed_header(grpc_chttp2_hpack_parser *p) {
+static grpc_error *is_binary_indexed_header(grpc_chttp2_hpack_parser *p,
+ bool *is) {
grpc_mdelem *elem = grpc_chttp2_hptbl_lookup(&p->table, p->index);
if (!elem) {
- if (grpc_http_trace) {
- gpr_log(GPR_ERROR, "Invalid HPACK index received: %d", p->index);
- }
- return ERROR_HEADER;
+ return grpc_error_set_int(
+ grpc_error_set_int(GRPC_ERROR_CREATE("Invalid HPACK index received"),
+ GRPC_ERROR_INT_INDEX, (intptr_t)p->index),
+ GRPC_ERROR_INT_SIZE, (intptr_t)p->table.num_ents);
}
- return grpc_is_binary_header(
- (const char *)GPR_SLICE_START_PTR(elem->key->slice),
- GPR_SLICE_LENGTH(elem->key->slice))
- ? BINARY_HEADER
- : PLAINTEXT_HEADER;
+ *is =
+ grpc_is_binary_header((const char *)GPR_SLICE_START_PTR(elem->key->slice),
+ GPR_SLICE_LENGTH(elem->key->slice));
+ return GRPC_ERROR_NONE;
}
/* parse the value string */
-static int parse_value_string(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
- const uint8_t *end, is_binary_header type) {
- switch (type) {
- case BINARY_HEADER:
- return begin_parse_string(p, cur, end, B64_BYTE0, &p->value);
- case PLAINTEXT_HEADER:
- return begin_parse_string(p, cur, end, NOT_BINARY, &p->value);
- case ERROR_HEADER:
- return 0;
- }
- /* Add code to prevent return without value error */
- GPR_UNREACHABLE_CODE(return 0);
+static grpc_error *parse_value_string(grpc_chttp2_hpack_parser *p,
+ const uint8_t *cur, const uint8_t *end,
+ bool is_binary) {
+ return begin_parse_string(p, cur, end, is_binary ? B64_BYTE0 : NOT_BINARY,
+ &p->value);
}
-static int parse_value_string_with_indexed_key(grpc_chttp2_hpack_parser *p,
- const uint8_t *cur,
- const uint8_t *end) {
- return parse_value_string(p, cur, end, is_binary_indexed_header(p));
+static grpc_error *parse_value_string_with_indexed_key(
+ grpc_chttp2_hpack_parser *p, const uint8_t *cur, const uint8_t *end) {
+ bool is_binary = false;
+ grpc_error *err = is_binary_indexed_header(p, &is_binary);
+ if (err != GRPC_ERROR_NONE) return parse_error(p, cur, end, err);
+ return parse_value_string(p, cur, end, is_binary);
}
-static int parse_value_string_with_literal_key(grpc_chttp2_hpack_parser *p,
- const uint8_t *cur,
- const uint8_t *end) {
+static grpc_error *parse_value_string_with_literal_key(
+ grpc_chttp2_hpack_parser *p, const uint8_t *cur, const uint8_t *end) {
return parse_value_string(p, cur, end, is_binary_literal_header(p));
}
@@ -1398,6 +1468,7 @@ void grpc_chttp2_hpack_parser_init(grpc_chttp2_hpack_parser *p) {
p->value.capacity = 0;
p->value.length = 0;
p->dynamic_table_update_allowed = 2;
+ p->last_error = GRPC_ERROR_NONE;
grpc_chttp2_hptbl_init(&p->table);
}
@@ -1408,12 +1479,14 @@ void grpc_chttp2_hpack_parser_set_has_priority(grpc_chttp2_hpack_parser *p) {
void grpc_chttp2_hpack_parser_destroy(grpc_chttp2_hpack_parser *p) {
grpc_chttp2_hptbl_destroy(&p->table);
+ GRPC_ERROR_UNREF(p->last_error);
gpr_free(p->key.str);
gpr_free(p->value.str);
}
-int grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p,
- const uint8_t *beg, const uint8_t *end) {
+grpc_error *grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p,
+ const uint8_t *beg,
+ const uint8_t *end) {
/* TODO(ctiller): limit the distance of end from beg, and perform multiple
steps in the event of a large chunk of data to limit
stack space usage when no tail call optimization is
@@ -1421,7 +1494,7 @@ int grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p,
return p->state(p, beg, end);
}
-grpc_chttp2_parse_error grpc_chttp2_header_parser_parse(
+grpc_error *grpc_chttp2_header_parser_parse(
grpc_exec_ctx *exec_ctx, void *hpack_parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
@@ -1430,17 +1503,17 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse(
if (stream_parsing != NULL) {
stream_parsing->stats.incoming.header_bytes += GPR_SLICE_LENGTH(slice);
}
- if (!grpc_chttp2_hpack_parser_parse(parser, GPR_SLICE_START_PTR(slice),
- GPR_SLICE_END_PTR(slice))) {
+ grpc_error *error = grpc_chttp2_hpack_parser_parse(
+ parser, GPR_SLICE_START_PTR(slice), GPR_SLICE_END_PTR(slice));
+ if (error != GRPC_ERROR_NONE) {
GPR_TIMER_END("grpc_chttp2_hpack_parser_parse", 0);
- return GRPC_CHTTP2_CONNECTION_ERROR;
+ return error;
}
if (is_last) {
if (parser->is_boundary && parser->state != parse_begin) {
- gpr_log(GPR_ERROR,
- "end of header frame not aligned with a hpack record boundary");
GPR_TIMER_END("grpc_chttp2_hpack_parser_parse", 0);
- return GRPC_CHTTP2_CONNECTION_ERROR;
+ return GRPC_ERROR_CREATE(
+ "end of header frame not aligned with a hpack record boundary");
}
/* need to check for null stream: this can occur if we receive an invalid
stream id on a header */
@@ -1448,8 +1521,7 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse(
if (parser->is_boundary) {
if (stream_parsing->header_frames_received ==
GPR_ARRAY_SIZE(stream_parsing->got_metadata_on_parse)) {
- gpr_log(GPR_ERROR, "too many trailer frames");
- return GRPC_CHTTP2_CONNECTION_ERROR;
+ return GRPC_ERROR_CREATE("Too many trailer frames");
}
stream_parsing
->got_metadata_on_parse[stream_parsing->header_frames_received] = 1;
@@ -1468,5 +1540,5 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse(
parser->dynamic_table_update_allowed = 2;
}
GPR_TIMER_END("grpc_chttp2_hpack_parser_parse", 0);
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.h b/src/core/ext/transport/chttp2/transport/hpack_parser.h
index 855d6c5d52..78eb38db5e 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.h
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.h
@@ -44,9 +44,8 @@
typedef struct grpc_chttp2_hpack_parser grpc_chttp2_hpack_parser;
-typedef int (*grpc_chttp2_hpack_parser_state)(grpc_chttp2_hpack_parser *p,
- const uint8_t *beg,
- const uint8_t *end);
+typedef grpc_error *(*grpc_chttp2_hpack_parser_state)(
+ grpc_chttp2_hpack_parser *p, const uint8_t *beg, const uint8_t *end);
typedef struct {
char *str;
@@ -59,6 +58,8 @@ struct grpc_chttp2_hpack_parser {
void (*on_header)(void *user_data, grpc_mdelem *md);
void *on_header_user_data;
+ grpc_error *last_error;
+
/* current parse state - or a function that implements it */
grpc_chttp2_hpack_parser_state state;
/* future states dependent on the opening op code */
@@ -103,12 +104,13 @@ void grpc_chttp2_hpack_parser_destroy(grpc_chttp2_hpack_parser *p);
void grpc_chttp2_hpack_parser_set_has_priority(grpc_chttp2_hpack_parser *p);
/* returns 1 on success, 0 on error */
-int grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p,
- const uint8_t *beg, const uint8_t *end);
+grpc_error *grpc_chttp2_hpack_parser_parse(grpc_chttp2_hpack_parser *p,
+ const uint8_t *beg,
+ const uint8_t *end);
/* wraps grpc_chttp2_hpack_parser_parse to provide a frame level parser for
the transport */
-grpc_chttp2_parse_error grpc_chttp2_header_parser_parse(
+grpc_error *grpc_chttp2_header_parser_parse(
grpc_exec_ctx *exec_ctx, void *hpack_parser,
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
diff --git a/src/core/ext/transport/chttp2/transport/hpack_table.c b/src/core/ext/transport/chttp2/transport/hpack_table.c
index 295f31c44f..2b73ec969e 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_table.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_table.c
@@ -38,6 +38,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include "src/core/lib/support/murmur_hash.h"
@@ -262,18 +263,19 @@ void grpc_chttp2_hptbl_set_max_bytes(grpc_chttp2_hptbl *tbl,
tbl->max_bytes = max_bytes;
}
-int grpc_chttp2_hptbl_set_current_table_size(grpc_chttp2_hptbl *tbl,
- uint32_t bytes) {
+grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_chttp2_hptbl *tbl,
+ uint32_t bytes) {
if (tbl->current_table_bytes == bytes) {
- return 1;
+ return GRPC_ERROR_NONE;
}
if (bytes > tbl->max_bytes) {
- if (grpc_http_trace) {
- gpr_log(GPR_ERROR,
- "Attempt to make hpack table %d bytes when max is %d bytes",
- bytes, tbl->max_bytes);
- }
- return 0;
+ char *msg;
+ gpr_asprintf(&msg,
+ "Attempt to make hpack table %d bytes when max is %d bytes",
+ bytes, tbl->max_bytes);
+ grpc_error *err = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return err;
}
if (grpc_http_trace) {
gpr_log(GPR_DEBUG, "Update hpack parser table size to %d", bytes);
@@ -291,23 +293,25 @@ int grpc_chttp2_hptbl_set_current_table_size(grpc_chttp2_hptbl *tbl,
rebuild_ents(tbl, new_cap);
}
}
- return 1;
+ return GRPC_ERROR_NONE;
}
-int grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl, grpc_mdelem *md) {
+grpc_error *grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl, grpc_mdelem *md) {
/* determine how many bytes of buffer this entry represents */
size_t elem_bytes = GPR_SLICE_LENGTH(md->key->slice) +
GPR_SLICE_LENGTH(md->value->slice) +
GRPC_CHTTP2_HPACK_ENTRY_OVERHEAD;
if (tbl->current_table_bytes > tbl->max_bytes) {
- if (grpc_http_trace) {
- gpr_log(GPR_ERROR,
- "HPACK max table size reduced to %d but not reflected by hpack "
- "stream (still at %d)",
- tbl->max_bytes, tbl->current_table_bytes);
- }
- return 0;
+ char *msg;
+ gpr_asprintf(
+ &msg,
+ "HPACK max table size reduced to %d but not reflected by hpack "
+ "stream (still at %d)",
+ tbl->max_bytes, tbl->current_table_bytes);
+ grpc_error *err = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return err;
}
/* we can't add elements bigger than the max table size */
@@ -324,7 +328,7 @@ int grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl, grpc_mdelem *md) {
while (tbl->num_ents) {
evict1(tbl);
}
- return 1;
+ return GRPC_ERROR_NONE;
}
/* evict entries to ensure no overflow */
@@ -339,7 +343,7 @@ int grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl, grpc_mdelem *md) {
/* update accounting values */
tbl->num_ents++;
tbl->mem_used += (uint32_t)elem_bytes;
- return 1;
+ return GRPC_ERROR_NONE;
}
grpc_chttp2_hptbl_find_result grpc_chttp2_hptbl_find(
diff --git a/src/core/ext/transport/chttp2/transport/hpack_table.h b/src/core/ext/transport/chttp2/transport/hpack_table.h
index 074fea36d8..45bd9255bf 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_table.h
+++ b/src/core/ext/transport/chttp2/transport/hpack_table.h
@@ -36,6 +36,7 @@
#include <grpc/support/port_platform.h>
#include <grpc/support/slice.h>
+#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/transport/metadata.h"
/* HPACK header table */
@@ -87,15 +88,15 @@ void grpc_chttp2_hptbl_init(grpc_chttp2_hptbl *tbl);
void grpc_chttp2_hptbl_destroy(grpc_chttp2_hptbl *tbl);
void grpc_chttp2_hptbl_set_max_bytes(grpc_chttp2_hptbl *tbl,
uint32_t max_bytes);
-int grpc_chttp2_hptbl_set_current_table_size(grpc_chttp2_hptbl *tbl,
- uint32_t bytes);
+grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_chttp2_hptbl *tbl,
+ uint32_t bytes);
/* lookup a table entry based on its hpack index */
grpc_mdelem *grpc_chttp2_hptbl_lookup(const grpc_chttp2_hptbl *tbl,
uint32_t index);
/* add a table entry to the index */
-int grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl,
- grpc_mdelem *md) GRPC_MUST_USE_RESULT;
+grpc_error *grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl,
+ grpc_mdelem *md) GRPC_MUST_USE_RESULT;
/* Find a key/value pair in the table... returns the index in the table of the
most similar entry, or 0 if the value was not found */
typedef struct {
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 7f3339a620..54e72ebd24 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -156,7 +156,7 @@ struct grpc_chttp2_incoming_byte_stream {
grpc_byte_stream base;
gpr_refcount refs;
struct grpc_chttp2_incoming_byte_stream *next_message;
- int failed;
+ grpc_error *error;
grpc_chttp2_transport *transport;
grpc_chttp2_stream *stream;
@@ -275,10 +275,10 @@ struct grpc_chttp2_transport_parsing {
/* active parser */
void *parser_data;
grpc_chttp2_stream_parsing *incoming_stream;
- grpc_chttp2_parse_error (*parser)(
- grpc_exec_ctx *exec_ctx, void *parser_user_data,
- grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last);
+ grpc_error *(*parser)(grpc_exec_ctx *exec_ctx, void *parser_user_data,
+ grpc_chttp2_transport_parsing *transport_parsing,
+ grpc_chttp2_stream_parsing *stream_parsing,
+ gpr_slice slice, int is_last);
/* received settings */
uint32_t settings[GRPC_CHTTP2_NUM_SETTINGS];
@@ -471,12 +471,12 @@ typedef struct {
} grpc_chttp2_stream_writing;
struct grpc_chttp2_stream_parsing {
+ /** saw some stream level error */
+ grpc_error *forced_close_error;
/** HTTP2 stream id for this stream, or zero if one has not been assigned */
uint32_t id;
/** has this stream received a close */
uint8_t received_close;
- /** saw a rst_stream */
- uint8_t saw_rst_stream;
/** how many header frames have we received? */
uint8_t header_frames_received;
/** which metadata did we get (on this parse) */
@@ -488,8 +488,6 @@ struct grpc_chttp2_stream_parsing {
int64_t incoming_window;
/** parsing state for data frames */
grpc_chttp2_data_parser data_parser;
- /** reason give to rst_stream */
- uint32_t rst_stream_reason;
/** amount of window given */
int64_t outgoing_window;
/** number of bytes received - reset at end of parse thread execution */
@@ -532,7 +530,7 @@ void grpc_chttp2_perform_writes(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
grpc_endpoint *endpoint);
void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
- void *transport_writing, bool success);
+ void *transport_writing, grpc_error *error);
void grpc_chttp2_cleanup_writing(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *global,
grpc_chttp2_transport_writing *writing);
@@ -541,9 +539,9 @@ void grpc_chttp2_prepare_to_read(grpc_chttp2_transport_global *global,
grpc_chttp2_transport_parsing *parsing);
/** Process one slice of incoming data; return 1 if the connection is still
viable after reading, or 0 if the connection should be torn down */
-int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport_parsing *transport_parsing,
- gpr_slice slice);
+grpc_error *grpc_chttp2_perform_read(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
+ gpr_slice slice);
void grpc_chttp2_publish_reads(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *global,
grpc_chttp2_transport_parsing *parsing);
@@ -673,9 +671,10 @@ void grpc_chttp2_for_all_streams(
void grpc_chttp2_parsing_become_skip_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing);
-void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_stream_global *stream_global,
- grpc_closure **pclosure, int success);
+void grpc_chttp2_complete_closure_step(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global, grpc_closure **pclosure,
+ grpc_error *error);
void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *transport,
@@ -778,8 +777,8 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
grpc_status_code status, gpr_slice *details);
void grpc_chttp2_mark_stream_closed(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global, int close_reads,
- int close_writes);
+ grpc_chttp2_stream_global *stream_global, int close_reads, int close_writes,
+ grpc_error *error);
void grpc_chttp2_start_writing(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global);
@@ -811,8 +810,8 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs,
gpr_slice slice);
void grpc_chttp2_incoming_byte_stream_finished(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, int success,
- int from_parsing_thread);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
+ grpc_error *error, int from_parsing_thread);
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_parsing *parsing,
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 3c74258352..785134091f 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -49,30 +49,30 @@
((grpc_chttp2_transport *)((char *)(tp)-offsetof(grpc_chttp2_transport, \
parsing)))
-static int init_frame_parser(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport_parsing *transport_parsing);
-static int init_header_frame_parser(
+static grpc_error *init_frame_parser(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing);
+static grpc_error *init_header_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
int is_continuation);
-static int init_data_frame_parser(
+static grpc_error *init_data_frame_parser(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing);
+static grpc_error *init_rst_stream_parser(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing);
+static grpc_error *init_settings_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing);
-static int init_rst_stream_parser(
+static grpc_error *init_window_update_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing);
-static int init_settings_frame_parser(
+static grpc_error *init_ping_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing);
-static int init_window_update_frame_parser(
+static grpc_error *init_goaway_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing);
-static int init_ping_parser(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport_parsing *transport_parsing);
-static int init_goaway_parser(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport_parsing *transport_parsing);
-static int init_skip_frame_parser(
+static grpc_error *init_skip_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
int is_header);
-static int parse_frame_slice(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport_parsing *transport_parsing,
- gpr_slice slice, int is_last);
+static grpc_error *parse_frame_slice(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
+ gpr_slice slice, int is_last);
void grpc_chttp2_prepare_to_read(
grpc_chttp2_transport_global *transport_global,
@@ -230,38 +230,42 @@ void grpc_chttp2_publish_reads(
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
- if (stream_parsing->saw_rst_stream) {
- if (stream_parsing->rst_stream_reason != GRPC_CHTTP2_NO_ERROR) {
- grpc_status_code status_code = grpc_chttp2_http2_error_to_grpc_status(
- (grpc_chttp2_error_code)stream_parsing->rst_stream_reason);
- char *status_details;
- gpr_slice slice_details;
- gpr_asprintf(&status_details, "Received RST_STREAM err=%d",
- stream_parsing->rst_stream_reason);
- slice_details = gpr_slice_from_copied_string(status_details);
- gpr_free(status_details);
+ if (stream_parsing->forced_close_error != GRPC_ERROR_NONE) {
+ intptr_t reason;
+ bool has_reason = grpc_error_get_int(stream_parsing->forced_close_error,
+ GRPC_ERROR_INT_HTTP2_ERROR, &reason);
+ if (has_reason && reason != GRPC_CHTTP2_NO_ERROR) {
+ grpc_status_code status_code =
+ has_reason ? grpc_chttp2_http2_error_to_grpc_status(
+ (grpc_chttp2_error_code)reason)
+ : GRPC_STATUS_INTERNAL;
+ const char *status_details =
+ grpc_error_string(stream_parsing->forced_close_error);
+ gpr_slice slice_details = gpr_slice_from_copied_string(status_details);
+ grpc_error_free_string(status_details);
grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global,
status_code, &slice_details);
}
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,
- 1, 1);
+ 1, 1, stream_parsing->forced_close_error);
}
if (stream_parsing->received_close) {
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,
- 1, 0);
+ 1, 0, GRPC_ERROR_NONE);
}
}
}
-int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport_parsing *transport_parsing,
- gpr_slice slice) {
+grpc_error *grpc_chttp2_perform_read(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
+ gpr_slice slice) {
uint8_t *beg = GPR_SLICE_START_PTR(slice);
uint8_t *end = GPR_SLICE_END_PTR(slice);
uint8_t *cur = beg;
+ grpc_error *err;
- if (cur == end) return 1;
+ if (cur == end) return GRPC_ERROR_NONE;
switch (transport_parsing->deframe_state) {
case GRPC_DTS_CLIENT_PREFIX_0:
@@ -291,21 +295,25 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
while (cur != end && transport_parsing->deframe_state != GRPC_DTS_FH_0) {
if (*cur != GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing
->deframe_state]) {
- gpr_log(GPR_INFO,
- "Connect string mismatch: expected '%c' (%d) got '%c' (%d) "
- "at byte %d",
- GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing
- ->deframe_state],
- (int)(uint8_t)GRPC_CHTTP2_CLIENT_CONNECT_STRING
- [transport_parsing->deframe_state],
- *cur, (int)*cur, transport_parsing->deframe_state);
- return 0;
+ char *msg;
+ gpr_asprintf(
+ &msg,
+ "Connect string mismatch: expected '%c' (%d) got '%c' (%d) "
+ "at byte %d",
+ GRPC_CHTTP2_CLIENT_CONNECT_STRING[transport_parsing
+ ->deframe_state],
+ (int)(uint8_t)GRPC_CHTTP2_CLIENT_CONNECT_STRING
+ [transport_parsing->deframe_state],
+ *cur, (int)*cur, transport_parsing->deframe_state);
+ err = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return err;
}
++cur;
++transport_parsing->deframe_state;
}
if (cur == end) {
- return 1;
+ return GRPC_ERROR_NONE;
}
/* fallthrough */
dts_fh_0:
@@ -314,7 +322,7 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
transport_parsing->incoming_frame_size = ((uint32_t)*cur) << 16;
if (++cur == end) {
transport_parsing->deframe_state = GRPC_DTS_FH_1;
- return 1;
+ return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_DTS_FH_1:
@@ -322,7 +330,7 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
transport_parsing->incoming_frame_size |= ((uint32_t)*cur) << 8;
if (++cur == end) {
transport_parsing->deframe_state = GRPC_DTS_FH_2;
- return 1;
+ return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_DTS_FH_2:
@@ -330,7 +338,7 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
transport_parsing->incoming_frame_size |= *cur;
if (++cur == end) {
transport_parsing->deframe_state = GRPC_DTS_FH_3;
- return 1;
+ return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_DTS_FH_3:
@@ -338,7 +346,7 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
transport_parsing->incoming_frame_type = *cur;
if (++cur == end) {
transport_parsing->deframe_state = GRPC_DTS_FH_4;
- return 1;
+ return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_DTS_FH_4:
@@ -346,7 +354,7 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
transport_parsing->incoming_frame_flags = *cur;
if (++cur == end) {
transport_parsing->deframe_state = GRPC_DTS_FH_5;
- return 1;
+ return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_DTS_FH_5:
@@ -354,7 +362,7 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
transport_parsing->incoming_stream_id = (((uint32_t)*cur) & 0x7f) << 24;
if (++cur == end) {
transport_parsing->deframe_state = GRPC_DTS_FH_6;
- return 1;
+ return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_DTS_FH_6:
@@ -362,7 +370,7 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
transport_parsing->incoming_stream_id |= ((uint32_t)*cur) << 16;
if (++cur == end) {
transport_parsing->deframe_state = GRPC_DTS_FH_7;
- return 1;
+ return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_DTS_FH_7:
@@ -370,15 +378,16 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
transport_parsing->incoming_stream_id |= ((uint32_t)*cur) << 8;
if (++cur == end) {
transport_parsing->deframe_state = GRPC_DTS_FH_8;
- return 1;
+ return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_DTS_FH_8:
GPR_ASSERT(cur < end);
transport_parsing->incoming_stream_id |= ((uint32_t)*cur);
transport_parsing->deframe_state = GRPC_DTS_FRAME;
- if (!init_frame_parser(exec_ctx, transport_parsing)) {
- return 0;
+ err = init_frame_parser(exec_ctx, transport_parsing);
+ if (err != GRPC_ERROR_NONE) {
+ return err;
}
if (transport_parsing->incoming_stream_id != 0 &&
transport_parsing->incoming_stream_id >
@@ -387,62 +396,69 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
transport_parsing->incoming_stream_id;
}
if (transport_parsing->incoming_frame_size == 0) {
- if (!parse_frame_slice(exec_ctx, transport_parsing, gpr_empty_slice(),
- 1)) {
- return 0;
+ err = parse_frame_slice(exec_ctx, transport_parsing, gpr_empty_slice(),
+ 1);
+ if (err != GRPC_ERROR_NONE) {
+ return err;
}
transport_parsing->incoming_stream = NULL;
if (++cur == end) {
transport_parsing->deframe_state = GRPC_DTS_FH_0;
- return 1;
+ return GRPC_ERROR_NONE;
}
goto dts_fh_0; /* loop */
} else if (transport_parsing->incoming_frame_size >
transport_parsing->max_frame_size) {
- gpr_log(GPR_DEBUG, "Frame size %d is larger than max frame size %d",
- transport_parsing->incoming_frame_size,
- transport_parsing->max_frame_size);
- return 0;
+ char *msg;
+ gpr_asprintf(&msg, "Frame size %d is larger than max frame size %d",
+ transport_parsing->incoming_frame_size,
+ transport_parsing->max_frame_size);
+ err = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return err;
}
if (++cur == end) {
- return 1;
+ return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_DTS_FRAME:
GPR_ASSERT(cur < end);
if ((uint32_t)(end - cur) == transport_parsing->incoming_frame_size) {
- if (!parse_frame_slice(exec_ctx, transport_parsing,
- gpr_slice_sub_no_ref(slice, (size_t)(cur - beg),
- (size_t)(end - beg)),
- 1)) {
- return 0;
+ err = parse_frame_slice(exec_ctx, transport_parsing,
+ gpr_slice_sub_no_ref(slice, (size_t)(cur - beg),
+ (size_t)(end - beg)),
+ 1);
+ if (err != GRPC_ERROR_NONE) {
+ return err;
}
transport_parsing->deframe_state = GRPC_DTS_FH_0;
transport_parsing->incoming_stream = NULL;
- return 1;
+ return GRPC_ERROR_NONE;
} else if ((uint32_t)(end - cur) >
transport_parsing->incoming_frame_size) {
size_t cur_offset = (size_t)(cur - beg);
- if (!parse_frame_slice(
- exec_ctx, transport_parsing,
- gpr_slice_sub_no_ref(
- slice, cur_offset,
- cur_offset + transport_parsing->incoming_frame_size),
- 1)) {
- return 0;
+ err = parse_frame_slice(
+ exec_ctx, transport_parsing,
+ gpr_slice_sub_no_ref(
+ slice, cur_offset,
+ cur_offset + transport_parsing->incoming_frame_size),
+ 1);
+ if (err != GRPC_ERROR_NONE) {
+ return err;
}
cur += transport_parsing->incoming_frame_size;
transport_parsing->incoming_stream = NULL;
goto dts_fh_0; /* loop */
} else {
- if (!parse_frame_slice(exec_ctx, transport_parsing,
- gpr_slice_sub_no_ref(slice, (size_t)(cur - beg),
- (size_t)(end - beg)),
- 0)) {
- return 0;
+ err = parse_frame_slice(exec_ctx, transport_parsing,
+ gpr_slice_sub_no_ref(slice, (size_t)(cur - beg),
+ (size_t)(end - beg)),
+ 0);
+ if (err != GRPC_ERROR_NONE) {
+ return err;
}
transport_parsing->incoming_frame_size -= (uint32_t)(end - cur);
- return 1;
+ return GRPC_ERROR_NONE;
}
GPR_UNREACHABLE_CODE(return 0);
}
@@ -450,23 +466,30 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
GPR_UNREACHABLE_CODE(return 0);
}
-static int init_frame_parser(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport_parsing *transport_parsing) {
+static grpc_error *init_frame_parser(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) {
if (transport_parsing->expect_continuation_stream_id != 0) {
if (transport_parsing->incoming_frame_type !=
GRPC_CHTTP2_FRAME_CONTINUATION) {
- gpr_log(GPR_ERROR, "Expected CONTINUATION frame, got frame type %02x",
- transport_parsing->incoming_frame_type);
- return 0;
+ char *msg;
+ gpr_asprintf(&msg, "Expected CONTINUATION frame, got frame type %02x",
+ transport_parsing->incoming_frame_type);
+ grpc_error *err = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return err;
}
if (transport_parsing->expect_continuation_stream_id !=
transport_parsing->incoming_stream_id) {
- gpr_log(GPR_ERROR,
- "Expected CONTINUATION frame for grpc_chttp2_stream %08x, got "
- "grpc_chttp2_stream %08x",
- transport_parsing->expect_continuation_stream_id,
- transport_parsing->incoming_stream_id);
- return 0;
+ char *msg;
+ gpr_asprintf(
+ &msg,
+ "Expected CONTINUATION frame for grpc_chttp2_stream %08x, got "
+ "grpc_chttp2_stream %08x",
+ transport_parsing->expect_continuation_stream_id,
+ transport_parsing->incoming_stream_id);
+ grpc_error *err = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return err;
}
return init_header_frame_parser(exec_ctx, transport_parsing, 1);
}
@@ -476,8 +499,7 @@ static int init_frame_parser(grpc_exec_ctx *exec_ctx,
case GRPC_CHTTP2_FRAME_HEADER:
return init_header_frame_parser(exec_ctx, transport_parsing, 0);
case GRPC_CHTTP2_FRAME_CONTINUATION:
- gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame");
- return 0;
+ return GRPC_ERROR_CREATE("Unexpected CONTINUATION frame");
case GRPC_CHTTP2_FRAME_RST_STREAM:
return init_rst_stream_parser(exec_ctx, transport_parsing);
case GRPC_CHTTP2_FRAME_SETTINGS:
@@ -489,22 +511,24 @@ static int init_frame_parser(grpc_exec_ctx *exec_ctx,
case GRPC_CHTTP2_FRAME_GOAWAY:
return init_goaway_parser(exec_ctx, transport_parsing);
default:
- gpr_log(GPR_ERROR, "Unknown frame type %02x",
- transport_parsing->incoming_frame_type);
+ if (grpc_http_trace) {
+ gpr_log(GPR_ERROR, "Unknown frame type %02x",
+ transport_parsing->incoming_frame_type);
+ }
return init_skip_frame_parser(exec_ctx, transport_parsing, 0);
}
}
-static grpc_chttp2_parse_error skip_parser(
- grpc_exec_ctx *exec_ctx, void *parser,
- grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
- return GRPC_CHTTP2_PARSE_OK;
+static grpc_error *skip_parser(grpc_exec_ctx *exec_ctx, void *parser,
+ grpc_chttp2_transport_parsing *transport_parsing,
+ grpc_chttp2_stream_parsing *stream_parsing,
+ gpr_slice slice, int is_last) {
+ return GRPC_ERROR_NONE;
}
static void skip_header(void *tp, grpc_mdelem *md) { GRPC_MDELEM_UNREF(md); }
-static int init_skip_frame_parser(
+static grpc_error *init_skip_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
int is_header) {
if (is_header) {
@@ -519,7 +543,7 @@ static int init_skip_frame_parser(
} else {
transport_parsing->parser = skip_parser;
}
- return 1;
+ return GRPC_ERROR_NONE;
}
void grpc_chttp2_parsing_become_skip_parser(
@@ -529,22 +553,28 @@ void grpc_chttp2_parsing_become_skip_parser(
transport_parsing->parser == grpc_chttp2_header_parser_parse);
}
-static grpc_chttp2_parse_error update_incoming_window(
+static grpc_error *update_incoming_window(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing) {
uint32_t incoming_frame_size = transport_parsing->incoming_frame_size;
if (incoming_frame_size > transport_parsing->incoming_window) {
- gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %" PRId64,
- transport_parsing->incoming_frame_size,
- transport_parsing->incoming_window);
- return GRPC_CHTTP2_CONNECTION_ERROR;
+ char *msg;
+ gpr_asprintf(&msg, "frame of size %d overflows incoming window of %" PRId64,
+ transport_parsing->incoming_frame_size,
+ transport_parsing->incoming_window);
+ grpc_error *err = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return err;
}
if (incoming_frame_size > stream_parsing->incoming_window) {
- gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %" PRId64,
- transport_parsing->incoming_frame_size,
- stream_parsing->incoming_window);
- return GRPC_CHTTP2_CONNECTION_ERROR;
+ char *msg;
+ gpr_asprintf(&msg, "frame of size %d overflows incoming window of %" PRId64,
+ transport_parsing->incoming_frame_size,
+ stream_parsing->incoming_window);
+ grpc_error *err = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return err;
}
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", transport_parsing, incoming_window,
@@ -555,15 +585,15 @@ static grpc_chttp2_parse_error update_incoming_window(
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
- return GRPC_CHTTP2_PARSE_OK;
+ return GRPC_ERROR_NONE;
}
-static int init_data_frame_parser(
+static grpc_error *init_data_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) {
grpc_chttp2_stream_parsing *stream_parsing =
grpc_chttp2_parsing_lookup_stream(transport_parsing,
transport_parsing->incoming_stream_id);
- grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK;
+ grpc_error *err = GRPC_ERROR_NONE;
if (stream_parsing == NULL) {
return init_skip_frame_parser(exec_ctx, transport_parsing, 0);
}
@@ -571,33 +601,32 @@ static int init_data_frame_parser(
if (stream_parsing->received_close) {
return init_skip_frame_parser(exec_ctx, transport_parsing, 0);
}
- if (err == GRPC_CHTTP2_PARSE_OK) {
+ if (err == GRPC_ERROR_NONE) {
err = update_incoming_window(exec_ctx, transport_parsing, stream_parsing);
}
- if (err == GRPC_CHTTP2_PARSE_OK) {
+ if (err == GRPC_ERROR_NONE) {
err = grpc_chttp2_data_parser_begin_frame(
- &stream_parsing->data_parser, transport_parsing->incoming_frame_flags);
- }
- switch (err) {
- case GRPC_CHTTP2_PARSE_OK:
- transport_parsing->incoming_stream = stream_parsing;
- transport_parsing->parser = grpc_chttp2_data_parser_parse;
- transport_parsing->parser_data = &stream_parsing->data_parser;
- return 1;
- case GRPC_CHTTP2_STREAM_ERROR:
- stream_parsing->received_close = 1;
- stream_parsing->saw_rst_stream = 1;
- stream_parsing->rst_stream_reason = GRPC_CHTTP2_PROTOCOL_ERROR;
- gpr_slice_buffer_add(
- &transport_parsing->qbuf,
- grpc_chttp2_rst_stream_create(transport_parsing->incoming_stream_id,
- GRPC_CHTTP2_PROTOCOL_ERROR,
- &stream_parsing->stats.outgoing));
- return init_skip_frame_parser(exec_ctx, transport_parsing, 0);
- case GRPC_CHTTP2_CONNECTION_ERROR:
- return 0;
+ &stream_parsing->data_parser, transport_parsing->incoming_frame_flags,
+ stream_parsing->id);
+ }
+ if (err == GRPC_ERROR_NONE) {
+ transport_parsing->incoming_stream = stream_parsing;
+ transport_parsing->parser = grpc_chttp2_data_parser_parse;
+ transport_parsing->parser_data = &stream_parsing->data_parser;
+ return GRPC_ERROR_NONE;
+ } else if (grpc_error_get_int(err, GRPC_ERROR_INT_STREAM_ID, NULL)) {
+ /* handle stream errors by closing the stream */
+ stream_parsing->received_close = 1;
+ stream_parsing->forced_close_error = err;
+ gpr_slice_buffer_add(
+ &transport_parsing->qbuf,
+ grpc_chttp2_rst_stream_create(transport_parsing->incoming_stream_id,
+ GRPC_CHTTP2_PROTOCOL_ERROR,
+ &stream_parsing->stats.outgoing));
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 0);
+ } else {
+ return err;
}
- GPR_UNREACHABLE_CODE(return 0);
}
static void free_timeout(void *p) { gpr_free(p); }
@@ -713,7 +742,7 @@ static void on_trailing_header(void *tp, grpc_mdelem *md) {
GPR_TIMER_END("on_trailing_header", 0);
}
-static int init_header_frame_parser(
+static grpc_error *init_header_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
int is_continuation) {
uint8_t is_eoh = (transport_parsing->incoming_frame_flags &
@@ -808,15 +837,16 @@ static int init_header_frame_parser(
GRPC_CHTTP2_FLAG_HAS_PRIORITY)) {
grpc_chttp2_hpack_parser_set_has_priority(&transport_parsing->hpack_parser);
}
- return 1;
+ return GRPC_ERROR_NONE;
}
-static int init_window_update_frame_parser(
+static grpc_error *init_window_update_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) {
- int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame(
- &transport_parsing->simple.window_update,
- transport_parsing->incoming_frame_size,
- transport_parsing->incoming_frame_flags);
+ grpc_error *err = grpc_chttp2_window_update_parser_begin_frame(
+ &transport_parsing->simple.window_update,
+ transport_parsing->incoming_frame_size,
+ transport_parsing->incoming_frame_flags);
+ if (err != GRPC_ERROR_NONE) return err;
if (transport_parsing->incoming_stream_id != 0) {
grpc_chttp2_stream_parsing *stream_parsing =
transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream(
@@ -828,26 +858,27 @@ static int init_window_update_frame_parser(
}
transport_parsing->parser = grpc_chttp2_window_update_parser_parse;
transport_parsing->parser_data = &transport_parsing->simple.window_update;
- return ok;
+ return GRPC_ERROR_NONE;
}
-static int init_ping_parser(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport_parsing *transport_parsing) {
- int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_ping_parser_begin_frame(
- &transport_parsing->simple.ping,
- transport_parsing->incoming_frame_size,
- transport_parsing->incoming_frame_flags);
+static grpc_error *init_ping_parser(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) {
+ grpc_error *err = grpc_chttp2_ping_parser_begin_frame(
+ &transport_parsing->simple.ping, transport_parsing->incoming_frame_size,
+ transport_parsing->incoming_frame_flags);
+ if (err != GRPC_ERROR_NONE) return err;
transport_parsing->parser = grpc_chttp2_ping_parser_parse;
transport_parsing->parser_data = &transport_parsing->simple.ping;
- return ok;
+ return GRPC_ERROR_NONE;
}
-static int init_rst_stream_parser(
+static grpc_error *init_rst_stream_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) {
- int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_rst_stream_parser_begin_frame(
- &transport_parsing->simple.rst_stream,
- transport_parsing->incoming_frame_size,
- transport_parsing->incoming_frame_flags);
+ grpc_error *err = grpc_chttp2_rst_stream_parser_begin_frame(
+ &transport_parsing->simple.rst_stream,
+ transport_parsing->incoming_frame_size,
+ transport_parsing->incoming_frame_flags);
+ if (err != GRPC_ERROR_NONE) return err;
grpc_chttp2_stream_parsing *stream_parsing =
transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream(
transport_parsing, transport_parsing->incoming_stream_id);
@@ -857,37 +888,32 @@ static int init_rst_stream_parser(
stream_parsing->stats.incoming.framing_bytes += 9;
transport_parsing->parser = grpc_chttp2_rst_stream_parser_parse;
transport_parsing->parser_data = &transport_parsing->simple.rst_stream;
- return ok;
+ return GRPC_ERROR_NONE;
}
-static int init_goaway_parser(
+static grpc_error *init_goaway_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) {
- int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_goaway_parser_begin_frame(
- &transport_parsing->goaway_parser,
- transport_parsing->incoming_frame_size,
- transport_parsing->incoming_frame_flags);
+ grpc_error *err = grpc_chttp2_goaway_parser_begin_frame(
+ &transport_parsing->goaway_parser, transport_parsing->incoming_frame_size,
+ transport_parsing->incoming_frame_flags);
+ if (err != GRPC_ERROR_NONE) return err;
transport_parsing->parser = grpc_chttp2_goaway_parser_parse;
transport_parsing->parser_data = &transport_parsing->goaway_parser;
- return ok;
+ return GRPC_ERROR_NONE;
}
-static int init_settings_frame_parser(
+static grpc_error *init_settings_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) {
- int ok;
-
if (transport_parsing->incoming_stream_id != 0) {
- gpr_log(GPR_ERROR, "settings frame received for grpc_chttp2_stream %d",
- transport_parsing->incoming_stream_id);
- return 0;
+ return GRPC_ERROR_CREATE("Settings frame received for grpc_chttp2_stream");
}
- ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_settings_parser_begin_frame(
- &transport_parsing->simple.settings,
- transport_parsing->incoming_frame_size,
- transport_parsing->incoming_frame_flags,
- transport_parsing->settings);
- if (!ok) {
- return 0;
+ grpc_error *err = grpc_chttp2_settings_parser_begin_frame(
+ &transport_parsing->simple.settings,
+ transport_parsing->incoming_frame_size,
+ transport_parsing->incoming_frame_flags, transport_parsing->settings);
+ if (err != GRPC_ERROR_NONE) {
+ return err;
}
if (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) {
transport_parsing->settings_ack_received = 1;
@@ -901,7 +927,7 @@ static int init_settings_frame_parser(
}
transport_parsing->parser = grpc_chttp2_settings_parser_parse;
transport_parsing->parser_data = &transport_parsing->simple.settings;
- return ok;
+ return GRPC_ERROR_NONE;
}
/*
@@ -910,34 +936,37 @@ static int is_window_update_legal(int64_t window_update, int64_t window) {
}
*/
-static int parse_frame_slice(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport_parsing *transport_parsing,
- gpr_slice slice, int is_last) {
+static grpc_error *parse_frame_slice(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
+ gpr_slice slice, int is_last) {
grpc_chttp2_stream_parsing *stream_parsing =
transport_parsing->incoming_stream;
- switch (transport_parsing->parser(exec_ctx, transport_parsing->parser_data,
- transport_parsing, stream_parsing, slice,
- is_last)) {
- case GRPC_CHTTP2_PARSE_OK:
- if (stream_parsing) {
- grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
- stream_parsing);
- }
- return 1;
- case GRPC_CHTTP2_STREAM_ERROR:
- grpc_chttp2_parsing_become_skip_parser(exec_ctx, transport_parsing);
- if (stream_parsing) {
- stream_parsing->saw_rst_stream = 1;
- stream_parsing->rst_stream_reason = GRPC_CHTTP2_PROTOCOL_ERROR;
- gpr_slice_buffer_add(
- &transport_parsing->qbuf,
- grpc_chttp2_rst_stream_create(transport_parsing->incoming_stream_id,
- GRPC_CHTTP2_PROTOCOL_ERROR,
- &stream_parsing->stats.outgoing));
- }
- return 1;
- case GRPC_CHTTP2_CONNECTION_ERROR:
- return 0;
+ grpc_error *err = transport_parsing->parser(
+ exec_ctx, transport_parsing->parser_data, transport_parsing,
+ stream_parsing, slice, is_last);
+ if (err == GRPC_ERROR_NONE) {
+ if (stream_parsing) {
+ grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
+ stream_parsing);
+ }
+ return GRPC_ERROR_NONE;
+ } else if (grpc_error_get_int(err, GRPC_ERROR_INT_STREAM_ID, NULL)) {
+ if (grpc_http_trace) {
+ const char *msg = grpc_error_string(err);
+ gpr_log(GPR_ERROR, "%s", msg);
+ grpc_error_free_string(msg);
+ }
+ grpc_chttp2_parsing_become_skip_parser(exec_ctx, transport_parsing);
+ if (stream_parsing) {
+ stream_parsing->forced_close_error = err;
+ gpr_slice_buffer_add(
+ &transport_parsing->qbuf,
+ grpc_chttp2_rst_stream_create(transport_parsing->incoming_stream_id,
+ GRPC_CHTTP2_PROTOCOL_ERROR,
+ &stream_parsing->stats.outgoing));
+ } else {
+ GRPC_ERROR_UNREF(err);
+ }
}
- GPR_UNREACHABLE_CODE(return 0);
+ return err;
}
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index a8fb463939..add7678182 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -187,7 +187,8 @@ void grpc_chttp2_perform_writes(
grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf,
&transport_writing->done_cb);
} else {
- grpc_exec_ctx_enqueue(exec_ctx, &transport_writing->done_cb, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, &transport_writing->done_cb, GRPC_ERROR_NONE,
+ NULL);
}
}
@@ -334,25 +335,27 @@ void grpc_chttp2_cleanup_writing(
transport_global, transport_writing, &stream_global, &stream_writing)) {
if (stream_writing->sent_initial_metadata) {
grpc_chttp2_complete_closure_step(
- exec_ctx, stream_global,
- &stream_global->send_initial_metadata_finished, 1);
+ exec_ctx, transport_global, stream_global,
+ &stream_global->send_initial_metadata_finished, GRPC_ERROR_NONE);
}
grpc_transport_move_one_way_stats(&stream_writing->stats,
&stream_global->stats.outgoing);
if (stream_writing->sent_message) {
GPR_ASSERT(stream_writing->send_message == NULL);
grpc_chttp2_complete_closure_step(
- exec_ctx, stream_global, &stream_global->send_message_finished, 1);
+ exec_ctx, transport_global, stream_global,
+ &stream_global->send_message_finished, GRPC_ERROR_NONE);
stream_writing->sent_message = 0;
}
if (stream_writing->sent_trailing_metadata) {
grpc_chttp2_complete_closure_step(
- exec_ctx, stream_global,
- &stream_global->send_trailing_metadata_finished, 1);
+ exec_ctx, transport_global, stream_global,
+ &stream_global->send_trailing_metadata_finished, GRPC_ERROR_NONE);
}
if (stream_writing->sent_trailing_metadata) {
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,
- !transport_global->is_client, 1);
+ !transport_global->is_client, 1,
+ GRPC_ERROR_NONE);
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
}
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 1bb53b756c..25d8aca250 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -159,11 +159,11 @@ static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx,
static void enqueue_callbacks(grpc_closure *callback_list[]) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
if (callback_list[0]) {
- grpc_exec_ctx_enqueue(&exec_ctx, callback_list[0], true, NULL);
+ grpc_exec_ctx_sched(&exec_ctx, callback_list[0], GRPC_ERROR_NONE, NULL);
callback_list[0] = NULL;
}
if (callback_list[1]) {
- grpc_exec_ctx_enqueue(&exec_ctx, callback_list[1], true, NULL);
+ grpc_exec_ctx_sched(&exec_ctx, callback_list[1], GRPC_ERROR_NONE, NULL);
callback_list[1] = NULL;
}
grpc_exec_ctx_finish(&exec_ctx);