aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/census/grpc_filter.c4
-rw-r--r--src/core/ext/client_channel/client_channel.c173
-rw-r--r--src/core/ext/client_channel/parse_address.c5
-rw-r--r--src/core/ext/client_channel/proxy_mapper_registry.c12
-rw-r--r--src/core/ext/client_channel/subchannel.c49
-rw-r--r--src/core/ext/client_channel/subchannel.h18
-rw-r--r--src/core/ext/load_reporting/load_reporting_filter.c2
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c41
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_ping.c2
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.c17
-rw-r--r--src/core/ext/transport/chttp2/transport/incoming_metadata.c65
-rw-r--r--src/core/ext/transport/chttp2/transport/incoming_metadata.h18
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h2
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c60
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c176
15 files changed, 356 insertions, 288 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c
index b80d831557..fc29dbd454 100644
--- a/src/core/ext/census/grpc_filter.c
+++ b/src/core/ext/census/grpc_filter.c
@@ -138,7 +138,7 @@ static grpc_error *client_init_call_elem(grpc_exec_ctx *exec_ctx,
static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *ignored) {
+ grpc_closure *ignored) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
/* TODO(hongyu): record rpc client stats and census_rpc_end_op here */
@@ -160,7 +160,7 @@ static grpc_error *server_init_call_elem(grpc_exec_ctx *exec_ctx,
static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *ignored) {
+ grpc_closure *ignored) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
/* TODO(hongyu): record rpc server stats and census_tracing_end_op here */
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index bf64f84772..960d00e815 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -71,7 +71,8 @@
*/
typedef enum {
- WAIT_FOR_READY_UNSET,
+ /* zero so it can be default initialized */
+ WAIT_FOR_READY_UNSET = 0,
WAIT_FOR_READY_FALSE,
WAIT_FOR_READY_TRUE
} wait_for_ready_value;
@@ -631,7 +632,8 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
#define CANCELLED_CALL ((grpc_subchannel_call *)1)
typedef enum {
- GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING,
+ /* zero so that it can be default-initialized */
+ GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING = 0,
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL
} subchannel_creation_phase;
@@ -653,13 +655,13 @@ typedef struct client_channel_call_data {
gpr_timespec call_start_time;
gpr_timespec deadline;
method_parameters *method_params;
- grpc_closure read_service_config;
grpc_error *cancel_error;
/** either 0 for no call, 1 for cancelled, or a pointer to a
grpc_subchannel_call */
gpr_atm subchannel_call;
+ gpr_arena *arena;
subchannel_creation_phase creation_phase;
grpc_connected_subchannel *connected_subchannel;
@@ -726,6 +728,47 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
gpr_free(ops);
}
+// Sets calld->method_params.
+// If the method params specify a timeout, populates
+// *per_method_deadline and returns true.
+static bool set_call_method_params_from_service_config_locked(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ gpr_timespec *per_method_deadline) {
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ if (chand->method_params_table != NULL) {
+ calld->method_params = grpc_method_config_table_get(
+ exec_ctx, chand->method_params_table, calld->path);
+ if (calld->method_params != NULL) {
+ method_parameters_ref(calld->method_params);
+ if (gpr_time_cmp(calld->method_params->timeout,
+ gpr_time_0(GPR_TIMESPAN)) != 0) {
+ *per_method_deadline =
+ gpr_time_add(calld->call_start_time, calld->method_params->timeout);
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
+static void apply_final_configuration_locked(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ /* apply service-config level configuration to the call (now that we're
+ * certain it exists) */
+ call_data *calld = elem->call_data;
+ gpr_timespec per_method_deadline;
+ if (set_call_method_params_from_service_config_locked(exec_ctx, elem,
+ &per_method_deadline)) {
+ // If the deadline from the service config is shorter than the one
+ // from the client API, reset the deadline timer.
+ if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
+ calld->deadline = per_method_deadline;
+ grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
+ }
+ }
+}
+
static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_call_element *elem = arg;
@@ -754,9 +797,14 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
} else {
/* Create call on subchannel. */
grpc_subchannel_call *subchannel_call = NULL;
+ const grpc_connected_subchannel_call_args call_args = {
+ .pollent = calld->pollent,
+ .path = calld->path,
+ .start_time = calld->call_start_time,
+ .deadline = calld->deadline,
+ .arena = calld->arena};
grpc_error *new_error = grpc_connected_subchannel_create_call(
- exec_ctx, calld->connected_subchannel, calld->pollent, calld->path,
- calld->call_start_time, calld->deadline, &subchannel_call);
+ exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
subchannel_call = CANCELLED_CALL;
@@ -851,6 +899,7 @@ static bool pick_subchannel_locked(
}
GPR_ASSERT(error == GRPC_ERROR_NONE);
if (chand->lb_policy != NULL) {
+ apply_final_configuration_locked(exec_ctx, elem);
grpc_lb_policy *lb_policy = chand->lb_policy;
GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel");
// If the application explicitly set wait_for_ready, use that.
@@ -982,9 +1031,14 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
calld->connected_subchannel != NULL) {
grpc_subchannel_call *subchannel_call = NULL;
+ const grpc_connected_subchannel_call_args call_args = {
+ .pollent = calld->pollent,
+ .path = calld->path,
+ .start_time = calld->call_start_time,
+ .deadline = calld->deadline,
+ .arena = calld->arena};
grpc_error *error = grpc_connected_subchannel_create_call(
- exec_ctx, calld->connected_subchannel, calld->pollent, calld->path,
- calld->call_start_time, calld->deadline, &subchannel_call);
+ exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
if (error != GRPC_ERROR_NONE) {
subchannel_call = CANCELLED_CALL;
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
@@ -1060,114 +1114,19 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END("cc_start_transport_stream_op", 0);
}
-// Sets calld->method_params.
-// If the method params specify a timeout, populates
-// *per_method_deadline and returns true.
-static bool set_call_method_params_from_service_config_locked(
- grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- gpr_timespec *per_method_deadline) {
- channel_data *chand = elem->channel_data;
- call_data *calld = elem->call_data;
- if (chand->method_params_table != NULL) {
- calld->method_params = grpc_method_config_table_get(
- exec_ctx, chand->method_params_table, calld->path);
- if (calld->method_params != NULL) {
- method_parameters_ref(calld->method_params);
- if (gpr_time_cmp(calld->method_params->timeout,
- gpr_time_0(GPR_TIMESPAN)) != 0) {
- *per_method_deadline =
- gpr_time_add(calld->call_start_time, calld->method_params->timeout);
- return true;
- }
- }
- }
- return false;
-}
-
-// Gets data from the service config. Invoked when the resolver returns
-// its initial result.
-static void read_service_config_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- grpc_call_element *elem = arg;
- call_data *calld = elem->call_data;
- // If this is an error, there's no point in looking at the service config.
- if (error == GRPC_ERROR_NONE) {
- gpr_timespec per_method_deadline;
- if (set_call_method_params_from_service_config_locked(
- exec_ctx, elem, &per_method_deadline)) {
- // If the deadline from the service config is shorter than the one
- // from the client API, reset the deadline timer.
- if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
- calld->deadline = per_method_deadline;
- grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
- }
- }
- }
- GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config");
-}
-
-static void initial_read_service_config_locked(grpc_exec_ctx *exec_ctx,
- void *arg,
- grpc_error *error_ignored) {
- grpc_call_element *elem = arg;
- channel_data *chand = elem->channel_data;
- call_data *calld = elem->call_data;
- // If the resolver has already returned results, then we can access
- // the service config parameters immediately. Otherwise, we need to
- // defer that work until the resolver returns an initial result.
- if (chand->lb_policy != NULL) {
- // We already have a resolver result, so check for service config.
- gpr_timespec per_method_deadline;
- if (set_call_method_params_from_service_config_locked(
- exec_ctx, elem, &per_method_deadline)) {
- calld->deadline = gpr_time_min(calld->deadline, per_method_deadline);
- }
- } else {
- // We don't yet have a resolver result, so register a callback to
- // get the service config data once the resolver returns.
- // Take a reference to the call stack to be owned by the callback.
- GRPC_CALL_STACK_REF(calld->owning_call, "read_service_config");
- grpc_closure_init(&calld->read_service_config, read_service_config_locked,
- elem, grpc_combiner_scheduler(chand->combiner, false));
- grpc_closure_list_append(&chand->waiting_for_config_closures,
- &calld->read_service_config, GRPC_ERROR_NONE);
- }
- // Start the deadline timer with the current deadline value. If we
- // do not yet have service config data, then the timer may be reset
- // later.
- grpc_deadline_state_start(exec_ctx, elem, calld->deadline);
- GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
- "initial_read_service_config");
-}
-
/* Constructor for call_data */
static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
- channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
// Initialize data members.
grpc_deadline_state_init(exec_ctx, elem, args->call_stack);
calld->path = grpc_slice_ref_internal(args->path);
calld->call_start_time = args->start_time;
calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
- calld->method_params = NULL;
- calld->cancel_error = GRPC_ERROR_NONE;
- gpr_atm_rel_store(&calld->subchannel_call, 0);
- calld->connected_subchannel = NULL;
- calld->waiting_ops = NULL;
- calld->waiting_ops_count = 0;
- calld->waiting_ops_capacity = 0;
- calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
calld->owning_call = args->call_stack;
- calld->pollent = NULL;
- GRPC_CALL_STACK_REF(calld->owning_call, "initial_read_service_config");
- grpc_closure_sched(
- exec_ctx,
- grpc_closure_init(&calld->read_service_config,
- initial_read_service_config_locked, elem,
- grpc_combiner_scheduler(chand->combiner, false)),
- GRPC_ERROR_NONE);
+ calld->arena = args->arena;
+ grpc_deadline_state_start(exec_ctx, elem, calld->deadline);
return GRPC_ERROR_NONE;
}
@@ -1175,7 +1134,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *and_free_memory) {
+ grpc_closure *then_schedule_closure) {
call_data *calld = elem->call_data;
grpc_deadline_state_destroy(exec_ctx, elem);
grpc_slice_unref_internal(exec_ctx, calld->path);
@@ -1185,6 +1144,8 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(calld->cancel_error);
grpc_subchannel_call *call = GET_CALL(calld);
if (call != NULL && call != CANCELLED_CALL) {
+ grpc_subchannel_call_set_cleanup_closure(call, then_schedule_closure);
+ then_schedule_closure = NULL;
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");
}
GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING);
@@ -1194,7 +1155,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
"picked");
}
gpr_free(calld->waiting_ops);
- gpr_free(and_free_memory);
+ grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
}
static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/client_channel/parse_address.c b/src/core/ext/client_channel/parse_address.c
index 8ae15fc72b..cd1b2cd80c 100644
--- a/src/core/ext/client_channel/parse_address.c
+++ b/src/core/ext/client_channel/parse_address.c
@@ -128,6 +128,7 @@ int parse_ipv6(grpc_uri *uri, grpc_resolved_address *resolved_addr) {
GPR_ASSERT(host_end >= host);
char host_without_scope[INET6_ADDRSTRLEN];
size_t host_without_scope_len = (size_t)(host_end - host);
+ uint32_t sin6_scope_id = 0;
strncpy(host_without_scope, host, host_without_scope_len);
host_without_scope[host_without_scope_len] = '\0';
if (inet_pton(AF_INET6, host_without_scope, &in6->sin6_addr) == 0) {
@@ -136,10 +137,12 @@ int parse_ipv6(grpc_uri *uri, grpc_resolved_address *resolved_addr) {
}
if (gpr_parse_bytes_to_uint32(host_end + 1,
strlen(host) - host_without_scope_len - 1,
- &in6->sin6_scope_id) == 0) {
+ &sin6_scope_id) == 0) {
gpr_log(GPR_ERROR, "invalid ipv6 scope id: '%s'", host_end + 1);
goto done;
}
+ // Handle "sin6_scope_id" being type "u_long". See grpc issue ##10027.
+ in6->sin6_scope_id = sin6_scope_id;
} else {
if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) {
gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host);
diff --git a/src/core/ext/client_channel/proxy_mapper_registry.c b/src/core/ext/client_channel/proxy_mapper_registry.c
index 2c44b9d490..0935ddbdbd 100644
--- a/src/core/ext/client_channel/proxy_mapper_registry.c
+++ b/src/core/ext/client_channel/proxy_mapper_registry.c
@@ -94,6 +94,14 @@ static void grpc_proxy_mapper_list_destroy(grpc_proxy_mapper_list* list) {
grpc_proxy_mapper_destroy(list->list[i]);
}
gpr_free(list->list);
+ // Clean up in case we re-initialze later.
+ // TODO(ctiller): This should ideally live in
+ // grpc_proxy_mapper_registry_init(). However, if we did this there,
+ // then we would do it AFTER we start registering proxy mappers from
+ // third-party plugins, so they'd never show up (and would leak memory).
+ // We probably need some sort of dependency system for plugins to fix
+ // this.
+ memset(list, 0, sizeof(*list));
}
//
@@ -102,9 +110,7 @@ static void grpc_proxy_mapper_list_destroy(grpc_proxy_mapper_list* list) {
static grpc_proxy_mapper_list g_proxy_mapper_list;
-void grpc_proxy_mapper_registry_init() {
- memset(&g_proxy_mapper_list, 0, sizeof(g_proxy_mapper_list));
-}
+void grpc_proxy_mapper_registry_init() {}
void grpc_proxy_mapper_registry_shutdown() {
grpc_proxy_mapper_list_destroy(&g_proxy_mapper_list);
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index 5df0a9060d..ed5029ea9a 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -148,6 +148,7 @@ struct grpc_subchannel {
struct grpc_subchannel_call {
grpc_connected_subchannel *connection;
+ grpc_closure *schedule_closure_after_destroy;
};
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
@@ -340,17 +341,15 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(new_address != NULL);
gpr_free(addr);
addr = new_address;
- if (new_args != NULL) c->args = new_args;
- }
- if (c->args == NULL) {
- static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
- grpc_arg new_arg = grpc_create_subchannel_address_arg(addr);
- c->args = grpc_channel_args_copy_and_add_and_remove(
- args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &new_arg,
- 1);
- gpr_free(new_arg.value.string);
}
+ static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
+ grpc_arg new_arg = grpc_create_subchannel_address_arg(addr);
gpr_free(addr);
+ c->args = grpc_channel_args_copy_and_add_and_remove(
+ new_args != NULL ? new_args : args->args, keys_to_remove,
+ GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1);
+ gpr_free(new_arg.value.string);
+ if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args);
c->root_external_state_watcher.next = c->root_external_state_watcher.prev =
&c->root_external_state_watcher;
grpc_closure_init(&c->connected, subchannel_connected, c,
@@ -719,13 +718,22 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) {
grpc_subchannel_call *c = call;
+ GPR_ASSERT(c->schedule_closure_after_destroy != NULL);
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
grpc_connected_subchannel *connection = c->connection;
- grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL, c);
+ grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL,
+ c->schedule_closure_after_destroy);
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call");
GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
}
+void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call *call,
+ grpc_closure *closure) {
+ GPR_ASSERT(call->schedule_closure_after_destroy == NULL);
+ GPR_ASSERT(closure != NULL);
+ call->schedule_closure_after_destroy = closure;
+}
+
void grpc_subchannel_call_ref(
grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
@@ -761,15 +769,22 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
- grpc_polling_entity *pollent, grpc_slice path, gpr_timespec start_time,
- gpr_timespec deadline, grpc_subchannel_call **call) {
+ const grpc_connected_subchannel_call_args *args,
+ grpc_subchannel_call **call) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
- *call = gpr_zalloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
+ *call = gpr_arena_alloc(
+ args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
(*call)->connection = con; // Ref is added below.
- grpc_error *error =
- grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call,
- NULL, NULL, path, start_time, deadline, callstk);
+ const grpc_call_element_args call_args = {.call_stack = callstk,
+ .server_transport_data = NULL,
+ .context = NULL,
+ .path = args->path,
+ .start_time = args->start_time,
+ .deadline = args->deadline,
+ .arena = args->arena};
+ grpc_error *error = grpc_call_stack_init(
+ exec_ctx, chanstk, 1, subchannel_call_destroy, *call, &call_args);
if (error != GRPC_ERROR_NONE) {
const char *error_string = grpc_error_string(error);
gpr_log(GPR_ERROR, "error: %s", error_string);
@@ -778,7 +793,7 @@ grpc_error *grpc_connected_subchannel_create_call(
return error;
}
GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
- grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, pollent);
+ grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, args->pollent);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h
index 6a70a76467..3e64a2507c 100644
--- a/src/core/ext/client_channel/subchannel.h
+++ b/src/core/ext/client_channel/subchannel.h
@@ -37,6 +37,7 @@
#include "src/core/ext/client_channel/connector.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/polling_entity.h"
+#include "src/core/lib/support/arena.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata.h"
@@ -112,10 +113,18 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
/** construct a subchannel call */
+typedef struct {
+ grpc_polling_entity *pollent;
+ grpc_slice path;
+ gpr_timespec start_time;
+ gpr_timespec deadline;
+ gpr_arena *arena;
+} grpc_connected_subchannel_call_args;
+
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel,
- grpc_polling_entity *pollent, grpc_slice path, gpr_timespec start_time,
- gpr_timespec deadline, grpc_subchannel_call **subchannel_call);
+ const grpc_connected_subchannel_call_args *args,
+ grpc_subchannel_call **subchannel_call);
/** process a transport level op */
void grpc_connected_subchannel_process_transport_op(
@@ -154,6 +163,11 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *subchannel_call);
+/** Must be called once per call. Sets the 'then_schedule_closure' argument for
+ call stack destruction. */
+void grpc_subchannel_call_set_cleanup_closure(
+ grpc_subchannel_call *subchannel_call, grpc_closure *closure);
+
grpc_call_stack *grpc_subchannel_call_get_call_stack(
grpc_subchannel_call *subchannel_call);
diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c
index c2750634a5..4ed832671d 100644
--- a/src/core/ext/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/load_reporting/load_reporting_filter.c
@@ -123,7 +123,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *ignored) {
+ grpc_closure *ignored) {
call_data *calld = elem->call_data;
/* TODO(dgq): do something with the data
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index da4c7dc7b2..082078c72f 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -511,6 +511,10 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_error *error) {
if (!t->closed) {
+ if (!grpc_error_has_clear_grpc_status(error)) {
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_UNAVAILABLE);
+ }
if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) {
if (t->close_transport_on_writes_finished == NULL) {
t->close_transport_on_writes_finished =
@@ -520,10 +524,6 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_error_add_child(t->close_transport_on_writes_finished, error);
return;
}
- if (!grpc_error_has_clear_grpc_status(error)) {
- error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
- GRPC_STATUS_UNAVAILABLE);
- }
t->closed = 1;
connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport");
@@ -575,7 +575,7 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s) {
static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_stream_refcount *refcount,
- const void *server_data) {
+ const void *server_data, gpr_arena *arena) {
GPR_TIMER_BEGIN("init_stream", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
@@ -588,8 +588,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
gpr_ref_init(&s->active_streams, 1);
GRPC_CHTTP2_STREAM_REF(s, "chttp2");
- grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0]);
- grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1]);
+ grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena);
+ grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1], arena);
grpc_chttp2_data_parser_init(&s->data_parser);
grpc_slice_buffer_init(&s->flow_controlled_buffer);
s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
@@ -665,16 +665,17 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GPR_TIMER_END("destroy_stream", 0);
- gpr_free(s->destroy_stream_arg);
+ grpc_closure_sched(exec_ctx, s->destroy_stream_arg, GRPC_ERROR_NONE);
}
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, void *and_free_memory) {
+ grpc_stream *gs,
+ grpc_closure *then_schedule_closure) {
GPR_TIMER_BEGIN("destroy_stream", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
- s->destroy_stream_arg = and_free_memory;
+ s->destroy_stream_arg = then_schedule_closure;
grpc_closure_sched(
exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s,
grpc_combiner_scheduler(t->combiner, false)),
@@ -1629,15 +1630,19 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
s->recv_trailing_metadata_finished != NULL) {
char status_string[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(status, status_string);
- grpc_chttp2_incoming_metadata_buffer_replace_or_add(
- exec_ctx, &s->metadata_buffer[1],
- grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_STATUS,
- grpc_slice_from_copied_string(status_string)));
+ GRPC_LOG_IF_ERROR("add_status",
+ grpc_chttp2_incoming_metadata_buffer_replace_or_add(
+ exec_ctx, &s->metadata_buffer[1],
+ grpc_mdelem_from_slices(
+ exec_ctx, GRPC_MDSTR_GRPC_STATUS,
+ grpc_slice_from_copied_string(status_string))));
if (msg != NULL) {
- grpc_chttp2_incoming_metadata_buffer_replace_or_add(
- exec_ctx, &s->metadata_buffer[1],
- grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
- grpc_slice_from_copied_string(msg)));
+ GRPC_LOG_IF_ERROR(
+ "add_status_message",
+ grpc_chttp2_incoming_metadata_buffer_replace_or_add(
+ exec_ctx, &s->metadata_buffer[1],
+ grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
+ grpc_slice_from_copied_string(msg))));
}
s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c
index f487533c41..9b4b1a7b84 100644
--- a/src/core/ext/transport/chttp2/transport/frame_ping.c
+++ b/src/core/ext/transport/chttp2/transport/frame_ping.c
@@ -91,7 +91,7 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_ping_parser *p = parser;
while (p->byte != 8 && cur != end) {
- p->opaque_8bytes |= (((uint64_t)*cur) << (8 * p->byte));
+ p->opaque_8bytes |= (((uint64_t)*cur) << (56 - 8 * p->byte));
cur++;
p->byte++;
}
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c
index 40f5120308..1865b997b7 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c
@@ -1620,13 +1620,18 @@ void grpc_chttp2_hpack_parser_destroy(grpc_exec_ctx *exec_ctx,
grpc_error *grpc_chttp2_hpack_parser_parse(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_parser *p,
grpc_slice slice) {
- /* TODO(ctiller): limit the distance of end from beg, and perform multiple
- steps in the event of a large chunk of data to limit
- stack space usage when no tail call optimization is
- available */
+/* max number of bytes to parse at a time... limits call stack depth on
+ * compilers without TCO */
+#define MAX_PARSE_LENGTH 1024
p->current_slice_refcount = slice.refcount;
- grpc_error *error = p->state(exec_ctx, p, GRPC_SLICE_START_PTR(slice),
- GRPC_SLICE_END_PTR(slice));
+ uint8_t *start = GRPC_SLICE_START_PTR(slice);
+ uint8_t *end = GRPC_SLICE_END_PTR(slice);
+ grpc_error *error = GRPC_ERROR_NONE;
+ while (start != end && error == GRPC_ERROR_NONE) {
+ uint8_t *target = start + GPR_MIN(MAX_PARSE_LENGTH, end - start);
+ error = p->state(exec_ctx, p, start, target);
+ start = target;
+ }
p->current_slice_refcount = NULL;
return error;
}
diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c
index c91b019aa0..da0a34d32a 100644
--- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c
+++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c
@@ -41,69 +41,48 @@
#include <grpc/support/log.h>
void grpc_chttp2_incoming_metadata_buffer_init(
- grpc_chttp2_incoming_metadata_buffer *buffer) {
- buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena) {
+ buffer->arena = arena;
+ grpc_metadata_batch_init(&buffer->batch);
+ buffer->batch.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
}
void grpc_chttp2_incoming_metadata_buffer_destroy(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer) {
- size_t i;
- if (!buffer->published) {
- for (i = 0; i < buffer->count; i++) {
- GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md);
- }
- }
- gpr_free(buffer->elems);
+ grpc_metadata_batch_destroy(exec_ctx, &buffer->batch);
}
-void grpc_chttp2_incoming_metadata_buffer_add(
- grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem) {
- GPR_ASSERT(!buffer->published);
- if (buffer->capacity == buffer->count) {
- buffer->capacity = GPR_MAX(8, 2 * buffer->capacity);
- buffer->elems =
- gpr_realloc(buffer->elems, sizeof(*buffer->elems) * buffer->capacity);
- }
- buffer->elems[buffer->count++].md = elem;
+grpc_error *grpc_chttp2_incoming_metadata_buffer_add(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
+ grpc_mdelem elem) {
buffer->size += GRPC_MDELEM_LENGTH(elem);
+ return grpc_metadata_batch_add_tail(
+ exec_ctx, &buffer->batch,
+ gpr_arena_alloc(buffer->arena, sizeof(grpc_linked_mdelem)), elem);
}
-void grpc_chttp2_incoming_metadata_buffer_replace_or_add(
+grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
grpc_mdelem elem) {
- for (size_t i = 0; i < buffer->count; i++) {
- if (grpc_slice_eq(GRPC_MDKEY(buffer->elems[i].md), GRPC_MDKEY(elem))) {
- GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md);
- buffer->elems[i].md = elem;
- return;
+ for (grpc_linked_mdelem *l = buffer->batch.list.head; l != NULL;
+ l = l->next) {
+ if (grpc_slice_eq(GRPC_MDKEY(l->md), GRPC_MDKEY(elem))) {
+ GRPC_MDELEM_UNREF(exec_ctx, l->md);
+ l->md = elem;
+ return GRPC_ERROR_NONE;
}
}
- grpc_chttp2_incoming_metadata_buffer_add(buffer, elem);
+ return grpc_chttp2_incoming_metadata_buffer_add(exec_ctx, buffer, elem);
}
void grpc_chttp2_incoming_metadata_buffer_set_deadline(
grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) {
- GPR_ASSERT(!buffer->published);
- buffer->deadline = deadline;
+ buffer->batch.deadline = deadline;
}
void grpc_chttp2_incoming_metadata_buffer_publish(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
grpc_metadata_batch *batch) {
- GPR_ASSERT(!buffer->published);
- buffer->published = 1;
- if (buffer->count > 0) {
- size_t i;
- for (i = 0; i < buffer->count; i++) {
- /* TODO(ctiller): do something better here */
- if (!GRPC_LOG_IF_ERROR("grpc_chttp2_incoming_metadata_buffer_publish",
- grpc_metadata_batch_link_tail(
- exec_ctx, batch, &buffer->elems[i]))) {
- GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md);
- }
- }
- } else {
- batch->list.head = batch->list.tail = NULL;
- }
- batch->deadline = buffer->deadline;
+ *batch = buffer->batch;
+ grpc_metadata_batch_init(&buffer->batch);
}
diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.h b/src/core/ext/transport/chttp2/transport/incoming_metadata.h
index 1eac6fc150..288c917e65 100644
--- a/src/core/ext/transport/chttp2/transport/incoming_metadata.h
+++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.h
@@ -37,28 +37,26 @@
#include "src/core/lib/transport/transport.h"
typedef struct {
- grpc_linked_mdelem *elems;
- size_t count;
- size_t capacity;
- gpr_timespec deadline;
- int published;
+ gpr_arena *arena;
+ grpc_metadata_batch batch;
size_t size; // total size of metadata
} grpc_chttp2_incoming_metadata_buffer;
/** assumes everything initially zeroed */
void grpc_chttp2_incoming_metadata_buffer_init(
- grpc_chttp2_incoming_metadata_buffer *buffer);
+ grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena);
void grpc_chttp2_incoming_metadata_buffer_destroy(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_publish(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
grpc_metadata_batch *batch);
-void grpc_chttp2_incoming_metadata_buffer_add(
- grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem);
-void grpc_chttp2_incoming_metadata_buffer_replace_or_add(
+grpc_error *grpc_chttp2_incoming_metadata_buffer_add(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
- grpc_mdelem elem);
+ grpc_mdelem elem) GRPC_MUST_USE_RESULT;
+grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
+ grpc_mdelem elem) GRPC_MUST_USE_RESULT;
void grpc_chttp2_incoming_metadata_buffer_set_deadline(
grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index d26812ad6b..3c56c21599 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -425,7 +425,7 @@ struct grpc_chttp2_stream {
grpc_stream_refcount *refcount;
grpc_closure destroy_stream;
- void *destroy_stream_arg;
+ grpc_closure *destroy_stream_arg;
grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
uint8_t included[STREAM_LIST_COUNT];
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 7ed00522c3..7efc8c63c9 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -381,16 +381,38 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx,
s->incoming_window_delta +
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) {
- char *msg;
- gpr_asprintf(&msg,
- "frame of size %d overflows incoming window of %" PRId64,
- t->incoming_frame_size,
- s->incoming_window_delta +
- t->settings[GRPC_ACKED_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
- grpc_error *err = GRPC_ERROR_CREATE(msg);
- gpr_free(msg);
- return err;
+ if (incoming_frame_size <=
+ s->incoming_window_delta +
+ t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) {
+ gpr_log(
+ GPR_ERROR,
+ "Incoming frame of size %d exceeds incoming window size of %" PRId64
+ ".\n"
+ "The (un-acked, future) window size would be %" PRId64
+ " which is not exceeded.\n"
+ "This would usually cause a disconnection, but allowing it due to "
+ "broken HTTP2 implementations in the wild.\n"
+ "See (for example) https://github.com/netty/netty/issues/6520.",
+ t->incoming_frame_size,
+ s->incoming_window_delta +
+ t->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ s->incoming_window_delta +
+ t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
+ } else {
+ char *msg;
+ gpr_asprintf(&msg,
+ "frame of size %d overflows incoming window of %" PRId64,
+ t->incoming_frame_size,
+ s->incoming_window_delta +
+ t->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
+ grpc_error *err = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return err;
+ }
}
GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA("parse", t, s,
@@ -526,7 +548,14 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp,
s->seen_error = true;
GRPC_MDELEM_UNREF(exec_ctx, md);
} else {
- grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[0], md);
+ grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add(
+ exec_ctx, &s->metadata_buffer[0], md);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_chttp2_cancel_stream(exec_ctx, t, s, error);
+ grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
+ s->seen_error = true;
+ GRPC_MDELEM_UNREF(exec_ctx, md);
+ }
}
}
@@ -576,7 +605,14 @@ static void on_trailing_header(grpc_exec_ctx *exec_ctx, void *tp,
s->seen_error = true;
GRPC_MDELEM_UNREF(exec_ctx, md);
} else {
- grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[1], md);
+ grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add(
+ exec_ctx, &s->metadata_buffer[1], md);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_chttp2_cancel_stream(exec_ctx, t, s, error);
+ grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
+ s->seen_error = true;
+ GRPC_MDELEM_UNREF(exec_ctx, md);
+ }
}
GPR_TIMER_END("on_trailing_header", 0);
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 01a03533da..450d9ab23a 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -54,6 +54,7 @@
#include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
#define GRPC_HEADER_SIZE_IN_BYTES 5
+#define GRPC_FLUSH_READ_SIZE 4096
#define CRONET_LOG(...) \
do { \
@@ -151,11 +152,17 @@ struct write_state {
struct op_state {
bool state_op_done[OP_NUM_OPS];
bool state_callback_received[OP_NUM_OPS];
+ /* A non-zero gRPC status code has been seen */
bool fail_state;
+ /* Transport is discarding all buffered messages */
bool flush_read;
bool flush_cronet_when_ready;
bool pending_write_for_trailer;
- bool unprocessed_send_message;
+ bool pending_send_message;
+ /* User requested RECV_TRAILING_METADATA */
+ bool pending_recv_trailing_metadata;
+ /* Cronet has not issued a callback of a bidirectional read */
+ bool pending_read_from_cronet;
grpc_error *cancel_error;
/* data structure for storing data coming from server */
struct read_state rs;
@@ -177,6 +184,7 @@ struct op_storage {
};
struct stream_obj {
+ gpr_arena *arena;
struct op_and_state *oas;
grpc_transport_stream_op *curr_op;
grpc_cronet_transport *curr_ct;
@@ -248,11 +256,35 @@ static const char *op_id_string(enum e_op_id i) {
return "UNKNOWN";
}
-static void free_read_buffer(stream_obj *s) {
+static void null_and_maybe_free_read_buffer(stream_obj *s) {
if (s->state.rs.read_buffer &&
s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
gpr_free(s->state.rs.read_buffer);
- s->state.rs.read_buffer = NULL;
+ }
+ s->state.rs.read_buffer = NULL;
+}
+
+static void maybe_flush_read(stream_obj *s) {
+ /* To enter flush read state (discarding all the buffered messages in
+ * transport layer), two conditions must be satisfied: 1) non-zero grpc status
+ * has been received, and 2) an op requesting the status code
+ * (RECV_TRAILING_METADATA) is issued by the user. (See
+ * doc/status_ordering.md) */
+ /* Whenever the evaluation of any of the two condition is changed, we check
+ * whether we should enter the flush read state. */
+ if (s->state.pending_recv_trailing_metadata && s->state.fail_state) {
+ if (!s->state.flush_read && !s->state.rs.read_stream_closed) {
+ CRONET_LOG(GPR_DEBUG, "%p: Flush read", s);
+ s->state.flush_read = true;
+ null_and_maybe_free_read_buffer(s);
+ s->state.rs.read_buffer = gpr_malloc(GRPC_FLUSH_READ_SIZE);
+ if (!s->state.pending_read_from_cronet) {
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
+ bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
+ GRPC_FLUSH_READ_SIZE);
+ s->state.pending_read_from_cronet = true;
+ }
+ }
}
}
@@ -279,7 +311,11 @@ static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) {
storage->head = new_op;
storage->num_pending_ops++;
if (op->send_message) {
- s->state.unprocessed_send_message = true;
+ s->state.pending_send_message = true;
+ }
+ if (op->recv_trailing_metadata) {
+ s->state.pending_recv_trailing_metadata = true;
+ maybe_flush_read(s);
}
CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
storage->num_pending_ops);
@@ -367,7 +403,7 @@ static void on_failed(bidirectional_stream *stream, int net_error) {
gpr_free(s->state.ws.write_buffer);
s->state.ws.write_buffer = NULL;
}
- free_read_buffer(s);
+ null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -390,7 +426,7 @@ static void on_canceled(bidirectional_stream *stream) {
gpr_free(s->state.ws.write_buffer);
s->state.ws.write_buffer = NULL;
}
- free_read_buffer(s);
+ null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -405,7 +441,7 @@ static void on_succeeded(bidirectional_stream *stream) {
bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_SUCCEEDED] = true;
s->cbs = NULL;
- free_read_buffer(s);
+ null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -451,15 +487,18 @@ static void on_response_headers_received(
gpr_mu_lock(&s->mu);
memset(&s->state.rs.initial_metadata, 0,
sizeof(s->state.rs.initial_metadata));
- grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata);
+ grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata,
+ s->arena);
for (size_t i = 0; i < headers->count; i++) {
- grpc_chttp2_incoming_metadata_buffer_add(
- &s->state.rs.initial_metadata,
- grpc_mdelem_from_slices(
- &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string(
- headers->headers[i].key)),
- grpc_slice_intern(
- grpc_slice_from_static_string(headers->headers[i].value))));
+ GRPC_LOG_IF_ERROR(
+ "on_response_headers_received",
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &exec_ctx, &s->state.rs.initial_metadata,
+ grpc_mdelem_from_slices(
+ &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string(
+ headers->headers[i].key)),
+ grpc_slice_intern(grpc_slice_from_static_string(
+ headers->headers[i].value)))));
}
s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
@@ -473,6 +512,7 @@ static void on_response_headers_received(
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
s->state.rs.remaining_bytes);
+ s->state.pending_read_from_cronet = true;
}
gpr_mu_unlock(&s->mu);
grpc_exec_ctx_finish(&exec_ctx);
@@ -504,10 +544,13 @@ static void on_read_completed(bidirectional_stream *stream, char *data,
CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
count);
gpr_mu_lock(&s->mu);
+ s->state.pending_read_from_cronet = false;
s->state.state_callback_received[OP_RECV_MESSAGE] = true;
if (count > 0 && s->state.flush_read) {
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
- bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, 4096);
+ bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
+ GRPC_FLUSH_READ_SIZE);
+ s->state.pending_read_from_cronet = true;
gpr_mu_unlock(&s->mu);
} else if (count > 0) {
s->state.rs.received_bytes += count;
@@ -518,16 +561,14 @@ static void on_read_completed(bidirectional_stream *stream, char *data,
bidirectional_stream_read(
s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
s->state.rs.remaining_bytes);
+ s->state.pending_read_from_cronet = true;
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
} else {
- if (s->state.flush_read) {
- gpr_free(s->state.rs.read_buffer);
- s->state.rs.read_buffer = NULL;
- }
+ null_and_maybe_free_read_buffer(s);
s->state.rs.read_stream_closed = true;
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
@@ -549,21 +590,25 @@ static void on_response_trailers_received(
memset(&s->state.rs.trailing_metadata, 0,
sizeof(s->state.rs.trailing_metadata));
s->state.rs.trailing_metadata_valid = false;
- grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata);
+ grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata,
+ s->arena);
for (size_t i = 0; i < trailers->count; i++) {
CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key,
trailers->headers[i].value);
- grpc_chttp2_incoming_metadata_buffer_add(
- &s->state.rs.trailing_metadata,
- grpc_mdelem_from_slices(
- &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string(
- trailers->headers[i].key)),
- grpc_slice_intern(
- grpc_slice_from_static_string(trailers->headers[i].value))));
+ GRPC_LOG_IF_ERROR(
+ "on_response_trailers_received",
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &exec_ctx, &s->state.rs.trailing_metadata,
+ grpc_mdelem_from_slices(
+ &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string(
+ trailers->headers[i].key)),
+ grpc_slice_intern(grpc_slice_from_static_string(
+ trailers->headers[i].value)))));
s->state.rs.trailing_metadata_valid = true;
if (0 == strcmp(trailers->headers[i].key, "grpc-status") &&
0 != strcmp(trailers->headers[i].value, "0")) {
s->state.fail_state = true;
+ maybe_flush_read(s);
}
}
s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
@@ -778,7 +823,7 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op,
else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
result = false;
/* we haven't sent message yet */
- else if (stream_state->unprocessed_send_message &&
+ else if (stream_state->pending_send_message &&
!stream_state->state_op_done[OP_SEND_MESSAGE])
result = false;
/* we haven't got on_write_completed for the send yet */
@@ -900,7 +945,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
} else if (stream_op->send_message &&
op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas);
- stream_state->unprocessed_send_message = false;
+ stream_state->pending_send_message = false;
if (stream_state->state_callback_received[OP_FAILED]) {
result = NO_ACTION_POSSIBLE;
CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
@@ -1009,6 +1054,13 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
+ } else if (stream_state->flush_read) {
+ CRONET_LOG(GPR_DEBUG, "flush read");
+ grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_NONE);
+ stream_state->state_op_done[OP_RECV_MESSAGE] = true;
+ oas->state.state_op_done[OP_RECV_MESSAGE] = true;
+ result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->rs.length_field_received == false) {
if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
stream_state->rs.remaining_bytes == 0) {
@@ -1029,6 +1081,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
true; /* Indicates that at least one read request has been made */
bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
stream_state->rs.remaining_bytes);
+ stream_state->pending_read_from_cronet = true;
result = ACTION_TAKEN_WITH_CALLBACK;
} else {
stream_state->rs.remaining_bytes = 0;
@@ -1047,11 +1100,13 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
stream_state->rs.received_bytes = 0;
+ stream_state->rs.length_field_received = false;
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
stream_state->state_op_done[OP_READ_REQ_MADE] =
true; /* Indicates that at least one read request has been made */
bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
stream_state->rs.remaining_bytes);
+ stream_state->pending_read_from_cronet = true;
result = ACTION_TAKEN_NO_CALLBACK;
}
} else if (stream_state->rs.remaining_bytes == 0) {
@@ -1064,6 +1119,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
true; /* Indicates that at least one read request has been made */
bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
stream_state->rs.remaining_bytes);
+ stream_state->pending_read_from_cronet = true;
result = ACTION_TAKEN_WITH_CALLBACK;
} else {
result = NO_ACTION_POSSIBLE;
@@ -1075,7 +1131,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
uint8_t *dst_p = GRPC_SLICE_START_PTR(read_data_slice);
memcpy(dst_p, stream_state->rs.read_buffer,
(size_t)stream_state->rs.length_field);
- free_read_buffer(s);
+ null_and_maybe_free_read_buffer(s);
grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
read_data_slice);
@@ -1096,6 +1152,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
stream_state->rs.remaining_bytes);
+ stream_state->pending_read_from_cronet = true;
result = ACTION_TAKEN_NO_CALLBACK;
}
} else if (stream_op->recv_trailing_metadata &&
@@ -1153,15 +1210,6 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
make a note */
if (stream_op->recv_message)
stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
- } else if (stream_state->fail_state && !stream_state->flush_read) {
- CRONET_LOG(GPR_DEBUG, "running: %p flush read", oas);
- if (stream_state->rs.read_buffer &&
- stream_state->rs.read_buffer != stream_state->rs.grpc_header_bytes) {
- gpr_free(stream_state->rs.read_buffer);
- stream_state->rs.read_buffer = NULL;
- }
- stream_state->rs.read_buffer = gpr_malloc(4096);
- stream_state->flush_read = true;
} else {
result = NO_ACTION_POSSIBLE;
}
@@ -1174,7 +1222,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_stream_refcount *refcount,
- const void *server_data) {
+ const void *server_data, gpr_arena *arena) {
stream_obj *s = (stream_obj *)gs;
memset(&s->storage, 0, sizeof(s->storage));
s->storage.head = NULL;
@@ -1190,10 +1238,13 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->state.fail_state = s->state.flush_read = false;
s->state.cancel_error = NULL;
s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false;
- s->state.unprocessed_send_message = false;
+ s->state.pending_send_message = false;
+ s->state.pending_recv_trailing_metadata = false;
+ s->state.pending_read_from_cronet = false;
s->curr_gs = gs;
s->curr_ct = (grpc_cronet_transport *)gt;
+ s->arena = arena;
gpr_mu_init(&s->mu);
return 0;
@@ -1209,38 +1260,33 @@ static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx,
static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_transport_stream_op *op) {
CRONET_LOG(GPR_DEBUG, "perform_stream_op");
- stream_obj *s = (stream_obj *)gs;
- add_to_storage(s, op);
if (op->send_initial_metadata &&
header_has_authority(op->send_initial_metadata->list.head)) {
/* Cronet does not support :authority header field. We cancel the call when
- this field is present in metadata */
- bidirectional_stream_header_array header_array;
- bidirectional_stream_header *header;
- bidirectional_stream cbs;
- CRONET_LOG(GPR_DEBUG,
- ":authority header is provided but not supported;"
- " cancel operations");
- /* Notify application that operation is cancelled by forging trailers */
- header_array.count = 1;
- header_array.capacity = 1;
- header_array.headers = gpr_malloc(sizeof(bidirectional_stream_header));
- header = (bidirectional_stream_header *)header_array.headers;
- header->key = "grpc-status";
- header->value = "1"; /* Return status GRPC_STATUS_CANCELLED */
- cbs.annotation = (void *)s;
- s->state.state_op_done[OP_CANCEL_ERROR] = true;
- on_response_trailers_received(&cbs, &header_array);
- gpr_free(header_array.headers);
- } else {
- execute_from_storage(s);
+ this field is present in metadata */
+ if (op->recv_initial_metadata_ready) {
+ grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready,
+ GRPC_ERROR_CANCELLED);
+ }
+ if (op->recv_message_ready) {
+ grpc_closure_sched(exec_ctx, op->recv_message_ready,
+ GRPC_ERROR_CANCELLED);
+ }
+ grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_CANCELLED);
+ return;
}
+ stream_obj *s = (stream_obj *)gs;
+ add_to_storage(s, op);
+ execute_from_storage(s);
}
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, void *and_free_memory) {
+ grpc_stream *gs,
+ grpc_closure *then_schedule_closure) {
stream_obj *s = (stream_obj *)gs;
+ null_and_maybe_free_read_buffer(s);
GRPC_ERROR_UNREF(s->state.cancel_error);
+ grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
}
static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {}