aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar murgatroid99 <mlumish@google.com>2017-04-25 12:15:53 -0700
committerGravatar murgatroid99 <mlumish@google.com>2017-04-25 12:15:53 -0700
commit3049462b75ac557348243601896ed68ca8145522 (patch)
tree213389dd32b961b25f3833738201e857ce7ae2e3 /src/core/ext
parent627e2a5abd2e4146c85625e98083a95846ad57e8 (diff)
parent79759fea1abd09102d38af3e13a84b69e898124b (diff)
Merge remote-tracking branch 'upstream/v1.3.x' into node_only_uv
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/census/context.c2
-rw-r--r--src/core/ext/census/resource.c4
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c130
-rw-r--r--src/core/ext/filters/client_channel/client_channel_plugin.c5
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c28
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c27
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h97
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_factory.c33
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_factory.h22
-rw-r--r--src/core/ext/filters/client_channel/parse_address.c44
-rw-r--r--src/core/ext/filters/client_channel/parse_address.h19
-rw-r--r--src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c6
-rw-r--r--src/core/ext/filters/client_channel/subchannel.c18
-rw-r--r--src/core/ext/filters/client_channel/uri_parser.c58
-rw-r--r--src/core/ext/filters/client_channel/uri_parser.h2
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.c373
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.h101
-rw-r--r--src/core/ext/filters/http/client/http_client_filter.c609
-rw-r--r--src/core/ext/filters/http/client/http_client_filter.h44
-rw-r--r--src/core/ext/filters/http/http_filters_plugin.c103
-rw-r--r--src/core/ext/filters/http/message_compress/message_compress_filter.c451
-rw-r--r--src/core/ext/filters/http/message_compress/message_compress_filter.h68
-rw-r--r--src/core/ext/filters/http/server/http_server_filter.c445
-rw-r--r--src/core/ext/filters/http/server/http_server_filter.h42
-rw-r--r--src/core/ext/filters/load_reporting/load_reporting.c27
-rw-r--r--src/core/ext/filters/load_reporting/load_reporting_filter.c27
-rw-r--r--src/core/ext/filters/max_age/max_age_filter.c28
-rw-r--r--src/core/ext/filters/message_size/message_size_filter.c314
-rw-r--r--src/core/ext/filters/message_size/message_size_filter.h39
-rw-r--r--src/core/ext/transport/chttp2/server/chttp2_server.c4
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c388
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_data.c363
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_data.h22
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h52
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c5
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c17
36 files changed, 3477 insertions, 540 deletions
diff --git a/src/core/ext/census/context.c b/src/core/ext/census/context.c
index 0dfc4ecbf1..4195cb1c9b 100644
--- a/src/core/ext/census/context.c
+++ b/src/core/ext/census/context.c
@@ -200,7 +200,7 @@ static bool tag_set_add_tag(struct tag_set *tags, const census_tag *tag,
// allocate new memory if needed
tags->kvm_size += 2 * CENSUS_MAX_TAG_KV_LEN + TAG_HEADER_SIZE;
char *new_kvm = gpr_malloc(tags->kvm_size);
- memcpy(new_kvm, tags->kvm, tags->kvm_used);
+ if (tags->kvm_used > 0) memcpy(new_kvm, tags->kvm, tags->kvm_used);
gpr_free(tags->kvm);
tags->kvm = new_kvm;
}
diff --git a/src/core/ext/census/resource.c b/src/core/ext/census/resource.c
index ed44f004f9..26ea1a8672 100644
--- a/src/core/ext/census/resource.c
+++ b/src/core/ext/census/resource.c
@@ -223,7 +223,9 @@ size_t allocate_resource(void) {
if (n_resources == n_defined_resources) {
size_t new_n_resources = n_resources ? n_resources * 2 : 2;
resource **new_resources = gpr_malloc(new_n_resources * sizeof(resource *));
- memcpy(new_resources, resources, n_resources * sizeof(resource *));
+ if (n_resources != 0) {
+ memcpy(new_resources, resources, n_resources * sizeof(resource *));
+ }
memset(new_resources + n_resources, 0,
(new_n_resources - n_resources) * sizeof(resource *));
gpr_free(resources);
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index 83e3b8f118..0463b25412 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -49,9 +49,9 @@
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/ext/filters/client_channel/retry_throttle.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/ext/filters/deadline/deadline_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
-#include "src/core/lib/channel/deadline_filter.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h"
@@ -96,17 +96,10 @@ static void method_parameters_unref(method_parameters *method_params) {
}
}
-static void *method_parameters_copy(void *value) {
- return method_parameters_ref(value);
-}
-
static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) {
method_parameters_unref(value);
}
-static const grpc_slice_hash_table_vtable method_parameters_vtable = {
- method_parameters_free, method_parameters_copy};
-
static bool parse_wait_for_ready(grpc_json *field,
wait_for_ready_value *wait_for_ready) {
if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
@@ -183,6 +176,8 @@ typedef struct client_channel_channel_data {
grpc_resolver *resolver;
/** have we started resolving this channel */
bool started_resolving;
+ /** is deadline checking enabled? */
+ bool deadline_checking_enabled;
/** client channel factory */
grpc_client_channel_factory *client_channel_factory;
@@ -236,14 +231,23 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state state,
grpc_error *error,
const char *reason) {
- if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
- state == GRPC_CHANNEL_SHUTDOWN) &&
- chand->lb_policy != NULL) {
- /* cancel picks with wait_for_ready=false */
- grpc_lb_policy_cancel_picks_locked(
- exec_ctx, chand->lb_policy,
- /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
- /* check= */ 0, GRPC_ERROR_REF(error));
+ /* TODO: Improve failure handling:
+ * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
+ * - Hand over pending picks from old policies during the switch that happens
+ * when resolver provides an update. */
+ if (chand->lb_policy != NULL) {
+ if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ /* cancel picks with wait_for_ready=false */
+ grpc_lb_policy_cancel_picks_locked(
+ exec_ctx, chand->lb_policy,
+ /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
+ /* check= */ 0, GRPC_ERROR_REF(error));
+ } else if (state == GRPC_CHANNEL_SHUTDOWN) {
+ /* cancel all picks */
+ grpc_lb_policy_cancel_picks_locked(exec_ctx, chand->lb_policy,
+ /* mask= */ 0, /* check= */ 0,
+ GRPC_ERROR_REF(error));
+ }
}
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
reason);
@@ -346,6 +350,33 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
}
}
+// Wrap a closure associated with \a lb_policy. The associated callback (\a
+// wrapped_on_pick_closure_cb) is responsible for unref'ing \a lb_policy after
+// scheduling \a wrapped_closure.
+typedef struct wrapped_on_pick_closure_arg {
+ /* the closure instance using this struct as argument */
+ grpc_closure wrapper_closure;
+
+ /* the original closure. Usually a on_complete/notify cb for pick() and ping()
+ * calls against the internal RR instance, respectively. */
+ grpc_closure *wrapped_closure;
+
+ /* The policy instance related to the closure */
+ grpc_lb_policy *lb_policy;
+} wrapped_on_pick_closure_arg;
+
+// Invoke \a arg->wrapped_closure, unref \a arg->lb_policy and free \a arg.
+static void wrapped_on_pick_closure_cb(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ wrapped_on_pick_closure_arg *wc_arg = arg;
+ GPR_ASSERT(wc_arg != NULL);
+ GPR_ASSERT(wc_arg->wrapped_closure != NULL);
+ GPR_ASSERT(wc_arg->lb_policy != NULL);
+ grpc_closure_run(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
+ GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->lb_policy, "pick_subchannel_wrapping");
+ gpr_free(wc_arg);
+}
+
static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
channel_data *chand = arg;
@@ -369,26 +400,24 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
lb_policy_name = channel_arg->value.string;
}
- // Special case: If all of the addresses are balancer addresses,
- // assume that we should use the grpclb policy, regardless of what the
- // resolver actually specified.
+ // Special case: If at least one balancer address is present, we use
+ // the grpclb policy, regardless of what the resolver actually specified.
channel_arg =
grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
if (channel_arg != NULL && channel_arg->type == GRPC_ARG_POINTER) {
grpc_lb_addresses *addresses = channel_arg->value.pointer.p;
- bool found_backend_address = false;
+ bool found_balancer_address = false;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
- if (!addresses->addresses[i].is_balancer) {
- found_backend_address = true;
+ if (addresses->addresses[i].is_balancer) {
+ found_balancer_address = true;
break;
}
}
- if (!found_backend_address) {
+ if (found_balancer_address) {
if (lb_policy_name != NULL && strcmp(lb_policy_name, "grpclb") != 0) {
gpr_log(GPR_INFO,
- "resolver requested LB policy %s but provided only balancer "
- "addresses, no backend addresses -- forcing use of grpclb LB "
- "policy",
+ "resolver requested LB policy %s but provided at least one "
+ "balancer address -- forcing use of grpclb LB policy",
lb_policy_name);
}
lb_policy_name = "grpclb";
@@ -434,7 +463,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_uri_destroy(uri);
method_params_table = grpc_service_config_create_method_config_table(
exec_ctx, service_config, method_parameters_create_from_json,
- &method_parameters_vtable);
+ method_parameters_free);
grpc_service_config_destroy(service_config);
}
}
@@ -676,6 +705,8 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
if (chand->resolver == NULL) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
}
+ chand->deadline_checking_enabled =
+ grpc_deadline_checking_enabled(args->channel_args);
return GRPC_ERROR_NONE;
}
@@ -864,12 +895,14 @@ static void apply_final_configuration_locked(grpc_exec_ctx *exec_ctx,
/* apply service-config level configuration to the call (now that we're
* certain it exists) */
call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
gpr_timespec per_method_deadline;
if (set_call_method_params_from_service_config_locked(exec_ctx, elem,
&per_method_deadline)) {
// If the deadline from the service config is shorter than the one
// from the client API, reset the deadline timer.
- if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
+ if (chand->deadline_checking_enabled &&
+ gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
calld->deadline = per_method_deadline;
grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
}
@@ -1031,11 +1064,29 @@ static bool pick_subchannel_locked(
const grpc_lb_policy_pick_args inputs = {
initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem,
gpr_inf_future(GPR_CLOCK_MONOTONIC)};
- const bool result = grpc_lb_policy_pick_locked(
- exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready);
+
+ // Wrap the user-provided callback in order to hold a strong reference to
+ // the LB policy for the duration of the pick.
+ wrapped_on_pick_closure_arg *w_on_pick_arg =
+ gpr_zalloc(sizeof(*w_on_pick_arg));
+ grpc_closure_init(&w_on_pick_arg->wrapper_closure,
+ wrapped_on_pick_closure_cb, w_on_pick_arg,
+ grpc_schedule_on_exec_ctx);
+ w_on_pick_arg->wrapped_closure = on_ready;
+ GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel_wrapping");
+ w_on_pick_arg->lb_policy = lb_policy;
+ const bool pick_done = grpc_lb_policy_pick_locked(
+ exec_ctx, lb_policy, &inputs, connected_subchannel, NULL,
+ &w_on_pick_arg->wrapper_closure);
+ if (pick_done) {
+ /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
+ GRPC_LB_POLICY_UNREF(exec_ctx, w_on_pick_arg->lb_policy,
+ "pick_subchannel_wrapping");
+ gpr_free(w_on_pick_arg);
+ }
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
GPR_TIMER_END("pick_subchannel", 0);
- return result;
+ return pick_done;
}
if (chand->resolver != NULL && !chand->started_resolving) {
chand->started_resolving = true;
@@ -1227,8 +1278,10 @@ static void cc_start_transport_stream_op_batch(
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
- op);
+ if (chand->deadline_checking_enabled) {
+ grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
+ op);
+ }
/* try to (atomically) get the call */
grpc_subchannel_call *call = GET_CALL(calld);
GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
@@ -1262,14 +1315,16 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
// Initialize data members.
- grpc_deadline_state_init(exec_ctx, elem, args->call_stack);
calld->path = grpc_slice_ref_internal(args->path);
calld->call_start_time = args->start_time;
calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
calld->owning_call = args->call_stack;
calld->arena = args->arena;
- grpc_deadline_state_start(exec_ctx, elem, calld->deadline);
+ if (chand->deadline_checking_enabled) {
+ grpc_deadline_state_init(exec_ctx, elem, args->call_stack, calld->deadline);
+ }
return GRPC_ERROR_NONE;
}
@@ -1279,7 +1334,10 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
const grpc_call_final_info *final_info,
grpc_closure *then_schedule_closure) {
call_data *calld = elem->call_data;
- grpc_deadline_state_destroy(exec_ctx, elem);
+ channel_data *chand = elem->channel_data;
+ if (chand->deadline_checking_enabled) {
+ grpc_deadline_state_destroy(exec_ctx, elem);
+ }
grpc_slice_unref_internal(exec_ctx, calld->path);
if (calld->method_params != NULL) {
method_parameters_unref(calld->method_params);
diff --git a/src/core/ext/filters/client_channel/client_channel_plugin.c b/src/core/ext/filters/client_channel/client_channel_plugin.c
index a68c01c222..0e3eae6615 100644
--- a/src/core/ext/filters/client_channel/client_channel_plugin.c
+++ b/src/core/ext/filters/client_channel/client_channel_plugin.c
@@ -91,8 +91,9 @@ void grpc_client_channel_init(void) {
grpc_subchannel_index_init();
grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MIN,
set_default_host_if_unset, NULL);
- grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, append_filter,
- (void *)&grpc_client_channel_filter);
+ grpc_channel_init_register_stage(
+ GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, append_filter,
+ (void *)&grpc_client_channel_filter);
grpc_http_connect_register_handshaker_factory();
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index ff8d319309..ad5f0685ec 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -750,18 +750,11 @@ static void destroy_balancer_name(grpc_exec_ctx *exec_ctx,
gpr_free(balancer_name);
}
-static void *copy_balancer_name(void *balancer_name) {
- return gpr_strdup(balancer_name);
-}
-
static grpc_slice_hash_table_entry targets_info_entry_create(
const char *address, const char *balancer_name) {
- static const grpc_slice_hash_table_vtable vtable = {destroy_balancer_name,
- copy_balancer_name};
grpc_slice_hash_table_entry entry;
entry.key = grpc_slice_from_copied_string(address);
- entry.value = (void *)balancer_name;
- entry.vtable = &vtable;
+ entry.value = gpr_strdup(balancer_name);
return entry;
}
@@ -825,11 +818,8 @@ static char *get_lb_uri_target_addresses(grpc_exec_ctx *exec_ctx,
uri_path);
gpr_free(uri_path);
- *targets_info =
- grpc_slice_hash_table_create(num_grpclb_addrs, targets_info_entries);
- for (size_t i = 0; i < num_grpclb_addrs; i++) {
- grpc_slice_unref_internal(exec_ctx, targets_info_entries[i].key);
- }
+ *targets_info = grpc_slice_hash_table_create(
+ num_grpclb_addrs, targets_info_entries, destroy_balancer_name);
gpr_free(targets_info_entries);
return target_uri_str;
@@ -841,10 +831,10 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
/* Count the number of gRPC-LB addresses. There must be at least one.
* TODO(roth): For now, we ignore non-balancer addresses, but in the
* future, we may change the behavior such that we fall back to using
- * the non-balancer addresses if we cannot reach any balancers. At that
- * time, this should be changed to allow a list with no balancer addresses,
- * since the resolver might fail to return a balancer address even when
- * this is the right LB policy to use. */
+ * the non-balancer addresses if we cannot reach any balancers. In the
+ * fallback case, we should use the LB policy indicated by
+ * GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is
+ * unset, we should default to pick_first). */
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
@@ -1122,6 +1112,7 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
glb_policy->base.interested_parties,
GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
&host, glb_policy->deadline, NULL);
+ grpc_slice_unref_internal(exec_ctx, host);
grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv);
grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv);
@@ -1152,7 +1143,7 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy) {
GPR_ASSERT(glb_policy->lb_call != NULL);
- grpc_call_destroy(glb_policy->lb_call);
+ grpc_call_unref(glb_policy->lb_call);
glb_policy->lb_call = NULL;
grpc_metadata_array_destroy(&glb_policy->lb_initial_metadata_recv);
@@ -1293,6 +1284,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
"Received empty server list. Picks will stay pending until a "
"response with > 0 servers is received");
}
+ grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
} else { /* serverlist == NULL */
gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.",
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
index e9adf98711..fb119c7fc8 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
@@ -16,6 +16,12 @@ const pb_field_t grpc_lb_v1_Duration_fields[3] = {
PB_LAST_FIELD
};
+const pb_field_t grpc_lb_v1_Timestamp_fields[3] = {
+ PB_FIELD( 1, INT64 , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Timestamp, seconds, seconds, 0),
+ PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Timestamp, nanos, seconds, 0),
+ PB_LAST_FIELD
+};
+
const pb_field_t grpc_lb_v1_LoadBalanceRequest_fields[3] = {
PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v1_LoadBalanceRequest, initial_request, initial_request, &grpc_lb_v1_InitialLoadBalanceRequest_fields),
PB_FIELD( 2, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v1_LoadBalanceRequest, client_stats, initial_request, &grpc_lb_v1_ClientStats_fields),
@@ -27,10 +33,14 @@ const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2] = {
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v1_ClientStats_fields[4] = {
- PB_FIELD( 1, INT64 , OPTIONAL, STATIC , FIRST, grpc_lb_v1_ClientStats, total_requests, total_requests, 0),
- PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, client_rpc_errors, total_requests, 0),
- PB_FIELD( 3, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, dropped_requests, client_rpc_errors, 0),
+const pb_field_t grpc_lb_v1_ClientStats_fields[8] = {
+ PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v1_ClientStats, timestamp, timestamp, &grpc_lb_v1_Timestamp_fields),
+ PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_started, timestamp, 0),
+ PB_FIELD( 3, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished, num_calls_started, 0),
+ PB_FIELD( 4, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_rate_limiting, num_calls_finished, 0),
+ PB_FIELD( 5, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_load_balancing, num_calls_finished_with_drop_for_rate_limiting, 0),
+ PB_FIELD( 6, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_client_failed_to_send, num_calls_finished_with_drop_for_load_balancing, 0),
+ PB_FIELD( 7, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_known_received, num_calls_finished_with_client_failed_to_send, 0),
PB_LAST_FIELD
};
@@ -52,11 +62,12 @@ const pb_field_t grpc_lb_v1_ServerList_fields[3] = {
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v1_Server_fields[5] = {
+const pb_field_t grpc_lb_v1_Server_fields[6] = {
PB_FIELD( 1, BYTES , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, port, ip_address, 0),
PB_FIELD( 3, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, load_balance_token, port, 0),
- PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_request, load_balance_token, 0),
+ PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_for_rate_limiting, load_balance_token, 0),
+ PB_FIELD( 5, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_for_load_balancing, drop_for_rate_limiting, 0),
PB_LAST_FIELD
};
@@ -70,7 +81,7 @@ const pb_field_t grpc_lb_v1_Server_fields[5] = {
* numbers or field sizes that are larger than what can fit in 8 or 16 bit
* field descriptors.
*/
-PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
+PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
#endif
#if !defined(PB_FIELD_16BIT) && !defined(PB_FIELD_32BIT)
@@ -81,7 +92,7 @@ PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request)
* numbers or field sizes that are larger than what can fit in the default
* 8 bit descriptors.
*/
-PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
+PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
#endif
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
index 725aa7e386..d3ae919ec2 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
@@ -14,16 +14,6 @@ extern "C" {
#endif
/* Struct definitions */
-typedef struct _grpc_lb_v1_ClientStats {
- bool has_total_requests;
- int64_t total_requests;
- bool has_client_rpc_errors;
- int64_t client_rpc_errors;
- bool has_dropped_requests;
- int64_t dropped_requests;
-/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStats) */
-} grpc_lb_v1_ClientStats;
-
typedef struct _grpc_lb_v1_Duration {
bool has_seconds;
int64_t seconds;
@@ -46,11 +36,39 @@ typedef struct _grpc_lb_v1_Server {
int32_t port;
bool has_load_balance_token;
char load_balance_token[50];
- bool has_drop_request;
- bool drop_request;
+ bool has_drop_for_rate_limiting;
+ bool drop_for_rate_limiting;
+ bool has_drop_for_load_balancing;
+ bool drop_for_load_balancing;
/* @@protoc_insertion_point(struct:grpc_lb_v1_Server) */
} grpc_lb_v1_Server;
+typedef struct _grpc_lb_v1_Timestamp {
+ bool has_seconds;
+ int64_t seconds;
+ bool has_nanos;
+ int32_t nanos;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_Timestamp) */
+} grpc_lb_v1_Timestamp;
+
+typedef struct _grpc_lb_v1_ClientStats {
+ bool has_timestamp;
+ grpc_lb_v1_Timestamp timestamp;
+ bool has_num_calls_started;
+ int64_t num_calls_started;
+ bool has_num_calls_finished;
+ int64_t num_calls_finished;
+ bool has_num_calls_finished_with_drop_for_rate_limiting;
+ int64_t num_calls_finished_with_drop_for_rate_limiting;
+ bool has_num_calls_finished_with_drop_for_load_balancing;
+ int64_t num_calls_finished_with_drop_for_load_balancing;
+ bool has_num_calls_finished_with_client_failed_to_send;
+ int64_t num_calls_finished_with_client_failed_to_send;
+ bool has_num_calls_finished_known_received;
+ int64_t num_calls_finished_known_received;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStats) */
+} grpc_lb_v1_ClientStats;
+
typedef struct _grpc_lb_v1_InitialLoadBalanceResponse {
bool has_load_balancer_delegate;
char load_balancer_delegate[64];
@@ -59,6 +77,13 @@ typedef struct _grpc_lb_v1_InitialLoadBalanceResponse {
/* @@protoc_insertion_point(struct:grpc_lb_v1_InitialLoadBalanceResponse) */
} grpc_lb_v1_InitialLoadBalanceResponse;
+typedef struct _grpc_lb_v1_ServerList {
+ pb_callback_t servers;
+ bool has_expiration_interval;
+ grpc_lb_v1_Duration expiration_interval;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_ServerList) */
+} grpc_lb_v1_ServerList;
+
typedef struct _grpc_lb_v1_LoadBalanceRequest {
bool has_initial_request;
grpc_lb_v1_InitialLoadBalanceRequest initial_request;
@@ -67,13 +92,6 @@ typedef struct _grpc_lb_v1_LoadBalanceRequest {
/* @@protoc_insertion_point(struct:grpc_lb_v1_LoadBalanceRequest) */
} grpc_lb_v1_LoadBalanceRequest;
-typedef struct _grpc_lb_v1_ServerList {
- pb_callback_t servers;
- bool has_expiration_interval;
- grpc_lb_v1_Duration expiration_interval;
-/* @@protoc_insertion_point(struct:grpc_lb_v1_ServerList) */
-} grpc_lb_v1_ServerList;
-
typedef struct _grpc_lb_v1_LoadBalanceResponse {
bool has_initial_response;
grpc_lb_v1_InitialLoadBalanceResponse initial_response;
@@ -86,61 +104,72 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
/* Initializer values for message structs */
#define grpc_lb_v1_Duration_init_default {false, 0, false, 0}
+#define grpc_lb_v1_Timestamp_init_default {false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceRequest_init_default {false, grpc_lb_v1_InitialLoadBalanceRequest_init_default, false, grpc_lb_v1_ClientStats_init_default}
#define grpc_lb_v1_InitialLoadBalanceRequest_init_default {false, ""}
-#define grpc_lb_v1_ClientStats_init_default {false, 0, false, 0, false, 0}
+#define grpc_lb_v1_ClientStats_init_default {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default}
#define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default}
-#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0}
+#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0, false, 0}
#define grpc_lb_v1_Duration_init_zero {false, 0, false, 0}
+#define grpc_lb_v1_Timestamp_init_zero {false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceRequest_init_zero {false, grpc_lb_v1_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v1_ClientStats_init_zero}
#define grpc_lb_v1_InitialLoadBalanceRequest_init_zero {false, ""}
-#define grpc_lb_v1_ClientStats_init_zero {false, 0, false, 0, false, 0}
+#define grpc_lb_v1_ClientStats_init_zero {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero}
#define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero}
-#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0}
+#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0, false, 0}
/* Field tags (for use in manual encoding/decoding) */
-#define grpc_lb_v1_ClientStats_total_requests_tag 1
-#define grpc_lb_v1_ClientStats_client_rpc_errors_tag 2
-#define grpc_lb_v1_ClientStats_dropped_requests_tag 3
#define grpc_lb_v1_Duration_seconds_tag 1
#define grpc_lb_v1_Duration_nanos_tag 2
#define grpc_lb_v1_InitialLoadBalanceRequest_name_tag 1
#define grpc_lb_v1_Server_ip_address_tag 1
#define grpc_lb_v1_Server_port_tag 2
#define grpc_lb_v1_Server_load_balance_token_tag 3
-#define grpc_lb_v1_Server_drop_request_tag 4
+#define grpc_lb_v1_Server_drop_for_rate_limiting_tag 4
+#define grpc_lb_v1_Server_drop_for_load_balancing_tag 5
+#define grpc_lb_v1_Timestamp_seconds_tag 1
+#define grpc_lb_v1_Timestamp_nanos_tag 2
+#define grpc_lb_v1_ClientStats_timestamp_tag 1
+#define grpc_lb_v1_ClientStats_num_calls_started_tag 2
+#define grpc_lb_v1_ClientStats_num_calls_finished_tag 3
+#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_rate_limiting_tag 4
+#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_load_balancing_tag 5
+#define grpc_lb_v1_ClientStats_num_calls_finished_with_client_failed_to_send_tag 6
+#define grpc_lb_v1_ClientStats_num_calls_finished_known_received_tag 7
#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 1
#define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 2
-#define grpc_lb_v1_LoadBalanceRequest_initial_request_tag 1
-#define grpc_lb_v1_LoadBalanceRequest_client_stats_tag 2
#define grpc_lb_v1_ServerList_servers_tag 1
#define grpc_lb_v1_ServerList_expiration_interval_tag 3
+#define grpc_lb_v1_LoadBalanceRequest_initial_request_tag 1
+#define grpc_lb_v1_LoadBalanceRequest_client_stats_tag 2
#define grpc_lb_v1_LoadBalanceResponse_initial_response_tag 1
#define grpc_lb_v1_LoadBalanceResponse_server_list_tag 2
/* Struct field encoding specification for nanopb */
extern const pb_field_t grpc_lb_v1_Duration_fields[3];
+extern const pb_field_t grpc_lb_v1_Timestamp_fields[3];
extern const pb_field_t grpc_lb_v1_LoadBalanceRequest_fields[3];
extern const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2];
-extern const pb_field_t grpc_lb_v1_ClientStats_fields[4];
+extern const pb_field_t grpc_lb_v1_ClientStats_fields[8];
extern const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3];
extern const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3];
extern const pb_field_t grpc_lb_v1_ServerList_fields[3];
-extern const pb_field_t grpc_lb_v1_Server_fields[5];
+extern const pb_field_t grpc_lb_v1_Server_fields[6];
/* Maximum encoded size of messages (where known) */
#define grpc_lb_v1_Duration_size 22
-#define grpc_lb_v1_LoadBalanceRequest_size 169
+#define grpc_lb_v1_Timestamp_size 22
+#define grpc_lb_v1_LoadBalanceRequest_size 226
#define grpc_lb_v1_InitialLoadBalanceRequest_size 131
-#define grpc_lb_v1_ClientStats_size 33
+#define grpc_lb_v1_ClientStats_size 90
#define grpc_lb_v1_LoadBalanceResponse_size (98 + grpc_lb_v1_ServerList_size)
#define grpc_lb_v1_InitialLoadBalanceResponse_size 90
/* grpc_lb_v1_ServerList_size depends on runtime parameters */
-#define grpc_lb_v1_Server_size 83
+#define grpc_lb_v1_Server_size 85
/* Message IDs (where set with "msgid" option) */
#ifdef PB_MSGID
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.c b/src/core/ext/filters/client_channel/lb_policy_factory.c
index e2af216b89..89b8bf8951 100644
--- a/src/core/ext/filters/client_channel/lb_policy_factory.c
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.c
@@ -36,16 +36,18 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
+#include "src/core/lib/channel/channel_args.h"
+
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
+#include "src/core/ext/filters/client_channel/parse_address.h"
grpc_lb_addresses* grpc_lb_addresses_create(
size_t num_addresses, const grpc_lb_user_data_vtable* user_data_vtable) {
- grpc_lb_addresses* addresses = gpr_malloc(sizeof(grpc_lb_addresses));
+ grpc_lb_addresses* addresses = gpr_zalloc(sizeof(grpc_lb_addresses));
addresses->num_addresses = num_addresses;
addresses->user_data_vtable = user_data_vtable;
const size_t addresses_size = sizeof(grpc_lb_address) * num_addresses;
- addresses->addresses = gpr_malloc(addresses_size);
- memset(addresses->addresses, 0, addresses_size);
+ addresses->addresses = gpr_zalloc(addresses_size);
return addresses;
}
@@ -69,7 +71,7 @@ grpc_lb_addresses* grpc_lb_addresses_copy(const grpc_lb_addresses* addresses) {
void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index,
void* address, size_t address_len,
- bool is_balancer, char* balancer_name,
+ bool is_balancer, const char* balancer_name,
void* user_data) {
GPR_ASSERT(index < addresses->num_addresses);
if (user_data != NULL) GPR_ASSERT(addresses->user_data_vtable != NULL);
@@ -77,10 +79,22 @@ void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index,
memcpy(target->address.addr, address, address_len);
target->address.len = address_len;
target->is_balancer = is_balancer;
- target->balancer_name = balancer_name;
+ target->balancer_name = gpr_strdup(balancer_name);
target->user_data = user_data;
}
+bool grpc_lb_addresses_set_address_from_uri(grpc_lb_addresses* addresses,
+ size_t index, const grpc_uri* uri,
+ bool is_balancer,
+ const char* balancer_name,
+ void* user_data) {
+ grpc_resolved_address address;
+ if (!grpc_parse_uri(uri, &address)) return false;
+ grpc_lb_addresses_set_address(addresses, index, address.addr, address.len,
+ is_balancer, balancer_name, user_data);
+ return true;
+}
+
int grpc_lb_addresses_cmp(const grpc_lb_addresses* addresses1,
const grpc_lb_addresses* addresses2) {
if (addresses1->num_addresses > addresses2->num_addresses) return 1;
@@ -147,6 +161,15 @@ grpc_arg grpc_lb_addresses_create_channel_arg(
return arg;
}
+grpc_lb_addresses* grpc_lb_addresses_find_channel_arg(
+ const grpc_channel_args* channel_args) {
+ const grpc_arg* lb_addresses_arg =
+ grpc_channel_args_find(channel_args, GRPC_ARG_LB_ADDRESSES);
+ if (lb_addresses_arg == NULL || lb_addresses_arg->type != GRPC_ARG_POINTER)
+ return NULL;
+ return lb_addresses_arg->value.pointer.p;
+}
+
void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) {
factory->vtable->ref(factory);
}
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h
index 81ab12ec8f..9d6c0fc139 100644
--- a/src/core/ext/filters/client_channel/lb_policy_factory.h
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.h
@@ -34,12 +34,13 @@
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_FACTORY_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_FACTORY_H
-#include "src/core/ext/filters/client_channel/client_channel_factory.h"
-#include "src/core/ext/filters/client_channel/lb_policy.h"
-
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/ext/filters/client_channel/client_channel_factory.h"
+#include "src/core/ext/filters/client_channel/lb_policy.h"
+#include "src/core/ext/filters/client_channel/uri_parser.h"
+
// Channel arg key for grpc_lb_addresses.
#define GRPC_ARG_LB_ADDRESSES "grpc.lb_addresses"
@@ -88,9 +89,18 @@ grpc_lb_addresses *grpc_lb_addresses_copy(const grpc_lb_addresses *addresses);
* Takes ownership of \a balancer_name. */
void grpc_lb_addresses_set_address(grpc_lb_addresses *addresses, size_t index,
void *address, size_t address_len,
- bool is_balancer, char *balancer_name,
+ bool is_balancer, const char *balancer_name,
void *user_data);
+/** Sets the value of the address at index \a index of \a addresses from \a uri.
+ * Returns true upon success, false otherwise. Takes ownership of \a
+ * balancer_name. */
+bool grpc_lb_addresses_set_address_from_uri(grpc_lb_addresses *addresses,
+ size_t index, const grpc_uri *uri,
+ bool is_balancer,
+ const char *balancer_name,
+ void *user_data);
+
/** Compares \a addresses1 and \a addresses2. */
int grpc_lb_addresses_cmp(const grpc_lb_addresses *addresses1,
const grpc_lb_addresses *addresses2);
@@ -103,6 +113,10 @@ void grpc_lb_addresses_destroy(grpc_exec_ctx *exec_ctx,
grpc_arg grpc_lb_addresses_create_channel_arg(
const grpc_lb_addresses *addresses);
+/** Returns the \a grpc_lb_addresses instance in \a channel_args or NULL */
+grpc_lb_addresses *grpc_lb_addresses_find_channel_arg(
+ const grpc_channel_args *channel_args);
+
/** Arguments passed to LB policies. */
typedef struct grpc_lb_policy_args {
grpc_client_channel_factory *client_channel_factory;
diff --git a/src/core/ext/filters/client_channel/parse_address.c b/src/core/ext/filters/client_channel/parse_address.c
index 0c97062075..edc6ce697d 100644
--- a/src/core/ext/filters/client_channel/parse_address.c
+++ b/src/core/ext/filters/client_channel/parse_address.c
@@ -48,7 +48,12 @@
#ifdef GRPC_HAVE_UNIX_SOCKET
-int parse_unix(grpc_uri *uri, grpc_resolved_address *resolved_addr) {
+bool grpc_parse_unix(const grpc_uri *uri,
+ grpc_resolved_address *resolved_addr) {
+ if (strcmp("unix", uri->scheme) != 0) {
+ gpr_log(GPR_ERROR, "Expected 'unix' scheme, got '%s'", uri->scheme);
+ return false;
+ }
struct sockaddr_un *un = (struct sockaddr_un *)resolved_addr->addr;
const size_t maxlen = sizeof(un->sun_path);
const size_t path_len = strnlen(uri->path, maxlen);
@@ -61,21 +66,29 @@ int parse_unix(grpc_uri *uri, grpc_resolved_address *resolved_addr) {
#else /* GRPC_HAVE_UNIX_SOCKET */
-int parse_unix(grpc_uri *uri, grpc_resolved_address *resolved_addr) { abort(); }
+bool grpc_parse_unix(const grpc_uri *uri,
+ grpc_resolved_address *resolved_addr) {
+ abort();
+}
#endif /* GRPC_HAVE_UNIX_SOCKET */
-int parse_ipv4(grpc_uri *uri, grpc_resolved_address *resolved_addr) {
+bool grpc_parse_ipv4(const grpc_uri *uri,
+ grpc_resolved_address *resolved_addr) {
+ if (strcmp("ipv4", uri->scheme) != 0) {
+ gpr_log(GPR_ERROR, "Expected 'ipv4' scheme, got '%s'", uri->scheme);
+ return false;
+ }
const char *host_port = uri->path;
char *host;
char *port;
int port_num;
- int result = 0;
+ bool result = false;
struct sockaddr_in *in = (struct sockaddr_in *)resolved_addr->addr;
if (*host_port == '/') ++host_port;
if (!gpr_split_host_port(host_port, &host, &port)) {
- return 0;
+ return false;
}
memset(resolved_addr, 0, sizeof(grpc_resolved_address));
@@ -98,14 +111,19 @@ int parse_ipv4(grpc_uri *uri, grpc_resolved_address *resolved_addr) {
goto done;
}
- result = 1;
+ result = true;
done:
gpr_free(host);
gpr_free(port);
return result;
}
-int parse_ipv6(grpc_uri *uri, grpc_resolved_address *resolved_addr) {
+bool grpc_parse_ipv6(const grpc_uri *uri,
+ grpc_resolved_address *resolved_addr) {
+ if (strcmp("ipv6", uri->scheme) != 0) {
+ gpr_log(GPR_ERROR, "Expected 'ipv6' scheme, got '%s'", uri->scheme);
+ return false;
+ }
const char *host_port = uri->path;
char *host;
char *port;
@@ -168,3 +186,15 @@ done:
gpr_free(port);
return result;
}
+
+bool grpc_parse_uri(const grpc_uri *uri, grpc_resolved_address *resolved_addr) {
+ if (strcmp("unix", uri->scheme) == 0) {
+ return grpc_parse_unix(uri, resolved_addr);
+ } else if (strcmp("ipv4", uri->scheme) == 0) {
+ return grpc_parse_ipv4(uri, resolved_addr);
+ } else if (strcmp("ipv6", uri->scheme) == 0) {
+ return grpc_parse_ipv6(uri, resolved_addr);
+ }
+ gpr_log(GPR_ERROR, "Can't parse scheme '%s'", uri->scheme);
+ return false;
+}
diff --git a/src/core/ext/filters/client_channel/parse_address.h b/src/core/ext/filters/client_channel/parse_address.h
index c8d77baa00..fa7ea33a00 100644
--- a/src/core/ext/filters/client_channel/parse_address.h
+++ b/src/core/ext/filters/client_channel/parse_address.h
@@ -39,16 +39,19 @@
#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/lib/iomgr/resolve_address.h"
-/** Populate \a addr and \a len from \a uri, whose path is expected to contain a
+/** Populate \a resolved_addr from \a uri, whose path is expected to contain a
* unix socket path. Returns true upon success. */
-int parse_unix(grpc_uri *uri, grpc_resolved_address *resolved_addr);
+bool grpc_parse_unix(const grpc_uri *uri, grpc_resolved_address *resolved_addr);
-/** Populate /a addr and \a len from \a uri, whose path is expected to contain a
- * host:port pair. Returns true upon success. */
-int parse_ipv4(grpc_uri *uri, grpc_resolved_address *resolved_addr);
+/** Populate \a resolved_addr from \a uri, whose path is expected to contain an
+ * IPv4 host:port pair. Returns true upon success. */
+bool grpc_parse_ipv4(const grpc_uri *uri, grpc_resolved_address *resolved_addr);
-/** Populate /a addr and \a len from \a uri, whose path is expected to contain a
- * host:port pair. Returns true upon success. */
-int parse_ipv6(grpc_uri *uri, grpc_resolved_address *resolved_addr);
+/** Populate \a resolved_addr from \a uri, whose path is expected to contain an
+ * IPv6 host:port pair. Returns true upon success. */
+bool grpc_parse_ipv6(const grpc_uri *uri, grpc_resolved_address *resolved_addr);
+
+/** Populate \a resolved_addr from \a uri. Returns true upon success. */
+bool grpc_parse_uri(const grpc_uri *uri, grpc_resolved_address *resolved_addr);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_PARSE_ADDRESS_H */
diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
index 54f020d691..4d7d878c23 100644
--- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
@@ -157,8 +157,8 @@ static void do_nothing(void *ignored) {}
static grpc_resolver *sockaddr_create(grpc_exec_ctx *exec_ctx,
grpc_resolver_args *args,
- int parse(grpc_uri *uri,
- grpc_resolved_address *dst)) {
+ bool parse(const grpc_uri *uri,
+ grpc_resolved_address *dst)) {
if (0 != strcmp(args->uri->authority, "")) {
gpr_log(GPR_ERROR, "authority based uri's not supported by the %s scheme",
args->uri->scheme);
@@ -209,7 +209,7 @@ static void sockaddr_factory_unref(grpc_resolver_factory *factory) {}
static grpc_resolver *name##_factory_create_resolver( \
grpc_exec_ctx *exec_ctx, grpc_resolver_factory *factory, \
grpc_resolver_args *args) { \
- return sockaddr_create(exec_ctx, args, parse_##name); \
+ return sockaddr_create(exec_ctx, args, grpc_parse_##name); \
} \
static const grpc_resolver_factory_vtable name##_factory_vtable = { \
sockaddr_factory_ref, sockaddr_factory_unref, \
diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c
index 9a7a7a0ee5..967e571221 100644
--- a/src/core/ext/filters/client_channel/subchannel.c
+++ b/src/core/ext/filters/client_channel/subchannel.c
@@ -59,9 +59,9 @@
#define INTERNAL_REF_BITS 16
#define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
-#define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20
#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
+#define GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS 20
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
@@ -353,8 +353,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
"subchannel");
int initial_backoff_ms =
GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
+ int min_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000;
int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
- int min_backoff_ms = GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS * 1000;
bool fixed_reconnect_backoff = false;
if (c->args) {
for (size_t i = 0; i < c->args->num_args; i++) {
@@ -366,6 +366,12 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
&c->args->args[i],
(grpc_integer_options){initial_backoff_ms, 100, INT_MAX});
} else if (0 == strcmp(c->args->args[i].key,
+ GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) {
+ fixed_reconnect_backoff = false;
+ min_backoff_ms = grpc_channel_arg_get_integer(
+ &c->args->args[i],
+ (grpc_integer_options){min_backoff_ms, 100, INT_MAX});
+ } else if (0 == strcmp(c->args->args[i].key,
GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
fixed_reconnect_backoff = false;
max_backoff_ms = grpc_channel_arg_get_integer(
@@ -797,13 +803,7 @@ static void grpc_uri_to_sockaddr(grpc_exec_ctx *exec_ctx, const char *uri_str,
grpc_resolved_address *addr) {
grpc_uri *uri = grpc_uri_parse(exec_ctx, uri_str, 0 /* suppress_errors */);
GPR_ASSERT(uri != NULL);
- if (strcmp(uri->scheme, "ipv4") == 0) {
- GPR_ASSERT(parse_ipv4(uri, addr));
- } else if (strcmp(uri->scheme, "ipv6") == 0) {
- GPR_ASSERT(parse_ipv6(uri, addr));
- } else {
- GPR_ASSERT(parse_unix(uri, addr));
- }
+ if (!grpc_parse_uri(uri, addr)) memset(addr, 0, sizeof(*addr));
grpc_uri_destroy(uri);
}
diff --git a/src/core/ext/filters/client_channel/uri_parser.c b/src/core/ext/filters/client_channel/uri_parser.c
index 01b99911aa..b233d835cb 100644
--- a/src/core/ext/filters/client_channel/uri_parser.c
+++ b/src/core/ext/filters/client_channel/uri_parser.c
@@ -50,7 +50,7 @@
#define NOT_SET (~(size_t)0)
static grpc_uri *bad_uri(const char *uri_text, size_t pos, const char *section,
- int suppress_errors) {
+ bool suppress_errors) {
char *line_prefix;
size_t pfx_len;
@@ -83,6 +83,11 @@ static char *decode_and_copy_component(grpc_exec_ctx *exec_ctx, const char *src,
return out;
}
+static bool valid_hex(char c) {
+ return ((c >= 'a') && (c <= 'f')) || ((c >= 'A') && (c <= 'F')) ||
+ ((c >= '0') && (c <= '9'));
+}
+
/** Returns how many chars to advance if \a uri_text[i] begins a valid \a pchar
* production. If \a uri_text[i] introduces an invalid \a pchar (such as percent
* sign not followed by two hex digits), NOT_SET is returned. */
@@ -93,27 +98,36 @@ static size_t parse_pchar(const char *uri_text, size_t i) {
* sub-delims = "!" / "$" / "&" / "'" / "(" / ")"
/ "*" / "+" / "," / ";" / "=" */
char c = uri_text[i];
- if (((c >= 'A') && (c <= 'Z')) || ((c >= 'a') && (c <= 'z')) ||
- ((c >= '0') && (c <= '9')) ||
- (c == '-' || c == '.' || c == '_' || c == '~') || /* unreserved */
- (c == '!' || c == '$' || c == '&' || c == '\'' || c == '$' || c == '&' ||
- c == '(' || c == ')' || c == '*' || c == '+' || c == ',' || c == ';' ||
- c == '=') /* sub-delims */) {
- return 1;
- }
- if (c == '%') { /* pct-encoded */
- size_t j;
- if (uri_text[i + 1] == 0 || uri_text[i + 2] == 0) {
- return NOT_SET;
- }
- for (j = i + 1; j < 2; j++) {
- c = uri_text[j];
- if (!(((c >= '0') && (c <= '9')) || ((c >= 'a') && (c <= 'f')) ||
- ((c >= 'A') && (c <= 'F')))) {
- return NOT_SET;
+ switch (c) {
+ default:
+ if (((c >= 'a') && (c <= 'z')) || ((c >= 'A') && (c <= 'Z')) ||
+ ((c >= '0') && (c <= '9'))) {
+ return 1;
}
- }
- return 2;
+ break;
+ case ':':
+ case '@':
+ case '-':
+ case '.':
+ case '_':
+ case '~':
+ case '!':
+ case '$':
+ case '&':
+ case '\'':
+ case '(':
+ case ')':
+ case '*':
+ case '+':
+ case ',':
+ case ';':
+ case '=':
+ return 1;
+ case '%': /* pct-encoded */
+ if (valid_hex(uri_text[i + 1]) && valid_hex(uri_text[i + 2])) {
+ return 2;
+ }
+ return NOT_SET;
}
return 0;
}
@@ -183,7 +197,7 @@ static void parse_query_parts(grpc_uri *uri) {
}
grpc_uri *grpc_uri_parse(grpc_exec_ctx *exec_ctx, const char *uri_text,
- int suppress_errors) {
+ bool suppress_errors) {
grpc_uri *uri;
size_t scheme_begin = 0;
size_t scheme_end = NOT_SET;
diff --git a/src/core/ext/filters/client_channel/uri_parser.h b/src/core/ext/filters/client_channel/uri_parser.h
index 2698d448d8..b889040b16 100644
--- a/src/core/ext/filters/client_channel/uri_parser.h
+++ b/src/core/ext/filters/client_channel/uri_parser.h
@@ -53,7 +53,7 @@ typedef struct {
/** parse a uri, return NULL on failure */
grpc_uri *grpc_uri_parse(grpc_exec_ctx *exec_ctx, const char *uri_text,
- int suppress_errors);
+ bool suppress_errors);
/** return the part of a query string after the '=' in "?key=xxx&...", or NULL
* if key is not present */
diff --git a/src/core/ext/filters/deadline/deadline_filter.c b/src/core/ext/filters/deadline/deadline_filter.c
new file mode 100644
index 0000000000..2e03d16bf6
--- /dev/null
+++ b/src/core/ext/filters/deadline/deadline_filter.c
@@ -0,0 +1,373 @@
+//
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+#include "src/core/ext/filters/deadline/deadline_filter.h"
+
+#include <stdbool.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/time.h>
+
+#include "src/core/lib/channel/channel_stack_builder.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/surface/channel_init.h"
+
+//
+// grpc_deadline_state
+//
+
+// Timer callback.
+static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_call_element* elem = arg;
+ grpc_deadline_state* deadline_state = elem->call_data;
+ if (error != GRPC_ERROR_CANCELLED) {
+ grpc_call_element_signal_error(
+ exec_ctx, elem,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED));
+ }
+ GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer");
+}
+
+// Starts the deadline timer.
+static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ gpr_timespec deadline) {
+ deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
+ if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) == 0) {
+ return;
+ }
+ grpc_deadline_state* deadline_state = elem->call_data;
+ grpc_deadline_timer_state cur_state;
+ grpc_closure* closure = NULL;
+retry:
+ cur_state =
+ (grpc_deadline_timer_state)gpr_atm_acq_load(&deadline_state->timer_state);
+ switch (cur_state) {
+ case GRPC_DEADLINE_STATE_PENDING:
+ // Note: We do not start the timer if there is already a timer
+ return;
+ case GRPC_DEADLINE_STATE_FINISHED:
+ if (gpr_atm_rel_cas(&deadline_state->timer_state,
+ GRPC_DEADLINE_STATE_FINISHED,
+ GRPC_DEADLINE_STATE_PENDING)) {
+ // If we've already created and destroyed a timer, we always create a
+ // new closure: we have no other guarantee that the inlined closure is
+ // not in use (it may hold a pending call to timer_callback)
+ closure = grpc_closure_create(timer_callback, elem,
+ grpc_schedule_on_exec_ctx);
+ } else {
+ goto retry;
+ }
+ break;
+ case GRPC_DEADLINE_STATE_INITIAL:
+ if (gpr_atm_rel_cas(&deadline_state->timer_state,
+ GRPC_DEADLINE_STATE_INITIAL,
+ GRPC_DEADLINE_STATE_PENDING)) {
+ closure =
+ grpc_closure_init(&deadline_state->timer_callback, timer_callback,
+ elem, grpc_schedule_on_exec_ctx);
+ } else {
+ goto retry;
+ }
+ break;
+ }
+ GPR_ASSERT(closure);
+ GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
+ grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, closure,
+ gpr_now(GPR_CLOCK_MONOTONIC));
+}
+
+// Cancels the deadline timer.
+static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
+ grpc_deadline_state* deadline_state) {
+ if (gpr_atm_rel_cas(&deadline_state->timer_state, GRPC_DEADLINE_STATE_PENDING,
+ GRPC_DEADLINE_STATE_FINISHED)) {
+ grpc_timer_cancel(exec_ctx, &deadline_state->timer);
+ } else {
+ // timer was either in STATE_INITAL (nothing to cancel)
+ // OR in STATE_FINISHED (again nothing to cancel)
+ }
+}
+
+// Callback run when the call is complete.
+static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ grpc_deadline_state* deadline_state = arg;
+ cancel_timer_if_needed(exec_ctx, deadline_state);
+ // Invoke the next callback.
+ grpc_closure_run(exec_ctx, deadline_state->next_on_complete,
+ GRPC_ERROR_REF(error));
+}
+
+// Inject our own on_complete callback into op.
+static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
+ grpc_transport_stream_op_batch* op) {
+ deadline_state->next_on_complete = op->on_complete;
+ grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state,
+ grpc_schedule_on_exec_ctx);
+ op->on_complete = &deadline_state->on_complete;
+}
+
+// Callback and associated state for starting the timer after call stack
+// initialization has been completed.
+struct start_timer_after_init_state {
+ grpc_call_element* elem;
+ gpr_timespec deadline;
+ grpc_closure closure;
+};
+static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ struct start_timer_after_init_state* state = arg;
+ start_timer_if_needed(exec_ctx, state->elem, state->deadline);
+ gpr_free(state);
+}
+
+void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_call_stack* call_stack,
+ gpr_timespec deadline) {
+ grpc_deadline_state* deadline_state = elem->call_data;
+ deadline_state->call_stack = call_stack;
+ // Deadline will always be infinite on servers, so the timer will only be
+ // set on clients with a finite deadline.
+ deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
+ if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
+ // When the deadline passes, we indicate the failure by sending down
+ // an op with cancel_error set. However, we can't send down any ops
+ // until after the call stack is fully initialized. If we start the
+ // timer here, we have no guarantee that the timer won't pop before
+ // call stack initialization is finished. To avoid that problem, we
+ // create a closure to start the timer, and we schedule that closure
+ // to be run after call stack initialization is done.
+ struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state));
+ state->elem = elem;
+ state->deadline = deadline;
+ grpc_closure_init(&state->closure, start_timer_after_init, state,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE);
+ }
+}
+
+void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem) {
+ grpc_deadline_state* deadline_state = elem->call_data;
+ cancel_timer_if_needed(exec_ctx, deadline_state);
+}
+
+void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ gpr_timespec new_deadline) {
+ grpc_deadline_state* deadline_state = elem->call_data;
+ cancel_timer_if_needed(exec_ctx, deadline_state);
+ start_timer_if_needed(exec_ctx, elem, new_deadline);
+}
+
+void grpc_deadline_state_client_start_transport_stream_op_batch(
+ grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_transport_stream_op_batch* op) {
+ grpc_deadline_state* deadline_state = elem->call_data;
+ if (op->cancel_stream) {
+ cancel_timer_if_needed(exec_ctx, deadline_state);
+ } else {
+ // Make sure we know when the call is complete, so that we can cancel
+ // the timer.
+ if (op->recv_trailing_metadata) {
+ inject_on_complete_cb(deadline_state, op);
+ }
+ }
+}
+
+//
+// filter code
+//
+
+// Constructor for channel_data. Used for both client and server filters.
+static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem,
+ grpc_channel_element_args* args) {
+ GPR_ASSERT(!args->is_last);
+ return GRPC_ERROR_NONE;
+}
+
+// Destructor for channel_data. Used for both client and server filters.
+static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem) {}
+
+// Call data used for both client and server filter.
+typedef struct base_call_data {
+ grpc_deadline_state deadline_state;
+} base_call_data;
+
+// Additional call data used only for the server filter.
+typedef struct server_call_data {
+ base_call_data base; // Must be first.
+ // The closure for receiving initial metadata.
+ grpc_closure recv_initial_metadata_ready;
+ // Received initial metadata batch.
+ grpc_metadata_batch* recv_initial_metadata;
+ // The original recv_initial_metadata_ready closure, which we chain to
+ // after our own closure is invoked.
+ grpc_closure* next_recv_initial_metadata_ready;
+} server_call_data;
+
+// Constructor for call_data. Used for both client and server filters.
+static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ const grpc_call_element_args* args) {
+ grpc_deadline_state_init(exec_ctx, elem, args->call_stack, args->deadline);
+ return GRPC_ERROR_NONE;
+}
+
+// Destructor for call_data. Used for both client and server filters.
+static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ const grpc_call_final_info* final_info,
+ grpc_closure* ignored) {
+ grpc_deadline_state_destroy(exec_ctx, elem);
+}
+
+// Method for starting a call op for client filter.
+static void client_start_transport_stream_op_batch(
+ grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_transport_stream_op_batch* op) {
+ grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
+ op);
+ // Chain to next filter.
+ grpc_call_next_op(exec_ctx, elem, op);
+}
+
+// Callback for receiving initial metadata on the server.
+static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_call_element* elem = arg;
+ server_call_data* calld = elem->call_data;
+ // Get deadline from metadata and start the timer if needed.
+ start_timer_if_needed(exec_ctx, elem, calld->recv_initial_metadata->deadline);
+ // Invoke the next callback.
+ calld->next_recv_initial_metadata_ready->cb(
+ exec_ctx, calld->next_recv_initial_metadata_ready->cb_arg, error);
+}
+
+// Method for starting a call op for server filter.
+static void server_start_transport_stream_op_batch(
+ grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_transport_stream_op_batch* op) {
+ server_call_data* calld = elem->call_data;
+ if (op->cancel_stream) {
+ cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state);
+ } else {
+ // If we're receiving initial metadata, we need to get the deadline
+ // from the recv_initial_metadata_ready callback. So we inject our
+ // own callback into that hook.
+ if (op->recv_initial_metadata) {
+ calld->next_recv_initial_metadata_ready =
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready;
+ calld->recv_initial_metadata =
+ op->payload->recv_initial_metadata.recv_initial_metadata;
+ grpc_closure_init(&calld->recv_initial_metadata_ready,
+ recv_initial_metadata_ready, elem,
+ grpc_schedule_on_exec_ctx);
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready =
+ &calld->recv_initial_metadata_ready;
+ }
+ // Make sure we know when the call is complete, so that we can cancel
+ // the timer.
+ // Note that we trigger this on recv_trailing_metadata, even though
+ // the client never sends trailing metadata, because this is the
+ // hook that tells us when the call is complete on the server side.
+ if (op->recv_trailing_metadata) {
+ inject_on_complete_cb(&calld->base.deadline_state, op);
+ }
+ }
+ // Chain to next filter.
+ grpc_call_next_op(exec_ctx, elem, op);
+}
+
+const grpc_channel_filter grpc_client_deadline_filter = {
+ client_start_transport_stream_op_batch,
+ grpc_channel_next_op,
+ sizeof(base_call_data),
+ init_call_elem,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ destroy_call_elem,
+ 0, // sizeof(channel_data)
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ grpc_channel_next_get_info,
+ "deadline",
+};
+
+const grpc_channel_filter grpc_server_deadline_filter = {
+ server_start_transport_stream_op_batch,
+ grpc_channel_next_op,
+ sizeof(server_call_data),
+ init_call_elem,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ destroy_call_elem,
+ 0, // sizeof(channel_data)
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ grpc_channel_next_get_info,
+ "deadline",
+};
+
+bool grpc_deadline_checking_enabled(const grpc_channel_args* channel_args) {
+ return grpc_channel_arg_get_bool(
+ grpc_channel_args_find(channel_args, GRPC_ARG_ENABLE_DEADLINE_CHECKS),
+ !grpc_channel_args_want_minimal_stack(channel_args));
+}
+
+static bool maybe_add_deadline_filter(grpc_exec_ctx* exec_ctx,
+ grpc_channel_stack_builder* builder,
+ void* arg) {
+ return grpc_deadline_checking_enabled(
+ grpc_channel_stack_builder_get_channel_arguments(builder))
+ ? grpc_channel_stack_builder_prepend_filter(builder, arg, NULL,
+ NULL)
+ : true;
+}
+
+void grpc_deadline_filter_init(void) {
+ grpc_channel_init_register_stage(
+ GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_deadline_filter, (void*)&grpc_client_deadline_filter);
+ grpc_channel_init_register_stage(
+ GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_deadline_filter, (void*)&grpc_server_deadline_filter);
+}
+
+void grpc_deadline_filter_shutdown(void) {}
diff --git a/src/core/ext/filters/deadline/deadline_filter.h b/src/core/ext/filters/deadline/deadline_filter.h
new file mode 100644
index 0000000000..5050453fa1
--- /dev/null
+++ b/src/core/ext/filters/deadline/deadline_filter.h
@@ -0,0 +1,101 @@
+//
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+#ifndef GRPC_CORE_EXT_FILTERS_DEADLINE_DEADLINE_FILTER_H
+#define GRPC_CORE_EXT_FILTERS_DEADLINE_DEADLINE_FILTER_H
+
+#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/iomgr/timer.h"
+
+typedef enum grpc_deadline_timer_state {
+ GRPC_DEADLINE_STATE_INITIAL,
+ GRPC_DEADLINE_STATE_PENDING,
+ GRPC_DEADLINE_STATE_FINISHED
+} grpc_deadline_timer_state;
+
+// State used for filters that enforce call deadlines.
+// Must be the first field in the filter's call_data.
+typedef struct grpc_deadline_state {
+ // We take a reference to the call stack for the timer callback.
+ grpc_call_stack* call_stack;
+ gpr_atm timer_state;
+ grpc_timer timer;
+ grpc_closure timer_callback;
+ // Closure to invoke when the call is complete.
+ // We use this to cancel the timer.
+ grpc_closure on_complete;
+ // The original on_complete closure, which we chain to after our own
+ // closure is invoked.
+ grpc_closure* next_on_complete;
+} grpc_deadline_state;
+
+//
+// NOTE: All of these functions require that the first field in
+// elem->call_data is a grpc_deadline_state.
+//
+
+// assumes elem->call_data is zero'd
+void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_call_stack* call_stack,
+ gpr_timespec deadline);
+void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem);
+
+// Cancels the existing timer and starts a new one with new_deadline.
+//
+// Note: It is generally safe to call this with an earlier deadline
+// value than the current one, but not the reverse. No checks are done
+// to ensure that the timer callback is not invoked while it is in the
+// process of being reset, which means that attempting to increase the
+// deadline may result in the timer being called twice.
+void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ gpr_timespec new_deadline);
+
+// To be called from the client-side filter's start_transport_stream_op_batch()
+// method. Ensures that the deadline timer is cancelled when the call
+// is completed.
+//
+// Note: It is the caller's responsibility to chain to the next filter if
+// necessary after this function returns.
+void grpc_deadline_state_client_start_transport_stream_op_batch(
+ grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_transport_stream_op_batch* op);
+
+// Should deadline checking be performed (according to channel args)
+bool grpc_deadline_checking_enabled(const grpc_channel_args* args);
+
+// Deadline filters for direct client channels and server channels.
+// Note: Deadlines for non-direct client channels are handled by the
+// client_channel filter.
+extern const grpc_channel_filter grpc_client_deadline_filter;
+extern const grpc_channel_filter grpc_server_deadline_filter;
+
+#endif /* GRPC_CORE_EXT_FILTERS_DEADLINE_DEADLINE_FILTER_H */
diff --git a/src/core/ext/filters/http/client/http_client_filter.c b/src/core/ext/filters/http/client/http_client_filter.c
new file mode 100644
index 0000000000..bf5fbc7da7
--- /dev/null
+++ b/src/core/ext/filters/http/client/http_client_filter.c
@@ -0,0 +1,609 @@
+/*
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/filters/http/client/http_client_filter.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <string.h>
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/b64.h"
+#include "src/core/lib/slice/percent_encoding.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/transport/static_metadata.h"
+#include "src/core/lib/transport/transport_impl.h"
+
+#define EXPECTED_CONTENT_TYPE "application/grpc"
+#define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1
+
+/* default maximum size of payload eligable for GET request */
+static const size_t kMaxPayloadSizeForGet = 2048;
+
+typedef struct call_data {
+ grpc_linked_mdelem method;
+ grpc_linked_mdelem scheme;
+ grpc_linked_mdelem authority;
+ grpc_linked_mdelem te_trailers;
+ grpc_linked_mdelem content_type;
+ grpc_linked_mdelem user_agent;
+
+ grpc_metadata_batch *recv_initial_metadata;
+ grpc_metadata_batch *recv_trailing_metadata;
+ uint8_t *payload_bytes;
+
+ /* Vars to read data off of send_message */
+ grpc_transport_stream_op_batch *send_op;
+ uint32_t send_length;
+ uint32_t send_flags;
+ grpc_slice incoming_slice;
+ grpc_slice_buffer_stream replacement_stream;
+ grpc_slice_buffer slices;
+ /* flag that indicates that all slices of send_messages aren't availble */
+ bool send_message_blocked;
+
+ /** Closure to call when finished with the hc_on_recv hook */
+ grpc_closure *on_done_recv_initial_metadata;
+ grpc_closure *on_done_recv_trailing_metadata;
+ grpc_closure *on_complete;
+ grpc_closure *post_send;
+
+ /** Receive closures are chained: we inject this closure as the on_done_recv
+ up-call on transport_op, and remember to call our on_done_recv member
+ after handling it. */
+ grpc_closure hc_on_recv_initial_metadata;
+ grpc_closure hc_on_recv_trailing_metadata;
+ grpc_closure hc_on_complete;
+ grpc_closure got_slice;
+ grpc_closure send_done;
+} call_data;
+
+typedef struct channel_data {
+ grpc_mdelem static_scheme;
+ grpc_mdelem user_agent;
+ size_t max_payload_size_for_get;
+} channel_data;
+
+static grpc_error *client_filter_incoming_metadata(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_metadata_batch *b) {
+ if (b->idx.named.status != NULL) {
+ if (grpc_mdelem_eq(b->idx.named.status->md, GRPC_MDELEM_STATUS_200)) {
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.status);
+ } else {
+ char *val = grpc_dump_slice(GRPC_MDVALUE(b->idx.named.status->md),
+ GPR_DUMP_ASCII);
+ char *msg;
+ gpr_asprintf(&msg, "Received http2 header with status: %s", val);
+ grpc_error *e = grpc_error_set_str(
+ grpc_error_set_int(
+ grpc_error_set_str(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Received http2 :status header with non-200 OK status"),
+ GRPC_ERROR_STR_VALUE, grpc_slice_from_copied_string(val)),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED),
+ GRPC_ERROR_STR_GRPC_MESSAGE, grpc_slice_from_copied_string(msg));
+ gpr_free(val);
+ gpr_free(msg);
+ return e;
+ }
+ }
+
+ if (b->idx.named.grpc_message != NULL) {
+ grpc_slice pct_decoded_msg = grpc_permissive_percent_decode_slice(
+ GRPC_MDVALUE(b->idx.named.grpc_message->md));
+ if (grpc_slice_is_equivalent(pct_decoded_msg,
+ GRPC_MDVALUE(b->idx.named.grpc_message->md))) {
+ grpc_slice_unref_internal(exec_ctx, pct_decoded_msg);
+ } else {
+ grpc_metadata_batch_set_value(exec_ctx, b->idx.named.grpc_message,
+ pct_decoded_msg);
+ }
+ }
+
+ if (b->idx.named.content_type != NULL) {
+ if (!grpc_mdelem_eq(b->idx.named.content_type->md,
+ GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC)) {
+ if (grpc_slice_buf_start_eq(GRPC_MDVALUE(b->idx.named.content_type->md),
+ EXPECTED_CONTENT_TYPE,
+ EXPECTED_CONTENT_TYPE_LENGTH) &&
+ (GRPC_SLICE_START_PTR(GRPC_MDVALUE(
+ b->idx.named.content_type->md))[EXPECTED_CONTENT_TYPE_LENGTH] ==
+ '+' ||
+ GRPC_SLICE_START_PTR(GRPC_MDVALUE(
+ b->idx.named.content_type->md))[EXPECTED_CONTENT_TYPE_LENGTH] ==
+ ';')) {
+ /* Although the C implementation doesn't (currently) generate them,
+ any custom +-suffix is explicitly valid. */
+ /* TODO(klempner): We should consider preallocating common values such
+ as +proto or +json, or at least stashing them if we see them. */
+ /* TODO(klempner): Should we be surfacing this to application code? */
+ } else {
+ /* TODO(klempner): We're currently allowing this, but we shouldn't
+ see it without a proxy so log for now. */
+ char *val = grpc_dump_slice(GRPC_MDVALUE(b->idx.named.content_type->md),
+ GPR_DUMP_ASCII);
+ gpr_log(GPR_INFO, "Unexpected content-type '%s'", val);
+ gpr_free(val);
+ }
+ }
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.content_type);
+ }
+
+ return GRPC_ERROR_NONE;
+}
+
+static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
+ void *user_data, grpc_error *error) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ if (error == GRPC_ERROR_NONE) {
+ error = client_filter_incoming_metadata(exec_ctx, elem,
+ calld->recv_initial_metadata);
+ } else {
+ GRPC_ERROR_REF(error);
+ }
+ grpc_closure_run(exec_ctx, calld->on_done_recv_initial_metadata, error);
+}
+
+static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
+ void *user_data, grpc_error *error) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ if (error == GRPC_ERROR_NONE) {
+ error = client_filter_incoming_metadata(exec_ctx, elem,
+ calld->recv_trailing_metadata);
+ } else {
+ GRPC_ERROR_REF(error);
+ }
+ grpc_closure_run(exec_ctx, calld->on_done_recv_trailing_metadata, error);
+}
+
+static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_error *error) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ if (calld->payload_bytes) {
+ gpr_free(calld->payload_bytes);
+ calld->payload_bytes = NULL;
+ }
+ calld->on_complete->cb(exec_ctx, calld->on_complete->cb_arg, error);
+}
+
+static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
+ grpc_call_element *elem = elemp;
+ call_data *calld = elem->call_data;
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices);
+ calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error);
+}
+
+static void remove_if_present(grpc_exec_ctx *exec_ctx,
+ grpc_metadata_batch *batch,
+ grpc_metadata_batch_callouts_index idx) {
+ if (batch->idx.array[idx] != NULL) {
+ grpc_metadata_batch_remove(exec_ctx, batch, batch->idx.array[idx]);
+ }
+}
+
+static void continue_send_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ uint8_t *wrptr = calld->payload_bytes;
+ while (grpc_byte_stream_next(
+ exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0,
+ &calld->got_slice)) {
+ grpc_byte_stream_pull(exec_ctx,
+ calld->send_op->payload->send_message.send_message,
+ &calld->incoming_slice);
+ if (GRPC_SLICE_LENGTH(calld->incoming_slice) > 0) {
+ memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice),
+ GRPC_SLICE_LENGTH(calld->incoming_slice));
+ }
+ wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice);
+ grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
+ if (calld->send_length == calld->slices.length) {
+ calld->send_message_blocked = false;
+ break;
+ }
+ }
+}
+
+static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
+ grpc_call_element *elem = elemp;
+ call_data *calld = elem->call_data;
+ calld->send_message_blocked = false;
+ if (GRPC_ERROR_NONE !=
+ grpc_byte_stream_pull(exec_ctx,
+ calld->send_op->payload->send_message.send_message,
+ &calld->incoming_slice)) {
+ /* Should never reach here */
+ abort();
+ }
+ grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
+ if (calld->send_length == calld->slices.length) {
+ /* Pass down the original send_message op that was blocked.*/
+ grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
+ calld->send_flags);
+ calld->send_op->payload->send_message.send_message =
+ &calld->replacement_stream.base;
+ calld->post_send = calld->send_op->on_complete;
+ calld->send_op->on_complete = &calld->send_done;
+ grpc_call_next_op(exec_ctx, elem, calld->send_op);
+ } else {
+ continue_send_message(exec_ctx, elem);
+ }
+}
+
+static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+ grpc_error *error;
+
+ if (op->send_initial_metadata) {
+ /* Decide which HTTP VERB to use. We use GET if the request is marked
+ cacheable, and the operation contains both initial metadata and send
+ message, and the payload is below the size threshold, and all the data
+ for this request is immediately available. */
+ grpc_mdelem method = GRPC_MDELEM_METHOD_POST;
+ if (op->send_message &&
+ (op->payload->send_initial_metadata.send_initial_metadata_flags &
+ GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) &&
+ op->payload->send_message.send_message->length <
+ channeld->max_payload_size_for_get) {
+ method = GRPC_MDELEM_METHOD_GET;
+ /* The following write to calld->send_message_blocked isn't racy with
+ reads in hc_start_transport_op (which deals with SEND_MESSAGE ops) because
+ being here means ops->send_message is not NULL, which is primarily
+ guarding the read there. */
+ calld->send_message_blocked = true;
+ } else if (op->payload->send_initial_metadata.send_initial_metadata_flags &
+ GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
+ method = GRPC_MDELEM_METHOD_PUT;
+ }
+
+ /* Attempt to read the data from send_message and create a header field. */
+ if (grpc_mdelem_eq(method, GRPC_MDELEM_METHOD_GET)) {
+ /* allocate memory to hold the entire payload */
+ calld->payload_bytes =
+ gpr_malloc(op->payload->send_message.send_message->length);
+
+ /* read slices of send_message and copy into payload_bytes */
+ calld->send_op = op;
+ calld->send_length = op->payload->send_message.send_message->length;
+ calld->send_flags = op->payload->send_message.send_message->flags;
+ continue_send_message(exec_ctx, elem);
+
+ if (calld->send_message_blocked == false) {
+ /* when all the send_message data is available, then modify the path
+ * MDELEM by appending base64 encoded query to the path */
+ const int k_url_safe = 1;
+ const int k_multi_line = 0;
+ const unsigned char k_query_separator = '?';
+
+ grpc_slice path_slice =
+ GRPC_MDVALUE(op->payload->send_initial_metadata
+ .send_initial_metadata->idx.named.path->md);
+ /* sum up individual component's lengths and allocate enough memory to
+ * hold combined path+query */
+ size_t estimated_len = GRPC_SLICE_LENGTH(path_slice);
+ estimated_len++; /* for the '?' */
+ estimated_len += grpc_base64_estimate_encoded_size(
+ op->payload->send_message.send_message->length, k_url_safe,
+ k_multi_line);
+ grpc_slice path_with_query_slice = grpc_slice_malloc(estimated_len);
+
+ /* memcopy individual pieces into this slice */
+ uint8_t *write_ptr =
+ (uint8_t *)GRPC_SLICE_START_PTR(path_with_query_slice);
+ uint8_t *original_path = (uint8_t *)GRPC_SLICE_START_PTR(path_slice);
+ memcpy(write_ptr, original_path, GRPC_SLICE_LENGTH(path_slice));
+ write_ptr += GRPC_SLICE_LENGTH(path_slice);
+
+ *write_ptr = k_query_separator;
+ write_ptr++; /* for the '?' */
+
+ grpc_base64_encode_core((char *)write_ptr, calld->payload_bytes,
+ op->payload->send_message.send_message->length,
+ k_url_safe, k_multi_line);
+
+ /* remove trailing unused memory and add trailing 0 to terminate string
+ */
+ char *t = (char *)GRPC_SLICE_START_PTR(path_with_query_slice);
+ /* safe to use strlen since base64_encode will always add '\0' */
+ path_with_query_slice =
+ grpc_slice_sub_no_ref(path_with_query_slice, 0, strlen(t));
+
+ /* substitute previous path with the new path+query */
+ grpc_mdelem mdelem_path_and_query = grpc_mdelem_from_slices(
+ exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice);
+ grpc_metadata_batch *b =
+ op->payload->send_initial_metadata.send_initial_metadata;
+ error = grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path,
+ mdelem_path_and_query);
+ if (error != GRPC_ERROR_NONE) return error;
+
+ calld->on_complete = op->on_complete;
+ op->on_complete = &calld->hc_on_complete;
+ op->send_message = false;
+ } else {
+ /* Not all data is available. Fall back to POST. */
+ gpr_log(GPR_DEBUG,
+ "Request is marked Cacheable but not all data is available.\
+ Falling back to POST");
+ method = GRPC_MDELEM_METHOD_POST;
+ }
+ }
+
+ remove_if_present(exec_ctx,
+ op->payload->send_initial_metadata.send_initial_metadata,
+ GRPC_BATCH_METHOD);
+ remove_if_present(exec_ctx,
+ op->payload->send_initial_metadata.send_initial_metadata,
+ GRPC_BATCH_SCHEME);
+ remove_if_present(exec_ctx,
+ op->payload->send_initial_metadata.send_initial_metadata,
+ GRPC_BATCH_TE);
+ remove_if_present(exec_ctx,
+ op->payload->send_initial_metadata.send_initial_metadata,
+ GRPC_BATCH_CONTENT_TYPE);
+ remove_if_present(exec_ctx,
+ op->payload->send_initial_metadata.send_initial_metadata,
+ GRPC_BATCH_USER_AGENT);
+
+ /* Send : prefixed headers, which have to be before any application
+ layer headers. */
+ error = grpc_metadata_batch_add_head(
+ exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ &calld->method, method);
+ if (error != GRPC_ERROR_NONE) return error;
+ error = grpc_metadata_batch_add_head(
+ exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ &calld->scheme, channeld->static_scheme);
+ if (error != GRPC_ERROR_NONE) return error;
+ error = grpc_metadata_batch_add_tail(
+ exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ &calld->te_trailers, GRPC_MDELEM_TE_TRAILERS);
+ if (error != GRPC_ERROR_NONE) return error;
+ error = grpc_metadata_batch_add_tail(
+ exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ &calld->content_type, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC);
+ if (error != GRPC_ERROR_NONE) return error;
+ error = grpc_metadata_batch_add_tail(
+ exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ &calld->user_agent, GRPC_MDELEM_REF(channeld->user_agent));
+ if (error != GRPC_ERROR_NONE) return error;
+ }
+
+ if (op->recv_initial_metadata) {
+ /* substitute our callback for the higher callback */
+ calld->recv_initial_metadata =
+ op->payload->recv_initial_metadata.recv_initial_metadata;
+ calld->on_done_recv_initial_metadata =
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready;
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready =
+ &calld->hc_on_recv_initial_metadata;
+ }
+
+ if (op->recv_trailing_metadata) {
+ /* substitute our callback for the higher callback */
+ calld->recv_trailing_metadata =
+ op->payload->recv_trailing_metadata.recv_trailing_metadata;
+ calld->on_done_recv_trailing_metadata = op->on_complete;
+ op->on_complete = &calld->hc_on_recv_trailing_metadata;
+ }
+
+ return GRPC_ERROR_NONE;
+}
+
+static void hc_start_transport_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op) {
+ GPR_TIMER_BEGIN("hc_start_transport_op", 0);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ grpc_error *error = hc_mutate_op(exec_ctx, elem, op);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
+ } else {
+ call_data *calld = elem->call_data;
+ if (op->send_message && calld->send_message_blocked) {
+ /* Don't forward the op. send_message contains slices that aren't ready
+ yet. The call will be forwarded by the op_complete of slice read call.
+ */
+ } else {
+ grpc_call_next_op(exec_ctx, elem, op);
+ }
+ }
+ GPR_TIMER_END("hc_start_transport_op", 0);
+}
+
+/* Constructor for call_data */
+static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ const grpc_call_element_args *args) {
+ call_data *calld = elem->call_data;
+ calld->on_done_recv_initial_metadata = NULL;
+ calld->on_done_recv_trailing_metadata = NULL;
+ calld->on_complete = NULL;
+ calld->payload_bytes = NULL;
+ calld->send_message_blocked = false;
+ grpc_slice_buffer_init(&calld->slices);
+ grpc_closure_init(&calld->hc_on_recv_initial_metadata,
+ hc_on_recv_initial_metadata, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->hc_on_recv_trailing_metadata,
+ hc_on_recv_trailing_metadata, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->hc_on_complete, hc_on_complete, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->got_slice, got_slice, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->send_done, send_done, elem,
+ grpc_schedule_on_exec_ctx);
+ return GRPC_ERROR_NONE;
+}
+
+/* Destructor for call_data */
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ const grpc_call_final_info *final_info,
+ grpc_closure *ignored) {
+ call_data *calld = elem->call_data;
+ grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices);
+}
+
+static grpc_mdelem scheme_from_args(const grpc_channel_args *args) {
+ unsigned i;
+ size_t j;
+ grpc_mdelem valid_schemes[] = {GRPC_MDELEM_SCHEME_HTTP,
+ GRPC_MDELEM_SCHEME_HTTPS};
+ if (args != NULL) {
+ for (i = 0; i < args->num_args; ++i) {
+ if (args->args[i].type == GRPC_ARG_STRING &&
+ strcmp(args->args[i].key, GRPC_ARG_HTTP2_SCHEME) == 0) {
+ for (j = 0; j < GPR_ARRAY_SIZE(valid_schemes); j++) {
+ if (0 == grpc_slice_str_cmp(GRPC_MDVALUE(valid_schemes[j]),
+ args->args[i].value.string)) {
+ return valid_schemes[j];
+ }
+ }
+ }
+ }
+ }
+ return GRPC_MDELEM_SCHEME_HTTP;
+}
+
+static size_t max_payload_size_from_args(const grpc_channel_args *args) {
+ if (args != NULL) {
+ for (size_t i = 0; i < args->num_args; ++i) {
+ if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_PAYLOAD_SIZE_FOR_GET)) {
+ if (args->args[i].type != GRPC_ARG_INTEGER) {
+ gpr_log(GPR_ERROR, "%s: must be an integer",
+ GRPC_ARG_MAX_PAYLOAD_SIZE_FOR_GET);
+ } else {
+ return (size_t)args->args[i].value.integer;
+ }
+ }
+ }
+ }
+ return kMaxPayloadSizeForGet;
+}
+
+static grpc_slice user_agent_from_args(const grpc_channel_args *args,
+ const char *transport_name) {
+ gpr_strvec v;
+ size_t i;
+ int is_first = 1;
+ char *tmp;
+ grpc_slice result;
+
+ gpr_strvec_init(&v);
+
+ for (i = 0; args && i < args->num_args; i++) {
+ if (0 == strcmp(args->args[i].key, GRPC_ARG_PRIMARY_USER_AGENT_STRING)) {
+ if (args->args[i].type != GRPC_ARG_STRING) {
+ gpr_log(GPR_ERROR, "Channel argument '%s' should be a string",
+ GRPC_ARG_PRIMARY_USER_AGENT_STRING);
+ } else {
+ if (!is_first) gpr_strvec_add(&v, gpr_strdup(" "));
+ is_first = 0;
+ gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string));
+ }
+ }
+ }
+
+ gpr_asprintf(&tmp, "%sgrpc-c/%s (%s; %s; %s)", is_first ? "" : " ",
+ grpc_version_string(), GPR_PLATFORM_STRING, transport_name,
+ grpc_g_stands_for());
+ is_first = 0;
+ gpr_strvec_add(&v, tmp);
+
+ for (i = 0; args && i < args->num_args; i++) {
+ if (0 == strcmp(args->args[i].key, GRPC_ARG_SECONDARY_USER_AGENT_STRING)) {
+ if (args->args[i].type != GRPC_ARG_STRING) {
+ gpr_log(GPR_ERROR, "Channel argument '%s' should be a string",
+ GRPC_ARG_SECONDARY_USER_AGENT_STRING);
+ } else {
+ if (!is_first) gpr_strvec_add(&v, gpr_strdup(" "));
+ is_first = 0;
+ gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string));
+ }
+ }
+ }
+
+ tmp = gpr_strvec_flatten(&v, NULL);
+ gpr_strvec_destroy(&v);
+ result = grpc_slice_intern(grpc_slice_from_static_string(tmp));
+ gpr_free(tmp);
+
+ return result;
+}
+
+/* Constructor for channel_data */
+static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_channel_element_args *args) {
+ channel_data *chand = elem->channel_data;
+ GPR_ASSERT(!args->is_last);
+ GPR_ASSERT(args->optional_transport != NULL);
+ chand->static_scheme = scheme_from_args(args->channel_args);
+ chand->max_payload_size_for_get =
+ max_payload_size_from_args(args->channel_args);
+ chand->user_agent = grpc_mdelem_from_slices(
+ exec_ctx, GRPC_MDSTR_USER_AGENT,
+ user_agent_from_args(args->channel_args,
+ args->optional_transport->vtable->name));
+ return GRPC_ERROR_NONE;
+}
+
+/* Destructor for channel data */
+static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem) {
+ channel_data *chand = elem->channel_data;
+ GRPC_MDELEM_UNREF(exec_ctx, chand->user_agent);
+}
+
+const grpc_channel_filter grpc_http_client_filter = {
+ hc_start_transport_op,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ init_call_elem,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ grpc_channel_next_get_info,
+ "http-client"};
diff --git a/src/core/ext/filters/http/client/http_client_filter.h b/src/core/ext/filters/http/client/http_client_filter.h
new file mode 100644
index 0000000000..6e1eb3937b
--- /dev/null
+++ b/src/core/ext/filters/http/client/http_client_filter.h
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_FILTERS_HTTP_CLIENT_HTTP_CLIENT_FILTER_H
+#define GRPC_CORE_EXT_FILTERS_HTTP_CLIENT_HTTP_CLIENT_FILTER_H
+
+#include "src/core/lib/channel/channel_stack.h"
+
+/* Processes metadata on the client side for HTTP2 transports */
+extern const grpc_channel_filter grpc_http_client_filter;
+
+/* Channel arg to determine maximum size of payload eligable for GET request */
+#define GRPC_ARG_MAX_PAYLOAD_SIZE_FOR_GET "grpc.max_payload_size_for_get"
+
+#endif /* GRPC_CORE_EXT_FILTERS_HTTP_CLIENT_HTTP_CLIENT_FILTER_H */
diff --git a/src/core/ext/filters/http/http_filters_plugin.c b/src/core/ext/filters/http/http_filters_plugin.c
new file mode 100644
index 0000000000..195a1a8119
--- /dev/null
+++ b/src/core/ext/filters/http/http_filters_plugin.c
@@ -0,0 +1,103 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <string.h>
+
+#include "src/core/ext/filters/http/client/http_client_filter.h"
+#include "src/core/ext/filters/http/message_compress/message_compress_filter.h"
+#include "src/core/ext/filters/http/server/http_server_filter.h"
+#include "src/core/lib/channel/channel_stack_builder.h"
+#include "src/core/lib/surface/channel_init.h"
+#include "src/core/lib/transport/transport_impl.h"
+
+typedef struct {
+ const grpc_channel_filter *filter;
+ const char *control_channel_arg;
+} optional_filter;
+
+static optional_filter compress_filter = {
+ &grpc_message_compress_filter, GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION};
+
+static bool is_building_http_like_transport(
+ grpc_channel_stack_builder *builder) {
+ grpc_transport *t = grpc_channel_stack_builder_get_transport(builder);
+ return t != NULL && strstr(t->vtable->name, "http");
+}
+
+static bool maybe_add_optional_filter(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack_builder *builder,
+ void *arg) {
+ if (!is_building_http_like_transport(builder)) return true;
+ optional_filter *filtarg = arg;
+ const grpc_channel_args *channel_args =
+ grpc_channel_stack_builder_get_channel_arguments(builder);
+ bool enable = grpc_channel_arg_get_bool(
+ grpc_channel_args_find(channel_args, filtarg->control_channel_arg),
+ !grpc_channel_args_want_minimal_stack(channel_args));
+ return enable ? grpc_channel_stack_builder_prepend_filter(
+ builder, filtarg->filter, NULL, NULL)
+ : true;
+}
+
+static bool maybe_add_required_filter(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack_builder *builder,
+ void *arg) {
+ return is_building_http_like_transport(builder)
+ ? grpc_channel_stack_builder_prepend_filter(
+ builder, (const grpc_channel_filter *)arg, NULL, NULL)
+ : true;
+}
+
+void grpc_http_filters_init(void) {
+ grpc_register_tracer("compression", &grpc_compression_trace);
+ grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_optional_filter, &compress_filter);
+ grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_optional_filter, &compress_filter);
+ grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_optional_filter, &compress_filter);
+ grpc_channel_init_register_stage(
+ GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_required_filter, (void *)&grpc_http_client_filter);
+ grpc_channel_init_register_stage(
+ GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_required_filter, (void *)&grpc_http_client_filter);
+ grpc_channel_init_register_stage(
+ GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_required_filter, (void *)&grpc_http_server_filter);
+}
+
+void grpc_http_filters_shutdown(void) {}
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c
new file mode 100644
index 0000000000..4f5f41e9b0
--- /dev/null
+++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c
@@ -0,0 +1,451 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <assert.h>
+#include <string.h>
+
+#include <grpc/compression.h>
+#include <grpc/slice_buffer.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/ext/filters/http/message_compress/message_compress_filter.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/compression/algorithm_metadata.h"
+#include "src/core/lib/compression/message_compress.h"
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/transport/static_metadata.h"
+
+#define INITIAL_METADATA_UNSEEN 0
+#define HAS_COMPRESSION_ALGORITHM 2
+#define NO_COMPRESSION_ALGORITHM 4
+
+#define CANCELLED_BIT ((gpr_atm)1)
+
+typedef struct call_data {
+ grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */
+ grpc_linked_mdelem compression_algorithm_storage;
+ grpc_linked_mdelem accept_encoding_storage;
+ uint32_t remaining_slice_bytes;
+ /** Compression algorithm we'll try to use. It may be given by incoming
+ * metadata, or by the channel's default compression settings. */
+ grpc_compression_algorithm compression_algorithm;
+
+ /* Atomic recording the state of initial metadata; allowed values:
+ INITIAL_METADATA_UNSEEN - initial metadata op not seen
+ HAS_COMPRESSION_ALGORITHM - initial metadata seen; compression algorithm
+ set
+ NO_COMPRESSION_ALGORITHM - initial metadata seen; no compression algorithm
+ set
+ pointer - a stalled op containing a send_message that's waiting on initial
+ metadata
+ pointer | CANCELLED_BIT - request was cancelled with error pointed to */
+ gpr_atm send_initial_metadata_state;
+
+ grpc_transport_stream_op_batch *send_op;
+ uint32_t send_length;
+ uint32_t send_flags;
+ grpc_slice incoming_slice;
+ grpc_slice_buffer_stream replacement_stream;
+ grpc_closure *post_send;
+ grpc_closure send_done;
+ grpc_closure got_slice;
+} call_data;
+
+typedef struct channel_data {
+ /** The default, channel-level, compression algorithm */
+ grpc_compression_algorithm default_compression_algorithm;
+ /** Bitset of enabled algorithms */
+ uint32_t enabled_algorithms_bitset;
+ /** Supported compression algorithms */
+ uint32_t supported_compression_algorithms;
+} channel_data;
+
+static bool skip_compression(grpc_call_element *elem, uint32_t flags,
+ bool has_compression_algorithm) {
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+
+ if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) {
+ return 1;
+ }
+ if (has_compression_algorithm) {
+ if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
+ return 1;
+ }
+ return 0; /* we have an actual call-specific algorithm */
+ }
+ /* no per-call compression override */
+ return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
+}
+
+/** Filter initial metadata */
+static grpc_error *process_send_initial_metadata(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_metadata_batch *initial_metadata,
+ bool *has_compression_algorithm) GRPC_MUST_USE_RESULT;
+static grpc_error *process_send_initial_metadata(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_metadata_batch *initial_metadata, bool *has_compression_algorithm) {
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+ *has_compression_algorithm = false;
+ /* Parse incoming request for compression. If any, it'll be available
+ * at calld->compression_algorithm */
+ if (initial_metadata->idx.named.grpc_internal_encoding_request != NULL) {
+ grpc_mdelem md =
+ initial_metadata->idx.named.grpc_internal_encoding_request->md;
+ if (!grpc_compression_algorithm_parse(GRPC_MDVALUE(md),
+ &calld->compression_algorithm)) {
+ char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
+ gpr_log(GPR_ERROR,
+ "Invalid compression algorithm: '%s' (unknown). Ignoring.", val);
+ gpr_free(val);
+ calld->compression_algorithm = GRPC_COMPRESS_NONE;
+ }
+ if (!GPR_BITGET(channeld->enabled_algorithms_bitset,
+ calld->compression_algorithm)) {
+ char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
+ gpr_log(GPR_ERROR,
+ "Invalid compression algorithm: '%s' (previously disabled). "
+ "Ignoring.",
+ val);
+ gpr_free(val);
+ calld->compression_algorithm = GRPC_COMPRESS_NONE;
+ }
+ *has_compression_algorithm = true;
+
+ grpc_metadata_batch_remove(
+ exec_ctx, initial_metadata,
+ initial_metadata->idx.named.grpc_internal_encoding_request);
+ } else {
+ /* If no algorithm was found in the metadata and we aren't
+ * exceptionally skipping compression, fall back to the channel
+ * default */
+ calld->compression_algorithm = channeld->default_compression_algorithm;
+ *has_compression_algorithm = true;
+ }
+
+ grpc_error *error = GRPC_ERROR_NONE;
+ /* hint compression algorithm */
+ if (calld->compression_algorithm != GRPC_COMPRESS_NONE) {
+ error = grpc_metadata_batch_add_tail(
+ exec_ctx, initial_metadata, &calld->compression_algorithm_storage,
+ grpc_compression_encoding_mdelem(calld->compression_algorithm));
+ }
+
+ if (error != GRPC_ERROR_NONE) return error;
+
+ /* convey supported compression algorithms */
+ error = grpc_metadata_batch_add_tail(
+ exec_ctx, initial_metadata, &calld->accept_encoding_storage,
+ GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS(
+ channeld->supported_compression_algorithms));
+
+ return error;
+}
+
+static void continue_send_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem);
+
+static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
+ grpc_call_element *elem = elemp;
+ call_data *calld = elem->call_data;
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices);
+ calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error);
+}
+
+static void finish_send_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ int did_compress;
+ grpc_slice_buffer tmp;
+ grpc_slice_buffer_init(&tmp);
+ did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm,
+ &calld->slices, &tmp);
+ if (did_compress) {
+ if (grpc_compression_trace) {
+ char *algo_name;
+ const size_t before_size = calld->slices.length;
+ const size_t after_size = tmp.length;
+ const float savings_ratio = 1.0f - (float)after_size / (float)before_size;
+ GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm,
+ &algo_name));
+ gpr_log(GPR_DEBUG, "Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR
+ " bytes (%.2f%% savings)",
+ algo_name, before_size, after_size, 100 * savings_ratio);
+ }
+ grpc_slice_buffer_swap(&calld->slices, &tmp);
+ calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+ } else {
+ if (grpc_compression_trace) {
+ char *algo_name;
+ GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm,
+ &algo_name));
+ gpr_log(GPR_DEBUG,
+ "Algorithm '%s' enabled but decided not to compress. Input size: "
+ "%" PRIuPTR,
+ algo_name, calld->slices.length);
+ }
+ }
+
+ grpc_slice_buffer_destroy_internal(exec_ctx, &tmp);
+
+ grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
+ calld->send_flags);
+ calld->send_op->payload->send_message.send_message =
+ &calld->replacement_stream.base;
+ calld->post_send = calld->send_op->on_complete;
+ calld->send_op->on_complete = &calld->send_done;
+
+ grpc_call_next_op(exec_ctx, elem, calld->send_op);
+}
+
+static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
+ grpc_call_element *elem = elemp;
+ call_data *calld = elem->call_data;
+ if (GRPC_ERROR_NONE !=
+ grpc_byte_stream_pull(exec_ctx,
+ calld->send_op->payload->send_message.send_message,
+ &calld->incoming_slice)) {
+ /* Should never reach here */
+ abort();
+ }
+ grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
+ if (calld->send_length == calld->slices.length) {
+ finish_send_message(exec_ctx, elem);
+ } else {
+ continue_send_message(exec_ctx, elem);
+ }
+}
+
+static void continue_send_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ while (grpc_byte_stream_next(
+ exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0,
+ &calld->got_slice)) {
+ grpc_byte_stream_pull(exec_ctx,
+ calld->send_op->payload->send_message.send_message,
+ &calld->incoming_slice);
+ grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
+ if (calld->send_length == calld->slices.length) {
+ finish_send_message(exec_ctx, elem);
+ break;
+ }
+ }
+}
+
+static void compress_start_transport_stream_op_batch(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op) {
+ call_data *calld = elem->call_data;
+
+ GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0);
+
+ if (op->cancel_stream) {
+ gpr_atm cur;
+ GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error);
+ do {
+ cur = gpr_atm_acq_load(&calld->send_initial_metadata_state);
+ } while (!gpr_atm_rel_cas(
+ &calld->send_initial_metadata_state, cur,
+ CANCELLED_BIT | (gpr_atm)op->payload->cancel_stream.cancel_error));
+ switch (cur) {
+ case HAS_COMPRESSION_ALGORITHM:
+ case NO_COMPRESSION_ALGORITHM:
+ case INITIAL_METADATA_UNSEEN:
+ break;
+ default:
+ if ((cur & CANCELLED_BIT) == 0) {
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, (grpc_transport_stream_op_batch *)cur,
+ GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
+ } else {
+ GRPC_ERROR_UNREF((grpc_error *)(cur & ~CANCELLED_BIT));
+ }
+ break;
+ }
+ }
+
+ if (op->send_initial_metadata) {
+ bool has_compression_algorithm;
+ grpc_error *error = process_send_initial_metadata(
+ exec_ctx, elem,
+ op->payload->send_initial_metadata.send_initial_metadata,
+ &has_compression_algorithm);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
+ return;
+ }
+ gpr_atm cur = gpr_atm_acq_load(&calld->send_initial_metadata_state);
+ GPR_ASSERT(cur != HAS_COMPRESSION_ALGORITHM &&
+ cur != NO_COMPRESSION_ALGORITHM);
+ if ((cur & CANCELLED_BIT) == 0) {
+ gpr_atm_rel_store(&calld->send_initial_metadata_state,
+ has_compression_algorithm ? HAS_COMPRESSION_ALGORITHM
+ : NO_COMPRESSION_ALGORITHM);
+ if (cur != INITIAL_METADATA_UNSEEN) {
+ grpc_call_next_op(exec_ctx, elem,
+ (grpc_transport_stream_op_batch *)cur);
+ }
+ }
+ }
+ if (op->send_message) {
+ gpr_atm cur;
+ retry_send:
+ cur = gpr_atm_acq_load(&calld->send_initial_metadata_state);
+ switch (cur) {
+ case INITIAL_METADATA_UNSEEN:
+ if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur,
+ (gpr_atm)op)) {
+ goto retry_send;
+ }
+ break;
+ case HAS_COMPRESSION_ALGORITHM:
+ case NO_COMPRESSION_ALGORITHM:
+ if (!skip_compression(elem,
+ op->payload->send_message.send_message->flags,
+ cur == HAS_COMPRESSION_ALGORITHM)) {
+ calld->send_op = op;
+ calld->send_length = op->payload->send_message.send_message->length;
+ calld->send_flags = op->payload->send_message.send_message->flags;
+ continue_send_message(exec_ctx, elem);
+ } else {
+ /* pass control down the stack */
+ grpc_call_next_op(exec_ctx, elem, op);
+ }
+ break;
+ default:
+ if (cur & CANCELLED_BIT) {
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, op,
+ GRPC_ERROR_REF((grpc_error *)(cur & ~CANCELLED_BIT)));
+ } else {
+ /* >1 send_message concurrently */
+ GPR_UNREACHABLE_CODE(break);
+ }
+ }
+ } else {
+ /* pass control down the stack */
+ grpc_call_next_op(exec_ctx, elem, op);
+ }
+
+ GPR_TIMER_END("compress_start_transport_stream_op_batch", 0);
+}
+
+/* Constructor for call_data */
+static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ const grpc_call_element_args *args) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+
+ /* initialize members */
+ grpc_slice_buffer_init(&calld->slices);
+ grpc_closure_init(&calld->got_slice, got_slice, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->send_done, send_done, elem,
+ grpc_schedule_on_exec_ctx);
+
+ return GRPC_ERROR_NONE;
+}
+
+/* Destructor for call_data */
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ const grpc_call_final_info *final_info,
+ grpc_closure *ignored) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices);
+ gpr_atm imstate =
+ gpr_atm_no_barrier_load(&calld->send_initial_metadata_state);
+ if (imstate & CANCELLED_BIT) {
+ GRPC_ERROR_UNREF((grpc_error *)(imstate & ~CANCELLED_BIT));
+ }
+}
+
+/* Constructor for channel_data */
+static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_channel_element_args *args) {
+ channel_data *channeld = elem->channel_data;
+
+ channeld->enabled_algorithms_bitset =
+ grpc_channel_args_compression_algorithm_get_states(args->channel_args);
+
+ channeld->default_compression_algorithm =
+ grpc_channel_args_get_compression_algorithm(args->channel_args);
+ /* Make sure the default isn't disabled. */
+ if (!GPR_BITGET(channeld->enabled_algorithms_bitset,
+ channeld->default_compression_algorithm)) {
+ gpr_log(GPR_DEBUG,
+ "compression algorithm %d not enabled: switching to none",
+ channeld->default_compression_algorithm);
+ channeld->default_compression_algorithm = GRPC_COMPRESS_NONE;
+ }
+
+ channeld->supported_compression_algorithms = 1; /* always support identity */
+ for (grpc_compression_algorithm algo_idx = 1;
+ algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
+ /* skip disabled algorithms */
+ if (!GPR_BITGET(channeld->enabled_algorithms_bitset, algo_idx)) {
+ continue;
+ }
+ channeld->supported_compression_algorithms |= 1u << algo_idx;
+ }
+
+ GPR_ASSERT(!args->is_last);
+ return GRPC_ERROR_NONE;
+}
+
+/* Destructor for channel data */
+static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem) {}
+
+const grpc_channel_filter grpc_message_compress_filter = {
+ compress_start_transport_stream_op_batch,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ init_call_elem,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ grpc_channel_next_get_info,
+ "compress"};
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.h b/src/core/ext/filters/http/message_compress/message_compress_filter.h
new file mode 100644
index 0000000000..75bfa17fba
--- /dev/null
+++ b/src/core/ext/filters/http/message_compress/message_compress_filter.h
@@ -0,0 +1,68 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_COMPRESS_FILTER_H
+#define GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_COMPRESS_FILTER_H
+
+#include <grpc/impl/codegen/compression_types.h>
+
+#include "src/core/lib/channel/channel_stack.h"
+
+extern int grpc_compression_trace;
+
+/** Compression filter for outgoing data.
+ *
+ * See <grpc/compression.h> for the available compression settings.
+ *
+ * Compression settings may come from:
+ * - Channel configuration, as established at channel creation time.
+ * - The metadata accompanying the outgoing data to be compressed. This is
+ * taken as a request only. We may choose not to honor it. The metadata key
+ * is given by \a GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY.
+ *
+ * Compression can be disabled for concrete messages (for instance in order to
+ * prevent CRIME/BEAST type attacks) by having the GRPC_WRITE_NO_COMPRESS set in
+ * the BEGIN_MESSAGE flags.
+ *
+ * The attempted compression mechanism is added to the resulting initial
+ * metadata under the'grpc-encoding' key.
+ *
+ * If compression is actually performed, BEGIN_MESSAGE's flag is modified to
+ * incorporate GRPC_WRITE_INTERNAL_COMPRESS. Otherwise, and regardless of the
+ * aforementioned 'grpc-encoding' metadata value, data will pass through
+ * uncompressed. */
+
+extern const grpc_channel_filter grpc_message_compress_filter;
+
+#endif /* GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_COMPRESS_FILTER_H \
+ */
diff --git a/src/core/ext/filters/http/server/http_server_filter.c b/src/core/ext/filters/http/server/http_server_filter.c
new file mode 100644
index 0000000000..ff857878e4
--- /dev/null
+++ b/src/core/ext/filters/http/server/http_server_filter.c
@@ -0,0 +1,445 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/filters/http/server/http_server_filter.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <string.h>
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/b64.h"
+#include "src/core/lib/slice/percent_encoding.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
+#include "src/core/lib/transport/static_metadata.h"
+
+#define EXPECTED_CONTENT_TYPE "application/grpc"
+#define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1
+
+extern int grpc_http_trace;
+
+typedef struct call_data {
+ grpc_linked_mdelem status;
+ grpc_linked_mdelem content_type;
+
+ /* did this request come with path query containing request payload */
+ bool seen_path_with_query;
+ /* flag to ensure payload_bin is delivered only once */
+ bool payload_bin_delivered;
+
+ grpc_metadata_batch *recv_initial_metadata;
+ uint32_t *recv_initial_metadata_flags;
+ /** Closure to call when finished with the hs_on_recv hook */
+ grpc_closure *on_done_recv;
+ /** Closure to call when we retrieve read message from the path URI
+ */
+ grpc_closure *recv_message_ready;
+ grpc_closure *on_complete;
+ grpc_byte_stream **pp_recv_message;
+ grpc_slice_buffer read_slice_buffer;
+ grpc_slice_buffer_stream read_stream;
+
+ /** Receive closures are chained: we inject this closure as the on_done_recv
+ up-call on transport_op, and remember to call our on_done_recv member
+ after handling it. */
+ grpc_closure hs_on_recv;
+ grpc_closure hs_on_complete;
+ grpc_closure hs_recv_message_ready;
+} call_data;
+
+typedef struct channel_data { uint8_t unused; } channel_data;
+
+static grpc_error *server_filter_outgoing_metadata(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_metadata_batch *b) {
+ if (b->idx.named.grpc_message != NULL) {
+ grpc_slice pct_encoded_msg = grpc_percent_encode_slice(
+ GRPC_MDVALUE(b->idx.named.grpc_message->md),
+ grpc_compatible_percent_encoding_unreserved_bytes);
+ if (grpc_slice_is_equivalent(pct_encoded_msg,
+ GRPC_MDVALUE(b->idx.named.grpc_message->md))) {
+ grpc_slice_unref_internal(exec_ctx, pct_encoded_msg);
+ } else {
+ grpc_metadata_batch_set_value(exec_ctx, b->idx.named.grpc_message,
+ pct_encoded_msg);
+ }
+ }
+ return GRPC_ERROR_NONE;
+}
+
+static void add_error(const char *error_name, grpc_error **cumulative,
+ grpc_error *new) {
+ if (new == GRPC_ERROR_NONE) return;
+ if (*cumulative == GRPC_ERROR_NONE) {
+ *cumulative = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_name);
+ }
+ *cumulative = grpc_error_add_child(*cumulative, new);
+}
+
+static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_metadata_batch *b) {
+ call_data *calld = elem->call_data;
+ grpc_error *error = GRPC_ERROR_NONE;
+ static const char *error_name = "Failed processing incoming headers";
+
+ if (b->idx.named.method != NULL) {
+ if (grpc_mdelem_eq(b->idx.named.method->md, GRPC_MDELEM_METHOD_POST)) {
+ *calld->recv_initial_metadata_flags &=
+ ~(GRPC_INITIAL_METADATA_CACHEABLE_REQUEST |
+ GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST);
+ } else if (grpc_mdelem_eq(b->idx.named.method->md,
+ GRPC_MDELEM_METHOD_PUT)) {
+ *calld->recv_initial_metadata_flags &=
+ ~GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
+ *calld->recv_initial_metadata_flags |=
+ GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
+ } else if (grpc_mdelem_eq(b->idx.named.method->md,
+ GRPC_MDELEM_METHOD_GET)) {
+ *calld->recv_initial_metadata_flags |=
+ GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
+ *calld->recv_initial_metadata_flags &=
+ ~GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
+ } else {
+ add_error(error_name, &error,
+ grpc_attach_md_to_error(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"),
+ b->idx.named.method->md));
+ }
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.method);
+ } else {
+ add_error(
+ error_name, &error,
+ grpc_error_set_str(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"),
+ GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":method")));
+ }
+
+ if (b->idx.named.te != NULL) {
+ if (!grpc_mdelem_eq(b->idx.named.te->md, GRPC_MDELEM_TE_TRAILERS)) {
+ add_error(error_name, &error,
+ grpc_attach_md_to_error(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"),
+ b->idx.named.te->md));
+ }
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.te);
+ } else {
+ add_error(error_name, &error,
+ grpc_error_set_str(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"),
+ GRPC_ERROR_STR_KEY, grpc_slice_from_static_string("te")));
+ }
+
+ if (b->idx.named.scheme != NULL) {
+ if (!grpc_mdelem_eq(b->idx.named.scheme->md, GRPC_MDELEM_SCHEME_HTTP) &&
+ !grpc_mdelem_eq(b->idx.named.scheme->md, GRPC_MDELEM_SCHEME_HTTPS) &&
+ !grpc_mdelem_eq(b->idx.named.scheme->md, GRPC_MDELEM_SCHEME_GRPC)) {
+ add_error(error_name, &error,
+ grpc_attach_md_to_error(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"),
+ b->idx.named.scheme->md));
+ }
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.scheme);
+ } else {
+ add_error(
+ error_name, &error,
+ grpc_error_set_str(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"),
+ GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":scheme")));
+ }
+
+ if (b->idx.named.content_type != NULL) {
+ if (!grpc_mdelem_eq(b->idx.named.content_type->md,
+ GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC)) {
+ if (grpc_slice_buf_start_eq(GRPC_MDVALUE(b->idx.named.content_type->md),
+ EXPECTED_CONTENT_TYPE,
+ EXPECTED_CONTENT_TYPE_LENGTH) &&
+ (GRPC_SLICE_START_PTR(GRPC_MDVALUE(
+ b->idx.named.content_type->md))[EXPECTED_CONTENT_TYPE_LENGTH] ==
+ '+' ||
+ GRPC_SLICE_START_PTR(GRPC_MDVALUE(
+ b->idx.named.content_type->md))[EXPECTED_CONTENT_TYPE_LENGTH] ==
+ ';')) {
+ /* Although the C implementation doesn't (currently) generate them,
+ any custom +-suffix is explicitly valid. */
+ /* TODO(klempner): We should consider preallocating common values such
+ as +proto or +json, or at least stashing them if we see them. */
+ /* TODO(klempner): Should we be surfacing this to application code? */
+ } else {
+ /* TODO(klempner): We're currently allowing this, but we shouldn't
+ see it without a proxy so log for now. */
+ char *val = grpc_dump_slice(GRPC_MDVALUE(b->idx.named.content_type->md),
+ GPR_DUMP_ASCII);
+ gpr_log(GPR_INFO, "Unexpected content-type '%s'", val);
+ gpr_free(val);
+ }
+ }
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.content_type);
+ }
+
+ if (b->idx.named.path == NULL) {
+ add_error(error_name, &error,
+ grpc_error_set_str(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"),
+ GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":path")));
+ } else if (*calld->recv_initial_metadata_flags &
+ GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) {
+ /* We have a cacheable request made with GET verb. The path contains the
+ * query parameter which is base64 encoded request payload. */
+ const char k_query_separator = '?';
+ grpc_slice path_slice = GRPC_MDVALUE(b->idx.named.path->md);
+ uint8_t *path_ptr = (uint8_t *)GRPC_SLICE_START_PTR(path_slice);
+ size_t path_length = GRPC_SLICE_LENGTH(path_slice);
+ /* offset of the character '?' */
+ size_t offset = 0;
+ for (offset = 0; offset < path_length && *path_ptr != k_query_separator;
+ path_ptr++, offset++)
+ ;
+ if (offset < path_length) {
+ grpc_slice query_slice =
+ grpc_slice_sub(path_slice, offset + 1, path_length);
+
+ /* substitute path metadata with just the path (not query) */
+ grpc_mdelem mdelem_path_without_query = grpc_mdelem_from_slices(
+ exec_ctx, GRPC_MDSTR_PATH, grpc_slice_sub(path_slice, 0, offset));
+
+ grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path,
+ mdelem_path_without_query);
+
+ /* decode payload from query and add to the slice buffer to be returned */
+ const int k_url_safe = 1;
+ grpc_slice_buffer_add(
+ &calld->read_slice_buffer,
+ grpc_base64_decode_with_len(
+ exec_ctx, (const char *)GRPC_SLICE_START_PTR(query_slice),
+ GRPC_SLICE_LENGTH(query_slice), k_url_safe));
+ grpc_slice_buffer_stream_init(&calld->read_stream,
+ &calld->read_slice_buffer, 0);
+ calld->seen_path_with_query = true;
+ grpc_slice_unref_internal(exec_ctx, query_slice);
+ } else {
+ gpr_log(GPR_ERROR, "GET request without QUERY");
+ }
+ }
+
+ if (b->idx.named.host != NULL && b->idx.named.authority == NULL) {
+ grpc_linked_mdelem *el = b->idx.named.host;
+ grpc_mdelem md = GRPC_MDELEM_REF(el->md);
+ grpc_metadata_batch_remove(exec_ctx, b, el);
+ add_error(
+ error_name, &error,
+ grpc_metadata_batch_add_head(
+ exec_ctx, b, el, grpc_mdelem_from_slices(
+ exec_ctx, GRPC_MDSTR_AUTHORITY,
+ grpc_slice_ref_internal(GRPC_MDVALUE(md)))));
+ GRPC_MDELEM_UNREF(exec_ctx, md);
+ }
+
+ if (b->idx.named.authority == NULL) {
+ add_error(
+ error_name, &error,
+ grpc_error_set_str(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"),
+ GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":authority")));
+ }
+
+ return error;
+}
+
+static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_error *err) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ if (err == GRPC_ERROR_NONE) {
+ err = server_filter_incoming_metadata(exec_ctx, elem,
+ calld->recv_initial_metadata);
+ } else {
+ GRPC_ERROR_REF(err);
+ }
+ grpc_closure_run(exec_ctx, calld->on_done_recv, err);
+}
+
+static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_error *err) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ /* Call recv_message_ready if we got the payload via the path field */
+ if (calld->seen_path_with_query && calld->recv_message_ready != NULL) {
+ *calld->pp_recv_message = calld->payload_bin_delivered
+ ? NULL
+ : (grpc_byte_stream *)&calld->read_stream;
+ grpc_closure_run(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err));
+ calld->recv_message_ready = NULL;
+ calld->payload_bin_delivered = true;
+ }
+ grpc_closure_run(exec_ctx, calld->on_complete, GRPC_ERROR_REF(err));
+}
+
+static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_error *err) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ if (calld->seen_path_with_query) {
+ /* do nothing. This is probably a GET request, and payload will be returned
+ in hs_on_complete callback. */
+ } else {
+ grpc_closure_run(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err));
+ }
+}
+
+static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+
+ if (op->send_initial_metadata) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ static const char *error_name = "Failed sending initial metadata";
+ add_error(
+ error_name, &error,
+ grpc_metadata_batch_add_head(
+ exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ &calld->status, GRPC_MDELEM_STATUS_200));
+ add_error(
+ error_name, &error,
+ grpc_metadata_batch_add_tail(
+ exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
+ &calld->content_type,
+ GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC));
+ add_error(error_name, &error,
+ server_filter_outgoing_metadata(
+ exec_ctx, elem,
+ op->payload->send_initial_metadata.send_initial_metadata));
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
+ return;
+ }
+ }
+
+ if (op->recv_initial_metadata) {
+ /* substitute our callback for the higher callback */
+ GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags != NULL);
+ calld->recv_initial_metadata =
+ op->payload->recv_initial_metadata.recv_initial_metadata;
+ calld->recv_initial_metadata_flags =
+ op->payload->recv_initial_metadata.recv_flags;
+ calld->on_done_recv =
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready;
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready =
+ &calld->hs_on_recv;
+ }
+
+ if (op->recv_message) {
+ calld->recv_message_ready = op->payload->recv_message.recv_message_ready;
+ calld->pp_recv_message = op->payload->recv_message.recv_message;
+ if (op->payload->recv_message.recv_message_ready) {
+ op->payload->recv_message.recv_message_ready =
+ &calld->hs_recv_message_ready;
+ }
+ if (op->on_complete) {
+ calld->on_complete = op->on_complete;
+ op->on_complete = &calld->hs_on_complete;
+ }
+ }
+
+ if (op->send_trailing_metadata) {
+ grpc_error *error = server_filter_outgoing_metadata(
+ exec_ctx, elem,
+ op->payload->send_trailing_metadata.send_trailing_metadata);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
+ return;
+ }
+ }
+}
+
+static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ GPR_TIMER_BEGIN("hs_start_transport_op", 0);
+ hs_mutate_op(exec_ctx, elem, op);
+ grpc_call_next_op(exec_ctx, elem, op);
+ GPR_TIMER_END("hs_start_transport_op", 0);
+}
+
+/* Constructor for call_data */
+static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ const grpc_call_element_args *args) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ /* initialize members */
+ grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&calld->hs_recv_message_ready, hs_recv_message_ready, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_slice_buffer_init(&calld->read_slice_buffer);
+ return GRPC_ERROR_NONE;
+}
+
+/* Destructor for call_data */
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ const grpc_call_final_info *final_info,
+ grpc_closure *ignored) {
+ call_data *calld = elem->call_data;
+ grpc_slice_buffer_destroy_internal(exec_ctx, &calld->read_slice_buffer);
+}
+
+/* Constructor for channel_data */
+static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_channel_element_args *args) {
+ GPR_ASSERT(!args->is_last);
+ return GRPC_ERROR_NONE;
+}
+
+/* Destructor for channel data */
+static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem) {}
+
+const grpc_channel_filter grpc_http_server_filter = {
+ hs_start_transport_op,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ init_call_elem,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ grpc_channel_next_get_info,
+ "http-server"};
diff --git a/src/core/ext/filters/http/server/http_server_filter.h b/src/core/ext/filters/http/server/http_server_filter.h
new file mode 100644
index 0000000000..8a2b2196ae
--- /dev/null
+++ b/src/core/ext/filters/http/server/http_server_filter.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_FILTERS_HTTP_SERVER_HTTP_SERVER_FILTER_H
+#define GRPC_CORE_EXT_FILTERS_HTTP_SERVER_HTTP_SERVER_FILTER_H
+
+#include "src/core/lib/channel/channel_stack.h"
+
+/* Processes metadata on the client side for HTTP2 transports */
+extern const grpc_channel_filter grpc_http_server_filter;
+
+#endif /* GRPC_CORE_EXT_FILTERS_HTTP_SERVER_HTTP_SERVER_FILTER_H */
diff --git a/src/core/ext/filters/load_reporting/load_reporting.c b/src/core/ext/filters/load_reporting/load_reporting.c
index f4dac15a60..4e9d0937ae 100644
--- a/src/core/ext/filters/load_reporting/load_reporting.c
+++ b/src/core/ext/filters/load_reporting/load_reporting.c
@@ -47,32 +47,9 @@
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel_init.h"
-static void destroy_lr_cost_context(void *c) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_load_reporting_cost_context *cost_ctx = c;
- for (size_t i = 0; i < cost_ctx->values_count; ++i) {
- grpc_slice_unref_internal(&exec_ctx, cost_ctx->values[i]);
- }
- grpc_exec_ctx_finish(&exec_ctx);
- gpr_free(cost_ctx->values);
- gpr_free(cost_ctx);
-}
-
-void grpc_call_set_load_reporting_cost_context(
- grpc_call *call, grpc_load_reporting_cost_context *ctx) {
- grpc_call_context_set(call, GRPC_CONTEXT_LR_COST, ctx,
- destroy_lr_cost_context);
-}
-
static bool is_load_reporting_enabled(const grpc_channel_args *a) {
- if (a == NULL) return false;
- for (size_t i = 0; i < a->num_args; i++) {
- if (0 == strcmp(a->args[i].key, GRPC_ARG_ENABLE_LOAD_REPORTING)) {
- return a->args[i].type == GRPC_ARG_INTEGER &&
- a->args[i].value.integer != 0;
- }
- }
- return false;
+ return grpc_channel_arg_get_bool(
+ grpc_channel_args_find(a, GRPC_ARG_ENABLE_LOAD_REPORTING), false);
}
static bool maybe_add_load_reporting_filter(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/filters/load_reporting/load_reporting_filter.c b/src/core/ext/filters/load_reporting/load_reporting_filter.c
index 57b25d0651..75a9a56687 100644
--- a/src/core/ext/filters/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/filters/load_reporting/load_reporting_filter.c
@@ -48,6 +48,8 @@
typedef struct call_data {
intptr_t id; /**< an id unique to the call */
+ bool have_trailing_md_string;
+ grpc_slice trailing_md_string;
bool have_initial_md_string;
grpc_slice initial_md_string;
bool have_service_method;
@@ -140,6 +142,9 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
if (calld->have_initial_md_string) {
grpc_slice_unref_internal(exec_ctx, calld->initial_md_string);
}
+ if (calld->have_trailing_md_string) {
+ grpc_slice_unref_internal(exec_ctx, calld->trailing_md_string);
+ }
if (calld->have_service_method) {
grpc_slice_unref_internal(exec_ctx, calld->service_method);
}
@@ -183,6 +188,18 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
*/
}
+static grpc_filtered_mdelem lr_trailing_md_filter(grpc_exec_ctx *exec_ctx,
+ void *user_data,
+ grpc_mdelem md) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_LB_COST_BIN)) {
+ calld->trailing_md_string = GRPC_MDVALUE(md);
+ return GRPC_FILTERED_REMOVE();
+ }
+ return GRPC_FILTERED_MDELEM(md);
+}
+
static void lr_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
@@ -190,13 +207,21 @@ static void lr_start_transport_stream_op_batch(
call_data *calld = elem->call_data;
if (op->recv_initial_metadata) {
+ /* substitute our callback for the higher callback */
calld->recv_initial_metadata =
op->payload->recv_initial_metadata.recv_initial_metadata;
- /* substitute our callback for the higher callback */
calld->ops_recv_initial_metadata_ready =
op->payload->recv_initial_metadata.recv_initial_metadata_ready;
op->payload->recv_initial_metadata.recv_initial_metadata_ready =
&calld->on_initial_md_ready;
+ } else if (op->send_trailing_metadata) {
+ GRPC_LOG_IF_ERROR(
+ "grpc_metadata_batch_filter",
+ grpc_metadata_batch_filter(
+ exec_ctx,
+ op->payload->send_trailing_metadata.send_trailing_metadata,
+ lr_trailing_md_filter, elem,
+ "LR trailing metadata filtering error"));
}
grpc_call_next_op(exec_ctx, elem, op);
diff --git a/src/core/ext/filters/max_age/max_age_filter.c b/src/core/ext/filters/max_age/max_age_filter.c
index b9fde36286..a9d91e2f80 100644
--- a/src/core/ext/filters/max_age/max_age_filter.c
+++ b/src/core/ext/filters/max_age/max_age_filter.c
@@ -47,6 +47,11 @@
#define DEFAULT_MAX_CONNECTION_IDLE_MS INT_MAX
#define MAX_CONNECTION_AGE_JITTER 0.1
+#define MAX_CONNECTION_AGE_INTEGER_OPTIONS \
+ (grpc_integer_options) { DEFAULT_MAX_CONNECTION_AGE_MS, 1, INT_MAX }
+#define MAX_CONNECTION_IDLE_INTEGER_OPTIONS \
+ (grpc_integer_options) { DEFAULT_MAX_CONNECTION_IDLE_MS, 1, INT_MAX }
+
typedef struct channel_data {
/* We take a reference to the channel stack for the timer callback */
grpc_channel_stack* channel_stack;
@@ -315,8 +320,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
if (0 == strcmp(args->channel_args->args[i].key,
GRPC_ARG_MAX_CONNECTION_AGE_MS)) {
const int value = grpc_channel_arg_get_integer(
- &args->channel_args->args[i],
- (grpc_integer_options){DEFAULT_MAX_CONNECTION_AGE_MS, 1, INT_MAX});
+ &args->channel_args->args[i], MAX_CONNECTION_AGE_INTEGER_OPTIONS);
chand->max_connection_age =
value == INT_MAX
? gpr_inf_future(GPR_TIMESPAN)
@@ -334,8 +338,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
} else if (0 == strcmp(args->channel_args->args[i].key,
GRPC_ARG_MAX_CONNECTION_IDLE_MS)) {
const int value = grpc_channel_arg_get_integer(
- &args->channel_args->args[i],
- (grpc_integer_options){DEFAULT_MAX_CONNECTION_IDLE_MS, 1, INT_MAX});
+ &args->channel_args->args[i], MAX_CONNECTION_IDLE_INTEGER_OPTIONS);
chand->max_connection_idle =
value == INT_MAX ? gpr_inf_future(GPR_TIMESPAN)
: gpr_time_from_millis(value, GPR_TIMESPAN);
@@ -412,16 +415,13 @@ static bool maybe_add_max_age_filter(grpc_exec_ctx* exec_ctx,
void* arg) {
const grpc_channel_args* channel_args =
grpc_channel_stack_builder_get_channel_arguments(builder);
- const grpc_arg* a =
- grpc_channel_args_find(channel_args, GRPC_ARG_MAX_CONNECTION_AGE_MS);
- bool enable = false;
- if (a != NULL && a->type == GRPC_ARG_INTEGER && a->value.integer != INT_MAX) {
- enable = true;
- }
- a = grpc_channel_args_find(channel_args, GRPC_ARG_MAX_CONNECTION_IDLE_MS);
- if (a != NULL && a->type == GRPC_ARG_INTEGER && a->value.integer != INT_MAX) {
- enable = true;
- }
+ bool enable =
+ grpc_channel_arg_get_integer(
+ grpc_channel_args_find(channel_args, GRPC_ARG_MAX_CONNECTION_AGE_MS),
+ MAX_CONNECTION_AGE_INTEGER_OPTIONS) != INT_MAX &&
+ grpc_channel_arg_get_integer(
+ grpc_channel_args_find(channel_args, GRPC_ARG_MAX_CONNECTION_IDLE_MS),
+ MAX_CONNECTION_IDLE_INTEGER_OPTIONS) != INT_MAX;
if (enable) {
return grpc_channel_stack_builder_prepend_filter(
builder, &grpc_max_age_filter, NULL, NULL);
diff --git a/src/core/ext/filters/message_size/message_size_filter.c b/src/core/ext/filters/message_size/message_size_filter.c
new file mode 100644
index 0000000000..b615116965
--- /dev/null
+++ b/src/core/ext/filters/message_size/message_size_filter.c
@@ -0,0 +1,314 @@
+//
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+#include "src/core/ext/filters/message_size/message_size_filter.h"
+
+#include <limits.h>
+#include <string.h>
+
+#include <grpc/impl/codegen/grpc_types.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/channel_stack_builder.h"
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/surface/channel_init.h"
+#include "src/core/lib/transport/service_config.h"
+
+typedef struct message_size_limits {
+ int max_send_size;
+ int max_recv_size;
+} message_size_limits;
+
+static void message_size_limits_free(grpc_exec_ctx* exec_ctx, void* value) {
+ gpr_free(value);
+}
+
+static void* message_size_limits_create_from_json(const grpc_json* json) {
+ int max_request_message_bytes = -1;
+ int max_response_message_bytes = -1;
+ for (grpc_json* field = json->child; field != NULL; field = field->next) {
+ if (field->key == NULL) continue;
+ if (strcmp(field->key, "maxRequestMessageBytes") == 0) {
+ if (max_request_message_bytes >= 0) return NULL; // Duplicate.
+ if (field->type != GRPC_JSON_STRING && field->type != GRPC_JSON_NUMBER) {
+ return NULL;
+ }
+ max_request_message_bytes = gpr_parse_nonnegative_int(field->value);
+ if (max_request_message_bytes == -1) return NULL;
+ } else if (strcmp(field->key, "maxResponseMessageBytes") == 0) {
+ if (max_response_message_bytes >= 0) return NULL; // Duplicate.
+ if (field->type != GRPC_JSON_STRING && field->type != GRPC_JSON_NUMBER) {
+ return NULL;
+ }
+ max_response_message_bytes = gpr_parse_nonnegative_int(field->value);
+ if (max_response_message_bytes == -1) return NULL;
+ }
+ }
+ message_size_limits* value = gpr_malloc(sizeof(message_size_limits));
+ value->max_send_size = max_request_message_bytes;
+ value->max_recv_size = max_response_message_bytes;
+ return value;
+}
+
+typedef struct call_data {
+ message_size_limits limits;
+ // Receive closures are chained: we inject this closure as the
+ // recv_message_ready up-call on transport_stream_op, and remember to
+ // call our next_recv_message_ready member after handling it.
+ grpc_closure recv_message_ready;
+ // Used by recv_message_ready.
+ grpc_byte_stream** recv_message;
+ // Original recv_message_ready callback, invoked after our own.
+ grpc_closure* next_recv_message_ready;
+} call_data;
+
+typedef struct channel_data {
+ message_size_limits limits;
+ // Maps path names to message_size_limits structs.
+ grpc_slice_hash_table* method_limit_table;
+} channel_data;
+
+// Callback invoked when we receive a message. Here we check the max
+// receive message size.
+static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
+ grpc_error* error) {
+ grpc_call_element* elem = user_data;
+ call_data* calld = elem->call_data;
+ if (*calld->recv_message != NULL && calld->limits.max_recv_size >= 0 &&
+ (*calld->recv_message)->length > (size_t)calld->limits.max_recv_size) {
+ char* message_string;
+ gpr_asprintf(&message_string,
+ "Received message larger than max (%u vs. %d)",
+ (*calld->recv_message)->length, calld->limits.max_recv_size);
+ grpc_error* new_error = grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED);
+ if (error == GRPC_ERROR_NONE) {
+ error = new_error;
+ } else {
+ error = grpc_error_add_child(error, new_error);
+ GRPC_ERROR_UNREF(new_error);
+ }
+ gpr_free(message_string);
+ } else {
+ GRPC_ERROR_REF(error);
+ }
+ // Invoke the next callback.
+ grpc_closure_run(exec_ctx, calld->next_recv_message_ready, error);
+}
+
+// Start transport stream op.
+static void start_transport_stream_op_batch(
+ grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_transport_stream_op_batch* op) {
+ call_data* calld = elem->call_data;
+ // Check max send message size.
+ if (op->send_message && calld->limits.max_send_size >= 0 &&
+ op->payload->send_message.send_message->length >
+ (size_t)calld->limits.max_send_size) {
+ char* message_string;
+ gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)",
+ op->payload->send_message.send_message->length,
+ calld->limits.max_send_size);
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, op,
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string),
+ GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_RESOURCE_EXHAUSTED));
+ gpr_free(message_string);
+ return;
+ }
+ // Inject callback for receiving a message.
+ if (op->recv_message) {
+ calld->next_recv_message_ready =
+ op->payload->recv_message.recv_message_ready;
+ calld->recv_message = op->payload->recv_message.recv_message;
+ op->payload->recv_message.recv_message_ready = &calld->recv_message_ready;
+ }
+ // Chain to the next filter.
+ grpc_call_next_op(exec_ctx, elem, op);
+}
+
+// Constructor for call_data.
+static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ const grpc_call_element_args* args) {
+ channel_data* chand = elem->channel_data;
+ call_data* calld = elem->call_data;
+ calld->next_recv_message_ready = NULL;
+ grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem,
+ grpc_schedule_on_exec_ctx);
+ // Get max sizes from channel data, then merge in per-method config values.
+ // Note: Per-method config is only available on the client, so we
+ // apply the max request size to the send limit and the max response
+ // size to the receive limit.
+ calld->limits = chand->limits;
+ if (chand->method_limit_table != NULL) {
+ message_size_limits* limits = grpc_method_config_table_get(
+ exec_ctx, chand->method_limit_table, args->path);
+ if (limits != NULL) {
+ if (limits->max_send_size >= 0 &&
+ (limits->max_send_size < calld->limits.max_send_size ||
+ calld->limits.max_send_size < 0)) {
+ calld->limits.max_send_size = limits->max_send_size;
+ }
+ if (limits->max_recv_size >= 0 &&
+ (limits->max_recv_size < calld->limits.max_recv_size ||
+ calld->limits.max_recv_size < 0)) {
+ calld->limits.max_recv_size = limits->max_recv_size;
+ }
+ }
+ }
+ return GRPC_ERROR_NONE;
+}
+
+// Destructor for call_data.
+static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ const grpc_call_final_info* final_info,
+ grpc_closure* ignored) {}
+
+static int default_size(const grpc_channel_args* args,
+ int without_minimal_stack) {
+ if (grpc_channel_args_want_minimal_stack(args)) {
+ return -1;
+ }
+ return without_minimal_stack;
+}
+
+message_size_limits get_message_size_limits(
+ const grpc_channel_args* channel_args) {
+ message_size_limits lim;
+ lim.max_send_size =
+ default_size(channel_args, GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH);
+ lim.max_recv_size =
+ default_size(channel_args, GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH);
+ for (size_t i = 0; i < channel_args->num_args; ++i) {
+ if (strcmp(channel_args->args[i].key, GRPC_ARG_MAX_SEND_MESSAGE_LENGTH) ==
+ 0) {
+ const grpc_integer_options options = {lim.max_send_size, -1, INT_MAX};
+ lim.max_send_size =
+ grpc_channel_arg_get_integer(&channel_args->args[i], options);
+ }
+ if (strcmp(channel_args->args[i].key,
+ GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH) == 0) {
+ const grpc_integer_options options = {lim.max_recv_size, -1, INT_MAX};
+ lim.max_recv_size =
+ grpc_channel_arg_get_integer(&channel_args->args[i], options);
+ }
+ }
+ return lim;
+}
+
+// Constructor for channel_data.
+static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem,
+ grpc_channel_element_args* args) {
+ GPR_ASSERT(!args->is_last);
+ channel_data* chand = elem->channel_data;
+ chand->limits = get_message_size_limits(args->channel_args);
+ // Get method config table from channel args.
+ const grpc_arg* channel_arg =
+ grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG);
+ if (channel_arg != NULL) {
+ GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
+ grpc_service_config* service_config =
+ grpc_service_config_create(channel_arg->value.string);
+ if (service_config != NULL) {
+ chand->method_limit_table =
+ grpc_service_config_create_method_config_table(
+ exec_ctx, service_config, message_size_limits_create_from_json,
+ message_size_limits_free);
+ grpc_service_config_destroy(service_config);
+ }
+ }
+ return GRPC_ERROR_NONE;
+}
+
+// Destructor for channel_data.
+static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem) {
+ channel_data* chand = elem->channel_data;
+ grpc_slice_hash_table_unref(exec_ctx, chand->method_limit_table);
+}
+
+const grpc_channel_filter grpc_message_size_filter = {
+ start_transport_stream_op_batch,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ init_call_elem,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ grpc_channel_next_get_info,
+ "message_size"};
+
+static bool maybe_add_message_size_filter(grpc_exec_ctx* exec_ctx,
+ grpc_channel_stack_builder* builder,
+ void* arg) {
+ const grpc_channel_args* channel_args =
+ grpc_channel_stack_builder_get_channel_arguments(builder);
+ bool enable = false;
+ message_size_limits lim = get_message_size_limits(channel_args);
+ if (lim.max_send_size != -1 || lim.max_recv_size != -1) {
+ enable = true;
+ }
+ const grpc_arg* a =
+ grpc_channel_args_find(channel_args, GRPC_ARG_SERVICE_CONFIG);
+ if (a != NULL) {
+ enable = true;
+ }
+ if (enable) {
+ return grpc_channel_stack_builder_prepend_filter(
+ builder, &grpc_message_size_filter, NULL, NULL);
+ } else {
+ return true;
+ }
+}
+
+void grpc_message_size_filter_init(void) {
+ grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_message_size_filter, NULL);
+ grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_message_size_filter, NULL);
+ grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
+ GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ maybe_add_message_size_filter, NULL);
+}
+
+void grpc_message_size_filter_shutdown(void) {}
diff --git a/src/core/ext/filters/message_size/message_size_filter.h b/src/core/ext/filters/message_size/message_size_filter.h
new file mode 100644
index 0000000000..83980e738c
--- /dev/null
+++ b/src/core/ext/filters/message_size/message_size_filter.h
@@ -0,0 +1,39 @@
+//
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+#ifndef GRPC_CORE_EXT_FILTERS_MESSAGE_SIZE_MESSAGE_SIZE_FILTER_H
+#define GRPC_CORE_EXT_FILTERS_MESSAGE_SIZE_MESSAGE_SIZE_FILTER_H
+
+#include "src/core/lib/channel/channel_stack.h"
+
+extern const grpc_channel_filter grpc_message_size_filter;
+
+#endif /* GRPC_CORE_EXT_FILTERS_MESSAGE_SIZE_MESSAGE_SIZE_FILTER_H */
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c
index 6433968ca5..b9c62c376a 100644
--- a/src/core/ext/transport/chttp2/server/chttp2_server.c
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.c
@@ -43,11 +43,11 @@
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
+#include "src/core/ext/filters/http/server/http_server_filter.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/channel/handshaker_registry.h"
-#include "src/core/lib/channel/http_server_filter.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/tcp_server.h"
@@ -80,7 +80,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_lock(&connection_state->server_state->mu);
if (error != GRPC_ERROR_NONE || connection_state->server_state->shutdown) {
const char *error_str = grpc_error_string(error);
- gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str);
+ gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str);
if (error == GRPC_ERROR_NONE && args->endpoint != NULL) {
// We were shut down after handshaking completed successfully, so
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index a7d047d6e7..5d74532ede 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -44,6 +44,7 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
+#include "src/core/ext/transport/chttp2/transport/frame_data.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/lib/channel/channel_args.h"
@@ -129,6 +130,11 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream,
grpc_error *error_ignored);
+static void incoming_byte_stream_publish_error(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
+ grpc_error *error);
+static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_incoming_byte_stream *bs);
static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
@@ -174,6 +180,9 @@ static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
+static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
+
/*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@@ -550,6 +559,10 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
exec_ctx, &t->keepalive_ping_timer,
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time),
&t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC));
+ } else {
+ /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
+ inflight keeaplive timers */
+ t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
}
grpc_chttp2_initiate_write(exec_ctx, t, false, "init");
@@ -598,21 +611,18 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport");
grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error));
- if (t->is_client) {
- switch (t->keepalive_state) {
- case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING: {
- grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
- break;
- }
- case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING: {
- grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
- grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer);
- break;
- }
- case GRPC_CHTTP2_KEEPALIVE_STATE_DYING: {
- break;
- }
- }
+ switch (t->keepalive_state) {
+ case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
+ grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
+ break;
+ case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
+ grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
+ grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer);
+ break;
+ case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
+ case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
+ /* keepalive timers are not set in these two states */
+ break;
}
/* flush writable stream list to avoid dangling references */
@@ -655,7 +665,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
/* We reserve one 'active stream' that's dropped when the stream is
read-closed. The others are for incoming_byte_streams that are actively
reading */
- gpr_ref_init(&s->active_streams, 1);
GRPC_CHTTP2_STREAM_REF(s, "chttp2");
grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena);
@@ -665,6 +674,11 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s,
grpc_schedule_on_exec_ctx);
+ grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer);
+ grpc_slice_buffer_init(&s->frame_storage);
+ s->pending_byte_stream = false;
+ grpc_closure_init(&s->reset_byte_stream, reset_byte_stream, s,
+ grpc_combiner_scheduler(t->combiner, false));
GRPC_CHTTP2_REF_TRANSPORT(t, "stream");
@@ -682,7 +696,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_error *error) {
- grpc_byte_stream *bs;
grpc_chttp2_stream *s = sp;
grpc_chttp2_transport *t = s->t;
@@ -693,9 +706,9 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == NULL);
}
- while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames))) {
- incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
- }
+ grpc_slice_buffer_destroy_internal(exec_ctx,
+ &s->unprocessed_incoming_frames_buffer);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage);
grpc_chttp2_list_remove_stalled_by_transport(t, s);
grpc_chttp2_list_remove_stalled_by_stream(t, s);
@@ -722,6 +735,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_slice_buffer_destroy_internal(exec_ctx, &s->flow_controlled_buffer);
GRPC_ERROR_UNREF(s->read_closed_error);
GRPC_ERROR_UNREF(s->write_closed_error);
+ GRPC_ERROR_UNREF(s->byte_stream_error);
if (s->incoming_window_delta > 0) {
GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA(
@@ -1175,8 +1189,9 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
s->fetching_send_message = NULL;
return; /* early out */
} else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message,
- &s->fetching_slice, UINT32_MAX,
- &s->complete_fetch_locked)) {
+ UINT32_MAX, &s->complete_fetch_locked)) {
+ grpc_byte_stream_pull(exec_ctx, s->fetching_send_message,
+ &s->fetching_slice);
add_fetched_slice_locked(exec_ctx, t, s);
}
}
@@ -1187,9 +1202,15 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
grpc_chttp2_stream *s = gs;
grpc_chttp2_transport *t = s->t;
if (error == GRPC_ERROR_NONE) {
- add_fetched_slice_locked(exec_ctx, t, s);
- continue_fetching_send_locked(exec_ctx, t, s);
- } else {
+ error = grpc_byte_stream_pull(exec_ctx, s->fetching_send_message,
+ &s->fetching_slice);
+ if (error == GRPC_ERROR_NONE) {
+ add_fetched_slice_locked(exec_ctx, t, s);
+ continue_fetching_send_locked(exec_ctx, t, s);
+ }
+ }
+
+ if (error != GRPC_ERROR_NONE) {
/* TODO(ctiller): what to do here */
abort();
}
@@ -1421,12 +1442,20 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
if (op->recv_message) {
+ size_t already_received;
GPR_ASSERT(s->recv_message_ready == NULL);
+ GPR_ASSERT(!s->pending_byte_stream);
s->recv_message_ready = op_payload->recv_message.recv_message_ready;
s->recv_message = op_payload->recv_message.recv_message;
- if (s->id != 0 &&
- (s->incoming_frames.head == NULL || s->incoming_frames.head->is_tail)) {
- incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0);
+ if (s->id != 0) {
+ if (s->pending_byte_stream) {
+ already_received = s->frame_storage.length;
+ } else {
+ already_received = s->frame_storage.length +
+ s->unprocessed_incoming_frames_buffer.length;
+ }
+ incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5,
+ already_received);
}
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
@@ -1614,13 +1643,13 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
- grpc_byte_stream *bs;
if (s->recv_initial_metadata_ready != NULL &&
s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
if (s->seen_error) {
- while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
- NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
+ if (!s->pending_byte_stream) {
+ grpc_slice_buffer_reset_and_unref_internal(
+ exec_ctx, &s->unprocessed_incoming_frames_buffer);
}
}
grpc_chttp2_incoming_metadata_buffer_publish(
@@ -1633,39 +1662,65 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
- grpc_byte_stream *bs;
+ grpc_error *error = GRPC_ERROR_NONE;
if (s->recv_message_ready != NULL) {
- while (s->final_metadata_requested && s->seen_error &&
- (bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
- NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
+ *s->recv_message = NULL;
+ if (s->final_metadata_requested && s->seen_error) {
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
+ if (!s->pending_byte_stream) {
+ grpc_slice_buffer_reset_and_unref_internal(
+ exec_ctx, &s->unprocessed_incoming_frames_buffer);
+ }
+ }
+ if (!s->pending_byte_stream) {
+ while (s->unprocessed_incoming_frames_buffer.length > 0 ||
+ s->frame_storage.length > 0) {
+ if (s->unprocessed_incoming_frames_buffer.length == 0) {
+ grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
+ &s->frame_storage);
+ }
+ error = grpc_deframe_unprocessed_incoming_frames(
+ exec_ctx, &s->data_parser, s,
+ &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message);
+ if (error != GRPC_ERROR_NONE) {
+ s->seen_error = true;
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+ &s->frame_storage);
+ grpc_slice_buffer_reset_and_unref_internal(
+ exec_ctx, &s->unprocessed_incoming_frames_buffer);
+ break;
+ } else if (*s->recv_message != NULL) {
+ break;
+ }
+ }
}
- if (s->incoming_frames.head != NULL) {
- *s->recv_message =
- grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames);
- GPR_ASSERT(*s->recv_message != NULL);
+ if (error == GRPC_ERROR_NONE && *s->recv_message != NULL) {
null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
} else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
*s->recv_message = NULL;
null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
}
+ GRPC_ERROR_UNREF(error);
}
}
void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
- grpc_byte_stream *bs;
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
if (s->recv_trailing_metadata_finished != NULL && s->read_closed &&
s->write_closed) {
if (s->seen_error) {
- while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
- NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
+ if (!s->pending_byte_stream) {
+ grpc_slice_buffer_reset_and_unref_internal(
+ exec_ctx, &s->unprocessed_incoming_frames_buffer);
}
}
- if (s->all_incoming_byte_streams_finished &&
+ bool pending_data = s->pending_byte_stream ||
+ s->unprocessed_incoming_frames_buffer.length > 0;
+ if (s->read_closed && s->frame_storage.length == 0 &&
+ (!pending_data || s->seen_error) &&
s->recv_trailing_metadata_finished != NULL) {
grpc_chttp2_incoming_metadata_buffer_publish(
exec_ctx, &s->metadata_buffer[1], s->recv_trailing_metadata);
@@ -1676,14 +1731,6 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
}
}
-static void decrement_active_streams_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s) {
- if ((s->all_incoming_byte_streams_finished = gpr_unref(&s->active_streams))) {
- grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
- }
-}
-
static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
uint32_t id, grpc_error *error) {
grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->stream_map, id);
@@ -1692,10 +1739,19 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->incoming_stream = NULL;
grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
}
- if (s->data_parser.parsing_frame != NULL) {
- grpc_chttp2_incoming_byte_stream_finished(
- exec_ctx, s->data_parser.parsing_frame, GRPC_ERROR_REF(error));
- s->data_parser.parsing_frame = NULL;
+ if (s->pending_byte_stream) {
+ if (s->on_next != NULL) {
+ grpc_chttp2_incoming_byte_stream *bs = s->data_parser.parsing_frame;
+ if (error == GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
+ }
+ incoming_byte_stream_publish_error(exec_ctx, bs, error);
+ incoming_byte_stream_unref(exec_ctx, bs);
+ s->data_parser.parsing_frame = NULL;
+ } else {
+ GRPC_ERROR_UNREF(s->byte_stream_error);
+ s->byte_stream_error = GRPC_ERROR_REF(error);
+ }
}
if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
@@ -1881,7 +1937,6 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE;
}
}
- decrement_active_streams_locked(exec_ctx, t, s);
grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
@@ -2095,6 +2150,7 @@ static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
(int)bdp);
}
push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, bdp);
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, bdp);
}
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
@@ -2312,7 +2368,9 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = arg;
GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
- if (error == GRPC_ERROR_NONE && !(t->destroying || t->closed)) {
+ if (t->destroying || t->closed) {
+ t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
+ } else if (error == GRPC_ERROR_NONE) {
if (t->keepalive_permit_without_calls ||
grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
@@ -2327,7 +2385,7 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time),
&t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC));
}
- } else if (error == GRPC_ERROR_CANCELLED && !(t->destroying || t->closed)) {
+ } else if (error == GRPC_ERROR_CANCELLED) {
/* The keepalive ping timer may be cancelled by bdp */
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
grpc_timer_init(
@@ -2419,12 +2477,28 @@ static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
* BYTE STREAM
*/
+static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_chttp2_stream *s = (grpc_chttp2_stream *)arg;
+
+ s->pending_byte_stream = false;
+ if (error == GRPC_ERROR_NONE) {
+ grpc_chttp2_maybe_complete_recv_message(exec_ctx, s->t, s);
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, s->t, s);
+ } else {
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
+ s->on_next = NULL;
+ GRPC_ERROR_UNREF(s->byte_stream_error);
+ s->byte_stream_error = GRPC_ERROR_NONE;
+ grpc_chttp2_cancel_stream(exec_ctx, s->t, s, GRPC_ERROR_REF(error));
+ s->byte_stream_error = error;
+ }
+}
+
static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs) {
if (gpr_unref(&bs->refs)) {
- GRPC_ERROR_UNREF(bs->error);
- grpc_slice_buffer_destroy_internal(exec_ctx, &bs->slices);
- gpr_mu_destroy(&bs->slice_mu);
gpr_free(bs);
}
}
@@ -2484,47 +2558,91 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t = bs->transport;
grpc_chttp2_stream *s = bs->stream;
- if (bs->is_tail) {
- gpr_mu_lock(&bs->slice_mu);
- size_t cur_length = bs->slices.length;
- gpr_mu_unlock(&bs->slice_mu);
- incoming_byte_stream_update_flow_control(
- exec_ctx, t, s, bs->next_action.max_size_hint, cur_length);
- }
- gpr_mu_lock(&bs->slice_mu);
- if (bs->slices.count > 0) {
- *bs->next_action.slice = grpc_slice_buffer_take_first(&bs->slices);
- grpc_closure_run(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
- } else if (bs->error != GRPC_ERROR_NONE) {
- grpc_closure_run(exec_ctx, bs->next_action.on_complete,
- GRPC_ERROR_REF(bs->error));
+ size_t cur_length = s->frame_storage.length;
+ incoming_byte_stream_update_flow_control(
+ exec_ctx, t, s, bs->next_action.max_size_hint, cur_length);
+
+ GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
+ if (s->frame_storage.length > 0) {
+ grpc_slice_buffer_swap(&s->frame_storage,
+ &s->unprocessed_incoming_frames_buffer);
+ grpc_closure_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
+ } else if (s->byte_stream_error != GRPC_ERROR_NONE) {
+ grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
+ GRPC_ERROR_REF(s->byte_stream_error));
+ if (s->data_parser.parsing_frame != NULL) {
+ incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
+ s->data_parser.parsing_frame = NULL;
+ }
+ } else if (s->read_closed) {
+ if (bs->remaining_bytes != 0) {
+ s->byte_stream_error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
+ grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
+ GRPC_ERROR_REF(s->byte_stream_error));
+ if (s->data_parser.parsing_frame != NULL) {
+ incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
+ s->data_parser.parsing_frame = NULL;
+ }
+ } else {
+ /* Should never reach here. */
+ GPR_ASSERT(false);
+ }
} else {
- bs->on_next = bs->next_action.on_complete;
- bs->next = bs->next_action.slice;
+ s->on_next = bs->next_action.on_complete;
}
- gpr_mu_unlock(&bs->slice_mu);
incoming_byte_stream_unref(exec_ctx, bs);
}
-static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream,
- grpc_slice *slice, size_t max_size_hint,
- grpc_closure *on_complete) {
+static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ size_t max_size_hint,
+ grpc_closure *on_complete) {
GPR_TIMER_BEGIN("incoming_byte_stream_next", 0);
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
- gpr_ref(&bs->refs);
- bs->next_action.slice = slice;
- bs->next_action.max_size_hint = max_size_hint;
- bs->next_action.on_complete = on_complete;
- grpc_closure_sched(
- exec_ctx,
- grpc_closure_init(
- &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
- grpc_combiner_scheduler(bs->transport->combiner, false)),
- GRPC_ERROR_NONE);
- GPR_TIMER_END("incoming_byte_stream_next", 0);
- return 0;
+ grpc_chttp2_stream *s = bs->stream;
+ if (s->unprocessed_incoming_frames_buffer.length > 0) {
+ GPR_TIMER_END("incoming_byte_stream_next", 0);
+ return true;
+ } else {
+ gpr_ref(&bs->refs);
+ bs->next_action.max_size_hint = max_size_hint;
+ bs->next_action.on_complete = on_complete;
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_init(
+ &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
+ grpc_combiner_scheduler(bs->transport->combiner, false)),
+ GRPC_ERROR_NONE);
+ GPR_TIMER_END("incoming_byte_stream_next", 0);
+ return false;
+ }
+}
+
+static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ grpc_slice *slice) {
+ GPR_TIMER_BEGIN("incoming_byte_stream_pull", 0);
+ grpc_chttp2_incoming_byte_stream *bs =
+ (grpc_chttp2_incoming_byte_stream *)byte_stream;
+ grpc_chttp2_stream *s = bs->stream;
+
+ if (s->unprocessed_incoming_frames_buffer.length > 0) {
+ grpc_error *error = grpc_deframe_unprocessed_incoming_frames(
+ exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
+ slice, NULL);
+ if (error != GRPC_ERROR_NONE) {
+ return error;
+ }
+ } else {
+ grpc_error *error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
+ grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
+ return error;
+ }
+ GPR_TIMER_END("incoming_byte_stream_pull", 0);
+ return GRPC_ERROR_NONE;
}
static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
@@ -2534,9 +2652,14 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream,
grpc_error *error_ignored) {
grpc_chttp2_incoming_byte_stream *bs = byte_stream;
+ grpc_chttp2_stream *s = bs->stream;
+ grpc_chttp2_transport *t = s->t;
+
GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy);
- decrement_active_streams_locked(exec_ctx, bs->transport, bs->stream);
incoming_byte_stream_unref(exec_ctx, bs);
+ s->pending_byte_stream = false;
+ grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
@@ -2556,50 +2679,53 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_publish_error(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error) {
+ grpc_chttp2_stream *s = bs->stream;
+
GPR_ASSERT(error != GRPC_ERROR_NONE);
- grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error));
- bs->on_next = NULL;
- GRPC_ERROR_UNREF(bs->error);
+ grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
+ s->on_next = NULL;
+ GRPC_ERROR_UNREF(s->byte_stream_error);
+ s->byte_stream_error = GRPC_ERROR_REF(error);
grpc_chttp2_cancel_stream(exec_ctx, bs->transport, bs->stream,
GRPC_ERROR_REF(error));
- bs->error = error;
}
-void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_incoming_byte_stream *bs,
- grpc_slice slice) {
- gpr_mu_lock(&bs->slice_mu);
+grpc_error *grpc_chttp2_incoming_byte_stream_push(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
+ grpc_slice slice, grpc_slice *slice_out) {
+ grpc_chttp2_stream *s = bs->stream;
+
if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) {
- incoming_byte_stream_publish_error(
- exec_ctx, bs,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream"));
+ grpc_error *error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
+
+ grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return error;
} else {
bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice);
- if (bs->on_next != NULL) {
- *bs->next = slice;
- grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE);
- bs->on_next = NULL;
- } else {
- grpc_slice_buffer_add(&bs->slices, slice);
+ if (slice_out != NULL) {
+ *slice_out = slice;
}
+ return GRPC_ERROR_NONE;
}
- gpr_mu_unlock(&bs->slice_mu);
}
-void grpc_chttp2_incoming_byte_stream_finished(
+grpc_error *grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
- grpc_error *error) {
+ grpc_error *error, bool reset_on_error) {
+ grpc_chttp2_stream *s = bs->stream;
+
if (error == GRPC_ERROR_NONE) {
- gpr_mu_lock(&bs->slice_mu);
if (bs->remaining_bytes != 0) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
}
- gpr_mu_unlock(&bs->slice_mu);
}
- if (error != GRPC_ERROR_NONE) {
- incoming_byte_stream_publish_error(exec_ctx, bs, error);
+ if (error != GRPC_ERROR_NONE && reset_on_error) {
+ grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
}
incoming_byte_stream_unref(exec_ctx, bs);
+ return error;
}
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
@@ -2611,26 +2737,12 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
incoming_byte_stream->remaining_bytes = frame_size;
incoming_byte_stream->base.flags = flags;
incoming_byte_stream->base.next = incoming_byte_stream_next;
+ incoming_byte_stream->base.pull = incoming_byte_stream_pull;
incoming_byte_stream->base.destroy = incoming_byte_stream_destroy;
- gpr_mu_init(&incoming_byte_stream->slice_mu);
gpr_ref_init(&incoming_byte_stream->refs, 2);
- incoming_byte_stream->next_message = NULL;
incoming_byte_stream->transport = t;
incoming_byte_stream->stream = s;
- gpr_ref(&incoming_byte_stream->stream->active_streams);
- grpc_slice_buffer_init(&incoming_byte_stream->slices);
- incoming_byte_stream->on_next = NULL;
- incoming_byte_stream->is_tail = 1;
- incoming_byte_stream->error = GRPC_ERROR_NONE;
- grpc_chttp2_incoming_frame_queue *q = &s->incoming_frames;
- if (q->head == NULL) {
- q->head = incoming_byte_stream;
- } else {
- q->tail->is_tail = 0;
- q->tail->next_message = incoming_byte_stream;
- }
- q->tail = incoming_byte_stream;
- grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
+ s->byte_stream_error = GRPC_ERROR_NONE;
return incoming_byte_stream;
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c
index 6e9258ee7e..9aba3ea445 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.c
+++ b/src/core/ext/transport/chttp2/transport/frame_data.c
@@ -40,6 +40,7 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/internal.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/transport.h"
@@ -53,16 +54,17 @@ grpc_error *grpc_chttp2_data_parser_init(grpc_chttp2_data_parser *parser) {
void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *parser) {
if (parser->parsing_frame != NULL) {
- grpc_chttp2_incoming_byte_stream_finished(
+ GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
exec_ctx, parser->parsing_frame,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"));
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false));
}
GRPC_ERROR_UNREF(parser->error);
}
grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
uint8_t flags,
- uint32_t stream_id) {
+ uint32_t stream_id,
+ grpc_chttp2_stream *s) {
if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
char *msg;
gpr_asprintf(&msg, "unsupported data flags: 0x%02x", flags);
@@ -74,47 +76,14 @@ grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
}
if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
- parser->is_last_frame = 1;
+ s->received_last_frame = true;
} else {
- parser->is_last_frame = 0;
+ s->received_last_frame = false;
}
return GRPC_ERROR_NONE;
}
-void grpc_chttp2_incoming_frame_queue_merge(
- grpc_chttp2_incoming_frame_queue *head_dst,
- grpc_chttp2_incoming_frame_queue *tail_src) {
- if (tail_src->head == NULL) {
- return;
- }
-
- if (head_dst->head == NULL) {
- *head_dst = *tail_src;
- memset(tail_src, 0, sizeof(*tail_src));
- return;
- }
-
- head_dst->tail->next_message = tail_src->head;
- head_dst->tail = tail_src->tail;
- memset(tail_src, 0, sizeof(*tail_src));
-}
-
-grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
- grpc_chttp2_incoming_frame_queue *q) {
- grpc_byte_stream *out;
- if (q->head == NULL) {
- return NULL;
- }
- out = &q->head->base;
- if (q->head == q->tail) {
- memset(q, 0, sizeof(*q));
- } else {
- q->head = q->head->next_message;
- }
- return out;
-}
-
void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
uint32_t write_bytes, int is_eof,
grpc_transport_one_way_stats *stats,
@@ -143,145 +112,215 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
stats->data_bytes += write_bytes;
}
-static grpc_error *parse_inner(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_data_parser *p,
- grpc_chttp2_transport *t, grpc_chttp2_stream *s,
- grpc_slice slice) {
- uint8_t *const beg = GRPC_SLICE_START_PTR(slice);
- uint8_t *const end = GRPC_SLICE_END_PTR(slice);
- uint8_t *cur = beg;
- uint32_t message_flags;
- grpc_chttp2_incoming_byte_stream *incoming_byte_stream;
- char *msg;
+grpc_error *grpc_deframe_unprocessed_incoming_frames(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_data_parser *p, grpc_chttp2_stream *s,
+ grpc_slice_buffer *slices, grpc_slice *slice_out,
+ grpc_byte_stream **stream_out) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ grpc_chttp2_transport *t = s->t;
- if (cur == end) {
- return GRPC_ERROR_NONE;
- }
+ while (slices->count > 0) {
+ uint8_t *beg = NULL;
+ uint8_t *end = NULL;
+ uint8_t *cur = NULL;
- switch (p->state) {
- case GRPC_CHTTP2_DATA_ERROR:
- p->state = GRPC_CHTTP2_DATA_ERROR;
- return GRPC_ERROR_REF(p->error);
- fh_0:
- case GRPC_CHTTP2_DATA_FH_0:
- s->stats.incoming.framing_bytes++;
- p->frame_type = *cur;
- switch (p->frame_type) {
- case 0:
- p->is_frame_compressed = 0; /* GPR_FALSE */
- break;
- case 1:
- p->is_frame_compressed = 1; /* GPR_TRUE */
- break;
- default:
- gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
- p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
- p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
- (intptr_t)s->id);
- gpr_free(msg);
- msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
- p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
- grpc_slice_from_copied_string(msg));
- gpr_free(msg);
- p->error =
- grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
- p->state = GRPC_CHTTP2_DATA_ERROR;
- return GRPC_ERROR_REF(p->error);
- }
- if (++cur == end) {
- p->state = GRPC_CHTTP2_DATA_FH_1;
- return GRPC_ERROR_NONE;
- }
- /* fallthrough */
- case GRPC_CHTTP2_DATA_FH_1:
- s->stats.incoming.framing_bytes++;
- p->frame_size = ((uint32_t)*cur) << 24;
- if (++cur == end) {
- p->state = GRPC_CHTTP2_DATA_FH_2;
- return GRPC_ERROR_NONE;
- }
- /* fallthrough */
- case GRPC_CHTTP2_DATA_FH_2:
- s->stats.incoming.framing_bytes++;
- p->frame_size |= ((uint32_t)*cur) << 16;
- if (++cur == end) {
- p->state = GRPC_CHTTP2_DATA_FH_3;
- return GRPC_ERROR_NONE;
- }
- /* fallthrough */
- case GRPC_CHTTP2_DATA_FH_3:
- s->stats.incoming.framing_bytes++;
- p->frame_size |= ((uint32_t)*cur) << 8;
- if (++cur == end) {
- p->state = GRPC_CHTTP2_DATA_FH_4;
- return GRPC_ERROR_NONE;
- }
- /* fallthrough */
- case GRPC_CHTTP2_DATA_FH_4:
- s->stats.incoming.framing_bytes++;
- p->frame_size |= ((uint32_t)*cur);
- p->state = GRPC_CHTTP2_DATA_FRAME;
- ++cur;
- message_flags = 0;
- if (p->is_frame_compressed) {
- message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
- }
- p->parsing_frame = incoming_byte_stream =
- grpc_chttp2_incoming_byte_stream_create(exec_ctx, t, s, p->frame_size,
- message_flags);
- /* fallthrough */
- case GRPC_CHTTP2_DATA_FRAME:
- if (cur == end) {
- return GRPC_ERROR_NONE;
- }
- uint32_t remaining = (uint32_t)(end - cur);
- if (remaining == p->frame_size) {
- s->stats.incoming.data_bytes += p->frame_size;
- grpc_chttp2_incoming_byte_stream_push(
- exec_ctx, p->parsing_frame,
- grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
- grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
- GRPC_ERROR_NONE);
- p->parsing_frame = NULL;
- p->state = GRPC_CHTTP2_DATA_FH_0;
- return GRPC_ERROR_NONE;
- } else if (remaining > p->frame_size) {
- s->stats.incoming.data_bytes += p->frame_size;
- grpc_chttp2_incoming_byte_stream_push(
- exec_ctx, p->parsing_frame,
- grpc_slice_sub(slice, (size_t)(cur - beg),
- (size_t)(cur + p->frame_size - beg)));
- grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
- GRPC_ERROR_NONE);
- p->parsing_frame = NULL;
- cur += p->frame_size;
- goto fh_0; /* loop */
- } else {
- GPR_ASSERT(remaining <= p->frame_size);
- grpc_chttp2_incoming_byte_stream_push(
- exec_ctx, p->parsing_frame,
- grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
- p->frame_size -= remaining;
- s->stats.incoming.data_bytes += remaining;
+ grpc_slice slice = grpc_slice_buffer_take_first(slices);
+
+ beg = GRPC_SLICE_START_PTR(slice);
+ end = GRPC_SLICE_END_PTR(slice);
+ cur = beg;
+ uint32_t message_flags;
+ char *msg;
+
+ if (cur == end) {
+ grpc_slice_unref_internal(exec_ctx, slice);
+ continue;
+ }
+
+ switch (p->state) {
+ case GRPC_CHTTP2_DATA_ERROR:
+ p->state = GRPC_CHTTP2_DATA_ERROR;
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return GRPC_ERROR_REF(p->error);
+ case GRPC_CHTTP2_DATA_FH_0:
+ p->frame_type = *cur;
+ switch (p->frame_type) {
+ case 0:
+ p->is_frame_compressed = false; /* GPR_FALSE */
+ break;
+ case 1:
+ p->is_frame_compressed = true; /* GPR_TRUE */
+ break;
+ default:
+ gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
+ p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
+ p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
+ (intptr_t)s->id);
+ gpr_free(msg);
+ msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
+ grpc_slice_from_copied_string(msg));
+ gpr_free(msg);
+ p->error =
+ grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
+ p->state = GRPC_CHTTP2_DATA_ERROR;
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return GRPC_ERROR_REF(p->error);
+ }
+ if (++cur == end) {
+ p->state = GRPC_CHTTP2_DATA_FH_1;
+ grpc_slice_unref_internal(exec_ctx, slice);
+ continue;
+ }
+ /* fallthrough */
+ case GRPC_CHTTP2_DATA_FH_1:
+ p->frame_size = ((uint32_t)*cur) << 24;
+ if (++cur == end) {
+ p->state = GRPC_CHTTP2_DATA_FH_2;
+ grpc_slice_unref_internal(exec_ctx, slice);
+ continue;
+ }
+ /* fallthrough */
+ case GRPC_CHTTP2_DATA_FH_2:
+ p->frame_size |= ((uint32_t)*cur) << 16;
+ if (++cur == end) {
+ p->state = GRPC_CHTTP2_DATA_FH_3;
+ grpc_slice_unref_internal(exec_ctx, slice);
+ continue;
+ }
+ /* fallthrough */
+ case GRPC_CHTTP2_DATA_FH_3:
+ p->frame_size |= ((uint32_t)*cur) << 8;
+ if (++cur == end) {
+ p->state = GRPC_CHTTP2_DATA_FH_4;
+ grpc_slice_unref_internal(exec_ctx, slice);
+ continue;
+ }
+ /* fallthrough */
+ case GRPC_CHTTP2_DATA_FH_4:
+ GPR_ASSERT(stream_out != NULL);
+ GPR_ASSERT(p->parsing_frame == NULL);
+ p->frame_size |= ((uint32_t)*cur);
+ p->state = GRPC_CHTTP2_DATA_FRAME;
+ ++cur;
+ message_flags = 0;
+ if (p->is_frame_compressed) {
+ message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+ }
+ p->parsing_frame = grpc_chttp2_incoming_byte_stream_create(
+ exec_ctx, t, s, p->frame_size, message_flags);
+ *stream_out = &p->parsing_frame->base;
+ if (p->parsing_frame->remaining_bytes == 0) {
+ GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
+ exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true));
+ p->parsing_frame = NULL;
+ p->state = GRPC_CHTTP2_DATA_FH_0;
+ }
+ s->pending_byte_stream = true;
+
+ if (cur != end) {
+ grpc_slice_buffer_undo_take_first(
+ &s->unprocessed_incoming_frames_buffer,
+ grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
+ }
+ grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_NONE;
+ case GRPC_CHTTP2_DATA_FRAME: {
+ GPR_ASSERT(p->parsing_frame != NULL);
+ GPR_ASSERT(slice_out != NULL);
+ if (cur == end) {
+ grpc_slice_unref_internal(exec_ctx, slice);
+ continue;
+ }
+ uint32_t remaining = (uint32_t)(end - cur);
+ if (remaining == p->frame_size) {
+ if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push(
+ exec_ctx, p->parsing_frame,
+ grpc_slice_sub(slice, (size_t)(cur - beg),
+ (size_t)(end - beg)),
+ slice_out))) {
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return error;
+ }
+ if (GRPC_ERROR_NONE !=
+ (error = grpc_chttp2_incoming_byte_stream_finished(
+ exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true))) {
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return error;
+ }
+ p->parsing_frame = NULL;
+ p->state = GRPC_CHTTP2_DATA_FH_0;
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return GRPC_ERROR_NONE;
+ } else if (remaining < p->frame_size) {
+ if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push(
+ exec_ctx, p->parsing_frame,
+ grpc_slice_sub(slice, (size_t)(cur - beg),
+ (size_t)(end - beg)),
+ slice_out))) {
+ return error;
+ }
+ p->frame_size -= remaining;
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return GRPC_ERROR_NONE;
+ } else {
+ GPR_ASSERT(remaining > p->frame_size);
+ if (GRPC_ERROR_NONE !=
+ (grpc_chttp2_incoming_byte_stream_push(
+ exec_ctx, p->parsing_frame,
+ grpc_slice_sub(slice, (size_t)(cur - beg),
+ (size_t)(cur + p->frame_size - beg)),
+ slice_out))) {
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return error;
+ }
+ if (GRPC_ERROR_NONE !=
+ (error = grpc_chttp2_incoming_byte_stream_finished(
+ exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true))) {
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return error;
+ }
+ p->parsing_frame = NULL;
+ p->state = GRPC_CHTTP2_DATA_FH_0;
+ cur += p->frame_size;
+ grpc_slice_buffer_undo_take_first(
+ &s->unprocessed_incoming_frames_buffer,
+ grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
+ grpc_slice_unref_internal(exec_ctx, slice);
+ return GRPC_ERROR_NONE;
+ }
}
+ }
}
- GPR_UNREACHABLE_CODE(
- return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Should never reach here"));
+ return GRPC_ERROR_NONE;
}
grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
grpc_slice slice, int is_last) {
- grpc_chttp2_data_parser *p = parser;
- grpc_error *error = parse_inner(exec_ctx, p, t, s, slice);
+ /* grpc_error *error = parse_inner_buffer(exec_ctx, p, t, s, slice); */
+ s->stats.incoming.framing_bytes += GRPC_SLICE_LENGTH(slice);
+ if (!s->pending_byte_stream) {
+ grpc_slice_ref_internal(slice);
+ grpc_slice_buffer_add(&s->frame_storage, slice);
+ grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
+ } else if (s->on_next) {
+ GPR_ASSERT(s->frame_storage.length == 0);
+ grpc_slice_ref_internal(slice);
+ grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
+ grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_NONE);
+ s->on_next = NULL;
+ } else {
+ grpc_slice_ref_internal(slice);
+ grpc_slice_buffer_add(&s->frame_storage, slice);
+ }
- if (is_last && p->is_last_frame) {
+ if (is_last && s->received_last_frame) {
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false,
GRPC_ERROR_NONE);
}
- return error;
+ return GRPC_ERROR_NONE;
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.h b/src/core/ext/transport/chttp2/transport/frame_data.h
index 264ad14608..9ed4ad0f21 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.h
+++ b/src/core/ext/transport/chttp2/transport/frame_data.h
@@ -56,28 +56,16 @@ typedef enum {
typedef struct grpc_chttp2_incoming_byte_stream
grpc_chttp2_incoming_byte_stream;
-typedef struct grpc_chttp2_incoming_frame_queue {
- grpc_chttp2_incoming_byte_stream *head;
- grpc_chttp2_incoming_byte_stream *tail;
-} grpc_chttp2_incoming_frame_queue;
-
typedef struct {
grpc_chttp2_stream_state state;
- uint8_t is_last_frame;
uint8_t frame_type;
uint32_t frame_size;
grpc_error *error;
- int is_frame_compressed;
+ bool is_frame_compressed;
grpc_chttp2_incoming_byte_stream *parsing_frame;
} grpc_chttp2_data_parser;
-void grpc_chttp2_incoming_frame_queue_merge(
- grpc_chttp2_incoming_frame_queue *head_dst,
- grpc_chttp2_incoming_frame_queue *tail_src);
-grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
- grpc_chttp2_incoming_frame_queue *q);
-
/* initialize per-stream state for data frame parsing */
grpc_error *grpc_chttp2_data_parser_init(grpc_chttp2_data_parser *parser);
@@ -87,7 +75,8 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
/* start processing a new data frame */
grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
uint8_t flags,
- uint32_t stream_id);
+ uint32_t stream_id,
+ grpc_chttp2_stream *s);
/* handle a slice of a data frame - is_last indicates the last slice of a
frame */
@@ -101,4 +90,9 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
grpc_transport_one_way_stats *stats,
grpc_slice_buffer *outbuf);
+grpc_error *grpc_deframe_unprocessed_incoming_frames(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_data_parser *p, grpc_chttp2_stream *s,
+ grpc_slice_buffer *slices, grpc_slice *slice_out,
+ grpc_byte_stream **stream_out);
+
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_DATA_H */
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 6eb848b8d7..0aaa4aebe5 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -195,22 +195,20 @@ typedef struct grpc_chttp2_write_cb {
struct grpc_chttp2_incoming_byte_stream {
grpc_byte_stream base;
gpr_refcount refs;
- struct grpc_chttp2_incoming_byte_stream *next_message;
- grpc_error *error;
- grpc_chttp2_transport *transport;
- grpc_chttp2_stream *stream;
- bool is_tail;
+ grpc_chttp2_transport *transport; /* immutable */
+ grpc_chttp2_stream *stream; /* immutable */
- gpr_mu slice_mu; // protects slices, on_next
- grpc_slice_buffer slices;
- grpc_closure *on_next;
- grpc_slice *next;
+ /* Accessed only by transport thread when stream->pending_byte_stream == false
+ * Accessed only by application thread when stream->pending_byte_stream ==
+ * true */
uint32_t remaining_bytes;
+ /* Accessed only by transport thread when stream->pending_byte_stream == false
+ * Accessed only by application thread when stream->pending_byte_stream ==
+ * true */
struct {
grpc_closure closure;
- grpc_slice *slice;
size_t max_size_hint;
grpc_closure *on_complete;
} next_action;
@@ -222,6 +220,7 @@ typedef enum {
GRPC_CHTTP2_KEEPALIVE_STATE_WAITING,
GRPC_CHTTP2_KEEPALIVE_STATE_PINGING,
GRPC_CHTTP2_KEEPALIVE_STATE_DYING,
+ GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED,
} grpc_chttp2_keepalive_state;
struct grpc_chttp2_transport {
@@ -445,8 +444,8 @@ struct grpc_chttp2_stream {
uint32_t id;
/** window available for us to send to peer, over or under the initial window
- * size of the transport... ie:
- * outgoing_window = outgoing_window_delta + transport.initial_window_size */
+ * size of the transport... ie:
+ * outgoing_window = outgoing_window_delta + transport.initial_window_size */
int64_t outgoing_window_delta;
/** things the upper layers would like to send */
grpc_metadata_batch *send_initial_metadata;
@@ -473,9 +472,6 @@ struct grpc_chttp2_stream {
grpc_transport_stream_stats *collecting_stats;
grpc_transport_stream_stats stats;
- /** number of streams that are currently being read */
- gpr_refcount active_streams;
-
/** Is this stream closed for writing. */
bool write_closed;
/** Is this stream reading half-closed. */
@@ -499,7 +495,17 @@ struct grpc_chttp2_stream {
grpc_chttp2_incoming_metadata_buffer metadata_buffer[2];
- grpc_chttp2_incoming_frame_queue incoming_frames;
+ grpc_slice_buffer frame_storage; /* protected by t combiner */
+
+ /* Accessed only by transport thread when stream->pending_byte_stream == false
+ * Accessed only by application thread when stream->pending_byte_stream ==
+ * true */
+ grpc_slice_buffer unprocessed_incoming_frames_buffer;
+ grpc_closure *on_next; /* protected by t combiner */
+ bool pending_byte_stream; /* protected by t combiner */
+ grpc_closure reset_byte_stream;
+ grpc_error *byte_stream_error; /* protected by t combiner */
+ bool received_last_frame; /* protected by t combiner */
gpr_timespec deadline;
@@ -512,6 +518,9 @@ struct grpc_chttp2_stream {
* incoming_window = incoming_window_delta + transport.initial_window_size */
int64_t incoming_window_delta;
/** parsing state for data frames */
+ /* Accessed only by transport thread when stream->pending_byte_stream == false
+ * Accessed only by application thread when stream->pending_byte_stream ==
+ * true */
grpc_chttp2_data_parser data_parser;
/** number of bytes received - reset at end of parse thread execution */
int64_t received_bytes;
@@ -790,10 +799,13 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport *t);
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
uint32_t frame_size, uint32_t flags);
-void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_incoming_byte_stream *bs,
- grpc_slice slice);
-void grpc_chttp2_incoming_byte_stream_finished(
+grpc_error *grpc_chttp2_incoming_byte_stream_push(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
+ grpc_slice slice, grpc_slice *slice_out);
+grpc_error *grpc_chttp2_incoming_byte_stream_finished(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
+ grpc_error *error, bool reset_on_error);
+void grpc_chttp2_incoming_byte_stream_notify(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error);
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 7e457ced27..638b137316 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -458,12 +458,13 @@ static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx,
return init_skip_frame_parser(exec_ctx, t, 0);
}
if (err == GRPC_ERROR_NONE) {
- err = grpc_chttp2_data_parser_begin_frame(&s->data_parser,
- t->incoming_frame_flags, s->id);
+ err = grpc_chttp2_data_parser_begin_frame(
+ &s->data_parser, t->incoming_frame_flags, s->id, s);
}
error_handler:
if (err == GRPC_ERROR_NONE) {
t->incoming_stream = s;
+ /* t->parser = grpc_chttp2_data_parser_parse;*/
t->parser = grpc_chttp2_data_parser_parse;
t->parser_data = &s->data_parser;
return GRPC_ERROR_NONE;
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 88335ecd0b..7896c70f9e 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -973,9 +973,20 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer write_slice_buffer;
grpc_slice slice;
grpc_slice_buffer_init(&write_slice_buffer);
- grpc_byte_stream_next(
- NULL, stream_op->payload->send_message.send_message, &slice,
- stream_op->payload->send_message.send_message->length, NULL);
+ if (1 != grpc_byte_stream_next(
+ exec_ctx, stream_op->payload->send_message.send_message,
+ stream_op->payload->send_message.send_message->length,
+ NULL)) {
+ /* Should never reach here */
+ GPR_ASSERT(false);
+ }
+ if (GRPC_ERROR_NONE !=
+ grpc_byte_stream_pull(exec_ctx,
+ stream_op->payload->send_message.send_message,
+ &slice)) {
+ /* Should never reach here */
+ GPR_ASSERT(false);
+ }
grpc_slice_buffer_add(&write_slice_buffer, slice);
if (write_slice_buffer.count != 1) {
/* Empty request not handled yet */