aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/census/gen/census.pb.h2
-rw-r--r--src/core/ext/census/gen/trace_context.pb.c45
-rw-r--r--src/core/ext/census/gen/trace_context.pb.h46
-rw-r--r--src/core/ext/census/grpc_filter.c4
-rw-r--r--src/core/ext/census/trace_context.c2
-rw-r--r--src/core/ext/census/trace_context.h3
-rw-r--r--src/core/ext/client_channel/client_channel.c35
-rw-r--r--src/core/ext/client_channel/client_channel.h4
-rw-r--r--src/core/ext/client_channel/connector.c6
-rw-r--r--src/core/ext/client_channel/connector.h7
-rw-r--r--src/core/ext/client_channel/http_connect_handshaker.c10
-rw-r--r--src/core/ext/client_channel/subchannel.c13
-rw-r--r--src/core/ext/client_channel/subchannel.h2
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c66
-rw-r--r--src/core/ext/load_reporting/load_reporting_filter.c84
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c5
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c2
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.c14
-rw-r--r--src/core/ext/transport/chttp2/server/chttp2_server.c16
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.c2
-rw-r--r--src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c2
-rw-r--r--src/core/ext/transport/chttp2/transport/bin_decoder.c14
-rw-r--r--src/core/ext/transport/chttp2/transport/bin_encoder.c3
-rw-r--r--src/core/ext/transport/chttp2/transport/bin_encoder.h3
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_plugin.c3
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c764
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_ping.c22
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_ping.h4
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_rst_stream.c13
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_settings.c23
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_window_update.c11
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_encoder.c152
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_encoder.h4
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.c196
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.h20
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_table.c41
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_table.h12
-rw-r--r--src/core/ext/transport/chttp2/transport/http2_errors.h56
-rw-r--r--src/core/ext/transport/chttp2/transport/incoming_metadata.c34
-rw-r--r--src/core/ext/transport/chttp2/transport/incoming_metadata.h8
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h125
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c99
-rw-r--r--src/core/ext/transport/chttp2/transport/status_conversion.c115
-rw-r--r--src/core/ext/transport/chttp2/transport/status_conversion.h50
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_lists.c43
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c126
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_api_dummy.c2
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c55
48 files changed, 1267 insertions, 1101 deletions
diff --git a/src/core/ext/census/gen/census.pb.h b/src/core/ext/census/gen/census.pb.h
index dae583f33d..c8546eac2e 100644
--- a/src/core/ext/census/gen/census.pb.h
+++ b/src/core/ext/census/gen/census.pb.h
@@ -292,4 +292,4 @@ extern const pb_field_t google_census_Metric_fields[5];
} /* extern "C" */
#endif
-#endif
+#endif /* GRPC_CORE_EXT_CENSUS_GEN_CENSUS_PB_H */
diff --git a/src/core/ext/census/gen/trace_context.pb.c b/src/core/ext/census/gen/trace_context.pb.c
index c8aea324ce..f4126d4d80 100644
--- a/src/core/ext/census/gen/trace_context.pb.c
+++ b/src/core/ext/census/gen/trace_context.pb.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2016, Google Inc.
+ * Copyright 2017, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -31,51 +31,24 @@
*
*/
/* Automatically generated nanopb constant definitions */
-/* Generated by nanopb-0.3.5-dev */
+/* Generated by nanopb-0.3.7-dev at Fri Jan 20 16:14:22 2017. */
#include "src/core/ext/census/gen/trace_context.pb.h"
+/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
#error Regenerate this file with the current version of nanopb generator.
#endif
-const pb_field_t google_trace_TraceId_fields[3] = {
- PB_FIELD( 1, FIXED64 , OPTIONAL, STATIC , FIRST, google_trace_TraceId, hi, hi, 0),
- PB_FIELD( 2, FIXED64 , OPTIONAL, STATIC , OTHER, google_trace_TraceId, lo, hi, 0),
+const pb_field_t google_trace_TraceContext_fields[5] = {
+ PB_FIELD( 1, FIXED64 , OPTIONAL, STATIC , FIRST, google_trace_TraceContext, trace_id_hi, trace_id_hi, 0),
+ PB_FIELD( 2, FIXED64 , OPTIONAL, STATIC , OTHER, google_trace_TraceContext, trace_id_lo, trace_id_hi, 0),
+ PB_FIELD( 3, FIXED64 , OPTIONAL, STATIC , OTHER, google_trace_TraceContext, span_id, trace_id_lo, 0),
+ PB_FIELD( 4, FIXED32 , OPTIONAL, STATIC , OTHER, google_trace_TraceContext, span_options, span_id, 0),
PB_LAST_FIELD
};
-const pb_field_t google_trace_TraceContext_fields[4] = {
- PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, google_trace_TraceContext, trace_id, trace_id, &google_trace_TraceId_fields),
- PB_FIELD( 2, FIXED64 , OPTIONAL, STATIC , OTHER, google_trace_TraceContext, span_id, trace_id, 0),
- PB_FIELD( 3, BOOL , OPTIONAL, STATIC , OTHER, google_trace_TraceContext, is_sampled, span_id, 0),
- PB_LAST_FIELD
-};
-
-
-/* Check that field information fits in pb_field_t */
-#if !defined(PB_FIELD_32BIT)
-/* If you get an error here, it means that you need to define PB_FIELD_32BIT
- * compile-time option. You can do that in pb.h or on compiler command line.
- *
- * The reason you need to do this is that some of your messages contain tag
- * numbers or field sizes that are larger than what can fit in 8 or 16 bit
- * field descriptors.
- */
-PB_STATIC_ASSERT((pb_membersize(google_trace_TraceContext, trace_id) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_google_trace_TraceId_google_trace_TraceContext)
-#endif
-
-#if !defined(PB_FIELD_16BIT) && !defined(PB_FIELD_32BIT)
-/* If you get an error here, it means that you need to define PB_FIELD_16BIT
- * compile-time option. You can do that in pb.h or on compiler command line.
- *
- * The reason you need to do this is that some of your messages contain tag
- * numbers or field sizes that are larger than what can fit in the default
- * 8 bit descriptors.
- */
-PB_STATIC_ASSERT((pb_membersize(google_trace_TraceContext, trace_id) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_google_trace_TraceId_google_trace_TraceContext)
-#endif
-
+/* @@protoc_insertion_point(eof) */
diff --git a/src/core/ext/census/gen/trace_context.pb.h b/src/core/ext/census/gen/trace_context.pb.h
index 263c4c58cb..ea127bf70a 100644
--- a/src/core/ext/census/gen/trace_context.pb.h
+++ b/src/core/ext/census/gen/trace_context.pb.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2016, Google Inc.
+ * Copyright 2017, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -31,11 +31,13 @@
*
*/
/* Automatically generated nanopb header */
-/* Generated by nanopb-0.3.5-dev */
+/* Generated by nanopb-0.3.7-dev at Fri Jan 20 16:14:22 2017. */
#ifndef GRPC_CORE_EXT_CENSUS_GEN_TRACE_CONTEXT_PB_H
#define GRPC_CORE_EXT_CENSUS_GEN_TRACE_CONTEXT_PB_H
#include "third_party/nanopb/pb.h"
+
+/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
#error Regenerate this file with the current version of nanopb generator.
#endif
@@ -45,44 +47,35 @@ extern "C" {
#endif
/* Struct definitions */
-typedef struct _google_trace_TraceId {
- bool has_hi;
- uint64_t hi;
- bool has_lo;
- uint64_t lo;
-} google_trace_TraceId;
-
typedef struct _google_trace_TraceContext {
- bool has_trace_id;
- google_trace_TraceId trace_id;
+ bool has_trace_id_hi;
+ uint64_t trace_id_hi;
+ bool has_trace_id_lo;
+ uint64_t trace_id_lo;
bool has_span_id;
uint64_t span_id;
- bool has_is_sampled;
- bool is_sampled;
+ bool has_span_options;
+ uint32_t span_options;
+/* @@protoc_insertion_point(struct:google_trace_TraceContext) */
} google_trace_TraceContext;
/* Default values for struct fields */
/* Initializer values for message structs */
-#define google_trace_TraceId_init_default {false, 0, false, 0}
-#define google_trace_TraceContext_init_default {false, google_trace_TraceId_init_default, false, 0, false, 0}
-#define google_trace_TraceId_init_zero {false, 0, false, 0}
-#define google_trace_TraceContext_init_zero {false, google_trace_TraceId_init_zero, false, 0, false, 0}
+#define google_trace_TraceContext_init_default {false, 0, false, 0, false, 0, false, 0}
+#define google_trace_TraceContext_init_zero {false, 0, false, 0, false, 0, false, 0}
/* Field tags (for use in manual encoding/decoding) */
-#define google_trace_TraceId_hi_tag 1
-#define google_trace_TraceId_lo_tag 2
-#define google_trace_TraceContext_trace_id_tag 1
-#define google_trace_TraceContext_span_id_tag 2
-#define google_trace_TraceContext_is_sampled_tag 3
+#define google_trace_TraceContext_trace_id_hi_tag 1
+#define google_trace_TraceContext_trace_id_lo_tag 2
+#define google_trace_TraceContext_span_id_tag 3
+#define google_trace_TraceContext_span_options_tag 4
/* Struct field encoding specification for nanopb */
-extern const pb_field_t google_trace_TraceId_fields[3];
-extern const pb_field_t google_trace_TraceContext_fields[4];
+extern const pb_field_t google_trace_TraceContext_fields[5];
/* Maximum encoded size of messages (where known) */
-#define google_trace_TraceId_size 18
-#define google_trace_TraceContext_size 31
+#define google_trace_TraceContext_size 32
/* Message IDs (where set with "msgid" option) */
#ifdef PB_MSGID
@@ -95,5 +88,6 @@ extern const pb_field_t google_trace_TraceContext_fields[4];
#ifdef __cplusplus
} /* extern "C" */
#endif
+/* @@protoc_insertion_point(eof) */
#endif
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c
index 8e4d4732b8..65cfe1fa90 100644
--- a/src/core/ext/census/grpc_filter.c
+++ b/src/core/ext/census/grpc_filter.c
@@ -67,9 +67,7 @@ static void extract_and_annotate_method_tag(grpc_metadata_batch *md,
channel_data *chand) {
grpc_linked_mdelem *m;
for (m = md->list.head; m != NULL; m = m->next) {
- if (m->md->key == GRPC_MDSTR_PATH) {
- gpr_log(GPR_DEBUG, "%s",
- (const char *)GRPC_SLICE_START_PTR(m->md->value->slice));
+ if (grpc_slice_eq(GRPC_MDKEY(m->md), GRPC_MDSTR_PATH)) {
/* Add method tag here */
}
}
diff --git a/src/core/ext/census/trace_context.c b/src/core/ext/census/trace_context.c
index fbb20d3448..47d0de19da 100644
--- a/src/core/ext/census/trace_context.c
+++ b/src/core/ext/census/trace_context.c
@@ -73,7 +73,7 @@ bool decode_trace_context(google_trace_TraceContext *ctxt, uint8_t *buffer,
}
// check fields
- if (!ctxt->has_trace_id) {
+ if (!ctxt->has_trace_id_hi || !ctxt->has_trace_id_lo) {
gpr_log(GPR_DEBUG, "Invalid TraceContext: missing trace_id");
return false;
}
diff --git a/src/core/ext/census/trace_context.h b/src/core/ext/census/trace_context.h
index 1cb5e26ea7..f391a1b7c1 100644
--- a/src/core/ext/census/trace_context.h
+++ b/src/core/ext/census/trace_context.h
@@ -38,6 +38,9 @@
#include "src/core/ext/census/gen/trace_context.pb.h"
+/* Span option flags. */
+#define SPAN_OPTIONS_IS_SAMPLED 0x01
+
/* Maximum number of bytes required to encode a TraceContext (31)
1 byte for trace_id field
1 byte for trace_id length
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index 865e91a2b4..208c95b67a 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -54,6 +54,7 @@
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -89,7 +90,7 @@ static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *p) {
gpr_free(p);
}
-static const grpc_mdstr_hash_table_vtable method_parameters_vtable = {
+static const grpc_slice_hash_table_vtable method_parameters_vtable = {
method_parameters_free, method_parameters_copy};
static void *method_parameters_create_from_json(const grpc_json *json) {
@@ -171,7 +172,7 @@ typedef struct client_channel_channel_data {
/** service config in JSON form */
char *service_config_json;
/** maps method names to method_parameters structs */
- grpc_mdstr_hash_table *method_params_table;
+ grpc_slice_hash_table *method_params_table;
/** incoming resolver result - set by resolver.next() */
grpc_channel_args *resolver_result;
/** a list of closures that are all waiting for config to come in */
@@ -273,7 +274,7 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
char *lb_policy_name = NULL;
grpc_lb_policy *lb_policy = NULL;
grpc_lb_policy *old_lb_policy;
- grpc_mdstr_hash_table *method_params_table = NULL;
+ grpc_slice_hash_table *method_params_table = NULL;
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
bool exit_idle = false;
grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy");
@@ -379,7 +380,7 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
chand->service_config_json = service_config_json;
}
if (chand->method_params_table != NULL) {
- grpc_mdstr_hash_table_unref(exec_ctx, chand->method_params_table);
+ grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
}
chand->method_params_table = method_params_table;
if (lb_policy != NULL) {
@@ -579,7 +580,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
gpr_free(chand->lb_policy_name);
gpr_free(chand->service_config_json);
if (chand->method_params_table != NULL) {
- grpc_mdstr_hash_table_unref(exec_ctx, chand->method_params_table);
+ grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
}
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
grpc_pollset_set_destroy(chand->interested_parties);
@@ -614,7 +615,7 @@ typedef struct client_channel_call_data {
// to avoid this without breaking the grpc_deadline_state abstraction.
grpc_deadline_state deadline_state;
- grpc_mdstr *path; // Request path.
+ grpc_slice path; // Request path.
gpr_timespec call_start_time;
gpr_timespec deadline;
wait_for_ready_value wait_for_ready_from_service_config;
@@ -643,6 +644,12 @@ typedef struct client_channel_call_data {
grpc_linked_mdelem lb_token_mdelem;
} call_data;
+grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
+ grpc_call_element *call_elem) {
+ grpc_subchannel_call *scc = GET_CALL((call_data *)call_elem->call_data);
+ return scc == CANCELLED_CALL ? NULL : scc;
+}
+
static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) {
GPR_TIMER_BEGIN("add_waiting_locked", 0);
if (calld->waiting_ops_count == calld->waiting_ops_capacity) {
@@ -1018,10 +1025,10 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg,
if (error == GRPC_ERROR_NONE) {
// Get the method config table from channel data.
gpr_mu_lock(&chand->mu);
- grpc_mdstr_hash_table *method_params_table = NULL;
+ grpc_slice_hash_table *method_params_table = NULL;
if (chand->method_params_table != NULL) {
method_params_table =
- grpc_mdstr_hash_table_ref(chand->method_params_table);
+ grpc_slice_hash_table_ref(chand->method_params_table);
}
gpr_mu_unlock(&chand->mu);
// If the method config table was present, use it.
@@ -1050,7 +1057,7 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&calld->mu);
}
}
- grpc_mdstr_hash_table_unref(exec_ctx, method_params_table);
+ grpc_slice_hash_table_unref(exec_ctx, method_params_table);
}
}
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config");
@@ -1064,7 +1071,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
call_data *calld = elem->call_data;
// Initialize data members.
grpc_deadline_state_init(exec_ctx, elem, args->call_stack);
- calld->path = GRPC_MDSTR_REF(args->path);
+ calld->path = grpc_slice_ref_internal(args->path);
calld->call_start_time = args->start_time;
calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
calld->wait_for_ready_from_service_config = WAIT_FOR_READY_UNSET;
@@ -1088,8 +1095,8 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
if (chand->lb_policy != NULL) {
// We already have a resolver result, so check for service config.
if (chand->method_params_table != NULL) {
- grpc_mdstr_hash_table *method_params_table =
- grpc_mdstr_hash_table_ref(chand->method_params_table);
+ grpc_slice_hash_table *method_params_table =
+ grpc_slice_hash_table_ref(chand->method_params_table);
gpr_mu_unlock(&chand->mu);
method_parameters *method_params = grpc_method_config_table_get(
exec_ctx, method_params_table, args->path);
@@ -1105,7 +1112,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
method_params->wait_for_ready;
}
}
- grpc_mdstr_hash_table_unref(exec_ctx, method_params_table);
+ grpc_slice_hash_table_unref(exec_ctx, method_params_table);
} else {
gpr_mu_unlock(&chand->mu);
}
@@ -1134,7 +1141,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
void *and_free_memory) {
call_data *calld = elem->call_data;
grpc_deadline_state_destroy(exec_ctx, elem);
- GRPC_MDSTR_UNREF(exec_ctx, calld->path);
+ grpc_slice_unref_internal(exec_ctx, calld->path);
GRPC_ERROR_UNREF(calld->cancel_error);
grpc_subchannel_call *call = GET_CALL(calld);
if (call != NULL && call != CANCELLED_CALL) {
diff --git a/src/core/ext/client_channel/client_channel.h b/src/core/ext/client_channel/client_channel.h
index f02587d0c1..5e6e64e58b 100644
--- a/src/core/ext/client_channel/client_channel.h
+++ b/src/core/ext/client_channel/client_channel.h
@@ -57,4 +57,8 @@ void grpc_client_channel_watch_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
grpc_connectivity_state *state, grpc_closure *on_complete);
+/* Debug helper: pull the subchannel call from a call stack element */
+grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
+ grpc_call_element *elem);
+
#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_H */
diff --git a/src/core/ext/client_channel/connector.c b/src/core/ext/client_channel/connector.c
index 0582e5b372..7a720fd1bd 100644
--- a/src/core/ext/client_channel/connector.c
+++ b/src/core/ext/client_channel/connector.c
@@ -49,7 +49,7 @@ void grpc_connector_connect(grpc_exec_ctx* exec_ctx, grpc_connector* connector,
connector->vtable->connect(exec_ctx, connector, in_args, out_args, notify);
}
-void grpc_connector_shutdown(grpc_exec_ctx* exec_ctx,
- grpc_connector* connector) {
- connector->vtable->shutdown(exec_ctx, connector);
+void grpc_connector_shutdown(grpc_exec_ctx* exec_ctx, grpc_connector* connector,
+ grpc_error* why) {
+ connector->vtable->shutdown(exec_ctx, connector, why);
}
diff --git a/src/core/ext/client_channel/connector.h b/src/core/ext/client_channel/connector.h
index 395f89b3b2..9bff41f003 100644
--- a/src/core/ext/client_channel/connector.h
+++ b/src/core/ext/client_channel/connector.h
@@ -68,7 +68,8 @@ struct grpc_connector_vtable {
void (*ref)(grpc_connector *connector);
void (*unref)(grpc_exec_ctx *exec_ctx, grpc_connector *connector);
/** Implementation of grpc_connector_shutdown */
- void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_connector *connector);
+ void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
+ grpc_error *why);
/** Implementation of grpc_connector_connect */
void (*connect)(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
const grpc_connect_in_args *in_args,
@@ -83,7 +84,7 @@ void grpc_connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
grpc_connect_out_args *out_args,
grpc_closure *notify);
/** Cancel any pending connection */
-void grpc_connector_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_connector *connector);
+void grpc_connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *connector,
+ grpc_error *why);
#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_CONNECTOR_H */
diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/client_channel/http_connect_handshaker.c
index 622d236320..58ab233f1b 100644
--- a/src/core/ext/client_channel/http_connect_handshaker.c
+++ b/src/core/ext/client_channel/http_connect_handshaker.c
@@ -123,7 +123,8 @@ static void handshake_failed_locked(grpc_exec_ctx* exec_ctx,
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
- grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint);
+ grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint,
+ GRPC_ERROR_REF(error));
// Not shutting down, so the handshake failed. Clean up before
// invoking the callback.
cleanup_args_for_failure_locked(exec_ctx, handshaker);
@@ -251,15 +252,18 @@ static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx,
}
static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
- grpc_handshaker* handshaker_in) {
+ grpc_handshaker* handshaker_in,
+ grpc_error* why) {
http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
gpr_mu_lock(&handshaker->mu);
if (!handshaker->shutdown) {
handshaker->shutdown = true;
- grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint);
+ grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint,
+ GRPC_ERROR_REF(why));
cleanup_args_for_failure_locked(exec_ctx, handshaker);
}
gpr_mu_unlock(&handshaker->mu);
+ GRPC_ERROR_UNREF(why);
}
static void http_connect_handshaker_do_handshake(
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index 05b08826b6..f1e4e079e2 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -273,7 +273,8 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->disconnected);
c->disconnected = true;
- grpc_connector_shutdown(exec_ctx, c->connector);
+ grpc_connector_shutdown(exec_ctx, c->connector,
+ GRPC_ERROR_CREATE("Subchannel disconnected"));
con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
if (con != NULL) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection");
@@ -637,9 +638,8 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_error *error = grpc_channel_stack_builder_finish(
exec_ctx, builder, 0, 1, connection_destroy, NULL, (void **)&con);
if (error != GRPC_ERROR_NONE) {
- const char *msg = grpc_error_string(error);
- gpr_log(GPR_ERROR, "error initializing subchannel stack: %s", msg);
- grpc_error_free_string(msg);
+ gpr_log(GPR_ERROR, "error initializing subchannel stack: %s",
+ grpc_error_string(error));
GRPC_ERROR_UNREF(error);
abort(); /* TODO(ctiller): what to do here? */
}
@@ -704,7 +704,6 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
const char *errmsg = grpc_error_string(error);
gpr_log(GPR_INFO, "Connect failed: %s", errmsg);
- grpc_error_free_string(errmsg);
maybe_start_connecting_locked(exec_ctx, c);
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
@@ -763,7 +762,7 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
- grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec start_time,
+ grpc_polling_entity *pollent, grpc_slice path, gpr_timespec start_time,
gpr_timespec deadline, grpc_subchannel_call **call) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
*call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
@@ -775,7 +774,7 @@ grpc_error *grpc_connected_subchannel_create_call(
if (error != GRPC_ERROR_NONE) {
const char *error_string = grpc_error_string(error);
gpr_log(GPR_ERROR, "error: %s", error_string);
- grpc_error_free_string(error_string);
+
gpr_free(*call);
return error;
}
diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h
index 684675eb37..9bd35a7704 100644
--- a/src/core/ext/client_channel/subchannel.h
+++ b/src/core/ext/client_channel/subchannel.h
@@ -114,7 +114,7 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
/** construct a subchannel call */
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel,
- grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec start_time,
+ grpc_polling_entity *pollent, grpc_slice path, gpr_timespec start_time,
gpr_timespec deadline, grpc_subchannel_call **subchannel_call);
/** process a transport level op */
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 567e65ac69..308facb7e7 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -135,13 +135,13 @@ int grpc_lb_glb_trace = 0;
/* add lb_token of selected subchannel (address) to the call's initial
* metadata */
-static void initial_metadata_add_lb_token(
- grpc_metadata_batch *initial_metadata,
- grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem *lb_token) {
+static grpc_error *initial_metadata_add_lb_token(
+ grpc_exec_ctx *exec_ctx, grpc_metadata_batch *initial_metadata,
+ grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem lb_token) {
GPR_ASSERT(lb_token_mdelem_storage != NULL);
- GPR_ASSERT(lb_token != NULL);
- grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
- lb_token);
+ GPR_ASSERT(!GRPC_MDISNULL(lb_token));
+ return grpc_metadata_batch_add_tail(exec_ctx, initial_metadata,
+ lb_token_mdelem_storage, lb_token);
}
typedef struct wrapped_rr_closure_arg {
@@ -161,7 +161,7 @@ typedef struct wrapped_rr_closure_arg {
grpc_connected_subchannel **target;
/* the LB token associated with the pick */
- grpc_mdelem *lb_token;
+ grpc_mdelem lb_token;
/* storage for the lb token initial metadata mdelem */
grpc_linked_mdelem *lb_token_mdelem_storage;
@@ -188,8 +188,8 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
* addresses failed to connect). There won't be any user_data/token
* available */
if (*wc_arg->target != NULL) {
- if (wc_arg->lb_token != NULL) {
- initial_metadata_add_lb_token(wc_arg->initial_metadata,
+ if (!GRPC_MDISNULL(wc_arg->lb_token)) {
+ initial_metadata_add_lb_token(exec_ctx, wc_arg->initial_metadata,
wc_arg->lb_token_mdelem_storage,
GRPC_MDELEM_REF(wc_arg->lb_token));
} else {
@@ -345,8 +345,7 @@ typedef struct glb_lb_policy {
/* call status code and details, set in lb_on_server_status_received() */
grpc_status_code lb_call_status;
- char *lb_call_status_details;
- size_t lb_call_status_details_capacity;
+ grpc_slice lb_call_status_details;
/** LB call retry backoff state */
gpr_backoff lb_call_backoff_state;
@@ -388,10 +387,14 @@ static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
/* vtable for LB tokens in grpc_lb_addresses. */
static void *lb_token_copy(void *token) {
- return token == NULL ? NULL : GRPC_MDELEM_REF(token);
+ return token == NULL
+ ? NULL
+ : (void *)GRPC_MDELEM_REF((grpc_mdelem){(uintptr_t)token}).payload;
}
static void lb_token_destroy(grpc_exec_ctx *exec_ctx, void *token) {
- if (token != NULL) GRPC_MDELEM_UNREF(exec_ctx, token);
+ if (token != NULL) {
+ GRPC_MDELEM_UNREF(exec_ctx, (grpc_mdelem){(uintptr_t)token});
+ }
}
static int lb_token_cmp(void *token1, void *token2) {
if (token1 > token2) return 1;
@@ -459,10 +462,11 @@ static grpc_lb_addresses *process_serverlist_locked(
GPR_ARRAY_SIZE(server->load_balance_token);
const size_t lb_token_length =
strnlen(server->load_balance_token, lb_token_max_length);
- grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer(
- (uint8_t *)server->load_balance_token, lb_token_length);
- user_data = grpc_mdelem_from_metadata_strings(
- exec_ctx, GRPC_MDSTR_LB_TOKEN, lb_token_mdstr);
+ grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
+ server->load_balance_token, lb_token_length);
+ user_data = (void *)grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_LB_TOKEN,
+ lb_token_mdstr)
+ .payload;
} else {
char *uri = grpc_sockaddr_to_uri(&addr);
gpr_log(GPR_INFO,
@@ -470,7 +474,7 @@ static grpc_lb_addresses *process_serverlist_locked(
"be used instead",
uri);
gpr_free(uri);
- user_data = GRPC_MDELEM_LB_TOKEN_EMPTY;
+ user_data = (void *)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
}
grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
@@ -564,7 +568,7 @@ static bool pick_from_internal_rr_locked(
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
/* add the load reporting initial metadata */
- initial_metadata_add_lb_token(pick_args->initial_metadata,
+ initial_metadata_add_lb_token(exec_ctx, pick_args->initial_metadata,
pick_args->lb_token_mdelem_storage,
GRPC_MDELEM_REF(wc_arg->lb_token));
@@ -1103,11 +1107,12 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
/* Note the following LB call progresses every time there's activity in \a
* glb_policy->base.interested_parties, which is comprised of the polling
* entities from \a client_channel. */
+ grpc_slice host = grpc_slice_from_copied_string(glb_policy->server_name);
glb_policy->lb_call = grpc_channel_create_pollset_set_call(
exec_ctx, glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
glb_policy->base.interested_parties,
- "/grpc.lb.v1.LoadBalancer/BalanceLoad", glb_policy->server_name,
- glb_policy->deadline, NULL);
+ GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
+ &host, glb_policy->deadline, NULL);
grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv);
grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv);
@@ -1120,9 +1125,6 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
grpc_slice_unref_internal(exec_ctx, request_payload_slice);
grpc_grpclb_request_destroy(request);
- glb_policy->lb_call_status_details = NULL;
- glb_policy->lb_call_status_details_capacity = 0;
-
grpc_closure_init(&glb_policy->lb_on_server_status_received,
lb_on_server_status_received, glb_policy,
grpc_schedule_on_exec_ctx);
@@ -1138,7 +1140,8 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
}
-static void lb_call_destroy_locked(glb_lb_policy *glb_policy) {
+static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx,
+ glb_lb_policy *glb_policy) {
GPR_ASSERT(glb_policy->lb_call != NULL);
grpc_call_destroy(glb_policy->lb_call);
glb_policy->lb_call = NULL;
@@ -1147,7 +1150,7 @@ static void lb_call_destroy_locked(glb_lb_policy *glb_policy) {
grpc_metadata_array_destroy(&glb_policy->lb_trailing_metadata_recv);
grpc_byte_buffer_destroy(glb_policy->lb_request_payload);
- gpr_free(glb_policy->lb_call_status_details);
+ grpc_slice_unref_internal(exec_ctx, glb_policy->lb_call_status_details);
}
/*
@@ -1197,8 +1200,6 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
op->data.recv_status_on_client.status = &glb_policy->lb_call_status;
op->data.recv_status_on_client.status_details =
&glb_policy->lb_call_status_details;
- op->data.recv_status_on_client.status_details_capacity =
- &glb_policy->lb_call_status_details_capacity;
op->flags = 0;
op->reserved = NULL;
op++;
@@ -1341,15 +1342,18 @@ static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(glb_policy->lb_call != NULL);
if (grpc_lb_glb_trace) {
+ char *status_details =
+ grpc_slice_to_c_string(glb_policy->lb_call_status_details);
gpr_log(GPR_DEBUG,
"Status from LB server received. Status = %d, Details = '%s', "
"(call: %p)",
- glb_policy->lb_call_status, glb_policy->lb_call_status_details,
+ glb_policy->lb_call_status, status_details,
(void *)glb_policy->lb_call);
+ gpr_free(status_details);
}
- /* We need to performe cleanups no matter what. */
- lb_call_destroy_locked(glb_policy);
+ /* We need to perform cleanups no matter what. */
+ lb_call_destroy_locked(exec_ctx, glb_policy);
if (!glb_policy->shutting_down) {
/* if we aren't shutting down, restart the LB client call after some time */
diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c
index 07ef10e6a8..8af6191c3b 100644
--- a/src/core/ext/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/load_reporting/load_reporting_filter.c
@@ -41,13 +41,17 @@
#include "src/core/ext/load_reporting/load_reporting_filter.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/static_metadata.h"
typedef struct call_data {
intptr_t id; /**< an id unique to the call */
- char *trailing_md_string;
- char *initial_md_string;
- const char *service_method;
+ bool have_trailing_md_string;
+ grpc_slice trailing_md_string;
+ bool have_initial_md_string;
+ grpc_slice initial_md_string;
+ bool have_service_method;
+ grpc_slice service_method;
/* stores the recv_initial_metadata op's ready closure, which we wrap with our
* own (on_initial_md_ready) in order to capture the incoming initial metadata
@@ -63,42 +67,28 @@ typedef struct channel_data {
intptr_t id; /**< an id unique to the channel */
} channel_data;
-typedef struct {
- grpc_call_element *elem;
- grpc_exec_ctx *exec_ctx;
-} recv_md_filter_args;
-
-static grpc_mdelem *recv_md_filter(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_mdelem *md) {
- recv_md_filter_args *a = user_data;
- grpc_call_element *elem = a->elem;
- call_data *calld = elem->call_data;
-
- if (md->key == GRPC_MDSTR_PATH) {
- calld->service_method = grpc_mdstr_as_c_string(md->value);
- } else if (md->key == GRPC_MDSTR_LB_TOKEN) {
- calld->initial_md_string = gpr_strdup(grpc_mdstr_as_c_string(md->value));
- return NULL;
- }
-
- return md;
-}
-
static void on_initial_md_ready(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_error *err) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (err == GRPC_ERROR_NONE) {
- recv_md_filter_args a;
- a.elem = elem;
- a.exec_ctx = exec_ctx;
- grpc_metadata_batch_filter(exec_ctx, calld->recv_initial_metadata,
- recv_md_filter, &a);
- if (calld->service_method == NULL) {
+ if (calld->recv_initial_metadata->idx.named.path != NULL) {
+ calld->service_method = grpc_slice_ref_internal(
+ GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.path->md));
+ calld->have_service_method = true;
+ } else {
err =
grpc_error_add_child(err, GRPC_ERROR_CREATE("Missing :path header"));
}
+ if (calld->recv_initial_metadata->idx.named.lb_token != NULL) {
+ calld->initial_md_string = grpc_slice_ref_internal(
+ GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.lb_token->md));
+ calld->have_initial_md_string = true;
+ grpc_metadata_batch_remove(
+ exec_ctx, calld->recv_initial_metadata,
+ calld->recv_initial_metadata->idx.named.lb_token);
+ }
} else {
GRPC_ERROR_REF(err);
}
@@ -149,8 +139,15 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
calld->service_method};
*/
- gpr_free(calld->initial_md_string);
- gpr_free(calld->trailing_md_string);
+ if (calld->have_initial_md_string) {
+ grpc_slice_unref_internal(exec_ctx, calld->initial_md_string);
+ }
+ if (calld->have_trailing_md_string) {
+ grpc_slice_unref_internal(exec_ctx, calld->trailing_md_string);
+ }
+ if (calld->have_service_method) {
+ grpc_slice_unref_internal(exec_ctx, calld->service_method);
+ }
}
/* Constructor for channel_data */
@@ -193,19 +190,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
*/
}
-static grpc_mdelem *lr_trailing_md_filter(grpc_exec_ctx *exec_ctx,
- void *user_data, grpc_mdelem *md) {
- grpc_call_element *elem = user_data;
- call_data *calld = elem->call_data;
-
- if (md->key == GRPC_MDSTR_LB_COST_BIN) {
- calld->trailing_md_string = gpr_strdup(grpc_mdstr_as_c_string(md->value));
- return NULL;
- }
-
- return md;
-}
-
static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
@@ -218,8 +202,14 @@ static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
calld->ops_recv_initial_metadata_ready = op->recv_initial_metadata_ready;
op->recv_initial_metadata_ready = &calld->on_initial_md_ready;
} else if (op->send_trailing_metadata) {
- grpc_metadata_batch_filter(exec_ctx, op->send_trailing_metadata,
- lr_trailing_md_filter, elem);
+ if (op->send_trailing_metadata->idx.named.lb_cost_bin != NULL) {
+ calld->trailing_md_string = grpc_slice_ref_internal(
+ GRPC_MDVALUE(op->send_trailing_metadata->idx.named.lb_cost_bin->md));
+ calld->have_trailing_md_string = true;
+ grpc_metadata_batch_remove(
+ exec_ctx, op->send_trailing_metadata,
+ op->send_trailing_metadata->idx.named.lb_cost_bin);
+ }
}
grpc_call_next_op(exec_ctx, elem, op);
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index bf2f4e5ee4..2c9623211b 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -187,9 +187,8 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now);
gpr_timespec timeout = gpr_time_sub(next_try, now);
- const char *msg = grpc_error_string(error);
- gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", msg);
- grpc_error_free_string(msg);
+ gpr_log(GPR_INFO, "dns resolution failed (will retry): %s",
+ grpc_error_string(error));
GPR_ASSERT(!r->have_retry_timer);
r->have_retry_timer = true;
GRPC_RESOLVER_REF(&r->base, "retry-timer");
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index c146a627cb..a1365f6465 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -182,7 +182,7 @@ static grpc_resolver *sockaddr_create(grpc_exec_ctx *exec_ctx,
bool errors_found = false;
for (size_t i = 0; i < addresses->num_addresses; i++) {
grpc_uri ith_uri = *args->uri;
- char *part_str = grpc_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII);
+ char *part_str = grpc_slice_to_c_string(path_parts.slices[i]);
ith_uri.path = part_str;
if (!parse(&ith_uri, &addresses->addresses[i].address)) {
errors_found = true; /* GPR_TRUE */
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c
index 013c96dc70..d0a762a280 100644
--- a/src/core/ext/transport/chttp2/client/chttp2_connector.c
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c
@@ -92,19 +92,21 @@ static void chttp2_connector_unref(grpc_exec_ctx *exec_ctx,
}
static void chttp2_connector_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_connector *con) {
+ grpc_connector *con, grpc_error *why) {
chttp2_connector *c = (chttp2_connector *)con;
gpr_mu_lock(&c->mu);
c->shutdown = true;
if (c->handshake_mgr != NULL) {
- grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr);
+ grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr,
+ GRPC_ERROR_REF(why));
}
// If handshaking is not yet in progress, shutdown the endpoint.
// Otherwise, the handshaker will do this for us.
if (!c->connecting && c->endpoint != NULL) {
- grpc_endpoint_shutdown(exec_ctx, c->endpoint);
+ grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(why));
}
gpr_mu_unlock(&c->mu);
+ GRPC_ERROR_UNREF(why);
}
static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
@@ -121,7 +123,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
- grpc_endpoint_shutdown(exec_ctx, args->endpoint);
+ grpc_endpoint_shutdown(exec_ctx, args->endpoint, GRPC_ERROR_REF(error));
grpc_endpoint_destroy(exec_ctx, args->endpoint);
grpc_channel_args_destroy(exec_ctx, args->args);
grpc_slice_buffer_destroy_internal(exec_ctx, args->read_buffer);
@@ -195,7 +197,9 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_closure *notify = c->notify;
c->notify = NULL;
grpc_closure_sched(exec_ctx, notify, error);
- if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint);
+ if (c->endpoint != NULL) {
+ grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(error));
+ }
gpr_mu_unlock(&c->mu);
chttp2_connector_unref(exec_ctx, arg);
} else {
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c
index 574d1a7710..ae2c3838ed 100644
--- a/src/core/ext/transport/chttp2/server/chttp2_server.c
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.c
@@ -101,16 +101,19 @@ static void pending_handshake_manager_remove_locked(
}
static void pending_handshake_manager_shutdown_locked(grpc_exec_ctx *exec_ctx,
- server_state *state) {
+ server_state *state,
+ grpc_error *why) {
pending_handshake_manager_node *prev_node = NULL;
for (pending_handshake_manager_node *node = state->pending_handshake_mgrs;
node != NULL; node = node->next) {
- grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr);
+ grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr,
+ GRPC_ERROR_REF(why));
gpr_free(prev_node);
prev_node = node;
}
gpr_free(prev_node);
state->pending_handshake_mgrs = NULL;
+ GRPC_ERROR_UNREF(why);
}
static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
@@ -121,7 +124,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
if (error != GRPC_ERROR_NONE || connection_state->server_state->shutdown) {
const char *error_str = grpc_error_string(error);
gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str);
- grpc_error_free_string(error_str);
+
if (error == GRPC_ERROR_NONE && args->endpoint != NULL) {
// We were shut down after handshaking completed successfully, so
// destroy the endpoint here.
@@ -129,7 +132,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
- grpc_endpoint_shutdown(exec_ctx, args->endpoint);
+ grpc_endpoint_shutdown(exec_ctx, args->endpoint, GRPC_ERROR_NONE);
grpc_endpoint_destroy(exec_ctx, args->endpoint);
grpc_channel_args_destroy(exec_ctx, args->args);
grpc_slice_buffer_destroy_internal(exec_ctx, args->read_buffer);
@@ -210,7 +213,8 @@ static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_lock(&state->mu);
grpc_closure *destroy_done = state->server_destroy_listener_done;
GPR_ASSERT(state->shutdown);
- pending_handshake_manager_shutdown_locked(exec_ctx, state);
+ pending_handshake_manager_shutdown_locked(exec_ctx, state,
+ GRPC_ERROR_REF(error));
gpr_mu_unlock(&state->mu);
// Flush queued work before destroying handshaker factory, since that
// may do a synchronous unref.
@@ -307,7 +311,7 @@ grpc_error *grpc_chttp2_server_add_port(grpc_exec_ctx *exec_ctx,
const char *warning_message = grpc_error_string(err);
gpr_log(GPR_INFO, "WARNING: %s", warning_message);
- grpc_error_free_string(warning_message);
+
/* we managed to bind some addresses: continue */
}
grpc_resolved_addresses_destroy(resolved);
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index bf5026bea6..c219a7d85f 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -51,7 +51,7 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
if (err != GRPC_ERROR_NONE) {
const char *msg = grpc_error_string(err);
gpr_log(GPR_ERROR, "%s", msg);
- grpc_error_free_string(msg);
+
GRPC_ERROR_UNREF(err);
}
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
index 395c79a71d..cb2b3f5502 100644
--- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
@@ -94,7 +94,7 @@ done:
if (err != GRPC_ERROR_NONE) {
const char *msg = grpc_error_string(err);
gpr_log(GPR_ERROR, "%s", msg);
- grpc_error_free_string(msg);
+
GRPC_ERROR_UNREF(err);
}
return port_num;
diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.c b/src/core/ext/transport/chttp2/transport/bin_decoder.c
index 8db36e4a7f..8c87de112e 100644
--- a/src/core/ext/transport/chttp2/transport/bin_decoder.c
+++ b/src/core/ext/transport/chttp2/transport/bin_decoder.c
@@ -157,7 +157,7 @@ grpc_slice grpc_chttp2_base64_decode(grpc_exec_ctx *exec_ctx,
"grpc_chttp2_base64_decode has a length of %d, which is not a "
"multiple of 4.\n",
(int)input_length);
- return gpr_empty_slice();
+ return grpc_empty_slice();
}
if (input_length > 0) {
@@ -178,11 +178,11 @@ grpc_slice grpc_chttp2_base64_decode(grpc_exec_ctx *exec_ctx,
ctx.contains_tail = false;
if (!grpc_base64_decode_partial(&ctx)) {
- char *s = grpc_dump_slice(input, GPR_DUMP_ASCII);
+ char *s = grpc_slice_to_c_string(input);
gpr_log(GPR_ERROR, "Base64 decoding failed, input string:\n%s\n", s);
gpr_free(s);
grpc_slice_unref_internal(exec_ctx, output);
- return gpr_empty_slice();
+ return grpc_empty_slice();
}
GPR_ASSERT(ctx.output_cur == GRPC_SLICE_END_PTR(output));
GPR_ASSERT(ctx.input_cur == GRPC_SLICE_END_PTR(input));
@@ -204,7 +204,7 @@ grpc_slice grpc_chttp2_base64_decode_with_length(grpc_exec_ctx *exec_ctx,
"has a tail of 1 byte.\n",
(int)input_length);
grpc_slice_unref_internal(exec_ctx, output);
- return gpr_empty_slice();
+ return grpc_empty_slice();
}
if (output_length > input_length / 4 * 3 + tail_xtra[input_length % 4]) {
@@ -214,7 +214,7 @@ grpc_slice grpc_chttp2_base64_decode_with_length(grpc_exec_ctx *exec_ctx,
(int)output_length,
(int)(input_length / 4 * 3 + tail_xtra[input_length % 4]));
grpc_slice_unref_internal(exec_ctx, output);
- return gpr_empty_slice();
+ return grpc_empty_slice();
}
ctx.input_cur = GRPC_SLICE_START_PTR(input);
@@ -224,11 +224,11 @@ grpc_slice grpc_chttp2_base64_decode_with_length(grpc_exec_ctx *exec_ctx,
ctx.contains_tail = true;
if (!grpc_base64_decode_partial(&ctx)) {
- char *s = grpc_dump_slice(input, GPR_DUMP_ASCII);
+ char *s = grpc_slice_to_c_string(input);
gpr_log(GPR_ERROR, "Base64 decoding failed, input string:\n%s\n", s);
gpr_free(s);
grpc_slice_unref_internal(exec_ctx, output);
- return gpr_empty_slice();
+ return grpc_empty_slice();
}
GPR_ASSERT(ctx.output_cur == GRPC_SLICE_END_PTR(output));
GPR_ASSERT(ctx.input_cur <= GRPC_SLICE_END_PTR(input));
diff --git a/src/core/ext/transport/chttp2/transport/bin_encoder.c b/src/core/ext/transport/chttp2/transport/bin_encoder.c
index af25a4352a..e301c073f3 100644
--- a/src/core/ext/transport/chttp2/transport/bin_encoder.c
+++ b/src/core/ext/transport/chttp2/transport/bin_encoder.c
@@ -177,8 +177,7 @@ static void enc_add1(huff_out *out, uint8_t a) {
enc_flush_some(out);
}
-grpc_slice grpc_chttp2_base64_encode_and_huffman_compress_impl(
- grpc_slice input) {
+grpc_slice grpc_chttp2_base64_encode_and_huffman_compress(grpc_slice input) {
size_t input_length = GRPC_SLICE_LENGTH(input);
size_t input_triplets = input_length / 3;
size_t tail_case = input_length % 3;
diff --git a/src/core/ext/transport/chttp2/transport/bin_encoder.h b/src/core/ext/transport/chttp2/transport/bin_encoder.h
index 477559d0e2..0f899c8e34 100644
--- a/src/core/ext/transport/chttp2/transport/bin_encoder.h
+++ b/src/core/ext/transport/chttp2/transport/bin_encoder.h
@@ -49,7 +49,6 @@ grpc_slice grpc_chttp2_huffman_compress(grpc_slice input);
grpc_slice y = grpc_chttp2_huffman_compress(x);
grpc_slice_unref_internal(exec_ctx, x);
return y; */
-grpc_slice grpc_chttp2_base64_encode_and_huffman_compress_impl(
- grpc_slice input);
+grpc_slice grpc_chttp2_base64_encode_and_huffman_compress(grpc_slice input);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_ENCODER_H */
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c
index bd87253ed3..59b21e3330 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c
@@ -31,14 +31,11 @@
*
*/
-#include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/metadata.h"
void grpc_chttp2_plugin_init(void) {
- grpc_chttp2_base64_encode_and_huffman_compress =
- grpc_chttp2_base64_encode_and_huffman_compress_impl;
grpc_register_tracer("http", &grpc_http_trace);
grpc_register_tracer("flowctl", &grpc_flowctl_trace);
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 68a6a2155d..fa18f5a725 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -44,9 +44,7 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
-#include "src/core/ext/transport/chttp2/transport/http2_errors.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
-#include "src/core/ext/transport/chttp2/transport/status_conversion.h"
#include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/http/parser.h"
@@ -55,7 +53,10 @@
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
+#include "src/core/lib/transport/error_utils.h"
+#include "src/core/lib/transport/http2_errors.h"
#include "src/core/lib/transport/static_metadata.h"
+#include "src/core/lib/transport/status_conversion.h"
#include "src/core/lib/transport/timeout_encoding.h"
#include "src/core/lib/transport/transport_impl.h"
@@ -123,6 +124,21 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error);
+static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error);
+static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error);
+
+static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_error *error);
+static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_ping_type ping_type,
+ grpc_closure *on_initiate,
+ grpc_closure *on_complete);
+
+#define DEFAULT_MIN_TIME_BETWEEN_PINGS_MS 0
+#define DEFAULT_MAX_PINGS_BETWEEN_DATA 3
+
/*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@@ -154,16 +170,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_combiner_destroy(exec_ctx, t->combiner);
- /* callback remaining pings: they're not allowed to call into the transpot,
- and maybe they hold resources that need to be freed */
- while (t->pings.next != &t->pings) {
- grpc_chttp2_outstanding_ping *ping = t->pings.next;
- grpc_closure_sched(exec_ctx, ping->on_recv,
- GRPC_ERROR_CREATE("Transport closed"));
- ping->next->prev = ping->prev;
- ping->prev->next = ping->next;
- gpr_free(ping);
- }
+ cancel_pings(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed"));
while (t->write_cb_pool) {
grpc_chttp2_write_cb *next = t->write_cb_pool->next;
@@ -171,6 +178,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
t->write_cb_pool = next;
}
+ gpr_free(t->ping_acks);
gpr_free(t->peer_string);
gpr_free(t);
}
@@ -223,10 +231,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->is_client = is_client;
t->outgoing_window = DEFAULT_WINDOW;
t->incoming_window = DEFAULT_WINDOW;
- t->stream_lookahead = DEFAULT_WINDOW;
- t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
- t->ping_counter = 1;
- t->pings.next = t->pings.prev = &t->pings;
t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->is_first_frame = true;
grpc_connectivity_state_init(
@@ -247,6 +251,22 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init(&t->destructive_reclaimer_locked,
destructive_reclaimer_locked, t,
grpc_combiner_scheduler(t->combiner, false));
+ grpc_closure_init(&t->start_bdp_ping_locked, start_bdp_ping_locked, t,
+ grpc_combiner_scheduler(t->combiner, false));
+ grpc_closure_init(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
+ grpc_combiner_scheduler(t->combiner, false));
+
+ grpc_bdp_estimator_init(&t->bdp_estimator, t->peer_string);
+ t->last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC);
+ grpc_pid_controller_init(
+ &t->pid_controller,
+ (grpc_pid_controller_args){.gain_p = 4,
+ .gain_i = 8,
+ .gain_d = 0,
+ .initial_control_value = log2(DEFAULT_WINDOW),
+ .min_control_value = -1,
+ .max_control_value = 22,
+ .integral_range = 10});
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser);
@@ -289,6 +309,12 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
DEFAULT_MAX_HEADER_LIST_SIZE);
+ t->ping_policy = (grpc_chttp2_repeated_ping_policy){
+ .max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA,
+ .min_time_between_pings =
+ gpr_time_from_millis(DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, GPR_TIMESPAN),
+ };
+
if (channel_args) {
for (i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key,
@@ -306,14 +332,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
} else if (0 == strcmp(channel_args->args[i].key,
- GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES)) {
- const grpc_integer_options options = {-1, 5, INT_MAX};
- const int value =
- grpc_channel_arg_get_integer(&channel_args->args[i], options);
- if (value >= 0) {
- t->stream_lookahead = (uint32_t)value;
- }
- } else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) {
const grpc_integer_options options = {-1, 0, INT_MAX};
const int value =
@@ -323,6 +341,19 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
(uint32_t)value);
}
} else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
+ t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ (grpc_integer_options){DEFAULT_MAX_PINGS_BETWEEN_DATA, 0, INT_MAX});
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_MIN_TIME_BETWEEN_PINGS_MS)) {
+ t->ping_policy.min_time_between_pings = gpr_time_from_millis(
+ grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ (grpc_integer_options){DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, 0,
+ INT_MAX}),
+ GPR_TIMESPAN);
+ } else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
t->write_buffer_size = (uint32_t)grpc_channel_arg_get_integer(
&channel_args->args[i],
@@ -333,24 +364,26 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_setting_id setting_id;
grpc_integer_options integer_options;
bool availability[2] /* server, client */;
- } settings_map[] = {
- {GRPC_ARG_MAX_CONCURRENT_STREAMS,
- GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
- {-1, 0, INT_MAX},
- {true, false}},
- {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
- GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
- {-1, 0, INT_MAX},
- {true, true}},
- {GRPC_ARG_MAX_METADATA_SIZE,
- GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
- {-1, 0, INT_MAX},
- {true, true}},
- {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
- GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
- {-1, 16384, 16777215},
- {true, true}},
- };
+ } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS,
+ GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
+ {-1, 0, INT32_MAX},
+ {true, false}},
+ {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
+ GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
+ {-1, 0, INT32_MAX},
+ {true, true}},
+ {GRPC_ARG_MAX_METADATA_SIZE,
+ GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
+ {-1, 0, INT32_MAX},
+ {true, true}},
+ {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
+ GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
+ {-1, 16384, 16777215},
+ {true, true}},
+ {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES,
+ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
+ {-1, 5, INT32_MAX},
+ {true, true}}};
for (j = 0; j < (int)GPR_ARRAY_SIZE(settings_map); j++) {
if (0 == strcmp(channel_args->args[i].key,
settings_map[j].channel_arg_name)) {
@@ -373,6 +406,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
+ t->ping_state.pings_before_data_required =
+ t->ping_policy.max_pings_without_data;
+
grpc_chttp2_initiate_write(exec_ctx, t, false, "init");
post_benign_reclaimer(exec_ctx, t);
}
@@ -409,14 +445,14 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_error_add_child(t->close_transport_on_writes_finished, error);
return;
}
- if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) {
+ if (!grpc_error_has_clear_grpc_status(error)) {
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE);
}
t->closed = 1;
connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport");
- grpc_endpoint_shutdown(exec_ctx, t->ep);
+ grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error));
/* flush writable stream list to avoid dangling references */
grpc_chttp2_stream *s;
@@ -424,6 +460,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close");
}
end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error));
+ cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
}
@@ -474,11 +511,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
if (server_data) {
s->id = (uint32_t)(uintptr_t)server_data;
- s->outgoing_window = t->settings[GRPC_PEER_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->incoming_window = s->max_recv_bytes =
- t->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
*t->accepting_stream = s;
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
post_destructive_reclaimer(exec_ctx, t);
@@ -507,6 +539,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
}
grpc_chttp2_list_remove_stalled_by_transport(t, s);
+ grpc_chttp2_list_remove_stalled_by_stream(t, s);
for (int i = 0; i < STREAM_LIST_COUNT; i++) {
if (s->included[i]) {
@@ -646,13 +679,21 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END("grpc_chttp2_initiate_write", 0);
}
-void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, bool covered_by_poller,
- const char *reason) {
+void grpc_chttp2_become_writable(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
+ grpc_chttp2_stream_write_type stream_write_type, const char *reason) {
if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) {
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
- grpc_chttp2_initiate_write(exec_ctx, t, covered_by_poller, reason);
+ }
+ switch (stream_write_type) {
+ case GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK:
+ break;
+ case GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED:
+ grpc_chttp2_initiate_write(exec_ctx, t, true, reason);
+ break;
+ case GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED:
+ grpc_chttp2_initiate_write(exec_ctx, t, false, reason);
+ break;
}
}
@@ -780,7 +821,6 @@ void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx,
static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
- uint32_t stream_incoming_window;
/* start streams where we have free grpc_chttp2_stream ids and free
* concurrency */
while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
@@ -803,15 +843,11 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
"no_more_stream_ids");
}
- s->outgoing_window = t->settings[GRPC_PEER_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->incoming_window = stream_incoming_window =
- t->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->max_recv_bytes = GPR_MAX(stream_incoming_window, s->max_recv_bytes);
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
post_destructive_reclaimer(exec_ctx, t);
- grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream");
+ grpc_chttp2_become_writable(exec_ctx, t, s,
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
+ "new_stream");
}
/* cancel out streams that will never be started */
while (t->next_stream_id >= MAX_CLIENT_STREAM_ID &&
@@ -866,7 +902,6 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
(int)(closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT),
(int)(closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT),
desc, errstr);
- grpc_error_free_string(errstr);
}
if (error != GRPC_ERROR_NONE) {
if (closure->error_data.error == GRPC_ERROR_NONE) {
@@ -895,12 +930,9 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
}
static bool contains_non_ok_status(grpc_metadata_batch *batch) {
- grpc_linked_mdelem *l;
- for (l = batch->list.head; l; l = l->next) {
- if (l->md->key == GRPC_MDSTR_GRPC_STATUS &&
- l->md != GRPC_MDELEM_GRPC_STATUS_0) {
- return true;
- }
+ if (batch->idx.named.grpc_status != NULL) {
+ return !grpc_mdelem_eq(batch->idx.named.grpc_status->md,
+ GRPC_MDELEM_GRPC_STATUS_0);
}
return false;
}
@@ -910,7 +942,9 @@ static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream *s) {
if (s->id != 0 && (!s->write_buffering ||
s->flow_controlled_buffer.length > t->write_buffer_size)) {
- grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
+ grpc_chttp2_become_writable(exec_ctx, t, s,
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
+ "op.send_message");
}
}
@@ -980,9 +1014,12 @@ static void log_metadata(const grpc_metadata_batch *md_batch, uint32_t id,
bool is_client, bool is_initial) {
for (grpc_linked_mdelem *md = md_batch->list.head; md != md_batch->list.tail;
md = md->next) {
+ char *key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
+ char *value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md));
gpr_log(GPR_INFO, "HTTP:%d:%s:%s: %s: %s", id, is_initial ? "HDR" : "TRL",
- is_client ? "CLI" : "SVR", grpc_mdstr_as_c_string(md->md->key),
- grpc_mdstr_as_c_string(md->md->value));
+ is_client ? "CLI" : "SVR", key, value);
+ gpr_free(key);
+ gpr_free(value);
}
}
@@ -1025,11 +1062,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
if (op->cancel_error != GRPC_ERROR_NONE) {
- grpc_chttp2_cancel_stream(exec_ctx, t, s, GRPC_ERROR_REF(op->cancel_error));
- }
-
- if (op->close_error != GRPC_ERROR_NONE) {
- close_from_api(exec_ctx, t, s, GRPC_ERROR_REF(op->close_error));
+ grpc_chttp2_cancel_stream(exec_ctx, t, s, op->cancel_error);
}
if (op->send_initial_metadata != NULL) {
@@ -1073,15 +1106,17 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
} else {
GPR_ASSERT(s->id != 0);
- grpc_chttp2_become_writable(exec_ctx, t, s, true,
+ grpc_chttp2_become_writable(exec_ctx, t, s,
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
"op.send_initial_metadata");
}
} else {
s->send_initial_metadata = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->send_initial_metadata_finished,
- GRPC_ERROR_CREATE(
- "Attempt to send initial metadata after stream was closed"),
+ GRPC_ERROR_CREATE_REFERENCING(
+ "Attempt to send initial metadata after stream was closed",
+ &s->write_closed_error, 1),
"send_initial_metadata_finished");
}
}
@@ -1093,7 +1128,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
if (s->write_closed) {
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->fetching_send_message_finished,
- GRPC_ERROR_CREATE("Attempt to send message after stream was closed"),
+ GRPC_ERROR_CREATE_REFERENCING(
+ "Attempt to send message after stream was closed",
+ &s->write_closed_error, 1),
"fetching_send_message_finished");
} else {
GPR_ASSERT(s->fetching_send_message == NULL);
@@ -1161,7 +1198,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
} else if (s->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
- grpc_chttp2_become_writable(exec_ctx, t, s, true,
+ grpc_chttp2_become_writable(exec_ctx, t, s,
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
"op.send_trailing_metadata");
}
}
@@ -1180,8 +1218,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
s->recv_message = op->recv_message;
if (s->id != 0 &&
(s->incoming_frames.head == NULL || s->incoming_frames.head->is_tail)) {
- incoming_byte_stream_update_flow_control(exec_ctx, t, s,
- t->stream_lookahead, 0);
+ incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0);
}
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
@@ -1225,51 +1262,59 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
GPR_TIMER_END("perform_stream_op", 0);
}
+static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_error *error) {
+ /* callback remaining pings: they're not allowed to call into the transpot,
+ and maybe they hold resources that need to be freed */
+ for (size_t i = 0; i < GRPC_CHTTP2_PING_TYPE_COUNT; i++) {
+ grpc_chttp2_ping_queue *pq = &t->ping_queues[i];
+ for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
+ grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
+ grpc_closure_list_sched(exec_ctx, &pq->lists[j]);
+ }
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_closure *on_recv) {
- grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p));
- p->next = &t->pings;
- p->prev = p->next->prev;
- p->prev->next = p->next->prev = p;
- p->id[0] = (uint8_t)((t->ping_counter >> 56) & 0xff);
- p->id[1] = (uint8_t)((t->ping_counter >> 48) & 0xff);
- p->id[2] = (uint8_t)((t->ping_counter >> 40) & 0xff);
- p->id[3] = (uint8_t)((t->ping_counter >> 32) & 0xff);
- p->id[4] = (uint8_t)((t->ping_counter >> 24) & 0xff);
- p->id[5] = (uint8_t)((t->ping_counter >> 16) & 0xff);
- p->id[6] = (uint8_t)((t->ping_counter >> 8) & 0xff);
- p->id[7] = (uint8_t)(t->ping_counter & 0xff);
- t->ping_counter++;
- p->on_recv = on_recv;
- grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id));
- grpc_chttp2_initiate_write(exec_ctx, t, true, "send_ping");
+ grpc_chttp2_ping_type ping_type,
+ grpc_closure *on_initiate, grpc_closure *on_ack) {
+ grpc_chttp2_ping_queue *pq = &t->ping_queues[ping_type];
+ grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
+ GRPC_ERROR_NONE);
+ if (grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
+ GRPC_ERROR_NONE)) {
+ grpc_chttp2_initiate_write(exec_ctx, t, false, "send_ping");
+ }
}
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- const uint8_t *opaque_8bytes) {
- grpc_chttp2_outstanding_ping *ping;
- for (ping = t->pings.next; ping != &t->pings; ping = ping->next) {
- if (0 == memcmp(opaque_8bytes, ping->id, 8)) {
- grpc_closure_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE);
- ping->next->prev = ping->prev;
- ping->prev->next = ping->next;
- gpr_free(ping);
- return;
- }
+ uint64_t id) {
+ grpc_chttp2_ping_queue *pq =
+ &t->ping_queues[id % GRPC_CHTTP2_PING_TYPE_COUNT];
+ if (pq->inflight_id != id) {
+ char *from = grpc_endpoint_get_peer(t->ep);
+ gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64, from, id);
+ gpr_free(from);
+ return;
+ }
+ grpc_closure_list_sched(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
+ if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
+ grpc_chttp2_initiate_write(exec_ctx, t, false, "continue_pings");
}
- char *msg = gpr_dump((const char *)opaque_8bytes, 8, GPR_DUMP_HEX);
- char *from = grpc_endpoint_get_peer(t->ep);
- gpr_log(GPR_DEBUG, "Unknown ping response from %s: %s", from, msg);
- gpr_free(from);
- gpr_free(msg);
}
static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_chttp2_error_code error, grpc_slice data) {
+ grpc_error *error) {
t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED;
- grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)error, data,
- &t->qbuf);
+ grpc_http2_error_code http_error;
+ const char *msg;
+ grpc_error_get_status(error, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL, &msg,
+ &http_error);
+ grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)http_error,
+ grpc_slice_from_copied_string(msg), &t->qbuf);
grpc_chttp2_initiate_write(exec_ctx, t, false, "goaway_sent");
+ GRPC_ERROR_UNREF(error);
}
static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
@@ -1285,10 +1330,8 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
op->on_connectivity_state_change);
}
- if (op->send_goaway) {
- send_goaway(exec_ctx, t,
- grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
- grpc_slice_ref_internal(*op->goaway_message));
+ if (op->goaway_error) {
+ send_goaway(exec_ctx, t, op->goaway_error);
}
if (op->set_accept_stream) {
@@ -1306,7 +1349,8 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
}
if (op->send_ping) {
- send_ping_locked(exec_ctx, t, op->send_ping);
+ send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, NULL,
+ op->send_ping);
}
if (close_transport != GRPC_ERROR_NONE) {
@@ -1348,8 +1392,8 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
}
}
- grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[0],
- s->recv_initial_metadata);
+ grpc_chttp2_incoming_metadata_buffer_publish(
+ exec_ctx, &s->metadata_buffer[0], s->recv_initial_metadata);
null_then_run_closure(exec_ctx, &s->recv_initial_metadata_ready,
GRPC_ERROR_NONE);
}
@@ -1392,8 +1436,8 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
}
if (s->all_incoming_byte_streams_finished &&
s->recv_trailing_metadata_finished != NULL) {
- grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
- s->recv_trailing_metadata);
+ grpc_chttp2_incoming_metadata_buffer_publish(
+ exec_ctx, &s->metadata_buffer[1], s->recv_trailing_metadata);
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE,
"recv_trailing_metadata_finished");
@@ -1441,70 +1485,37 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
maybe_start_some_streams(exec_ctx, t);
}
-static void status_codes_from_error(grpc_error *error, gpr_timespec deadline,
- grpc_chttp2_error_code *http2_error,
- grpc_status_code *grpc_status) {
- intptr_t ip_http;
- intptr_t ip_grpc;
- bool have_http =
- grpc_error_get_int(error, GRPC_ERROR_INT_HTTP2_ERROR, &ip_http);
- bool have_grpc =
- grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &ip_grpc);
- if (have_http) {
- *http2_error = (grpc_chttp2_error_code)ip_http;
- } else if (have_grpc) {
- *http2_error =
- grpc_chttp2_grpc_status_to_http2_error((grpc_status_code)ip_grpc);
- } else {
- *http2_error = GRPC_CHTTP2_INTERNAL_ERROR;
- }
- if (have_grpc) {
- *grpc_status = (grpc_status_code)ip_grpc;
- } else if (have_http) {
- *grpc_status = grpc_chttp2_http2_error_to_grpc_status(
- (grpc_chttp2_error_code)ip_http, deadline);
- } else {
- *grpc_status = GRPC_STATUS_INTERNAL;
- }
-}
-
void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_error *due_to_error) {
- if (!s->read_closed || !s->write_closed) {
- grpc_status_code grpc_status;
- grpc_chttp2_error_code http_error;
- status_codes_from_error(due_to_error, s->deadline, &http_error,
- &grpc_status);
+ if (!t->is_client && !s->sent_trailing_metadata &&
+ grpc_error_has_clear_grpc_status(due_to_error)) {
+ close_from_api(exec_ctx, t, s, due_to_error);
+ return;
+ }
+ if (!s->read_closed || !s->write_closed) {
if (s->id != 0) {
+ grpc_http2_error_code http_error;
+ grpc_error_get_status(due_to_error, s->deadline, NULL, NULL, &http_error);
grpc_slice_buffer_add(
&t->qbuf, grpc_chttp2_rst_stream_create(s->id, (uint32_t)http_error,
&s->stats.outgoing));
grpc_chttp2_initiate_write(exec_ctx, t, false, "rst_stream");
}
-
- const char *msg =
- grpc_error_get_str(due_to_error, GRPC_ERROR_STR_GRPC_MESSAGE);
- bool free_msg = false;
- if (msg == NULL) {
- free_msg = true;
- msg = grpc_error_string(due_to_error);
- }
- grpc_slice msg_slice = grpc_slice_from_copied_string(msg);
- grpc_chttp2_fake_status(exec_ctx, t, s, grpc_status, &msg_slice);
- if (free_msg) grpc_error_free_string(msg);
}
if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) {
s->seen_error = true;
- grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, 1, 1, due_to_error);
}
void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, grpc_status_code status,
- grpc_slice *slice) {
+ grpc_chttp2_stream *s, grpc_error *error) {
+ grpc_status_code status;
+ const char *msg;
+ grpc_error_get_status(error, s->deadline, &status, &msg, NULL);
+
if (status != GRPC_STATUS_OK) {
s->seen_error = true;
}
@@ -1518,24 +1529,21 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
s->recv_trailing_metadata_finished != NULL) {
char status_string[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(status, status_string);
- grpc_chttp2_incoming_metadata_buffer_add(
- &s->metadata_buffer[1], grpc_mdelem_from_metadata_strings(
- exec_ctx, GRPC_MDSTR_GRPC_STATUS,
- grpc_mdstr_from_string(status_string)));
- if (slice) {
- grpc_chttp2_incoming_metadata_buffer_add(
- &s->metadata_buffer[1],
- grpc_mdelem_from_metadata_strings(
- exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
- grpc_mdstr_from_slice(exec_ctx,
- grpc_slice_ref_internal(*slice))));
+ grpc_chttp2_incoming_metadata_buffer_replace_or_add(
+ exec_ctx, &s->metadata_buffer[1],
+ grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_STATUS,
+ grpc_slice_from_copied_string(status_string)));
+ if (msg != NULL) {
+ grpc_chttp2_incoming_metadata_buffer_replace_or_add(
+ exec_ctx, &s->metadata_buffer[1],
+ grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
+ grpc_slice_from_copied_string(msg)));
}
s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
- if (slice) {
- grpc_slice_unref_internal(exec_ctx, *slice);
- }
+
+ GRPC_ERROR_UNREF(error);
}
static void add_error(grpc_error *error, grpc_error **refs, size_t *nrefs) {
@@ -1601,36 +1609,48 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
int close_writes, grpc_error *error) {
if (s->read_closed && s->write_closed) {
/* already closed */
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
GRPC_ERROR_UNREF(error);
return;
}
+ bool closed_read = false;
+ bool became_closed = false;
if (close_reads && !s->read_closed) {
s->read_closed_error = GRPC_ERROR_REF(error);
s->read_closed = true;
- for (int i = 0; i < 2; i++) {
- if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) {
- s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE;
- }
- }
- decrement_active_streams_locked(exec_ctx, t, s);
- grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
- grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
- grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
+ closed_read = true;
}
if (close_writes && !s->write_closed) {
s->write_closed_error = GRPC_ERROR_REF(error);
s->write_closed = true;
grpc_chttp2_fail_pending_writes(exec_ctx, t, s, GRPC_ERROR_REF(error));
- grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
if (s->read_closed && s->write_closed) {
+ became_closed = true;
+ grpc_error *overall_error =
+ removal_error(GRPC_ERROR_REF(error), s, "Stream removed");
if (s->id != 0) {
- remove_stream(exec_ctx, t, s->id,
- removal_error(GRPC_ERROR_REF(error), s, "Stream removed"));
+ remove_stream(exec_ctx, t, s->id, GRPC_ERROR_REF(overall_error));
} else {
/* Purge streams waiting on concurrency still waiting for id assignment */
grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
}
+ if (overall_error != GRPC_ERROR_NONE) {
+ grpc_chttp2_fake_status(exec_ctx, t, s, overall_error);
+ }
+ }
+ if (closed_read) {
+ for (int i = 0; i < 2; i++) {
+ if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) {
+ s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE;
+ }
+ }
+ decrement_active_streams_locked(exec_ctx, t, s);
+ grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
+ grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
+ }
+ if (became_closed) {
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2");
}
GRPC_ERROR_UNREF(error);
@@ -1644,112 +1664,92 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
uint8_t *p;
uint32_t len = 0;
grpc_status_code grpc_status;
- grpc_chttp2_error_code http_error;
- status_codes_from_error(error, s->deadline, &http_error, &grpc_status);
+ const char *msg;
+ grpc_error_get_status(error, s->deadline, &grpc_status, &msg, NULL);
GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100);
- if (s->id != 0 && !t->is_client) {
- /* Hand roll a header block.
- This is unnecessarily ugly - at some point we should find a more
- elegant
- solution.
- It's complicated by the fact that our send machinery would be dead by
- the
- time we got around to sending this, so instead we ignore HPACK
- compression
- and just write the uncompressed bytes onto the wire. */
- status_hdr = grpc_slice_malloc(15 + (grpc_status >= 10));
- p = GRPC_SLICE_START_PTR(status_hdr);
- *p++ = 0x40; /* literal header */
- *p++ = 11; /* len(grpc-status) */
+ /* Hand roll a header block.
+ This is unnecessarily ugly - at some point we should find a more
+ elegant solution.
+ It's complicated by the fact that our send machinery would be dead by
+ the time we got around to sending this, so instead we ignore HPACK
+ compression and just write the uncompressed bytes onto the wire. */
+ status_hdr = grpc_slice_malloc(15 + (grpc_status >= 10));
+ p = GRPC_SLICE_START_PTR(status_hdr);
+ *p++ = 0x00; /* literal header, not indexed */
+ *p++ = 11; /* len(grpc-status) */
+ *p++ = 'g';
+ *p++ = 'r';
+ *p++ = 'p';
+ *p++ = 'c';
+ *p++ = '-';
+ *p++ = 's';
+ *p++ = 't';
+ *p++ = 'a';
+ *p++ = 't';
+ *p++ = 'u';
+ *p++ = 's';
+ if (grpc_status < 10) {
+ *p++ = 1;
+ *p++ = (uint8_t)('0' + grpc_status);
+ } else {
+ *p++ = 2;
+ *p++ = (uint8_t)('0' + (grpc_status / 10));
+ *p++ = (uint8_t)('0' + (grpc_status % 10));
+ }
+ GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr));
+ len += (uint32_t)GRPC_SLICE_LENGTH(status_hdr);
+
+ if (msg != NULL) {
+ size_t msg_len = strlen(msg);
+ GPR_ASSERT(msg_len <= UINT32_MAX);
+ uint32_t msg_len_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)msg_len, 0);
+ message_pfx = grpc_slice_malloc(14 + msg_len_len);
+ p = GRPC_SLICE_START_PTR(message_pfx);
+ *p++ = 0x00; /* literal header, not indexed */
+ *p++ = 12; /* len(grpc-message) */
*p++ = 'g';
*p++ = 'r';
*p++ = 'p';
*p++ = 'c';
*p++ = '-';
+ *p++ = 'm';
+ *p++ = 'e';
*p++ = 's';
- *p++ = 't';
- *p++ = 'a';
- *p++ = 't';
- *p++ = 'u';
*p++ = 's';
- if (grpc_status < 10) {
- *p++ = 1;
- *p++ = (uint8_t)('0' + grpc_status);
- } else {
- *p++ = 2;
- *p++ = (uint8_t)('0' + (grpc_status / 10));
- *p++ = (uint8_t)('0' + (grpc_status % 10));
- }
- GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr));
- len += (uint32_t)GRPC_SLICE_LENGTH(status_hdr);
-
- const char *optional_message =
- grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE);
-
- if (optional_message != NULL) {
- size_t msg_len = strlen(optional_message);
- GPR_ASSERT(msg_len <= UINT32_MAX);
- uint32_t msg_len_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)msg_len, 0);
- message_pfx = grpc_slice_malloc(14 + msg_len_len);
- p = GRPC_SLICE_START_PTR(message_pfx);
- *p++ = 0x40;
- *p++ = 12; /* len(grpc-message) */
- *p++ = 'g';
- *p++ = 'r';
- *p++ = 'p';
- *p++ = 'c';
- *p++ = '-';
- *p++ = 'm';
- *p++ = 'e';
- *p++ = 's';
- *p++ = 's';
- *p++ = 'a';
- *p++ = 'g';
- *p++ = 'e';
- GRPC_CHTTP2_WRITE_VARINT((uint32_t)msg_len, 0, 0, p,
- (uint32_t)msg_len_len);
- p += msg_len_len;
- GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
- len += (uint32_t)GRPC_SLICE_LENGTH(message_pfx);
- len += (uint32_t)msg_len;
- }
-
- hdr = grpc_slice_malloc(9);
- p = GRPC_SLICE_START_PTR(hdr);
- *p++ = (uint8_t)(len >> 16);
- *p++ = (uint8_t)(len >> 8);
- *p++ = (uint8_t)(len);
- *p++ = GRPC_CHTTP2_FRAME_HEADER;
- *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
- *p++ = (uint8_t)(s->id >> 24);
- *p++ = (uint8_t)(s->id >> 16);
- *p++ = (uint8_t)(s->id >> 8);
- *p++ = (uint8_t)(s->id);
- GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
-
- grpc_slice_buffer_add(&t->qbuf, hdr);
- grpc_slice_buffer_add(&t->qbuf, status_hdr);
- if (optional_message) {
- grpc_slice_buffer_add(&t->qbuf, message_pfx);
- grpc_slice_buffer_add(&t->qbuf,
- grpc_slice_from_copied_string(optional_message));
- }
- grpc_slice_buffer_add(
- &t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_CHTTP2_NO_ERROR,
- &s->stats.outgoing));
- }
-
- const char *msg = grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE);
- bool free_msg = false;
- if (msg == NULL) {
- free_msg = true;
- msg = grpc_error_string(error);
- }
- grpc_slice msg_slice = grpc_slice_from_copied_string(msg);
- grpc_chttp2_fake_status(exec_ctx, t, s, grpc_status, &msg_slice);
- if (free_msg) grpc_error_free_string(msg);
+ *p++ = 'a';
+ *p++ = 'g';
+ *p++ = 'e';
+ GRPC_CHTTP2_WRITE_VARINT((uint32_t)msg_len, 0, 0, p, (uint32_t)msg_len_len);
+ p += msg_len_len;
+ GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
+ len += (uint32_t)GRPC_SLICE_LENGTH(message_pfx);
+ len += (uint32_t)msg_len;
+ }
+
+ hdr = grpc_slice_malloc(9);
+ p = GRPC_SLICE_START_PTR(hdr);
+ *p++ = (uint8_t)(len >> 16);
+ *p++ = (uint8_t)(len >> 8);
+ *p++ = (uint8_t)(len);
+ *p++ = GRPC_CHTTP2_FRAME_HEADER;
+ *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
+ *p++ = (uint8_t)(s->id >> 24);
+ *p++ = (uint8_t)(s->id >> 16);
+ *p++ = (uint8_t)(s->id >> 8);
+ *p++ = (uint8_t)(s->id);
+ GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
+
+ grpc_slice_buffer_add(&t->qbuf, hdr);
+ grpc_slice_buffer_add(&t->qbuf, status_hdr);
+ if (msg != NULL) {
+ grpc_slice_buffer_add(&t->qbuf, message_pfx);
+ grpc_slice_buffer_add(&t->qbuf, grpc_slice_from_copied_string(msg));
+ }
+ grpc_slice_buffer_add(
+ &t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_HTTP2_NO_ERROR,
+ &s->stats.outgoing));
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, 1, 1, error);
grpc_chttp2_initiate_write(exec_ctx, t, false, "close_from_api");
@@ -1775,34 +1775,28 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_ERROR_UNREF(error);
}
-/** update window from a settings change */
-typedef struct {
- grpc_chttp2_transport *t;
- grpc_exec_ctx *exec_ctx;
-} update_global_window_args;
+/*******************************************************************************
+ * INPUT PROCESSING - PARSING
+ */
-static void update_global_window(void *args, uint32_t id, void *stream) {
- update_global_window_args *a = args;
- grpc_chttp2_transport *t = a->t;
- grpc_chttp2_stream *s = stream;
- int was_zero;
- int is_zero;
- int64_t initial_window_update = t->initial_window_update;
-
- if (initial_window_update > 0) {
- was_zero = s->outgoing_window <= 0;
- GRPC_CHTTP2_FLOW_CREDIT_STREAM("settings", t, s, outgoing_window,
- initial_window_update);
- is_zero = s->outgoing_window <= 0;
-
- if (was_zero && !is_zero) {
- grpc_chttp2_become_writable(a->exec_ctx, t, s, true,
- "update_global_window");
- }
+static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ double bdp_dbl) {
+ uint32_t bdp;
+ if (bdp_dbl <= 0) {
+ bdp = 0;
+ } else if (bdp_dbl > UINT32_MAX) {
+ bdp = UINT32_MAX;
} else {
- GRPC_CHTTP2_FLOW_DEBIT_STREAM("settings", t, s, outgoing_window,
- -initial_window_update);
+ bdp = (uint32_t)(bdp_dbl);
}
+ int64_t delta =
+ (int64_t)bdp -
+ (int64_t)t->settings[GRPC_LOCAL_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ if (delta == 0 || (bdp != 0 && delta > -1024 && delta < 1024)) {
+ return;
+ }
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, bdp);
}
/*******************************************************************************
@@ -1827,8 +1821,10 @@ static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
if (parse_error == GRPC_ERROR_NONE &&
(parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) {
error = grpc_error_set_int(
- GRPC_ERROR_CREATE("Trying to connect an http1.x server"),
- GRPC_ERROR_INT_HTTP_STATUS, response.status);
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE("Trying to connect an http1.x server"),
+ GRPC_ERROR_INT_HTTP_STATUS, response.status),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
}
GRPC_ERROR_UNREF(parse_error);
@@ -1842,6 +1838,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_BEGIN("reading_action_locked", 0);
grpc_chttp2_transport *t = tp;
+ bool need_bdp_ping = false;
GRPC_ERROR_REF(error);
@@ -1859,9 +1856,14 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
GRPC_ERROR_NONE};
for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
+ if (grpc_bdp_estimator_add_incoming_bytes(
+ &t->bdp_estimator,
+ (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]))) {
+ need_bdp_ping = true;
+ }
errors[1] =
grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]);
- };
+ }
if (errors[1] != GRPC_ERROR_NONE) {
errors[2] = try_http_parsing(exec_ctx, t);
GRPC_ERROR_UNREF(error);
@@ -1875,21 +1877,16 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_BEGIN("post_parse_locked", 0);
if (t->initial_window_update != 0) {
- update_global_window_args args = {t, exec_ctx};
- grpc_chttp2_stream_map_for_each(&t->stream_map, update_global_window,
- &args);
+ if (t->initial_window_update > 0) {
+ grpc_chttp2_stream *s;
+ while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
+ grpc_chttp2_become_writable(
+ exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED,
+ "unstalled");
+ }
+ }
t->initial_window_update = 0;
}
- /* handle higher level things */
- if (t->incoming_window < t->connection_window_target * 3 / 4) {
- int64_t announce_bytes = t->connection_window_target - t->incoming_window;
- GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", t, announce_incoming_window,
- announce_bytes);
- GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", t, incoming_window,
- announce_bytes);
- grpc_chttp2_initiate_write(exec_ctx, t, false, "global incoming window");
- }
-
GPR_TIMER_END("post_parse_locked", 0);
}
@@ -1910,6 +1907,35 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (keep_reading) {
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer,
&t->read_action_locked);
+
+ if (need_bdp_ping) {
+ GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
+ grpc_bdp_estimator_schedule_ping(&t->bdp_estimator);
+ send_ping_locked(exec_ctx, t,
+ GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE,
+ &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked);
+ }
+
+ int64_t estimate = -1;
+ if (grpc_bdp_estimator_get_estimate(&t->bdp_estimator, &estimate)) {
+ double target = 1 + log2((double)estimate);
+ double memory_pressure = grpc_resource_quota_get_memory_pressure(
+ grpc_resource_user_quota(grpc_endpoint_get_resource_user(t->ep)));
+ if (memory_pressure > 0.8) {
+ target *= 1 - GPR_MIN(1, (memory_pressure - 0.8) / 0.1);
+ }
+ double bdp_error = target - grpc_pid_controller_last(&t->pid_controller);
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ gpr_timespec dt_timespec = gpr_time_sub(now, t->last_pid_update);
+ double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9;
+ if (dt > 0.1) {
+ dt = 0.1;
+ }
+ double log2_bdp_guess =
+ grpc_pid_controller_update(&t->pid_controller, bdp_error, dt);
+ update_bdp(exec_ctx, t, pow(2, log2_bdp_guess));
+ t->last_pid_update = now;
+ }
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
} else {
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action");
@@ -1922,6 +1948,26 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_END("reading_action_locked", 0);
}
+static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = tp;
+ if (grpc_http_trace) {
+ gpr_log(GPR_DEBUG, "%s: Start BDP ping", t->peer_string);
+ }
+ grpc_bdp_estimator_start_ping(&t->bdp_estimator);
+}
+
+static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = tp;
+ if (grpc_http_trace) {
+ gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string);
+ }
+ grpc_bdp_estimator_complete_ping(&t->bdp_estimator);
+
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
+}
+
/*******************************************************************************
* CALLBACK LOOP
*/
@@ -1972,10 +2018,12 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
size_t max_size_hint,
size_t have_already) {
uint32_t max_recv_bytes;
+ uint32_t initial_window_size =
+ t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
/* clamp max recv hint to an allowable size */
- if (max_size_hint >= UINT32_MAX - t->stream_lookahead) {
- max_recv_bytes = UINT32_MAX - t->stream_lookahead;
+ if (max_size_hint >= UINT32_MAX - initial_window_size) {
+ max_recv_bytes = UINT32_MAX - initial_window_size;
} else {
max_recv_bytes = (uint32_t)max_size_hint;
}
@@ -1988,20 +2036,26 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
}
/* add some small lookahead to keep pipelines flowing */
- GPR_ASSERT(max_recv_bytes <= UINT32_MAX - t->stream_lookahead);
- max_recv_bytes += t->stream_lookahead;
- if (s->max_recv_bytes < max_recv_bytes) {
- uint32_t add_max_recv_bytes = max_recv_bytes - s->max_recv_bytes;
- bool new_window_write_is_covered_by_poller =
- s->max_recv_bytes < have_already;
- GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, max_recv_bytes,
- add_max_recv_bytes);
- GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, incoming_window,
+ GPR_ASSERT(max_recv_bytes <= UINT32_MAX - initial_window_size);
+ if (s->incoming_window_delta < max_recv_bytes && !s->read_closed) {
+ uint32_t add_max_recv_bytes =
+ (uint32_t)(max_recv_bytes - s->incoming_window_delta);
+ grpc_chttp2_stream_write_type write_type =
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED;
+ if (s->incoming_window_delta + initial_window_size <
+ (int64_t)have_already) {
+ write_type = GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED;
+ }
+ GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, incoming_window_delta,
add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, announce_window,
add_max_recv_bytes);
- grpc_chttp2_become_writable(exec_ctx, t, s,
- new_window_write_is_covered_by_poller,
+ if ((int64_t)s->incoming_window_delta + (int64_t)initial_window_size -
+ (int64_t)s->announce_window >
+ (int64_t)initial_window_size / 2) {
+ write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK;
+ }
+ grpc_chttp2_become_writable(exec_ctx, t, s, write_type,
"read_incoming_stream");
}
}
@@ -2089,6 +2143,8 @@ static void incoming_byte_stream_publish_error(
grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error));
bs->on_next = NULL;
GRPC_ERROR_UNREF(bs->error);
+ grpc_chttp2_cancel_stream(exec_ctx, bs->transport, bs->stream,
+ GRPC_ERROR_REF(error));
bs->error = error;
}
@@ -2197,8 +2253,10 @@ static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
gpr_log(GPR_DEBUG, "HTTP2: %s - send goaway to free memory",
t->peer_string);
}
- send_goaway(exec_ctx, t, GRPC_CHTTP2_ENHANCE_YOUR_CALM,
- grpc_slice_from_static_string("Buffers full"));
+ send_goaway(exec_ctx, t,
+ grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
+ GRPC_ERROR_INT_HTTP2_ERROR,
+ GRPC_HTTP2_ENHANCE_YOUR_CALM));
} else if (error == GRPC_ERROR_NONE && grpc_resource_quota_trace) {
gpr_log(GPR_DEBUG,
"HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
@@ -2227,7 +2285,7 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_chttp2_cancel_stream(
exec_ctx, t, s, grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
GRPC_ERROR_INT_HTTP2_ERROR,
- GRPC_CHTTP2_ENHANCE_YOUR_CALM));
+ GRPC_HTTP2_ENHANCE_YOUR_CALM));
if (n > 1) {
/* Since we cancel one stream per destructive reclamation, if
there are more streams left, we can immediately post a new
diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c
index 7de5f6362d..f487533c41 100644
--- a/src/core/ext/transport/chttp2/transport/frame_ping.c
+++ b/src/core/ext/transport/chttp2/transport/frame_ping.c
@@ -40,7 +40,7 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint8_t *opaque_8bytes) {
+grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint64_t opaque_8bytes) {
grpc_slice slice = grpc_slice_malloc(9 + 8);
uint8_t *p = GRPC_SLICE_START_PTR(slice);
@@ -53,7 +53,14 @@ grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint8_t *opaque_8bytes) {
*p++ = 0;
*p++ = 0;
*p++ = 0;
- memcpy(p, opaque_8bytes, 8);
+ *p++ = (uint8_t)(opaque_8bytes >> 56);
+ *p++ = (uint8_t)(opaque_8bytes >> 48);
+ *p++ = (uint8_t)(opaque_8bytes >> 40);
+ *p++ = (uint8_t)(opaque_8bytes >> 32);
+ *p++ = (uint8_t)(opaque_8bytes >> 24);
+ *p++ = (uint8_t)(opaque_8bytes >> 16);
+ *p++ = (uint8_t)(opaque_8bytes >> 8);
+ *p++ = (uint8_t)(opaque_8bytes);
return slice;
}
@@ -70,6 +77,7 @@ grpc_error *grpc_chttp2_ping_parser_begin_frame(grpc_chttp2_ping_parser *parser,
}
parser->byte = 0;
parser->is_ack = flags;
+ parser->opaque_8bytes = 0;
return GRPC_ERROR_NONE;
}
@@ -83,7 +91,7 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_ping_parser *p = parser;
while (p->byte != 8 && cur != end) {
- p->opaque_8bytes[p->byte] = *cur;
+ p->opaque_8bytes |= (((uint64_t)*cur) << (8 * p->byte));
cur++;
p->byte++;
}
@@ -93,8 +101,12 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
if (p->is_ack) {
grpc_chttp2_ack_ping(exec_ctx, t, p->opaque_8bytes);
} else {
- grpc_slice_buffer_add(&t->qbuf,
- grpc_chttp2_ping_create(1, p->opaque_8bytes));
+ if (t->ping_ack_count == t->ping_ack_capacity) {
+ t->ping_ack_capacity = GPR_MAX(t->ping_ack_capacity * 3 / 2, 3);
+ t->ping_acks = gpr_realloc(
+ t->ping_acks, t->ping_ack_capacity * sizeof(*t->ping_acks));
+ }
+ t->ping_acks[t->ping_ack_count++] = p->opaque_8bytes;
grpc_chttp2_initiate_write(exec_ctx, t, false, "ping response");
}
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.h b/src/core/ext/transport/chttp2/transport/frame_ping.h
index b9889e2d11..ef642465d7 100644
--- a/src/core/ext/transport/chttp2/transport/frame_ping.h
+++ b/src/core/ext/transport/chttp2/transport/frame_ping.h
@@ -41,10 +41,10 @@
typedef struct {
uint8_t byte;
uint8_t is_ack;
- uint8_t opaque_8bytes[8];
+ uint64_t opaque_8bytes;
} grpc_chttp2_ping_parser;
-grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint8_t *opaque_8bytes);
+grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint64_t opaque_8bytes);
grpc_error *grpc_chttp2_ping_parser_begin_frame(grpc_chttp2_ping_parser *parser,
uint32_t length, uint8_t flags);
diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c
index 20043f5fbf..7d5beed09d 100644
--- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c
+++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c
@@ -39,8 +39,7 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/transport/chttp2/transport/frame.h"
-#include "src/core/ext/transport/chttp2/transport/http2_errors.h"
-#include "src/core/ext/transport/chttp2/transport/status_conversion.h"
+#include "src/core/lib/transport/http2_errors.h"
grpc_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code,
grpc_transport_one_way_stats *stats) {
@@ -109,17 +108,9 @@ grpc_error *grpc_chttp2_rst_stream_parser_parse(grpc_exec_ctx *exec_ctx,
(((uint32_t)p->reason_bytes[2]) << 8) |
(((uint32_t)p->reason_bytes[3]));
grpc_error *error = GRPC_ERROR_NONE;
- if (reason != GRPC_CHTTP2_NO_ERROR || s->header_frames_received < 2) {
+ if (reason != GRPC_HTTP2_NO_ERROR || s->header_frames_received < 2) {
error = grpc_error_set_int(GRPC_ERROR_CREATE("RST_STREAM"),
GRPC_ERROR_INT_HTTP2_ERROR, (intptr_t)reason);
- grpc_status_code status_code = grpc_chttp2_http2_error_to_grpc_status(
- (grpc_chttp2_error_code)reason, s->deadline);
- char *status_details;
- gpr_asprintf(&status_details, "Received RST_STREAM with error code %d",
- reason);
- grpc_slice slice_details = grpc_slice_from_copied_string(status_details);
- gpr_free(status_details);
- grpc_chttp2_fake_status(exec_ctx, t, s, status_code, &slice_details);
}
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, true, error);
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.c b/src/core/ext/transport/chttp2/transport/frame_settings.c
index 98facae87f..82290e34cd 100644
--- a/src/core/ext/transport/chttp2/transport/frame_settings.c
+++ b/src/core/ext/transport/chttp2/transport/frame_settings.c
@@ -43,8 +43,8 @@
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/ext/transport/chttp2/transport/frame.h"
-#include "src/core/ext/transport/chttp2/transport/http2_errors.h"
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/transport/http2_errors.h"
#define MAX_MAX_HEADER_LIST_SIZE (1024 * 1024 * 1024)
@@ -52,21 +52,21 @@
const grpc_chttp2_setting_parameters
grpc_chttp2_settings_parameters[GRPC_CHTTP2_NUM_SETTINGS] = {
{NULL, 0, 0, 0, GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE,
- GRPC_CHTTP2_PROTOCOL_ERROR},
+ GRPC_HTTP2_PROTOCOL_ERROR},
{"HEADER_TABLE_SIZE", 4096, 0, 0xffffffff,
- GRPC_CHTTP2_CLAMP_INVALID_VALUE, GRPC_CHTTP2_PROTOCOL_ERROR},
+ GRPC_CHTTP2_CLAMP_INVALID_VALUE, GRPC_HTTP2_PROTOCOL_ERROR},
{"ENABLE_PUSH", 1, 0, 1, GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE,
- GRPC_CHTTP2_PROTOCOL_ERROR},
+ GRPC_HTTP2_PROTOCOL_ERROR},
{"MAX_CONCURRENT_STREAMS", 0xffffffffu, 0, 0xffffffffu,
- GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE, GRPC_CHTTP2_PROTOCOL_ERROR},
+ GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE, GRPC_HTTP2_PROTOCOL_ERROR},
{"INITIAL_WINDOW_SIZE", 65535, 0, 0x7fffffffu,
GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE,
- GRPC_CHTTP2_FLOW_CONTROL_ERROR},
+ GRPC_HTTP2_FLOW_CONTROL_ERROR},
{"MAX_FRAME_SIZE", 16384, 16384, 16777215,
- GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE, GRPC_CHTTP2_PROTOCOL_ERROR},
+ GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE, GRPC_HTTP2_PROTOCOL_ERROR},
{"MAX_HEADER_LIST_SIZE", MAX_MAX_HEADER_LIST_SIZE, 0,
MAX_MAX_HEADER_LIST_SIZE, GRPC_CHTTP2_CLAMP_INVALID_VALUE,
- GRPC_CHTTP2_PROTOCOL_ERROR},
+ GRPC_HTTP2_PROTOCOL_ERROR},
};
static uint8_t *fill_header(uint8_t *out, uint32_t length, uint8_t flags) {
@@ -236,7 +236,7 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p,
}
if (parser->id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE &&
parser->incoming_settings[parser->id] != parser->value) {
- t->initial_window_update =
+ t->initial_window_update +=
(int64_t)parser->value - parser->incoming_settings[parser->id];
if (grpc_http_trace) {
gpr_log(GPR_DEBUG, "adding %d for initial_window change",
@@ -245,8 +245,9 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p,
}
parser->incoming_settings[parser->id] = parser->value;
if (grpc_http_trace) {
- gpr_log(GPR_DEBUG, "CHTTP2:%s: got setting %d = %d",
- t->is_client ? "CLI" : "SVR", parser->id, parser->value);
+ gpr_log(GPR_DEBUG, "CHTTP2:%s:%s: got setting %d = %d",
+ t->is_client ? "CLI" : "SVR", t->peer_string, parser->id,
+ parser->value);
}
} else if (grpc_http_trace) {
gpr_log(GPR_ERROR, "CHTTP2: Ignoring unknown setting %d (value %d)",
diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c
index 31a31c2871..8fa0bb471a 100644
--- a/src/core/ext/transport/chttp2/transport/frame_window_update.c
+++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c
@@ -110,13 +110,12 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
if (t->incoming_stream_id != 0) {
if (s != NULL) {
- bool was_zero = s->outgoing_window <= 0;
- GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", t, s, outgoing_window,
+ GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", t, s, outgoing_window_delta,
received_update);
- bool is_zero = s->outgoing_window <= 0;
- if (was_zero && !is_zero) {
- grpc_chttp2_become_writable(exec_ctx, t, s, false,
- "stream.read_flow_control");
+ if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) {
+ grpc_chttp2_become_writable(
+ exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED,
+ "stream.read_flow_control");
}
}
} else {
diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c
index 49a8326f62..63df8e135f 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_encoder.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c
@@ -49,6 +49,7 @@
#include "src/core/ext/transport/chttp2/transport/hpack_table.h"
#include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/transport/metadata.h"
#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/timeout_encoding.h"
@@ -64,6 +65,10 @@
/* don't consider adding anything bigger than this to the hpack table */
#define MAX_DECODER_SPACE_USAGE 512
+static grpc_slice_refcount terminal_slice_refcount = {NULL, NULL};
+static const grpc_slice terminal_slice = {&terminal_slice_refcount,
+ .data.refcounted = {0, 0}};
+
extern int grpc_http_trace;
typedef struct {
@@ -185,9 +190,12 @@ static void evict_entry(grpc_chttp2_hpack_compressor *c) {
/* add an element to the decoder table */
static void add_elem(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c,
- grpc_mdelem *elem) {
- uint32_t key_hash = elem->key->hash;
- uint32_t elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash);
+ grpc_mdelem elem) {
+ GPR_ASSERT(GRPC_MDELEM_IS_INTERNED(elem));
+
+ uint32_t key_hash = grpc_slice_hash(GRPC_MDKEY(elem));
+ uint32_t value_hash = grpc_slice_hash(GRPC_MDVALUE(elem));
+ uint32_t elem_hash = GRPC_MDSTR_KV_HASH(key_hash, value_hash);
uint32_t new_index = c->tail_remote_index + c->table_elems + 1;
size_t elem_size = grpc_mdelem_get_size_in_hpack_table(elem);
@@ -212,17 +220,18 @@ static void add_elem(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c,
c->table_elems++;
/* Store this element into {entries,indices}_elem */
- if (c->entries_elems[HASH_FRAGMENT_2(elem_hash)] == elem) {
+ if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_2(elem_hash)], elem)) {
/* already there: update with new index */
c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
- } else if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == elem) {
+ } else if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_3(elem_hash)],
+ elem)) {
/* already there (cuckoo): update with new index */
c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
- } else if (c->entries_elems[HASH_FRAGMENT_2(elem_hash)] == NULL) {
+ } else if (GRPC_MDISNULL(c->entries_elems[HASH_FRAGMENT_2(elem_hash)])) {
/* not there, but a free element: add */
c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = GRPC_MDELEM_REF(elem);
c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
- } else if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == NULL) {
+ } else if (GRPC_MDISNULL(c->entries_elems[HASH_FRAGMENT_3(elem_hash)])) {
/* not there (cuckoo), but a free element: add */
c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = GRPC_MDELEM_REF(elem);
c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
@@ -241,24 +250,34 @@ static void add_elem(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c,
/* do exactly the same for the key (so we can find by that again too) */
- if (c->entries_keys[HASH_FRAGMENT_2(key_hash)] == elem->key) {
+ if (grpc_slice_eq(c->entries_keys[HASH_FRAGMENT_2(key_hash)],
+ GRPC_MDKEY(elem))) {
c->indices_keys[HASH_FRAGMENT_2(key_hash)] = new_index;
- } else if (c->entries_keys[HASH_FRAGMENT_3(key_hash)] == elem->key) {
+ } else if (grpc_slice_eq(c->entries_keys[HASH_FRAGMENT_3(key_hash)],
+ GRPC_MDKEY(elem))) {
c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index;
- } else if (c->entries_keys[HASH_FRAGMENT_2(key_hash)] == NULL) {
- c->entries_keys[HASH_FRAGMENT_2(key_hash)] = GRPC_MDSTR_REF(elem->key);
+ } else if (c->entries_keys[HASH_FRAGMENT_2(key_hash)].refcount ==
+ &terminal_slice_refcount) {
+ c->entries_keys[HASH_FRAGMENT_2(key_hash)] =
+ grpc_slice_ref_internal(GRPC_MDKEY(elem));
c->indices_keys[HASH_FRAGMENT_2(key_hash)] = new_index;
- } else if (c->entries_keys[HASH_FRAGMENT_3(key_hash)] == NULL) {
- c->entries_keys[HASH_FRAGMENT_3(key_hash)] = GRPC_MDSTR_REF(elem->key);
+ } else if (c->entries_keys[HASH_FRAGMENT_3(key_hash)].refcount ==
+ &terminal_slice_refcount) {
+ c->entries_keys[HASH_FRAGMENT_3(key_hash)] =
+ grpc_slice_ref_internal(GRPC_MDKEY(elem));
c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index;
} else if (c->indices_keys[HASH_FRAGMENT_2(key_hash)] <
c->indices_keys[HASH_FRAGMENT_3(key_hash)]) {
- GRPC_MDSTR_UNREF(exec_ctx, c->entries_keys[HASH_FRAGMENT_2(key_hash)]);
- c->entries_keys[HASH_FRAGMENT_2(key_hash)] = GRPC_MDSTR_REF(elem->key);
+ grpc_slice_unref_internal(exec_ctx,
+ c->entries_keys[HASH_FRAGMENT_2(key_hash)]);
+ c->entries_keys[HASH_FRAGMENT_2(key_hash)] =
+ grpc_slice_ref_internal(GRPC_MDKEY(elem));
c->indices_keys[HASH_FRAGMENT_2(key_hash)] = new_index;
} else {
- GRPC_MDSTR_UNREF(exec_ctx, c->entries_keys[HASH_FRAGMENT_3(key_hash)]);
- c->entries_keys[HASH_FRAGMENT_3(key_hash)] = GRPC_MDSTR_REF(elem->key);
+ grpc_slice_unref_internal(exec_ctx,
+ c->entries_keys[HASH_FRAGMENT_3(key_hash)]);
+ c->entries_keys[HASH_FRAGMENT_3(key_hash)] =
+ grpc_slice_ref_internal(GRPC_MDKEY(elem));
c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index;
}
}
@@ -270,20 +289,18 @@ static void emit_indexed(grpc_chttp2_hpack_compressor *c, uint32_t elem_index,
len);
}
-static grpc_slice get_wire_value(grpc_mdelem *elem, uint8_t *huffman_prefix) {
- if (grpc_is_binary_header(
- (const char *)GRPC_SLICE_START_PTR(elem->key->slice),
- GRPC_SLICE_LENGTH(elem->key->slice))) {
+static grpc_slice get_wire_value(grpc_mdelem elem, uint8_t *huffman_prefix) {
+ if (grpc_is_binary_header(GRPC_MDKEY(elem))) {
*huffman_prefix = 0x80;
- return grpc_mdstr_as_base64_encoded_and_huffman_compressed(elem->value);
+ return grpc_chttp2_base64_encode_and_huffman_compress(GRPC_MDVALUE(elem));
}
/* TODO(ctiller): opportunistically compress non-binary headers */
*huffman_prefix = 0x00;
- return elem->value->slice;
+ return grpc_slice_ref_internal(GRPC_MDVALUE(elem));
}
static void emit_lithdr_incidx(grpc_chttp2_hpack_compressor *c,
- uint32_t key_index, grpc_mdelem *elem,
+ uint32_t key_index, grpc_mdelem elem,
framer_state *st) {
uint32_t len_pfx = GRPC_CHTTP2_VARINT_LENGTH(key_index, 2);
uint8_t huffman_prefix;
@@ -296,11 +313,11 @@ static void emit_lithdr_incidx(grpc_chttp2_hpack_compressor *c,
add_tiny_header_data(st, len_pfx), len_pfx);
GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, huffman_prefix,
add_tiny_header_data(st, len_val_len), len_val_len);
- add_header_data(st, grpc_slice_ref_internal(value_slice));
+ add_header_data(st, value_slice);
}
static void emit_lithdr_noidx(grpc_chttp2_hpack_compressor *c,
- uint32_t key_index, grpc_mdelem *elem,
+ uint32_t key_index, grpc_mdelem elem,
framer_state *st) {
uint32_t len_pfx = GRPC_CHTTP2_VARINT_LENGTH(key_index, 4);
uint8_t huffman_prefix;
@@ -313,12 +330,12 @@ static void emit_lithdr_noidx(grpc_chttp2_hpack_compressor *c,
add_tiny_header_data(st, len_pfx), len_pfx);
GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, huffman_prefix,
add_tiny_header_data(st, len_val_len), len_val_len);
- add_header_data(st, grpc_slice_ref_internal(value_slice));
+ add_header_data(st, value_slice);
}
static void emit_lithdr_incidx_v(grpc_chttp2_hpack_compressor *c,
- grpc_mdelem *elem, framer_state *st) {
- uint32_t len_key = (uint32_t)GRPC_SLICE_LENGTH(elem->key->slice);
+ grpc_mdelem elem, framer_state *st) {
+ uint32_t len_key = (uint32_t)GRPC_SLICE_LENGTH(GRPC_MDKEY(elem));
uint8_t huffman_prefix;
grpc_slice value_slice = get_wire_value(elem, &huffman_prefix);
uint32_t len_val = (uint32_t)GRPC_SLICE_LENGTH(value_slice);
@@ -329,15 +346,15 @@ static void emit_lithdr_incidx_v(grpc_chttp2_hpack_compressor *c,
*add_tiny_header_data(st, 1) = 0x40;
GRPC_CHTTP2_WRITE_VARINT(len_key, 1, 0x00,
add_tiny_header_data(st, len_key_len), len_key_len);
- add_header_data(st, grpc_slice_ref_internal(elem->key->slice));
+ add_header_data(st, grpc_slice_ref_internal(GRPC_MDKEY(elem)));
GRPC_CHTTP2_WRITE_VARINT(len_val, 1, huffman_prefix,
add_tiny_header_data(st, len_val_len), len_val_len);
- add_header_data(st, grpc_slice_ref_internal(value_slice));
+ add_header_data(st, value_slice);
}
static void emit_lithdr_noidx_v(grpc_chttp2_hpack_compressor *c,
- grpc_mdelem *elem, framer_state *st) {
- uint32_t len_key = (uint32_t)GRPC_SLICE_LENGTH(elem->key->slice);
+ grpc_mdelem elem, framer_state *st) {
+ uint32_t len_key = (uint32_t)GRPC_SLICE_LENGTH(GRPC_MDKEY(elem));
uint8_t huffman_prefix;
grpc_slice value_slice = get_wire_value(elem, &huffman_prefix);
uint32_t len_val = (uint32_t)GRPC_SLICE_LENGTH(value_slice);
@@ -348,10 +365,10 @@ static void emit_lithdr_noidx_v(grpc_chttp2_hpack_compressor *c,
*add_tiny_header_data(st, 1) = 0x00;
GRPC_CHTTP2_WRITE_VARINT(len_key, 1, 0x00,
add_tiny_header_data(st, len_key_len), len_key_len);
- add_header_data(st, grpc_slice_ref_internal(elem->key->slice));
+ add_header_data(st, grpc_slice_ref_internal(GRPC_MDKEY(elem)));
GRPC_CHTTP2_WRITE_VARINT(len_val, 1, huffman_prefix,
add_tiny_header_data(st, len_val_len), len_val_len);
- add_header_data(st, grpc_slice_ref_internal(value_slice));
+ add_header_data(st, value_slice);
}
static void emit_advertise_table_size_change(grpc_chttp2_hpack_compressor *c,
@@ -369,15 +386,9 @@ static uint32_t dynidx(grpc_chttp2_hpack_compressor *c, uint32_t elem_index) {
/* encode an mdelem */
static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c,
- grpc_mdelem *elem, framer_state *st) {
- uint32_t key_hash = elem->key->hash;
- uint32_t elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash);
- size_t decoder_space_usage;
- uint32_t indices_key;
- int should_add_elem;
-
- GPR_ASSERT(GRPC_SLICE_LENGTH(elem->key->slice) > 0);
- if (GRPC_SLICE_START_PTR(elem->key->slice)[0] != ':') { /* regular header */
+ grpc_mdelem elem, framer_state *st) {
+ GPR_ASSERT(GRPC_SLICE_LENGTH(GRPC_MDKEY(elem)) > 0);
+ if (GRPC_SLICE_START_PTR(GRPC_MDKEY(elem))[0] != ':') { /* regular header */
st->seen_regular_header = 1;
} else {
GPR_ASSERT(
@@ -385,11 +396,39 @@ static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c,
"Reserved header (colon-prefixed) happening after regular ones.");
}
+ if (grpc_http_trace && !GRPC_MDELEM_IS_INTERNED(elem)) {
+ char *k = grpc_slice_to_c_string(GRPC_MDKEY(elem));
+ char *v = grpc_slice_to_c_string(GRPC_MDVALUE(elem));
+ gpr_log(
+ GPR_DEBUG,
+ "Encode: '%s: %s', elem_interned=%d [%d], k_interned=%d, v_interned=%d",
+ k, v, GRPC_MDELEM_IS_INTERNED(elem), GRPC_MDELEM_STORAGE(elem),
+ grpc_slice_is_interned(GRPC_MDKEY(elem)),
+ grpc_slice_is_interned(GRPC_MDVALUE(elem)));
+ gpr_free(k);
+ gpr_free(v);
+ }
+ if (!GRPC_MDELEM_IS_INTERNED(elem)) {
+ emit_lithdr_noidx_v(c, elem, st);
+ return;
+ }
+
+ uint32_t key_hash;
+ uint32_t value_hash;
+ uint32_t elem_hash;
+ size_t decoder_space_usage;
+ uint32_t indices_key;
+ int should_add_elem;
+
+ key_hash = grpc_slice_hash(GRPC_MDKEY(elem));
+ value_hash = grpc_slice_hash(GRPC_MDVALUE(elem));
+ elem_hash = GRPC_MDSTR_KV_HASH(key_hash, value_hash);
+
inc_filter(HASH_FRAGMENT_1(elem_hash), &c->filter_elems_sum, c->filter_elems);
/* is this elem currently in the decoders table? */
- if (c->entries_elems[HASH_FRAGMENT_2(elem_hash)] == elem &&
+ if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_2(elem_hash)], elem) &&
c->indices_elems[HASH_FRAGMENT_2(elem_hash)] > c->tail_remote_index) {
/* HIT: complete element (first cuckoo hash) */
emit_indexed(c, dynidx(c, c->indices_elems[HASH_FRAGMENT_2(elem_hash)]),
@@ -397,7 +436,7 @@ static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c,
return;
}
- if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == elem &&
+ if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_3(elem_hash)], elem) &&
c->indices_elems[HASH_FRAGMENT_3(elem_hash)] > c->tail_remote_index) {
/* HIT: complete element (second cuckoo hash) */
emit_indexed(c, dynidx(c, c->indices_elems[HASH_FRAGMENT_3(elem_hash)]),
@@ -414,7 +453,8 @@ static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c,
/* no hits for the elem... maybe there's a key? */
indices_key = c->indices_keys[HASH_FRAGMENT_2(key_hash)];
- if (c->entries_keys[HASH_FRAGMENT_2(key_hash)] == elem->key &&
+ if (grpc_slice_eq(c->entries_keys[HASH_FRAGMENT_2(key_hash)],
+ GRPC_MDKEY(elem)) &&
indices_key > c->tail_remote_index) {
/* HIT: key (first cuckoo hash) */
if (should_add_elem) {
@@ -429,7 +469,8 @@ static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c,
}
indices_key = c->indices_keys[HASH_FRAGMENT_3(key_hash)];
- if (c->entries_keys[HASH_FRAGMENT_3(key_hash)] == elem->key &&
+ if (grpc_slice_eq(c->entries_keys[HASH_FRAGMENT_3(key_hash)],
+ GRPC_MDKEY(elem)) &&
indices_key > c->tail_remote_index) {
/* HIT: key (first cuckoo hash) */
if (should_add_elem) {
@@ -463,11 +504,11 @@ static void deadline_enc(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_compressor *c, gpr_timespec deadline,
framer_state *st) {
char timeout_str[GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE];
- grpc_mdelem *mdelem;
+ grpc_mdelem mdelem;
grpc_http2_encode_timeout(
gpr_time_sub(deadline, gpr_now(deadline.clock_type)), timeout_str);
- mdelem = grpc_mdelem_from_metadata_strings(
- exec_ctx, GRPC_MDSTR_GRPC_TIMEOUT, grpc_mdstr_from_string(timeout_str));
+ mdelem = grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_TIMEOUT,
+ grpc_slice_from_copied_string(timeout_str));
hpack_enc(exec_ctx, c, mdelem, st);
GRPC_MDELEM_UNREF(exec_ctx, mdelem);
}
@@ -484,14 +525,19 @@ void grpc_chttp2_hpack_compressor_init(grpc_chttp2_hpack_compressor *c) {
gpr_malloc(sizeof(*c->table_elem_size) * c->cap_table_elems);
memset(c->table_elem_size, 0,
sizeof(*c->table_elem_size) * c->cap_table_elems);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(c->entries_keys); i++) {
+ c->entries_keys[i] = terminal_slice;
+ }
}
void grpc_chttp2_hpack_compressor_destroy(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_compressor *c) {
int i;
for (i = 0; i < GRPC_CHTTP2_HPACKC_NUM_VALUES; i++) {
- if (c->entries_keys[i]) GRPC_MDSTR_UNREF(exec_ctx, c->entries_keys[i]);
- if (c->entries_elems[i]) GRPC_MDELEM_UNREF(exec_ctx, c->entries_elems[i]);
+ if (c->entries_keys[i].refcount != &terminal_slice_refcount) {
+ grpc_slice_unref_internal(exec_ctx, c->entries_keys[i]);
+ }
+ GRPC_MDELEM_UNREF(exec_ctx, c->entries_elems[i]);
}
gpr_free(c->table_elem_size);
}
diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.h b/src/core/ext/transport/chttp2/transport/hpack_encoder.h
index 3a35496ec8..83ba5b1b3e 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_encoder.h
+++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.h
@@ -74,8 +74,8 @@ typedef struct {
/* entry tables for keys & elems: these tables track values that have been
seen and *may* be in the decompressor table */
- grpc_mdstr *entries_keys[GRPC_CHTTP2_HPACKC_NUM_VALUES];
- grpc_mdelem *entries_elems[GRPC_CHTTP2_HPACKC_NUM_VALUES];
+ grpc_slice entries_keys[GRPC_CHTTP2_HPACKC_NUM_VALUES];
+ grpc_mdelem entries_elems[GRPC_CHTTP2_HPACKC_NUM_VALUES];
uint32_t indices_keys[GRPC_CHTTP2_HPACKC_NUM_VALUES];
uint32_t indices_elems[GRPC_CHTTP2_HPACKC_NUM_VALUES];
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c
index 8b91cc760b..40f5120308 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c
@@ -50,9 +50,13 @@
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
-#include "src/core/ext/transport/chttp2/transport/http2_errors.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/support/string.h"
+#include "src/core/lib/transport/http2_errors.h"
+
+/* TODO(ctiller): remove before submission */
+#include "src/core/lib/slice/slice_string_helpers.h"
extern int grpc_http_trace;
@@ -668,8 +672,22 @@ static const uint8_t inverse_base64[256] = {
/* emission helpers */
static grpc_error *on_hdr(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p,
- grpc_mdelem *md, int add_to_table) {
+ grpc_mdelem md, int add_to_table) {
+ if (grpc_http_trace && !GRPC_MDELEM_IS_INTERNED(md)) {
+ char *k = grpc_slice_to_c_string(GRPC_MDKEY(md));
+ char *v = grpc_slice_to_c_string(GRPC_MDVALUE(md));
+ gpr_log(
+ GPR_DEBUG,
+ "Decode: '%s: %s', elem_interned=%d [%d], k_interned=%d, v_interned=%d",
+ k, v, GRPC_MDELEM_IS_INTERNED(md), GRPC_MDELEM_STORAGE(md),
+ grpc_slice_is_interned(GRPC_MDKEY(md)),
+ grpc_slice_is_interned(GRPC_MDVALUE(md)));
+ gpr_free(k);
+ gpr_free(v);
+ }
if (add_to_table) {
+ GPR_ASSERT(GRPC_MDELEM_STORAGE(md) == GRPC_MDELEM_STORAGE_INTERNED ||
+ GRPC_MDELEM_STORAGE(md) == GRPC_MDELEM_STORAGE_STATIC);
grpc_error *err = grpc_chttp2_hptbl_add(exec_ctx, &p->table, md);
if (err != GRPC_ERROR_NONE) return err;
}
@@ -681,10 +699,28 @@ static grpc_error *on_hdr(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p,
return GRPC_ERROR_NONE;
}
-static grpc_mdstr *take_string(grpc_chttp2_hpack_parser *p,
- grpc_chttp2_hpack_parser_string *str) {
- grpc_mdstr *s = grpc_mdstr_from_buffer((uint8_t *)str->str, str->length);
- str->length = 0;
+static grpc_slice take_string(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_parser *p,
+ grpc_chttp2_hpack_parser_string *str,
+ bool intern) {
+ grpc_slice s;
+ if (!str->copied) {
+ if (intern) {
+ s = grpc_slice_intern(str->data.referenced);
+ grpc_slice_unref_internal(exec_ctx, str->data.referenced);
+ } else {
+ s = str->data.referenced;
+ }
+ str->copied = true;
+ str->data.referenced = grpc_empty_slice();
+ } else if (intern) {
+ s = grpc_slice_intern(grpc_slice_from_static_buffer(
+ str->data.copied.str, str->data.copied.length));
+ } else {
+ s = grpc_slice_from_copied_buffer(str->data.copied.str,
+ str->data.copied.length);
+ }
+ str->data.copied.length = 0;
return s;
}
@@ -771,8 +807,8 @@ static grpc_error *finish_indexed_field(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_parser *p,
const uint8_t *cur,
const uint8_t *end) {
- grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
- if (md == NULL) {
+ grpc_mdelem md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
+ if (GRPC_MDISNULL(md)) {
return grpc_error_set_int(
grpc_error_set_int(GRPC_ERROR_CREATE("Invalid HPACK index received"),
GRPC_ERROR_INT_INDEX, (intptr_t)p->index),
@@ -813,12 +849,13 @@ static grpc_error *finish_lithdr_incidx(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_parser *p,
const uint8_t *cur,
const uint8_t *end) {
- grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
- GPR_ASSERT(md != NULL); /* handled in string parsing */
- grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings(
- exec_ctx, GRPC_MDSTR_REF(md->key),
- take_string(p, &p->value)),
- 1);
+ grpc_mdelem md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
+ GPR_ASSERT(!GRPC_MDISNULL(md)); /* handled in string parsing */
+ grpc_error *err = on_hdr(
+ exec_ctx, p,
+ grpc_mdelem_from_slices(exec_ctx, grpc_slice_ref_internal(GRPC_MDKEY(md)),
+ take_string(exec_ctx, p, &p->value, true)),
+ 1);
if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err);
return parse_begin(exec_ctx, p, cur, end);
}
@@ -828,10 +865,11 @@ static grpc_error *finish_lithdr_incidx_v(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_parser *p,
const uint8_t *cur,
const uint8_t *end) {
- grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings(
- exec_ctx, take_string(p, &p->key),
- take_string(p, &p->value)),
- 1);
+ grpc_error *err = on_hdr(
+ exec_ctx, p,
+ grpc_mdelem_from_slices(exec_ctx, take_string(exec_ctx, p, &p->key, true),
+ take_string(exec_ctx, p, &p->value, true)),
+ 1);
if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err);
return parse_begin(exec_ctx, p, cur, end);
}
@@ -881,12 +919,13 @@ static grpc_error *finish_lithdr_notidx(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_parser *p,
const uint8_t *cur,
const uint8_t *end) {
- grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
- GPR_ASSERT(md != NULL); /* handled in string parsing */
- grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings(
- exec_ctx, GRPC_MDSTR_REF(md->key),
- take_string(p, &p->value)),
- 0);
+ grpc_mdelem md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
+ GPR_ASSERT(!GRPC_MDISNULL(md)); /* handled in string parsing */
+ grpc_error *err = on_hdr(
+ exec_ctx, p,
+ grpc_mdelem_from_slices(exec_ctx, grpc_slice_ref_internal(GRPC_MDKEY(md)),
+ take_string(exec_ctx, p, &p->value, false)),
+ 0);
if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err);
return parse_begin(exec_ctx, p, cur, end);
}
@@ -896,10 +935,11 @@ static grpc_error *finish_lithdr_notidx_v(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_parser *p,
const uint8_t *cur,
const uint8_t *end) {
- grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings(
- exec_ctx, take_string(p, &p->key),
- take_string(p, &p->value)),
- 0);
+ grpc_error *err = on_hdr(
+ exec_ctx, p,
+ grpc_mdelem_from_slices(exec_ctx, take_string(exec_ctx, p, &p->key, true),
+ take_string(exec_ctx, p, &p->value, false)),
+ 0);
if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err);
return parse_begin(exec_ctx, p, cur, end);
}
@@ -949,12 +989,13 @@ static grpc_error *finish_lithdr_nvridx(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_parser *p,
const uint8_t *cur,
const uint8_t *end) {
- grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
- GPR_ASSERT(md != NULL); /* handled in string parsing */
- grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings(
- exec_ctx, GRPC_MDSTR_REF(md->key),
- take_string(p, &p->value)),
- 0);
+ grpc_mdelem md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
+ GPR_ASSERT(!GRPC_MDISNULL(md)); /* handled in string parsing */
+ grpc_error *err = on_hdr(
+ exec_ctx, p,
+ grpc_mdelem_from_slices(exec_ctx, grpc_slice_ref_internal(GRPC_MDKEY(md)),
+ take_string(exec_ctx, p, &p->value, false)),
+ 0);
if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err);
return parse_begin(exec_ctx, p, cur, end);
}
@@ -964,10 +1005,11 @@ static grpc_error *finish_lithdr_nvridx_v(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_parser *p,
const uint8_t *cur,
const uint8_t *end) {
- grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings(
- exec_ctx, take_string(p, &p->key),
- take_string(p, &p->value)),
- 0);
+ grpc_error *err = on_hdr(
+ exec_ctx, p,
+ grpc_mdelem_from_slices(exec_ctx, take_string(exec_ctx, p, &p->key, true),
+ take_string(exec_ctx, p, &p->value, false)),
+ 0);
if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err);
return parse_begin(exec_ctx, p, cur, end);
}
@@ -1261,14 +1303,15 @@ static grpc_error *parse_string_prefix(grpc_exec_ctx *exec_ctx,
static void append_bytes(grpc_chttp2_hpack_parser_string *str,
const uint8_t *data, size_t length) {
if (length == 0) return;
- if (length + str->length > str->capacity) {
- GPR_ASSERT(str->length + length <= UINT32_MAX);
- str->capacity = (uint32_t)(str->length + length);
- str->str = gpr_realloc(str->str, str->capacity);
+ if (length + str->data.copied.length > str->data.copied.capacity) {
+ GPR_ASSERT(str->data.copied.length + length <= UINT32_MAX);
+ str->data.copied.capacity = (uint32_t)(str->data.copied.length + length);
+ str->data.copied.str =
+ gpr_realloc(str->data.copied.str, str->data.copied.capacity);
}
- memcpy(str->str + str->length, data, length);
- GPR_ASSERT(length <= UINT32_MAX - str->length);
- str->length += (uint32_t)length;
+ memcpy(str->data.copied.str + str->data.copied.length, data, length);
+ GPR_ASSERT(length <= UINT32_MAX - str->data.copied.length);
+ str->data.copied.length += (uint32_t)length;
}
static grpc_error *append_string(grpc_exec_ctx *exec_ctx,
@@ -1351,11 +1394,9 @@ static grpc_error *append_string(grpc_exec_ctx *exec_ctx,
exec_ctx, p, cur, end, GRPC_ERROR_CREATE("Should never reach here")));
}
-/* append a null terminator to a string */
static grpc_error *finish_str(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_parser *p, const uint8_t *cur,
const uint8_t *end) {
- uint8_t terminator = 0;
uint8_t decoded[2];
uint32_t bits;
grpc_chttp2_hpack_parser_string *str = p->parsing.str;
@@ -1396,8 +1437,6 @@ static grpc_error *finish_str(grpc_exec_ctx *exec_ctx,
append_bytes(str, decoded, 2);
break;
}
- append_bytes(str, &terminator, 1);
- p->parsing.str->length--; /* don't actually count the null terminator */
return GRPC_ERROR_NONE;
}
@@ -1472,8 +1511,18 @@ static grpc_error *begin_parse_string(grpc_exec_ctx *exec_ctx,
const uint8_t *cur, const uint8_t *end,
uint8_t binary,
grpc_chttp2_hpack_parser_string *str) {
+ if (!p->huff && binary == NOT_BINARY && (end - cur) >= (intptr_t)p->strlen &&
+ p->current_slice_refcount != NULL) {
+ str->copied = false;
+ str->data.referenced.refcount = p->current_slice_refcount;
+ str->data.referenced.data.refcounted.bytes = (uint8_t *)cur;
+ str->data.referenced.data.refcounted.length = p->strlen;
+ grpc_slice_ref_internal(str->data.referenced);
+ return parse_next(exec_ctx, p, cur + p->strlen, end);
+ }
p->strgot = 0;
- str->length = 0;
+ str->copied = true;
+ str->data.copied.length = 0;
p->parsing.str = str;
p->huff_state = 0;
p->binary = binary;
@@ -1490,21 +1539,22 @@ static grpc_error *parse_key_string(grpc_exec_ctx *exec_ctx,
/* check if a key represents a binary header or not */
static bool is_binary_literal_header(grpc_chttp2_hpack_parser *p) {
- return grpc_is_binary_header(p->key.str, p->key.length);
+ return grpc_is_binary_header(
+ p->key.copied ? grpc_slice_from_static_buffer(p->key.data.copied.str,
+ p->key.data.copied.length)
+ : p->key.data.referenced);
}
static grpc_error *is_binary_indexed_header(grpc_chttp2_hpack_parser *p,
bool *is) {
- grpc_mdelem *elem = grpc_chttp2_hptbl_lookup(&p->table, p->index);
- if (!elem) {
+ grpc_mdelem elem = grpc_chttp2_hptbl_lookup(&p->table, p->index);
+ if (GRPC_MDISNULL(elem)) {
return grpc_error_set_int(
grpc_error_set_int(GRPC_ERROR_CREATE("Invalid HPACK index received"),
GRPC_ERROR_INT_INDEX, (intptr_t)p->index),
GRPC_ERROR_INT_SIZE, (intptr_t)p->table.num_ents);
}
- *is = grpc_is_binary_header(
- (const char *)GRPC_SLICE_START_PTR(elem->key->slice),
- GRPC_SLICE_LENGTH(elem->key->slice));
+ *is = grpc_is_binary_header(GRPC_MDKEY(elem));
return GRPC_ERROR_NONE;
}
@@ -1539,12 +1589,14 @@ void grpc_chttp2_hpack_parser_init(grpc_exec_ctx *exec_ctx,
p->on_header = NULL;
p->on_header_user_data = NULL;
p->state = parse_begin;
- p->key.str = NULL;
- p->key.capacity = 0;
- p->key.length = 0;
- p->value.str = NULL;
- p->value.capacity = 0;
- p->value.length = 0;
+ p->key.data.referenced = grpc_empty_slice();
+ p->key.data.copied.str = NULL;
+ p->key.data.copied.capacity = 0;
+ p->key.data.copied.length = 0;
+ p->value.data.referenced = grpc_empty_slice();
+ p->value.data.copied.str = NULL;
+ p->value.data.copied.capacity = 0;
+ p->value.data.copied.length = 0;
p->dynamic_table_update_allowed = 2;
p->last_error = GRPC_ERROR_NONE;
grpc_chttp2_hptbl_init(exec_ctx, &p->table);
@@ -1559,19 +1611,24 @@ void grpc_chttp2_hpack_parser_destroy(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_parser *p) {
grpc_chttp2_hptbl_destroy(exec_ctx, &p->table);
GRPC_ERROR_UNREF(p->last_error);
- gpr_free(p->key.str);
- gpr_free(p->value.str);
+ grpc_slice_unref_internal(exec_ctx, p->key.data.referenced);
+ grpc_slice_unref_internal(exec_ctx, p->value.data.referenced);
+ gpr_free(p->key.data.copied.str);
+ gpr_free(p->value.data.copied.str);
}
grpc_error *grpc_chttp2_hpack_parser_parse(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_parser *p,
- const uint8_t *beg,
- const uint8_t *end) {
+ grpc_slice slice) {
/* TODO(ctiller): limit the distance of end from beg, and perform multiple
steps in the event of a large chunk of data to limit
stack space usage when no tail call optimization is
available */
- return p->state(exec_ctx, p, beg, end);
+ p->current_slice_refcount = slice.refcount;
+ grpc_error *error = p->state(exec_ctx, p, GRPC_SLICE_START_PTR(slice),
+ GRPC_SLICE_END_PTR(slice));
+ p->current_slice_refcount = NULL;
+ return error;
}
typedef void (*maybe_complete_func_type)(grpc_exec_ctx *exec_ctx,
@@ -1587,7 +1644,7 @@ static void force_client_rst_stream(grpc_exec_ctx *exec_ctx, void *sp,
grpc_chttp2_transport *t = s->t;
if (!s->write_closed) {
grpc_slice_buffer_add(
- &t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_CHTTP2_NO_ERROR,
+ &t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_HTTP2_NO_ERROR,
&s->stats.outgoing));
grpc_chttp2_initiate_write(exec_ctx, t, false, "force_rst_stream");
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, true, GRPC_ERROR_NONE);
@@ -1605,8 +1662,7 @@ grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx,
if (s != NULL) {
s->stats.incoming.header_bytes += GRPC_SLICE_LENGTH(slice);
}
- grpc_error *error = grpc_chttp2_hpack_parser_parse(
- exec_ctx, parser, GRPC_SLICE_START_PTR(slice), GRPC_SLICE_END_PTR(slice));
+ grpc_error *error = grpc_chttp2_hpack_parser_parse(exec_ctx, parser, slice);
if (error != GRPC_ERROR_NONE) {
GPR_TIMER_END("grpc_chttp2_hpack_parser_parse", 0);
return error;
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.h b/src/core/ext/transport/chttp2/transport/hpack_parser.h
index 52ccf1e7a7..a817183eb5 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.h
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.h
@@ -49,14 +49,20 @@ typedef grpc_error *(*grpc_chttp2_hpack_parser_state)(
const uint8_t *end);
typedef struct {
- char *str;
- uint32_t length;
- uint32_t capacity;
+ bool copied;
+ struct {
+ grpc_slice referenced;
+ struct {
+ char *str;
+ uint32_t length;
+ uint32_t capacity;
+ } copied;
+ } data;
} grpc_chttp2_hpack_parser_string;
struct grpc_chttp2_hpack_parser {
/* user specified callback for each header output */
- void (*on_header)(grpc_exec_ctx *exec_ctx, void *user_data, grpc_mdelem *md);
+ void (*on_header)(grpc_exec_ctx *exec_ctx, void *user_data, grpc_mdelem md);
void *on_header_user_data;
grpc_error *last_error;
@@ -67,6 +73,8 @@ struct grpc_chttp2_hpack_parser {
const grpc_chttp2_hpack_parser_state *next_state;
/* what to do after skipping prioritization data */
grpc_chttp2_hpack_parser_state after_prioritization;
+ /* the refcount of the slice that we're currently parsing */
+ grpc_slice_refcount *current_slice_refcount;
/* the value we're currently parsing */
union {
uint32_t *value;
@@ -106,11 +114,9 @@ void grpc_chttp2_hpack_parser_destroy(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_hpack_parser_set_has_priority(grpc_chttp2_hpack_parser *p);
-/* returns 1 on success, 0 on error */
grpc_error *grpc_chttp2_hpack_parser_parse(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_parser *p,
- const uint8_t *beg,
- const uint8_t *end);
+ grpc_slice slice);
/* wraps grpc_chttp2_hpack_parser_parse to provide a frame level parser for
the transport */
diff --git a/src/core/ext/transport/chttp2/transport/hpack_table.c b/src/core/ext/transport/chttp2/transport/hpack_table.c
index 26d4036d49..62dd1b8cab 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_table.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_table.c
@@ -190,8 +190,11 @@ void grpc_chttp2_hptbl_init(grpc_exec_ctx *exec_ctx, grpc_chttp2_hptbl *tbl) {
tbl->ents = gpr_malloc(sizeof(*tbl->ents) * tbl->cap_entries);
memset(tbl->ents, 0, sizeof(*tbl->ents) * tbl->cap_entries);
for (i = 1; i <= GRPC_CHTTP2_LAST_STATIC_ENTRY; i++) {
- tbl->static_ents[i - 1] = grpc_mdelem_from_strings(
- exec_ctx, static_table[i].key, static_table[i].value);
+ tbl->static_ents[i - 1] = grpc_mdelem_from_slices(
+ exec_ctx,
+ grpc_slice_intern(grpc_slice_from_static_string(static_table[i].key)),
+ grpc_slice_intern(
+ grpc_slice_from_static_string(static_table[i].value)));
}
}
@@ -208,8 +211,8 @@ void grpc_chttp2_hptbl_destroy(grpc_exec_ctx *exec_ctx,
gpr_free(tbl->ents);
}
-grpc_mdelem *grpc_chttp2_hptbl_lookup(const grpc_chttp2_hptbl *tbl,
- uint32_t tbl_index) {
+grpc_mdelem grpc_chttp2_hptbl_lookup(const grpc_chttp2_hptbl *tbl,
+ uint32_t tbl_index) {
/* Static table comes first, just return an entry from it */
if (tbl_index <= GRPC_CHTTP2_LAST_STATIC_ENTRY) {
return tbl->static_ents[tbl_index - 1];
@@ -222,14 +225,14 @@ grpc_mdelem *grpc_chttp2_hptbl_lookup(const grpc_chttp2_hptbl *tbl,
return tbl->ents[offset];
}
/* Invalid entry: return error */
- return NULL;
+ return GRPC_MDNULL;
}
/* Evict one element from the table */
static void evict1(grpc_exec_ctx *exec_ctx, grpc_chttp2_hptbl *tbl) {
- grpc_mdelem *first_ent = tbl->ents[tbl->first_ent];
- size_t elem_bytes = GRPC_SLICE_LENGTH(first_ent->key->slice) +
- GRPC_SLICE_LENGTH(first_ent->value->slice) +
+ grpc_mdelem first_ent = tbl->ents[tbl->first_ent];
+ size_t elem_bytes = GRPC_SLICE_LENGTH(GRPC_MDKEY(first_ent)) +
+ GRPC_SLICE_LENGTH(GRPC_MDVALUE(first_ent)) +
GRPC_CHTTP2_HPACK_ENTRY_OVERHEAD;
GPR_ASSERT(elem_bytes <= tbl->mem_used);
tbl->mem_used -= (uint32_t)elem_bytes;
@@ -239,7 +242,7 @@ static void evict1(grpc_exec_ctx *exec_ctx, grpc_chttp2_hptbl *tbl) {
}
static void rebuild_ents(grpc_chttp2_hptbl *tbl, uint32_t new_cap) {
- grpc_mdelem **ents = gpr_malloc(sizeof(*ents) * new_cap);
+ grpc_mdelem *ents = gpr_malloc(sizeof(*ents) * new_cap);
uint32_t i;
for (i = 0; i < tbl->num_ents; i++) {
@@ -301,10 +304,10 @@ grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_exec_ctx *exec_ctx,
}
grpc_error *grpc_chttp2_hptbl_add(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_hptbl *tbl, grpc_mdelem *md) {
+ grpc_chttp2_hptbl *tbl, grpc_mdelem md) {
/* determine how many bytes of buffer this entry represents */
- size_t elem_bytes = GRPC_SLICE_LENGTH(md->key->slice) +
- GRPC_SLICE_LENGTH(md->value->slice) +
+ size_t elem_bytes = GRPC_SLICE_LENGTH(GRPC_MDKEY(md)) +
+ GRPC_SLICE_LENGTH(GRPC_MDVALUE(md)) +
GRPC_CHTTP2_HPACK_ENTRY_OVERHEAD;
if (tbl->current_table_bytes > tbl->max_bytes) {
@@ -352,16 +355,16 @@ grpc_error *grpc_chttp2_hptbl_add(grpc_exec_ctx *exec_ctx,
}
grpc_chttp2_hptbl_find_result grpc_chttp2_hptbl_find(
- const grpc_chttp2_hptbl *tbl, grpc_mdelem *md) {
+ const grpc_chttp2_hptbl *tbl, grpc_mdelem md) {
grpc_chttp2_hptbl_find_result r = {0, 0};
uint32_t i;
/* See if the string is in the static table */
for (i = 0; i < GRPC_CHTTP2_LAST_STATIC_ENTRY; i++) {
- grpc_mdelem *ent = tbl->static_ents[i];
- if (md->key != ent->key) continue;
+ grpc_mdelem ent = tbl->static_ents[i];
+ if (!grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDKEY(ent))) continue;
r.index = i + 1u;
- r.has_value = md->value == ent->value;
+ r.has_value = grpc_slice_eq(GRPC_MDVALUE(md), GRPC_MDVALUE(ent));
if (r.has_value) return r;
}
@@ -369,10 +372,10 @@ grpc_chttp2_hptbl_find_result grpc_chttp2_hptbl_find(
for (i = 0; i < tbl->num_ents; i++) {
uint32_t idx =
(uint32_t)(tbl->num_ents - i + GRPC_CHTTP2_LAST_STATIC_ENTRY);
- grpc_mdelem *ent = tbl->ents[(tbl->first_ent + i) % tbl->cap_entries];
- if (md->key != ent->key) continue;
+ grpc_mdelem ent = tbl->ents[(tbl->first_ent + i) % tbl->cap_entries];
+ if (!grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDKEY(ent))) continue;
r.index = idx;
- r.has_value = md->value == ent->value;
+ r.has_value = grpc_slice_eq(GRPC_MDVALUE(md), GRPC_MDVALUE(ent));
if (r.has_value) return r;
}
diff --git a/src/core/ext/transport/chttp2/transport/hpack_table.h b/src/core/ext/transport/chttp2/transport/hpack_table.h
index 144574ef06..32a0380e00 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_table.h
+++ b/src/core/ext/transport/chttp2/transport/hpack_table.h
@@ -79,8 +79,8 @@ typedef struct {
/* a circular buffer of headers - this is stored in the opposite order to
what hpack specifies, in order to simplify table management a little...
meaning lookups need to SUBTRACT from the end position */
- grpc_mdelem **ents;
- grpc_mdelem *static_ents[GRPC_CHTTP2_LAST_STATIC_ENTRY];
+ grpc_mdelem *ents;
+ grpc_mdelem static_ents[GRPC_CHTTP2_LAST_STATIC_ENTRY];
} grpc_chttp2_hptbl;
/* initialize a hpack table */
@@ -94,12 +94,12 @@ grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_exec_ctx *exec_ctx,
uint32_t bytes);
/* lookup a table entry based on its hpack index */
-grpc_mdelem *grpc_chttp2_hptbl_lookup(const grpc_chttp2_hptbl *tbl,
- uint32_t index);
+grpc_mdelem grpc_chttp2_hptbl_lookup(const grpc_chttp2_hptbl *tbl,
+ uint32_t index);
/* add a table entry to the index */
grpc_error *grpc_chttp2_hptbl_add(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hptbl *tbl,
- grpc_mdelem *md) GRPC_MUST_USE_RESULT;
+ grpc_mdelem md) GRPC_MUST_USE_RESULT;
/* Find a key/value pair in the table... returns the index in the table of the
most similar entry, or 0 if the value was not found */
typedef struct {
@@ -107,6 +107,6 @@ typedef struct {
int has_value;
} grpc_chttp2_hptbl_find_result;
grpc_chttp2_hptbl_find_result grpc_chttp2_hptbl_find(
- const grpc_chttp2_hptbl *tbl, grpc_mdelem *md);
+ const grpc_chttp2_hptbl *tbl, grpc_mdelem md);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HPACK_TABLE_H */
diff --git a/src/core/ext/transport/chttp2/transport/http2_errors.h b/src/core/ext/transport/chttp2/transport/http2_errors.h
deleted file mode 100644
index deab2b7e3e..0000000000
--- a/src/core/ext/transport/chttp2/transport/http2_errors.h
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HTTP2_ERRORS_H
-#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HTTP2_ERRORS_H
-
-/* error codes for RST_STREAM from http2 draft 14 section 7 */
-typedef enum {
- GRPC_CHTTP2_NO_ERROR = 0x0,
- GRPC_CHTTP2_PROTOCOL_ERROR = 0x1,
- GRPC_CHTTP2_INTERNAL_ERROR = 0x2,
- GRPC_CHTTP2_FLOW_CONTROL_ERROR = 0x3,
- GRPC_CHTTP2_SETTINGS_TIMEOUT = 0x4,
- GRPC_CHTTP2_STREAM_CLOSED = 0x5,
- GRPC_CHTTP2_FRAME_SIZE_ERROR = 0x6,
- GRPC_CHTTP2_REFUSED_STREAM = 0x7,
- GRPC_CHTTP2_CANCEL = 0x8,
- GRPC_CHTTP2_COMPRESSION_ERROR = 0x9,
- GRPC_CHTTP2_CONNECT_ERROR = 0xa,
- GRPC_CHTTP2_ENHANCE_YOUR_CALM = 0xb,
- GRPC_CHTTP2_INADEQUATE_SECURITY = 0xc,
- /* force use of a default clause */
- GRPC_CHTTP2__ERROR_DO_NOT_USE = -1
-} grpc_chttp2_error_code;
-
-#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_HTTP2_ERRORS_H */
diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c
index 5d1094999c..c91b019aa0 100644
--- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c
+++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c
@@ -57,7 +57,7 @@ void grpc_chttp2_incoming_metadata_buffer_destroy(
}
void grpc_chttp2_incoming_metadata_buffer_add(
- grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem *elem) {
+ grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem) {
GPR_ASSERT(!buffer->published);
if (buffer->capacity == buffer->count) {
buffer->capacity = GPR_MAX(8, 2 * buffer->capacity);
@@ -68,6 +68,19 @@ void grpc_chttp2_incoming_metadata_buffer_add(
buffer->size += GRPC_MDELEM_LENGTH(elem);
}
+void grpc_chttp2_incoming_metadata_buffer_replace_or_add(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
+ grpc_mdelem elem) {
+ for (size_t i = 0; i < buffer->count; i++) {
+ if (grpc_slice_eq(GRPC_MDKEY(buffer->elems[i].md), GRPC_MDKEY(elem))) {
+ GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md);
+ buffer->elems[i].md = elem;
+ return;
+ }
+ }
+ grpc_chttp2_incoming_metadata_buffer_add(buffer, elem);
+}
+
void grpc_chttp2_incoming_metadata_buffer_set_deadline(
grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) {
GPR_ASSERT(!buffer->published);
@@ -75,21 +88,20 @@ void grpc_chttp2_incoming_metadata_buffer_set_deadline(
}
void grpc_chttp2_incoming_metadata_buffer_publish(
- grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
+ grpc_metadata_batch *batch) {
GPR_ASSERT(!buffer->published);
buffer->published = 1;
if (buffer->count > 0) {
size_t i;
- for (i = 1; i < buffer->count; i++) {
- buffer->elems[i].prev = &buffer->elems[i - 1];
- }
- for (i = 0; i < buffer->count - 1; i++) {
- buffer->elems[i].next = &buffer->elems[i + 1];
+ for (i = 0; i < buffer->count; i++) {
+ /* TODO(ctiller): do something better here */
+ if (!GRPC_LOG_IF_ERROR("grpc_chttp2_incoming_metadata_buffer_publish",
+ grpc_metadata_batch_link_tail(
+ exec_ctx, batch, &buffer->elems[i]))) {
+ GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md);
+ }
}
- buffer->elems[0].prev = NULL;
- buffer->elems[buffer->count - 1].next = NULL;
- batch->list.head = &buffer->elems[0];
- batch->list.tail = &buffer->elems[buffer->count - 1];
} else {
batch->list.head = batch->list.tail = NULL;
}
diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.h b/src/core/ext/transport/chttp2/transport/incoming_metadata.h
index 7a0c4da15f..1eac6fc150 100644
--- a/src/core/ext/transport/chttp2/transport/incoming_metadata.h
+++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.h
@@ -51,10 +51,14 @@ void grpc_chttp2_incoming_metadata_buffer_init(
void grpc_chttp2_incoming_metadata_buffer_destroy(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_publish(
- grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
+ grpc_metadata_batch *batch);
void grpc_chttp2_incoming_metadata_buffer_add(
- grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem *elem);
+ grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem);
+void grpc_chttp2_incoming_metadata_buffer_replace_or_add(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
+ grpc_mdelem elem);
void grpc_chttp2_incoming_metadata_buffer_set_deadline(
grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index ea7beb4c2b..1dabf9edba 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -50,7 +50,9 @@
#include "src/core/ext/transport/chttp2/transport/stream_map.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/pid_controller.h"
#include "src/core/lib/transport/transport_impl.h"
/* streams are kept in various linked lists depending on what things need to
@@ -59,6 +61,7 @@ typedef enum {
GRPC_CHTTP2_LIST_WRITABLE,
GRPC_CHTTP2_LIST_WRITING,
GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
+ GRPC_CHTTP2_LIST_STALLED_BY_STREAM,
/** streams that are waiting to start because there are too many concurrent
streams on the connection */
GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
@@ -72,6 +75,34 @@ typedef enum {
GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER,
} grpc_chttp2_write_state;
+typedef enum {
+ GRPC_CHTTP2_PING_ON_NEXT_WRITE = 0,
+ GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE,
+ GRPC_CHTTP2_PING_TYPE_COUNT /* must be last */
+} grpc_chttp2_ping_type;
+
+typedef enum {
+ GRPC_CHTTP2_PCL_INITIATE = 0,
+ GRPC_CHTTP2_PCL_NEXT,
+ GRPC_CHTTP2_PCL_INFLIGHT,
+ GRPC_CHTTP2_PCL_COUNT /* must be last */
+} grpc_chttp2_ping_closure_list;
+
+typedef struct {
+ grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT];
+ uint64_t inflight_id;
+} grpc_chttp2_ping_queue;
+
+typedef struct {
+ gpr_timespec min_time_between_pings;
+ int max_pings_without_data;
+} grpc_chttp2_repeated_ping_policy;
+
+typedef struct {
+ gpr_timespec last_ping_sent_time;
+ int pings_before_data_required;
+} grpc_chttp2_repeated_ping_state;
+
/* deframer state for the overall http2 stream of bytes */
typedef enum {
/* prefix: one entry per http2 connection prefix byte */
@@ -144,14 +175,6 @@ typedef enum {
GRPC_CHTTP2_GOAWAY_SENT,
} grpc_chttp2_sent_goaway_state;
-/* Outstanding ping request data */
-typedef struct grpc_chttp2_outstanding_ping {
- uint8_t id[8];
- grpc_closure *on_recv;
- struct grpc_chttp2_outstanding_ping *next;
- struct grpc_chttp2_outstanding_ping *prev;
-} grpc_chttp2_outstanding_ping;
-
typedef struct grpc_chttp2_write_cb {
int64_t call_at_byte;
grpc_closure *closure;
@@ -271,16 +294,19 @@ struct grpc_chttp2_transport {
copied to next_stream_id in parsing when parsing commences */
uint32_t next_stream_id;
- /** how far to lookahead in a stream? */
- uint32_t stream_lookahead;
-
/** last new stream id */
uint32_t last_new_stream_id;
- /** pings awaiting responses */
- grpc_chttp2_outstanding_ping pings;
- /** next payload for an outgoing ping */
- uint64_t ping_counter;
+ /** ping queues for various ping insertion points */
+ grpc_chttp2_ping_queue ping_queues[GRPC_CHTTP2_PING_TYPE_COUNT];
+ grpc_chttp2_repeated_ping_policy ping_policy;
+ grpc_chttp2_repeated_ping_state ping_state;
+ uint64_t ping_ctr; /* unique id for pings */
+
+ /** ping acks */
+ size_t ping_ack_count;
+ size_t ping_ack_capacity;
+ uint64_t *ping_acks;
/** parser for headers */
grpc_chttp2_hpack_parser hpack_parser;
@@ -324,6 +350,13 @@ struct grpc_chttp2_transport {
grpc_chttp2_write_cb *write_cb_pool;
+ /* bdp estimator */
+ grpc_bdp_estimator bdp_estimator;
+ grpc_pid_controller pid_controller;
+ grpc_closure start_bdp_ping_locked;
+ grpc_closure finish_bdp_ping_locked;
+ gpr_timespec last_pid_update;
+
/* if non-NULL, close the transport with this error when writes are finished
*/
grpc_error *close_transport_on_writes_finished;
@@ -362,12 +395,10 @@ struct grpc_chttp2_stream {
/** HTTP2 stream id for this stream, or zero if one has not been assigned */
uint32_t id;
- /** window available for us to send to peer */
- int64_t outgoing_window;
- /** The number of bytes the upper layers have offered to receive.
- As the upper layer offers more bytes, this value increases.
- As bytes are read, this value decreases. */
- uint32_t max_recv_bytes;
+ /** window available for us to send to peer, over or under the initial window
+ * size of the transport... ie:
+ * outgoing_window = outgoing_window_delta + transport.initial_window_size */
+ int64_t outgoing_window_delta;
/** things the upper layers would like to send */
grpc_metadata_batch *send_initial_metadata;
grpc_closure *send_initial_metadata_finished;
@@ -428,8 +459,10 @@ struct grpc_chttp2_stream {
grpc_error *forced_close_error;
/** how many header frames have we received? */
uint8_t header_frames_received;
- /** window available for peer to send to us */
- int64_t incoming_window;
+ /** window available for peer to send to us (as a delta on
+ * transport.initial_window_size)
+ * incoming_window = incoming_window_delta + transport.initial_window_size */
+ int64_t incoming_window_delta;
/** parsing state for data frames */
grpc_chttp2_data_parser data_parser;
/** number of bytes received - reset at end of parse thread execution */
@@ -478,36 +511,43 @@ bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
/** Get a writable stream
returns non-zero if there was a stream available */
-int grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s);
+bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s);
bool grpc_chttp2_list_remove_writable_stream(
grpc_chttp2_transport *t, grpc_chttp2_stream *s) GRPC_MUST_USE_RESULT;
bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
-int grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport *t);
-int grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s);
+bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport *t);
+bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s);
void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
-int grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s);
+bool grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s);
void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
-int grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s);
+bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s);
void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
-int grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s);
+bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s);
void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
+void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s);
+bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s);
+bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s);
+
grpc_chttp2_stream *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport *t,
uint32_t id);
grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx,
@@ -618,8 +658,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
uint32_t stream_id, int64_t val1, int64_t val2);
void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_chttp2_stream *stream,
- grpc_status_code status, grpc_slice *details);
+ grpc_chttp2_stream *stream, grpc_error *error);
void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s, int close_reads,
@@ -673,13 +712,23 @@ void grpc_chttp2_incoming_byte_stream_finished(
grpc_error *error);
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- const uint8_t *opaque_8bytes);
+ uint64_t id);
+
+typedef enum {
+ /* don't initiate a transport write, but piggyback on the next one */
+ GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK,
+ /* initiate a covered write */
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
+ /* initiate an uncovered write */
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED
+} grpc_chttp2_stream_write_type;
/** add a ref to the stream and add it to the writable list;
ref will be dropped in writing.c */
void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, bool covered_by_poller,
+ grpc_chttp2_stream *s,
+ grpc_chttp2_stream_write_type type,
const char *reason);
void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 4fb5dc7bd2..24bd93067b 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -39,10 +39,11 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-#include "src/core/ext/transport/chttp2/transport/http2_errors.h"
-#include "src/core/ext/transport/chttp2/transport/status_conversion.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
+#include "src/core/lib/transport/http2_errors.h"
#include "src/core/lib/transport/static_metadata.h"
+#include "src/core/lib/transport/status_conversion.h"
#include "src/core/lib/transport/timeout_encoding.h"
static grpc_error *init_frame_parser(grpc_exec_ctx *exec_ctx,
@@ -200,7 +201,7 @@ grpc_error *grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
return err;
}
if (t->incoming_frame_size == 0) {
- err = parse_frame_slice(exec_ctx, t, gpr_empty_slice(), 1);
+ err = parse_frame_slice(exec_ctx, t, grpc_empty_slice(), 1);
if (err != GRPC_ERROR_NONE) {
return err;
}
@@ -335,7 +336,7 @@ static grpc_error *skip_parser(grpc_exec_ctx *exec_ctx, void *parser,
return GRPC_ERROR_NONE;
}
-static void skip_header(grpc_exec_ctx *exec_ctx, void *tp, grpc_mdelem *md) {
+static void skip_header(grpc_exec_ctx *exec_ctx, void *tp, grpc_mdelem md) {
GRPC_MDELEM_UNREF(exec_ctx, md);
}
@@ -375,25 +376,45 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx,
return err;
}
+ uint32_t target_incoming_window = GPR_MAX(
+ t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ 1024);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window,
incoming_frame_size);
+ if (t->incoming_window <= target_incoming_window / 2) {
+ grpc_chttp2_initiate_write(exec_ctx, t, false, "flow_control");
+ }
if (s != NULL) {
- if (incoming_frame_size > s->incoming_window) {
+ if (incoming_frame_size >
+ s->incoming_window_delta +
+ t->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) {
char *msg;
gpr_asprintf(&msg,
"frame of size %d overflows incoming window of %" PRId64,
- t->incoming_frame_size, s->incoming_window);
+ t->incoming_frame_size,
+ s->incoming_window_delta +
+ t->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
grpc_error *err = GRPC_ERROR_CREATE(msg);
gpr_free(msg);
return err;
}
- GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", t, s, incoming_window,
+ GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", t, s, incoming_window_delta,
incoming_frame_size);
+ if ((int64_t)t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] +
+ (int64_t)s->incoming_window_delta - (int64_t)s->announce_window <=
+ (int64_t)t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] /
+ 2) {
+ grpc_chttp2_become_writable(exec_ctx, t, s,
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED,
+ "window-update-required");
+ }
s->received_bytes += incoming_frame_size;
- s->max_recv_bytes -=
- (uint32_t)GPR_MIN(s->max_recv_bytes, incoming_frame_size);
}
return GRPC_ERROR_NONE;
@@ -432,7 +453,7 @@ error_handler:
}
grpc_slice_buffer_add(
&t->qbuf, grpc_chttp2_rst_stream_create(t->incoming_stream_id,
- GRPC_CHTTP2_PROTOCOL_ERROR,
+ GRPC_HTTP2_PROTOCOL_ERROR,
&s->stats.outgoing));
return init_skip_frame_parser(exec_ctx, t, 0);
} else {
@@ -443,7 +464,7 @@ error_handler:
static void free_timeout(void *p) { gpr_free(p); }
static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp,
- grpc_mdelem *md) {
+ grpc_mdelem md) {
grpc_chttp2_transport *t = tp;
grpc_chttp2_stream *s = t->incoming_stream;
@@ -451,32 +472,42 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp,
GPR_ASSERT(s != NULL);
- GRPC_CHTTP2_IF_TRACING(gpr_log(
- GPR_INFO, "HTTP:%d:HDR:%s: %s: %s", s->id, t->is_client ? "CLI" : "SVR",
- grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));
+ if (grpc_http_trace) {
+ char *key = grpc_slice_to_c_string(GRPC_MDKEY(md));
+ char *value =
+ grpc_dump_slice(GRPC_MDVALUE(md), GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_INFO, "HTTP:%d:HDR:%s: %s: %s", s->id,
+ t->is_client ? "CLI" : "SVR", key, value);
+ gpr_free(key);
+ gpr_free(value);
+ }
- if (md->key == GRPC_MDSTR_GRPC_STATUS && md != GRPC_MDELEM_GRPC_STATUS_0) {
+ if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_GRPC_STATUS) &&
+ !grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_0)) {
/* TODO(ctiller): check for a status like " 0" */
s->seen_error = true;
}
- if (md->key == GRPC_MDSTR_GRPC_TIMEOUT) {
+ if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_GRPC_TIMEOUT)) {
gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
- if (!cached_timeout) {
+ gpr_timespec timeout;
+ if (cached_timeout == NULL) {
/* not already parsed: parse it now, and store the result away */
cached_timeout = gpr_malloc(sizeof(gpr_timespec));
- if (!grpc_http2_decode_timeout(grpc_mdstr_as_c_string(md->value),
- cached_timeout)) {
- gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'",
- grpc_mdstr_as_c_string(md->value));
+ if (!grpc_http2_decode_timeout(GRPC_MDVALUE(md), cached_timeout)) {
+ char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
+ gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'", val);
+ gpr_free(val);
*cached_timeout = gpr_inf_future(GPR_TIMESPAN);
}
- cached_timeout =
- grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
+ timeout = *cached_timeout;
+ grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
+ } else {
+ timeout = *cached_timeout;
}
grpc_chttp2_incoming_metadata_buffer_set_deadline(
&s->metadata_buffer[0],
- gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), *cached_timeout));
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), timeout));
GRPC_MDELEM_UNREF(exec_ctx, md);
} else {
const size_t new_size = s->metadata_buffer[0].size + GRPC_MDELEM_LENGTH(md);
@@ -505,7 +536,7 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp,
}
static void on_trailing_header(grpc_exec_ctx *exec_ctx, void *tp,
- grpc_mdelem *md) {
+ grpc_mdelem md) {
grpc_chttp2_transport *t = tp;
grpc_chttp2_stream *s = t->incoming_stream;
@@ -513,11 +544,18 @@ static void on_trailing_header(grpc_exec_ctx *exec_ctx, void *tp,
GPR_ASSERT(s != NULL);
- GRPC_CHTTP2_IF_TRACING(gpr_log(
- GPR_INFO, "HTTP:%d:TRL:%s: %s: %s", s->id, t->is_client ? "CLI" : "SVR",
- grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));
+ if (grpc_http_trace) {
+ char *key = grpc_slice_to_c_string(GRPC_MDKEY(md));
+ char *value =
+ grpc_dump_slice(GRPC_MDVALUE(md), GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_INFO, "HTTP:%d:TRL:%s: %s: %s", s->id,
+ t->is_client ? "CLI" : "SVR", key, value);
+ gpr_free(key);
+ gpr_free(value);
+ }
- if (md->key == GRPC_MDSTR_GRPC_STATUS && md != GRPC_MDELEM_GRPC_STATUS_0) {
+ if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_GRPC_STATUS) &&
+ !grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_0)) {
/* TODO(ctiller): check for a status like " 0" */
s->seen_error = true;
}
@@ -733,14 +771,13 @@ static grpc_error *parse_frame_slice(grpc_exec_ctx *exec_ctx,
if (grpc_http_trace) {
const char *msg = grpc_error_string(err);
gpr_log(GPR_ERROR, "%s", msg);
- grpc_error_free_string(msg);
}
grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
if (s) {
s->forced_close_error = err;
grpc_slice_buffer_add(
&t->qbuf, grpc_chttp2_rst_stream_create(t->incoming_stream_id,
- GRPC_CHTTP2_PROTOCOL_ERROR,
+ GRPC_HTTP2_PROTOCOL_ERROR,
&s->stats.outgoing));
} else {
GRPC_ERROR_UNREF(err);
diff --git a/src/core/ext/transport/chttp2/transport/status_conversion.c b/src/core/ext/transport/chttp2/transport/status_conversion.c
deleted file mode 100644
index 5dce2f2d0c..0000000000
--- a/src/core/ext/transport/chttp2/transport/status_conversion.c
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/ext/transport/chttp2/transport/status_conversion.h"
-
-int grpc_chttp2_grpc_status_to_http2_error(grpc_status_code status) {
- switch (status) {
- case GRPC_STATUS_OK:
- return GRPC_CHTTP2_NO_ERROR;
- case GRPC_STATUS_CANCELLED:
- return GRPC_CHTTP2_CANCEL;
- case GRPC_STATUS_DEADLINE_EXCEEDED:
- return GRPC_CHTTP2_CANCEL;
- case GRPC_STATUS_RESOURCE_EXHAUSTED:
- return GRPC_CHTTP2_ENHANCE_YOUR_CALM;
- case GRPC_STATUS_PERMISSION_DENIED:
- return GRPC_CHTTP2_INADEQUATE_SECURITY;
- case GRPC_STATUS_UNAVAILABLE:
- return GRPC_CHTTP2_REFUSED_STREAM;
- default:
- return GRPC_CHTTP2_INTERNAL_ERROR;
- }
-}
-
-grpc_status_code grpc_chttp2_http2_error_to_grpc_status(
- grpc_chttp2_error_code error, gpr_timespec deadline) {
- switch (error) {
- case GRPC_CHTTP2_NO_ERROR:
- /* should never be received */
- return GRPC_STATUS_INTERNAL;
- case GRPC_CHTTP2_CANCEL:
- /* http2 cancel translates to STATUS_CANCELLED iff deadline hasn't been
- * exceeded */
- return gpr_time_cmp(gpr_now(deadline.clock_type), deadline) >= 0
- ? GRPC_STATUS_DEADLINE_EXCEEDED
- : GRPC_STATUS_CANCELLED;
- case GRPC_CHTTP2_ENHANCE_YOUR_CALM:
- return GRPC_STATUS_RESOURCE_EXHAUSTED;
- case GRPC_CHTTP2_INADEQUATE_SECURITY:
- return GRPC_STATUS_PERMISSION_DENIED;
- case GRPC_CHTTP2_REFUSED_STREAM:
- return GRPC_STATUS_UNAVAILABLE;
- default:
- return GRPC_STATUS_INTERNAL;
- }
-}
-
-grpc_status_code grpc_chttp2_http2_status_to_grpc_status(int status) {
- switch (status) {
- /* these HTTP2 status codes are called out explicitly in status.proto */
- case 200:
- return GRPC_STATUS_OK;
- case 400:
- return GRPC_STATUS_INVALID_ARGUMENT;
- case 401:
- return GRPC_STATUS_UNAUTHENTICATED;
- case 403:
- return GRPC_STATUS_PERMISSION_DENIED;
- case 404:
- return GRPC_STATUS_NOT_FOUND;
- case 409:
- return GRPC_STATUS_ABORTED;
- case 412:
- return GRPC_STATUS_FAILED_PRECONDITION;
- case 429:
- return GRPC_STATUS_RESOURCE_EXHAUSTED;
- case 499:
- return GRPC_STATUS_CANCELLED;
- case 500:
- return GRPC_STATUS_UNKNOWN;
- case 501:
- return GRPC_STATUS_UNIMPLEMENTED;
- case 503:
- return GRPC_STATUS_UNAVAILABLE;
- case 504:
- return GRPC_STATUS_DEADLINE_EXCEEDED;
- /* everything else is unknown */
- default:
- return GRPC_STATUS_UNKNOWN;
- }
-}
-
-int grpc_chttp2_grpc_status_to_http2_status(grpc_status_code status) {
- return 200;
-}
diff --git a/src/core/ext/transport/chttp2/transport/status_conversion.h b/src/core/ext/transport/chttp2/transport/status_conversion.h
deleted file mode 100644
index 953bc9f1e1..0000000000
--- a/src/core/ext/transport/chttp2/transport/status_conversion.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_STATUS_CONVERSION_H
-#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_STATUS_CONVERSION_H
-
-#include <grpc/grpc.h>
-#include "src/core/ext/transport/chttp2/transport/http2_errors.h"
-
-/* Conversion of grpc status codes to http2 error codes (for RST_STREAM) */
-grpc_chttp2_error_code grpc_chttp2_grpc_status_to_http2_error(
- grpc_status_code status);
-grpc_status_code grpc_chttp2_http2_error_to_grpc_status(
- grpc_chttp2_error_code error, gpr_timespec deadline);
-
-/* Conversion of HTTP status codes (:status) to grpc status codes */
-grpc_status_code grpc_chttp2_http2_status_to_grpc_status(int status);
-int grpc_chttp2_grpc_status_to_http2_status(grpc_status_code status);
-
-#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_STATUS_CONVERSION_H */
diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c
index a60264cc51..078818fb18 100644
--- a/src/core/ext/transport/chttp2/transport/stream_lists.c
+++ b/src/core/ext/transport/chttp2/transport/stream_lists.c
@@ -37,14 +37,14 @@
/* core list management */
-static int stream_list_empty(grpc_chttp2_transport *t,
- grpc_chttp2_stream_list_id id) {
+static bool stream_list_empty(grpc_chttp2_transport *t,
+ grpc_chttp2_stream_list_id id) {
return t->lists[id].head == NULL;
}
-static int stream_list_pop(grpc_chttp2_transport *t,
- grpc_chttp2_stream **stream,
- grpc_chttp2_stream_list_id id) {
+static bool stream_list_pop(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **stream,
+ grpc_chttp2_stream_list_id id) {
grpc_chttp2_stream *s = t->lists[id].head;
if (s) {
grpc_chttp2_stream *new_head = s->links[id].next;
@@ -124,8 +124,8 @@ bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport *t,
return stream_list_add(t, s, GRPC_CHTTP2_LIST_WRITABLE);
}
-int grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s) {
+bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s) {
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WRITABLE);
}
@@ -139,12 +139,12 @@ bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport *t,
return stream_list_add(t, s, GRPC_CHTTP2_LIST_WRITING);
}
-int grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport *t) {
+bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport *t) {
return !stream_list_empty(t, GRPC_CHTTP2_LIST_WRITING);
}
-int grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s) {
+bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s) {
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WRITING);
}
@@ -153,8 +153,8 @@ void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport *t,
stream_list_add(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
}
-int grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s) {
+bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s) {
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
}
@@ -168,8 +168,8 @@ void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t,
stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}
-int grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s) {
+bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s) {
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}
@@ -177,3 +177,18 @@ void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}
+
+void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s) {
+ stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
+}
+
+bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s) {
+ return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
+}
+
+bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s) {
+ return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
+}
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 84554d327d..05e6f59947 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -37,9 +37,9 @@
#include <grpc/support/log.h>
-#include "src/core/ext/transport/chttp2/transport/http2_errors.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/transport/http2_errors.h"
static void add_to_write_list(grpc_chttp2_write_cb **list,
grpc_chttp2_write_cb *cb) {
@@ -56,6 +56,75 @@ static void finish_write_cb(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->write_cb_pool = cb;
}
+static void collapse_pings_from_into(grpc_chttp2_transport *t,
+ grpc_chttp2_ping_type ping_type,
+ grpc_chttp2_ping_queue *pq) {
+ for (size_t i = 0; i < GRPC_CHTTP2_PCL_COUNT; i++) {
+ grpc_closure_list_move(&t->ping_queues[ping_type].lists[i], &pq->lists[i]);
+ }
+}
+
+static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_ping_type ping_type) {
+ grpc_chttp2_ping_queue *pq = &t->ping_queues[ping_type];
+ if (grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
+ /* no ping needed: wait */
+ return;
+ }
+ if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
+ /* ping already in-flight: wait */
+ if (grpc_http_trace || grpc_bdp_estimator_trace) {
+ gpr_log(GPR_DEBUG, "Ping delayed [%p]: already pinging", t->peer_string);
+ }
+ return;
+ }
+ if (t->ping_state.pings_before_data_required == 0 &&
+ t->ping_policy.max_pings_without_data != 0) {
+ /* need to send something of substance before sending a ping again */
+ if (grpc_http_trace || grpc_bdp_estimator_trace) {
+ gpr_log(GPR_DEBUG, "Ping delayed [%p]: too many recent pings: %d/%d",
+ t->peer_string, t->ping_state.pings_before_data_required,
+ t->ping_policy.max_pings_without_data);
+ }
+ return;
+ }
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ gpr_timespec elapsed = gpr_time_sub(now, t->ping_state.last_ping_sent_time);
+ /*gpr_log(GPR_DEBUG, "elapsed:%d.%09d min:%d.%09d", (int)elapsed.tv_sec,
+ elapsed.tv_nsec, (int)t->ping_policy.min_time_between_pings.tv_sec,
+ (int)t->ping_policy.min_time_between_pings.tv_nsec);*/
+ if (gpr_time_cmp(elapsed, t->ping_policy.min_time_between_pings) < 0) {
+ /* not enough elapsed time between successive pings */
+ if (grpc_http_trace || grpc_bdp_estimator_trace) {
+ gpr_log(GPR_DEBUG,
+ "Ping delayed [%p]: not enough time elapsed since last ping",
+ t->peer_string);
+ }
+ return;
+ }
+ /* coalesce equivalent pings into this one */
+ switch (ping_type) {
+ case GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE:
+ collapse_pings_from_into(t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, pq);
+ break;
+ case GRPC_CHTTP2_PING_ON_NEXT_WRITE:
+ break;
+ case GRPC_CHTTP2_PING_TYPE_COUNT:
+ GPR_UNREACHABLE_CODE(break);
+ }
+ pq->inflight_id = t->ping_ctr * GRPC_CHTTP2_PING_TYPE_COUNT + ping_type;
+ t->ping_ctr++;
+ grpc_closure_list_sched(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
+ grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
+ &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
+ grpc_slice_buffer_add(&t->outbuf,
+ grpc_chttp2_ping_create(false, pq->inflight_id));
+ t->ping_state.last_ping_sent_time = now;
+ t->ping_state.pings_before_data_required -=
+ (t->ping_state.pings_before_data_required != 0);
+}
+
static void update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *s, int64_t send_bytes,
grpc_chttp2_write_cb **list, grpc_error *error) {
@@ -139,6 +208,8 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
s->sent_initial_metadata = true;
sent_initial_metadata = true;
now_writing = true;
+ t->ping_state.pings_before_data_required =
+ t->ping_policy.max_pings_without_data;
}
/* send any window updates */
if (s->announce_window > 0) {
@@ -146,15 +217,22 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_add(&t->outbuf,
grpc_chttp2_window_update_create(
s->id, s->announce_window, &s->stats.outgoing));
+ t->ping_state.pings_before_data_required =
+ t->ping_policy.max_pings_without_data;
GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, announce_window, announce);
}
if (sent_initial_metadata) {
/* send any body bytes, if allowed by flow control */
if (s->flow_controlled_buffer.length > 0) {
- uint32_t max_outgoing =
- (uint32_t)GPR_MIN(t->settings[GRPC_ACKED_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
- GPR_MIN(s->outgoing_window, t->outgoing_window));
+ uint32_t stream_outgoing_window = (uint32_t)GPR_MAX(
+ 0,
+ s->outgoing_window_delta +
+ (int64_t)t->settings[GRPC_PEER_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
+ uint32_t max_outgoing = (uint32_t)GPR_MIN(
+ t->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
+ GPR_MIN(stream_outgoing_window, t->outgoing_window));
if (max_outgoing > 0) {
uint32_t send_bytes =
(uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length);
@@ -167,16 +245,18 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes,
is_last_frame, &s->stats.outgoing,
&t->outbuf);
- GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window,
+ GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta,
send_bytes);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
send_bytes);
+ t->ping_state.pings_before_data_required =
+ t->ping_policy.max_pings_without_data;
if (is_last_frame) {
s->send_trailing_metadata = NULL;
s->sent_trailing_metadata = true;
if (!t->is_client && !s->read_closed) {
grpc_slice_buffer_add(&t->outbuf, grpc_chttp2_rst_stream_create(
- s->id, GRPC_CHTTP2_NO_ERROR,
+ s->id, GRPC_HTTP2_NO_ERROR,
&s->stats.outgoing));
}
}
@@ -189,6 +269,9 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
} else if (t->outgoing_window == 0) {
grpc_chttp2_list_add_stalled_by_transport(t, s);
now_writing = true;
+ } else if (stream_outgoing_window == 0) {
+ grpc_chttp2_list_add_stalled_by_stream(t, s);
+ now_writing = true;
}
}
if (s->send_trailing_metadata != NULL &&
@@ -209,7 +292,7 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
if (!t->is_client && !s->read_closed) {
grpc_slice_buffer_add(
&t->outbuf, grpc_chttp2_rst_stream_create(
- s->id, GRPC_CHTTP2_NO_ERROR, &s->stats.outgoing));
+ s->id, GRPC_HTTP2_NO_ERROR, &s->stats.outgoing));
}
now_writing = true;
}
@@ -227,15 +310,32 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
/* if the grpc_chttp2_transport is ready to send a window update, do so here
also; 3/4 is a magic number that will likely get tuned soon */
- if (t->announce_incoming_window > 0) {
- uint32_t announced =
- (uint32_t)GPR_MIN(t->announce_incoming_window, UINT32_MAX);
- GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, announce_incoming_window,
- announced);
+ uint32_t target_incoming_window = GPR_MAX(
+ t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ 1024);
+ uint32_t threshold_to_send_transport_window_update =
+ t->outbuf.count > 0 ? 3 * target_incoming_window / 4
+ : target_incoming_window / 2;
+ if (t->incoming_window <= threshold_to_send_transport_window_update) {
+ maybe_initiate_ping(exec_ctx, t,
+ GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE);
+ uint32_t announced = (uint32_t)GPR_CLAMP(
+ target_incoming_window - t->incoming_window, 0, UINT32_MAX);
+ GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("write", t, incoming_window, announced);
grpc_transport_one_way_stats throwaway_stats;
grpc_slice_buffer_add(&t->outbuf, grpc_chttp2_window_update_create(
0, announced, &throwaway_stats));
+ t->ping_state.pings_before_data_required =
+ t->ping_policy.max_pings_without_data;
+ }
+
+ for (size_t i = 0; i < t->ping_ack_count; i++) {
+ grpc_slice_buffer_add(&t->outbuf,
+ grpc_chttp2_ping_create(1, t->ping_acks[i]));
}
+ t->ping_ack_count = 0;
+
+ maybe_initiate_ping(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE);
GPR_TIMER_END("grpc_chttp2_begin_write", 0);
diff --git a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
index 74327a4214..da6c0b4fbc 100644
--- a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
+++ b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
@@ -38,7 +38,7 @@ library, so we can build it in all environments */
#include <grpc/support/log.h>
-#include "third_party/Cronet/bidirectional_stream_c.h"
+#include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
#ifdef GRPC_COMPILE_WITH_CRONET
/* link with the real CRONET library in the build system */
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 6f5816390a..d755b1f147 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -44,12 +44,14 @@
#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/transport_impl.h"
-#include "third_party/Cronet/bidirectional_stream_c.h"
+#include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
#define GRPC_HEADER_SIZE_IN_BYTES 5
@@ -437,9 +439,11 @@ static void on_response_headers_received(
for (size_t i = 0; i < headers->count; i++) {
grpc_chttp2_incoming_metadata_buffer_add(
&s->state.rs.initial_metadata,
- grpc_mdelem_from_metadata_strings(
- &exec_ctx, grpc_mdstr_from_string(headers->headers[i].key),
- grpc_mdstr_from_string(headers->headers[i].value)));
+ grpc_mdelem_from_slices(
+ &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string(
+ headers->headers[i].key)),
+ grpc_slice_intern(
+ grpc_slice_from_static_string(headers->headers[i].value))));
}
s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
@@ -534,9 +538,11 @@ static void on_response_trailers_received(
trailers->headers[i].value);
grpc_chttp2_incoming_metadata_buffer_add(
&s->state.rs.trailing_metadata,
- grpc_mdelem_from_metadata_strings(
- &exec_ctx, grpc_mdstr_from_string(trailers->headers[i].key),
- grpc_mdstr_from_string(trailers->headers[i].value)));
+ grpc_mdelem_from_slices(
+ &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string(
+ trailers->headers[i].key)),
+ grpc_slice_intern(
+ grpc_slice_from_static_string(trailers->headers[i].value))));
s->state.rs.trailing_metadata_valid = true;
if (0 == strcmp(trailers->headers[i].key, "grpc-status") &&
0 != strcmp(trailers->headers[i].value, "0")) {
@@ -616,27 +622,33 @@ static void convert_metadata_to_cronet_headers(
curr = head;
size_t num_headers = 0;
while (num_headers < num_headers_available) {
- grpc_mdelem *mdelem = curr->md;
+ grpc_mdelem mdelem = curr->md;
curr = curr->next;
- const char *key = grpc_mdstr_as_c_string(mdelem->key);
- const char *value = grpc_mdstr_as_c_string(mdelem->value);
- if (mdelem->key == GRPC_MDSTR_SCHEME ||
- mdelem->key == GRPC_MDSTR_AUTHORITY) {
+ char *key = grpc_slice_to_c_string(GRPC_MDKEY(mdelem));
+ char *value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem));
+ if (grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_SCHEME) ||
+ grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_AUTHORITY)) {
/* Cronet populates these fields on its own */
+ gpr_free(key);
+ gpr_free(value);
continue;
}
- if (mdelem->key == GRPC_MDSTR_METHOD) {
- if (mdelem->value == GRPC_MDSTR_PUT) {
+ if (grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_METHOD)) {
+ if (grpc_slice_eq(GRPC_MDVALUE(mdelem), GRPC_MDSTR_PUT)) {
*method = "PUT";
} else {
/* POST method in default*/
*method = "POST";
}
+ gpr_free(key);
+ gpr_free(value);
continue;
}
- if (mdelem->key == GRPC_MDSTR_PATH) {
+ if (grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_PATH)) {
/* Create URL by appending :path value to the hostname */
gpr_asprintf(pp_url, "https://%s%s", host, value);
+ gpr_free(key);
+ gpr_free(value);
continue;
}
CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
@@ -662,7 +674,7 @@ static int parse_grpc_header(const uint8_t *data) {
static bool header_has_authority(grpc_linked_mdelem *head) {
while (head != NULL) {
- if (head->md->key == GRPC_MDSTR_AUTHORITY) {
+ if (grpc_slice_eq(GRPC_MDKEY(head->md), GRPC_MDSTR_AUTHORITY)) {
return true;
}
head = head->next;
@@ -843,6 +855,12 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
s->header_array.capacity = s->header_array.count;
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url);
bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false);
+ unsigned int header_index;
+ for (header_index = 0; header_index < s->header_array.count;
+ header_index++) {
+ gpr_free((void *)s->header_array.headers[header_index].key);
+ gpr_free((void *)s->header_array.headers[header_index].value);
+ }
stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
result = ACTION_TAKEN_WITH_CALLBACK;
} else if (stream_op->recv_initial_metadata &&
@@ -857,7 +875,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_NONE);
} else {
grpc_chttp2_incoming_metadata_buffer_publish(
- &oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata);
+ exec_ctx, &oas->s->state.rs.initial_metadata,
+ stream_op->recv_initial_metadata);
grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
GRPC_ERROR_NONE);
}
@@ -1013,7 +1032,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
if (oas->s->state.rs.trailing_metadata_valid) {
grpc_chttp2_incoming_metadata_buffer_publish(
- &oas->s->state.rs.trailing_metadata,
+ exec_ctx, &oas->s->state.rs.trailing_metadata,
stream_op->recv_trailing_metadata);
stream_state->rs.trailing_metadata_valid = false;
}