aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/chttp2
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport/chttp2')
-rw-r--r--src/core/ext/transport/chttp2/client/authority.cc42
-rw-r--r--src/core/ext/transport/chttp2/client/authority.h36
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.cc11
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc1
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc25
-rw-r--r--src/core/ext/transport/chttp2/server/chttp2_server.h2
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc1
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc373
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_data.cc32
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_data.h10
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_goaway.h1
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_ping.h1
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_rst_stream.h1
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_settings.cc6
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_settings.h1
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_window_update.h1
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_encoder.cc4
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.cc2
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.h1
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_table.cc4
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h82
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.cc4
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_lists.cc6
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.cc24
24 files changed, 388 insertions, 283 deletions
diff --git a/src/core/ext/transport/chttp2/client/authority.cc b/src/core/ext/transport/chttp2/client/authority.cc
new file mode 100644
index 0000000000..bad3153b01
--- /dev/null
+++ b/src/core/ext/transport/chttp2/client/authority.cc
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/ext/transport/chttp2/client/authority.h"
+
+grpc_channel_args* grpc_default_authority_add_if_not_present(
+ const grpc_channel_args* args) {
+ const bool has_default_authority =
+ grpc_channel_args_find(args, GRPC_ARG_DEFAULT_AUTHORITY) != nullptr;
+ grpc_arg new_args[1];
+ size_t num_new_args = 0;
+ grpc_core::UniquePtr<char> default_authority;
+ if (!has_default_authority) {
+ const grpc_arg* server_uri_arg =
+ grpc_channel_args_find(args, GRPC_ARG_SERVER_URI);
+ const char* server_uri_str = grpc_channel_arg_get_string(server_uri_arg);
+ GPR_ASSERT(server_uri_str != nullptr);
+ default_authority =
+ grpc_core::ResolverRegistry::GetDefaultAuthority(server_uri_str);
+ GPR_ASSERT(default_authority != nullptr);
+ new_args[num_new_args++] = grpc_channel_arg_string_create(
+ const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY), default_authority.get());
+ }
+ return grpc_channel_args_copy_and_add(args, new_args, num_new_args);
+}
diff --git a/src/core/ext/transport/chttp2/client/authority.h b/src/core/ext/transport/chttp2/client/authority.h
new file mode 100644
index 0000000000..642584ef56
--- /dev/null
+++ b/src/core/ext/transport/chttp2/client/authority.h
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_AUTHORITY_H
+#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_AUTHORITY_H
+
+#include <grpc/support/port_platform.h>
+
+#include <grpc/grpc.h>
+
+#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/resolver_registry.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gprpp/memory.h"
+
+/// Returns a copy of \a args with the default authority channel arg set if it
+/// wasn't already present.
+grpc_channel_args* grpc_default_authority_add_if_not_present(
+ const grpc_channel_args* args);
+
+#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_AUTHORITY_H */
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create.cc
index 60800365b8..e6c8c38260 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.cc
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.cc
@@ -27,6 +27,7 @@
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
+#include "src/core/ext/transport/chttp2/client/authority.h"
#include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/surface/api_trace.h"
@@ -40,9 +41,13 @@ static void client_channel_factory_unref(
static grpc_subchannel* client_channel_factory_create_subchannel(
grpc_client_channel_factory* cc_factory, const grpc_subchannel_args* args) {
+ grpc_subchannel_args final_sc_args;
+ memcpy(&final_sc_args, args, sizeof(*args));
+ final_sc_args.args = grpc_default_authority_add_if_not_present(args->args);
grpc_connector* connector = grpc_chttp2_connector_create();
- grpc_subchannel* s = grpc_subchannel_create(connector, args);
+ grpc_subchannel* s = grpc_subchannel_create(connector, &final_sc_args);
grpc_connector_unref(connector);
+ grpc_channel_args_destroy(const_cast<grpc_channel_args*>(final_sc_args.args));
return s;
}
@@ -56,8 +61,8 @@ static grpc_channel* client_channel_factory_create_channel(
// Add channel arg containing the server URI.
grpc_core::UniquePtr<char> canonical_target =
grpc_core::ResolverRegistry::AddDefaultPrefixIfNeeded(target);
- grpc_arg arg = grpc_channel_arg_string_create((char*)GRPC_ARG_SERVER_URI,
- canonical_target.get());
+ grpc_arg arg = grpc_channel_arg_string_create(
+ const_cast<char*>(GRPC_ARG_SERVER_URI), canonical_target.get());
const char* to_remove[] = {GRPC_ARG_SERVER_URI};
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1);
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
index 479f0da572..b95c9dae53 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
@@ -29,7 +29,6 @@
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/endpoint.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/tcp_client_posix.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#include "src/core/lib/surface/api_trace.h"
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
index a82009ff69..5ce73a95d7 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
@@ -71,9 +71,6 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args(
grpc_uri* server_uri =
grpc_uri_parse(server_uri_str, true /* supress errors */);
GPR_ASSERT(server_uri != nullptr);
- const char* server_uri_path;
- server_uri_path =
- server_uri->path[0] == '/' ? server_uri->path + 1 : server_uri->path;
const grpc_core::TargetAuthorityTable* target_authority_table =
grpc_core::FindTargetAuthorityTableInArgs(args->args);
grpc_core::UniquePtr<char> authority;
@@ -98,33 +95,49 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args(
// authority table was present or because the target was not present
// in the table), fall back to using the original server URI.
if (authority == nullptr) {
- authority.reset(gpr_strdup(server_uri_path));
+ authority =
+ grpc_core::ResolverRegistry::GetDefaultAuthority(server_uri_str);
}
+ grpc_arg args_to_add[2];
+ size_t num_args_to_add = 0;
+ if (grpc_channel_args_find(args->args, GRPC_ARG_DEFAULT_AUTHORITY) ==
+ nullptr) {
+ // If the channel args don't already contain GRPC_ARG_DEFAULT_AUTHORITY, add
+ // the arg, setting it to the value just obtained.
+ args_to_add[num_args_to_add++] = grpc_channel_arg_string_create(
+ const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY), authority.get());
+ }
+ grpc_channel_args* args_with_authority =
+ grpc_channel_args_copy_and_add(args->args, args_to_add, num_args_to_add);
grpc_uri_destroy(server_uri);
grpc_channel_security_connector* subchannel_security_connector = nullptr;
// Create the security connector using the credentials and target name.
grpc_channel_args* new_args_from_connector = nullptr;
const grpc_security_status security_status =
grpc_channel_credentials_create_security_connector(
- channel_credentials, authority.get(), args->args,
+ channel_credentials, authority.get(), args_with_authority,
&subchannel_security_connector, &new_args_from_connector);
if (security_status != GRPC_SECURITY_OK) {
gpr_log(GPR_ERROR,
"Failed to create secure subchannel for secure name '%s'",
authority.get());
+ grpc_channel_args_destroy(args_with_authority);
return nullptr;
}
grpc_arg new_security_connector_arg =
grpc_security_connector_to_arg(&subchannel_security_connector->base);
grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
- new_args_from_connector != nullptr ? new_args_from_connector : args->args,
+ new_args_from_connector != nullptr ? new_args_from_connector
+ : args_with_authority,
&new_security_connector_arg, 1);
+
GRPC_SECURITY_CONNECTOR_UNREF(&subchannel_security_connector->base,
"lb_channel_create");
if (new_args_from_connector != nullptr) {
grpc_channel_args_destroy(new_args_from_connector);
}
+ grpc_channel_args_destroy(args_with_authority);
grpc_subchannel_args* final_sc_args =
static_cast<grpc_subchannel_args*>(gpr_malloc(sizeof(*final_sc_args)));
memcpy(final_sc_args, args, sizeof(*args));
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.h b/src/core/ext/transport/chttp2/server/chttp2_server.h
index 7b41972160..6e51001b53 100644
--- a/src/core/ext/transport/chttp2/server/chttp2_server.h
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.h
@@ -23,7 +23,7 @@
#include <grpc/impl/codegen/grpc_types.h>
-#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/error.h"
/// Adds a port to \a server. Sets \a port_num to the port number.
/// Takes ownership of \a args.
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc
index 822236dd2d..99f18cdf39 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc
@@ -41,6 +41,5 @@ int grpc_server_add_insecure_http2_port(grpc_server* server, const char* addr) {
GRPC_ERROR_UNREF(err);
}
-
return port_num;
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index df3fb8c68c..0ef73961a5 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -39,6 +39,7 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/timer.h"
@@ -67,7 +68,7 @@
#define DEFAULT_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
#define DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
-#define DEFAULT_MAX_PINGS_BETWEEN_DATA 0 /* unlimited */
+#define DEFAULT_MAX_PINGS_BETWEEN_DATA 2
#define DEFAULT_MAX_PING_STRIKES 2
static int g_default_client_keepalive_time_ms =
@@ -117,12 +118,6 @@ static void connectivity_state_set(grpc_chttp2_transport* t,
grpc_connectivity_state state,
grpc_error* error, const char* reason);
-static void incoming_byte_stream_destroy_locked(void* byte_stream,
- grpc_error* error_ignored);
-static void incoming_byte_stream_publish_error(
- grpc_chttp2_incoming_byte_stream* bs, grpc_error* error);
-static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream* bs);
-
static void benign_reclaimer_locked(void* t, grpc_error* error);
static void destructive_reclaimer_locked(void* t, grpc_error* error);
@@ -662,8 +657,8 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs,
s->t = t;
s->refcount = refcount;
/* We reserve one 'active stream' that's dropped when the stream is
- read-closed. The others are for incoming_byte_streams that are actively
- reading */
+ read-closed. The others are for Chttp2IncomingByteStreams that are
+ actively reading */
GRPC_CHTTP2_STREAM_REF(s, "chttp2");
grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena);
@@ -674,6 +669,7 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs,
GRPC_CLOSURE_INIT(&s->complete_fetch_locked, complete_fetch_locked, s,
grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer);
+ s->unprocessed_incoming_frames_buffer_cached_length = 0;
grpc_slice_buffer_init(&s->frame_storage);
grpc_slice_buffer_init(&s->compressed_data_buffer);
grpc_slice_buffer_init(&s->decompressed_data_buffer);
@@ -811,7 +807,7 @@ static const char* write_state_name(grpc_chttp2_write_state st) {
static void set_write_state(grpc_chttp2_transport* t,
grpc_chttp2_write_state st, const char* reason) {
- GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "W:%p %s state %s -> %s [%s]", t,
+ GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "W:%p %s state %s -> %s [%s]", t,
t->is_client ? "CLIENT" : "SERVER",
write_state_name(t->write_state),
write_state_name(st), reason));
@@ -1076,7 +1072,7 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
uint32_t goaway_error,
grpc_slice goaway_text) {
// GRPC_CHTTP2_IF_TRACING(
- // gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg));
+ // gpr_log(GPR_INFO, "got goaway [%d]: %s", goaway_error, msg));
// Discard the error from a previous goaway frame (if any)
if (t->goaway_error != GRPC_ERROR_NONE) {
@@ -1122,7 +1118,7 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t) {
grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
/* safe since we can't (legally) be parsing this stream yet */
GRPC_CHTTP2_IF_TRACING(gpr_log(
- GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
+ GPR_INFO, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
t->is_client ? "CLI" : "SVR", s, t->next_stream_id));
GPR_ASSERT(s->id == 0);
@@ -1187,7 +1183,7 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
if (grpc_http_trace.enabled()) {
const char* errstr = grpc_error_string(error);
gpr_log(
- GPR_DEBUG,
+ GPR_INFO,
"complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s "
"write_state=%s",
t, closure,
@@ -1256,8 +1252,7 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t,
abort(); /* TODO(ctiller): what cleanup here? */
return; /* early out */
}
- if (s->fetched_send_message_length == s->fetching_send_message->length) {
- grpc_byte_stream_destroy(s->fetching_send_message);
+ if (s->fetched_send_message_length == s->fetching_send_message->length()) {
int64_t notify_offset = s->next_message_end_offset;
if (notify_offset <= s->flow_controlled_bytes_written) {
grpc_chttp2_complete_closure_step(
@@ -1274,20 +1269,19 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t,
cb->closure = s->fetching_send_message_finished;
s->fetching_send_message_finished = nullptr;
grpc_chttp2_write_cb** list =
- s->fetching_send_message->flags & GRPC_WRITE_THROUGH
+ s->fetching_send_message->flags() & GRPC_WRITE_THROUGH
? &s->on_write_finished_cbs
: &s->on_flow_controlled_cbs;
cb->next = *list;
*list = cb;
}
- s->fetching_send_message = nullptr;
+ s->fetching_send_message.reset();
return; /* early out */
- } else if (grpc_byte_stream_next(s->fetching_send_message, UINT32_MAX,
- &s->complete_fetch_locked)) {
- grpc_error* error =
- grpc_byte_stream_pull(s->fetching_send_message, &s->fetching_slice);
+ } else if (s->fetching_send_message->Next(UINT32_MAX,
+ &s->complete_fetch_locked)) {
+ grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice);
if (error != GRPC_ERROR_NONE) {
- grpc_byte_stream_destroy(s->fetching_send_message);
+ s->fetching_send_message.reset();
grpc_chttp2_cancel_stream(t, s, error);
} else {
add_fetched_slice_locked(t, s);
@@ -1300,14 +1294,14 @@ static void complete_fetch_locked(void* gs, grpc_error* error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
grpc_chttp2_transport* t = s->t;
if (error == GRPC_ERROR_NONE) {
- error = grpc_byte_stream_pull(s->fetching_send_message, &s->fetching_slice);
+ error = s->fetching_send_message->Pull(&s->fetching_slice);
if (error == GRPC_ERROR_NONE) {
add_fetched_slice_locked(t, s);
continue_fetching_send_locked(t, s);
}
}
if (error != GRPC_ERROR_NONE) {
- grpc_byte_stream_destroy(s->fetching_send_message);
+ s->fetching_send_message.reset();
grpc_chttp2_cancel_stream(t, s, error);
}
}
@@ -1342,7 +1336,7 @@ static void perform_stream_op_locked(void* stream_op,
if (grpc_http_trace.enabled()) {
char* str = grpc_transport_stream_op_batch_string(op);
- gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s; on_complete = %p", str,
+ gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p", str,
op->on_complete);
gpr_free(str);
if (op->send_initial_metadata) {
@@ -1439,7 +1433,7 @@ static void perform_stream_op_locked(void* stream_op,
GPR_ASSERT(s->id != 0);
grpc_chttp2_mark_stream_writable(t, s);
if (!(op->send_message &&
- (op->payload->send_message.send_message->flags &
+ (op->payload->send_message.send_message->flags() &
GRPC_WRITE_BUFFER_HINT))) {
grpc_chttp2_initiate_write(
t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
@@ -1466,7 +1460,7 @@ static void perform_stream_op_locked(void* stream_op,
if (op->send_message) {
GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE();
GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(
- op->payload->send_message.send_message->length);
+ op->payload->send_message.send_message->length());
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
if (s->write_closed) {
@@ -1475,7 +1469,7 @@ static void perform_stream_op_locked(void* stream_op,
// streaming call might send another message before getting a
// recv_message failure, breaking out of its loop, and then
// starting recv_trailing_metadata.
- grpc_byte_stream_destroy(op->payload->send_message.send_message);
+ op->payload->send_message.send_message.reset();
grpc_chttp2_complete_closure_step(
t, s, &s->fetching_send_message_finished,
t->is_client && s->received_trailing_metadata
@@ -1488,14 +1482,15 @@ static void perform_stream_op_locked(void* stream_op,
GPR_ASSERT(s->fetching_send_message == nullptr);
uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(
&s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
- uint32_t flags = op_payload->send_message.send_message->flags;
+ uint32_t flags = op_payload->send_message.send_message->flags();
frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
- size_t len = op_payload->send_message.send_message->length;
+ size_t len = op_payload->send_message.send_message->length();
frame_hdr[1] = static_cast<uint8_t>(len >> 24);
frame_hdr[2] = static_cast<uint8_t>(len >> 16);
frame_hdr[3] = static_cast<uint8_t>(len >> 8);
frame_hdr[4] = static_cast<uint8_t>(len);
- s->fetching_send_message = op_payload->send_message.send_message;
+ s->fetching_send_message =
+ std::move(op_payload->send_message.send_message);
s->fetched_send_message_length = 0;
s->next_message_end_offset =
s->flow_controlled_bytes_written +
@@ -1583,20 +1578,27 @@ static void perform_stream_op_locked(void* stream_op,
if (op->recv_message) {
GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE();
- size_t already_received;
+ size_t before = 0;
GPR_ASSERT(s->recv_message_ready == nullptr);
GPR_ASSERT(!s->pending_byte_stream);
s->recv_message_ready = op_payload->recv_message.recv_message_ready;
s->recv_message = op_payload->recv_message.recv_message;
if (s->id != 0) {
if (!s->read_closed) {
- already_received = s->frame_storage.length;
+ before = s->frame_storage.length +
+ s->unprocessed_incoming_frames_buffer.length;
+ }
+ }
+ grpc_chttp2_maybe_complete_recv_message(t, s);
+ if (s->id != 0) {
+ if (!s->read_closed && s->frame_storage.length == 0) {
+ size_t after = s->frame_storage.length +
+ s->unprocessed_incoming_frames_buffer_cached_length;
s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES,
- already_received);
+ before - after);
grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
}
}
- grpc_chttp2_maybe_complete_recv_message(t, s);
}
if (op->recv_trailing_metadata) {
@@ -1636,7 +1638,7 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
if (grpc_http_trace.enabled()) {
char* str = grpc_transport_stream_op_batch_string(op);
- gpr_log(GPR_DEBUG, "perform_stream_op[s=%p]: %s", s, str);
+ gpr_log(GPR_INFO, "perform_stream_op[s=%p]: %s", s, str);
gpr_free(str);
}
@@ -1674,6 +1676,33 @@ static void send_ping_locked(grpc_chttp2_transport* t,
GRPC_ERROR_NONE);
}
+/*
+ * Specialized form of send_ping_locked for keepalive ping. If there is already
+ * a ping in progress, the keepalive ping would piggyback onto that ping,
+ * instead of waiting for that ping to complete and then starting a new ping.
+ */
+static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
+ if (t->closed_with_error != GRPC_ERROR_NONE) {
+ GRPC_CLOSURE_SCHED(&t->start_keepalive_ping_locked,
+ GRPC_ERROR_REF(t->closed_with_error));
+ GRPC_CLOSURE_SCHED(&t->finish_keepalive_ping_locked,
+ GRPC_ERROR_REF(t->closed_with_error));
+ return;
+ }
+ grpc_chttp2_ping_queue* pq = &t->ping_queue;
+ if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
+ /* There is a ping in flight. Add yourself to the inflight closure list. */
+ GRPC_CLOSURE_SCHED(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE);
+ grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT],
+ &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE);
+ return;
+ }
+ grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE],
+ &t->start_keepalive_ping_locked, GRPC_ERROR_NONE);
+ grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
+ &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE);
+}
+
static void retry_initiate_ping_locked(void* tp, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
t->ping_state.is_delayed_ping_timer_set = false;
@@ -1711,7 +1740,6 @@ static void send_goaway(grpc_chttp2_transport* t, grpc_error* error) {
}
void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) {
- t->ping_recv_state.ping_strikes++;
if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes &&
t->ping_policy.max_ping_strikes != 0) {
send_goaway(t,
@@ -1874,6 +1902,10 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
}
}
}
+ // save the length of the buffer before handing control back to application
+ // threads. Needed to support correct flow control bookkeeping
+ s->unprocessed_incoming_frames_buffer_cached_length =
+ s->unprocessed_incoming_frames_buffer.length;
if (error == GRPC_ERROR_NONE && *s->recv_message != nullptr) {
null_then_run_closure(&s->recv_message_ready, GRPC_ERROR_NONE);
} else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
@@ -1948,12 +1980,12 @@ static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
}
if (s->pending_byte_stream) {
if (s->on_next != nullptr) {
- grpc_chttp2_incoming_byte_stream* bs = s->data_parser.parsing_frame;
+ grpc_core::Chttp2IncomingByteStream* bs = s->data_parser.parsing_frame;
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
}
- incoming_byte_stream_publish_error(bs, error);
- incoming_byte_stream_unref(bs);
+ bs->PublishError(error);
+ bs->Unref();
s->data_parser.parsing_frame = nullptr;
} else {
GRPC_ERROR_UNREF(s->byte_stream_error);
@@ -2097,10 +2129,7 @@ void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
GRPC_ERROR_REF(error),
"send_trailing_metadata_finished");
- if (s->fetching_send_message != nullptr) {
- grpc_byte_stream_destroy(s->fetching_send_message);
- s->fetching_send_message = nullptr;
- }
+ s->fetching_send_message.reset();
grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished,
GRPC_ERROR_REF(error),
"fetching_send_message_finished");
@@ -2500,7 +2529,7 @@ static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) {
static void start_bdp_ping_locked(void* tp, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
if (grpc_http_trace.enabled()) {
- gpr_log(GPR_DEBUG, "%s: Start BDP ping err=%s", t->peer_string,
+ gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string,
grpc_error_string(error));
}
/* Reset the keepalive ping timer */
@@ -2513,7 +2542,7 @@ static void start_bdp_ping_locked(void* tp, grpc_error* error) {
static void finish_bdp_ping_locked(void* tp, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
if (grpc_http_trace.enabled()) {
- gpr_log(GPR_DEBUG, "%s: Complete BDP ping err=%s", t->peer_string,
+ gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string,
grpc_error_string(error));
}
if (error != GRPC_ERROR_NONE) {
@@ -2617,8 +2646,7 @@ static void init_keepalive_ping_locked(void* arg, grpc_error* error) {
grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
- send_ping_locked(t, &t->start_keepalive_ping_locked,
- &t->finish_keepalive_ping_locked);
+ send_keepalive_ping_locked(t);
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
} else {
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
@@ -2640,7 +2668,7 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
grpc_timer_init(&t->keepalive_watchdog_timer,
- grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
+ grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout,
&t->keepalive_watchdog_fired_locked);
}
@@ -2688,8 +2716,7 @@ static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) {
static void connectivity_state_set(grpc_chttp2_transport* t,
grpc_connectivity_state state,
grpc_error* error, const char* reason) {
- GRPC_CHTTP2_IF_TRACING(
- gpr_log(GPR_DEBUG, "set connectivity_state=%d", state));
+ GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "set connectivity_state=%d", state));
grpc_connectivity_state_set(&t->channel_callback.state_tracker, state, error,
reason);
}
@@ -2716,7 +2743,6 @@ static void set_pollset_set(grpc_transport* gt, grpc_stream* gs,
static void reset_byte_stream(void* arg, grpc_error* error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg);
-
s->pending_byte_stream = false;
if (error == GRPC_ERROR_NONE) {
grpc_chttp2_maybe_complete_recv_message(s->t, s);
@@ -2732,22 +2758,56 @@ static void reset_byte_stream(void* arg, grpc_error* error) {
}
}
-static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream* bs) {
- if (gpr_unref(&bs->refs)) {
- gpr_free(bs);
+namespace grpc_core {
+
+Chttp2IncomingByteStream::Chttp2IncomingByteStream(
+ grpc_chttp2_transport* transport, grpc_chttp2_stream* stream,
+ uint32_t frame_size, uint32_t flags)
+ : ByteStream(frame_size, flags),
+ transport_(transport),
+ stream_(stream),
+ remaining_bytes_(frame_size) {
+ gpr_ref_init(&refs_, 2);
+ GRPC_ERROR_UNREF(stream->byte_stream_error);
+ stream->byte_stream_error = GRPC_ERROR_NONE;
+}
+
+void Chttp2IncomingByteStream::OrphanLocked(void* arg,
+ grpc_error* error_ignored) {
+ Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
+ grpc_chttp2_stream* s = bs->stream_;
+ grpc_chttp2_transport* t = s->t;
+ bs->Unref();
+ s->pending_byte_stream = false;
+ grpc_chttp2_maybe_complete_recv_message(t, s);
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
+}
+
+void Chttp2IncomingByteStream::Orphan() {
+ GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0);
+ GRPC_CLOSURE_SCHED(
+ GRPC_CLOSURE_INIT(&destroy_action_,
+ &Chttp2IncomingByteStream::OrphanLocked, this,
+ grpc_combiner_scheduler(transport_->combiner)),
+ GRPC_ERROR_NONE);
+}
+
+void Chttp2IncomingByteStream::Unref() {
+ if (gpr_unref(&refs_)) {
+ Delete(this);
}
}
-static void incoming_byte_stream_next_locked(void* argp,
- grpc_error* error_ignored) {
- grpc_chttp2_incoming_byte_stream* bs =
- static_cast<grpc_chttp2_incoming_byte_stream*>(argp);
- grpc_chttp2_transport* t = bs->transport;
- grpc_chttp2_stream* s = bs->stream;
+void Chttp2IncomingByteStream::Ref() { gpr_ref(&refs_); }
+void Chttp2IncomingByteStream::NextLocked(void* arg,
+ grpc_error* error_ignored) {
+ Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
+ grpc_chttp2_transport* t = bs->transport_;
+ grpc_chttp2_stream* s = bs->stream_;
size_t cur_length = s->frame_storage.length;
if (!s->read_closed) {
- s->flow_control->IncomingByteStreamUpdate(bs->next_action.max_size_hint,
+ s->flow_control->IncomingByteStreamUpdate(bs->next_action_.max_size_hint,
cur_length);
grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
}
@@ -2756,22 +2816,22 @@ static void incoming_byte_stream_next_locked(void* argp,
grpc_slice_buffer_swap(&s->frame_storage,
&s->unprocessed_incoming_frames_buffer);
s->unprocessed_incoming_frames_decompressed = false;
- GRPC_CLOSURE_SCHED(bs->next_action.on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, GRPC_ERROR_NONE);
} else if (s->byte_stream_error != GRPC_ERROR_NONE) {
- GRPC_CLOSURE_SCHED(bs->next_action.on_complete,
+ GRPC_CLOSURE_SCHED(bs->next_action_.on_complete,
GRPC_ERROR_REF(s->byte_stream_error));
if (s->data_parser.parsing_frame != nullptr) {
- incoming_byte_stream_unref(s->data_parser.parsing_frame);
+ s->data_parser.parsing_frame->Unref();
s->data_parser.parsing_frame = nullptr;
}
} else if (s->read_closed) {
- if (bs->remaining_bytes != 0) {
+ if (bs->remaining_bytes_ != 0) {
s->byte_stream_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
- GRPC_CLOSURE_SCHED(bs->next_action.on_complete,
+ GRPC_CLOSURE_SCHED(bs->next_action_.on_complete,
GRPC_ERROR_REF(s->byte_stream_error));
if (s->data_parser.parsing_frame != nullptr) {
- incoming_byte_stream_unref(s->data_parser.parsing_frame);
+ s->data_parser.parsing_frame->Unref();
s->data_parser.parsing_frame = nullptr;
}
} else {
@@ -2779,122 +2839,94 @@ static void incoming_byte_stream_next_locked(void* argp,
GPR_ASSERT(false);
}
} else {
- s->on_next = bs->next_action.on_complete;
+ s->on_next = bs->next_action_.on_complete;
}
- incoming_byte_stream_unref(bs);
+ bs->Unref();
}
-static bool incoming_byte_stream_next(grpc_byte_stream* byte_stream,
- size_t max_size_hint,
- grpc_closure* on_complete) {
+bool Chttp2IncomingByteStream::Next(size_t max_size_hint,
+ grpc_closure* on_complete) {
GPR_TIMER_SCOPE("incoming_byte_stream_next", 0);
- grpc_chttp2_incoming_byte_stream* bs =
- reinterpret_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream);
- grpc_chttp2_stream* s = bs->stream;
- if (s->unprocessed_incoming_frames_buffer.length > 0) {
+ if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
return true;
} else {
- gpr_ref(&bs->refs);
- bs->next_action.max_size_hint = max_size_hint;
- bs->next_action.on_complete = on_complete;
+ Ref();
+ next_action_.max_size_hint = max_size_hint;
+ next_action_.on_complete = on_complete;
GRPC_CLOSURE_SCHED(
- GRPC_CLOSURE_INIT(&bs->next_action.closure,
- incoming_byte_stream_next_locked, bs,
- grpc_combiner_scheduler(bs->transport->combiner)),
+ GRPC_CLOSURE_INIT(&next_action_.closure,
+ &Chttp2IncomingByteStream::NextLocked, this,
+ grpc_combiner_scheduler(transport_->combiner)),
GRPC_ERROR_NONE);
return false;
}
}
-static grpc_error* incoming_byte_stream_pull(grpc_byte_stream* byte_stream,
- grpc_slice* slice) {
+grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) {
GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0);
- grpc_chttp2_incoming_byte_stream* bs =
- reinterpret_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream);
- grpc_chttp2_stream* s = bs->stream;
grpc_error* error;
-
- if (s->unprocessed_incoming_frames_buffer.length > 0) {
- if (!s->unprocessed_incoming_frames_decompressed) {
+ if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
+ if (!stream_->unprocessed_incoming_frames_decompressed) {
bool end_of_context;
- if (!s->stream_decompression_ctx) {
- s->stream_decompression_ctx = grpc_stream_compression_context_create(
- s->stream_decompression_method);
+ if (!stream_->stream_decompression_ctx) {
+ stream_->stream_decompression_ctx =
+ grpc_stream_compression_context_create(
+ stream_->stream_decompression_method);
}
- if (!grpc_stream_decompress(s->stream_decompression_ctx,
- &s->unprocessed_incoming_frames_buffer,
- &s->decompressed_data_buffer, nullptr,
+ if (!grpc_stream_decompress(stream_->stream_decompression_ctx,
+ &stream_->unprocessed_incoming_frames_buffer,
+ &stream_->decompressed_data_buffer, nullptr,
MAX_SIZE_T, &end_of_context)) {
error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error.");
return error;
}
- GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
- grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
- &s->decompressed_data_buffer);
- s->unprocessed_incoming_frames_decompressed = true;
+ GPR_ASSERT(stream_->unprocessed_incoming_frames_buffer.length == 0);
+ grpc_slice_buffer_swap(&stream_->unprocessed_incoming_frames_buffer,
+ &stream_->decompressed_data_buffer);
+ stream_->unprocessed_incoming_frames_decompressed = true;
if (end_of_context) {
- grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
- s->stream_decompression_ctx = nullptr;
+ grpc_stream_compression_context_destroy(
+ stream_->stream_decompression_ctx);
+ stream_->stream_decompression_ctx = nullptr;
}
- if (s->unprocessed_incoming_frames_buffer.length == 0) {
+ if (stream_->unprocessed_incoming_frames_buffer.length == 0) {
*slice = grpc_empty_slice();
}
}
error = grpc_deframe_unprocessed_incoming_frames(
- &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, slice,
- nullptr);
+ &stream_->data_parser, stream_,
+ &stream_->unprocessed_incoming_frames_buffer, slice, nullptr);
if (error != GRPC_ERROR_NONE) {
return error;
}
} else {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
- GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
return error;
}
return GRPC_ERROR_NONE;
}
-static void incoming_byte_stream_destroy_locked(void* byte_stream,
- grpc_error* error_ignored);
-
-static void incoming_byte_stream_destroy(grpc_byte_stream* byte_stream) {
- GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0);
- grpc_chttp2_incoming_byte_stream* bs =
- reinterpret_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream);
- GRPC_CLOSURE_SCHED(
- GRPC_CLOSURE_INIT(&bs->destroy_action,
- incoming_byte_stream_destroy_locked, bs,
- grpc_combiner_scheduler(bs->transport->combiner)),
- GRPC_ERROR_NONE);
-}
-
-static void incoming_byte_stream_publish_error(
- grpc_chttp2_incoming_byte_stream* bs, grpc_error* error) {
- grpc_chttp2_stream* s = bs->stream;
-
+void Chttp2IncomingByteStream::PublishError(grpc_error* error) {
GPR_ASSERT(error != GRPC_ERROR_NONE);
- GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_REF(error));
- s->on_next = nullptr;
- GRPC_ERROR_UNREF(s->byte_stream_error);
- s->byte_stream_error = GRPC_ERROR_REF(error);
- grpc_chttp2_cancel_stream(bs->transport, bs->stream, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(stream_->on_next, GRPC_ERROR_REF(error));
+ stream_->on_next = nullptr;
+ GRPC_ERROR_UNREF(stream_->byte_stream_error);
+ stream_->byte_stream_error = GRPC_ERROR_REF(error);
+ grpc_chttp2_cancel_stream(transport_, stream_, GRPC_ERROR_REF(error));
}
-grpc_error* grpc_chttp2_incoming_byte_stream_push(
- grpc_chttp2_incoming_byte_stream* bs, grpc_slice slice,
- grpc_slice* slice_out) {
- grpc_chttp2_stream* s = bs->stream;
-
- if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) {
+grpc_error* Chttp2IncomingByteStream::Push(grpc_slice slice,
+ grpc_slice* slice_out) {
+ if (remaining_bytes_ < GRPC_SLICE_LENGTH(slice)) {
grpc_error* error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
-
- GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
grpc_slice_unref_internal(slice);
return error;
} else {
- bs->remaining_bytes -= static_cast<uint32_t> GRPC_SLICE_LENGTH(slice);
+ remaining_bytes_ -= static_cast<uint32_t> GRPC_SLICE_LENGTH(slice);
if (slice_out != nullptr) {
*slice_out = slice;
}
@@ -2902,66 +2934,25 @@ grpc_error* grpc_chttp2_incoming_byte_stream_push(
}
}
-grpc_error* grpc_chttp2_incoming_byte_stream_finished(
- grpc_chttp2_incoming_byte_stream* bs, grpc_error* error,
- bool reset_on_error) {
- grpc_chttp2_stream* s = bs->stream;
-
+grpc_error* Chttp2IncomingByteStream::Finished(grpc_error* error,
+ bool reset_on_error) {
if (error == GRPC_ERROR_NONE) {
- if (bs->remaining_bytes != 0) {
+ if (remaining_bytes_ != 0) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
}
}
if (error != GRPC_ERROR_NONE && reset_on_error) {
- GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
}
- incoming_byte_stream_unref(bs);
+ Unref();
return error;
}
-static void incoming_byte_stream_shutdown(grpc_byte_stream* byte_stream,
- grpc_error* error) {
- grpc_chttp2_incoming_byte_stream* bs =
- reinterpret_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream);
- GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
- bs, error, true /* reset_on_error */));
-}
-
-static const grpc_byte_stream_vtable grpc_chttp2_incoming_byte_stream_vtable = {
- incoming_byte_stream_next, incoming_byte_stream_pull,
- incoming_byte_stream_shutdown, incoming_byte_stream_destroy};
-
-static void incoming_byte_stream_destroy_locked(void* byte_stream,
- grpc_error* error_ignored) {
- grpc_chttp2_incoming_byte_stream* bs =
- static_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream);
- grpc_chttp2_stream* s = bs->stream;
- grpc_chttp2_transport* t = s->t;
-
- GPR_ASSERT(bs->base.vtable == &grpc_chttp2_incoming_byte_stream_vtable);
- incoming_byte_stream_unref(bs);
- s->pending_byte_stream = false;
- grpc_chttp2_maybe_complete_recv_message(t, s);
- grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
+void Chttp2IncomingByteStream::Shutdown(grpc_error* error) {
+ GRPC_ERROR_UNREF(Finished(error, true /* reset_on_error */));
}
-grpc_chttp2_incoming_byte_stream* grpc_chttp2_incoming_byte_stream_create(
- grpc_chttp2_transport* t, grpc_chttp2_stream* s, uint32_t frame_size,
- uint32_t flags) {
- grpc_chttp2_incoming_byte_stream* incoming_byte_stream =
- static_cast<grpc_chttp2_incoming_byte_stream*>(
- gpr_malloc(sizeof(*incoming_byte_stream)));
- incoming_byte_stream->base.length = frame_size;
- incoming_byte_stream->remaining_bytes = frame_size;
- incoming_byte_stream->base.flags = flags;
- incoming_byte_stream->base.vtable = &grpc_chttp2_incoming_byte_stream_vtable;
- gpr_ref_init(&incoming_byte_stream->refs, 2);
- incoming_byte_stream->transport = t;
- incoming_byte_stream->stream = s;
- GRPC_ERROR_UNREF(s->byte_stream_error);
- s->byte_stream_error = GRPC_ERROR_NONE;
- return incoming_byte_stream;
-}
+} // namespace grpc_core
/*******************************************************************************
* RESOURCE QUOTAS
@@ -2992,7 +2983,7 @@ static void benign_reclaimer_locked(void* arg, grpc_error* error) {
/* Channel with no active streams: send a goaway to try and make it
* disconnect cleanly */
if (grpc_resource_quota_trace.enabled()) {
- gpr_log(GPR_DEBUG, "HTTP2: %s - send goaway to free memory",
+ gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory",
t->peer_string);
}
send_goaway(t,
@@ -3000,7 +2991,7 @@ static void benign_reclaimer_locked(void* arg, grpc_error* error) {
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
} else if (error == GRPC_ERROR_NONE && grpc_resource_quota_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
" streams",
t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map));
@@ -3021,7 +3012,7 @@ static void destructive_reclaimer_locked(void* arg, grpc_error* error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
grpc_chttp2_stream_map_rand(&t->stream_map));
if (grpc_resource_quota_trace.enabled()) {
- gpr_log(GPR_DEBUG, "HTTP2: %s - abandon stream id %d", t->peer_string,
+ gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d", t->peer_string,
s->id);
}
grpc_chttp2_cancel_stream(
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.cc b/src/core/ext/transport/chttp2/transport/frame_data.cc
index 0d37a494a2..f8f06f6789 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.cc
+++ b/src/core/ext/transport/chttp2/transport/frame_data.cc
@@ -27,6 +27,7 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/transport/transport.h"
@@ -39,8 +40,7 @@ grpc_error* grpc_chttp2_data_parser_init(grpc_chttp2_data_parser* parser) {
void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser* parser) {
if (parser->parsing_frame != nullptr) {
- GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
- parser->parsing_frame,
+ GRPC_ERROR_UNREF(parser->parsing_frame->Finished(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false));
}
GRPC_ERROR_UNREF(parser->error);
@@ -100,7 +100,7 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
grpc_error* grpc_deframe_unprocessed_incoming_frames(
grpc_chttp2_data_parser* p, grpc_chttp2_stream* s,
grpc_slice_buffer* slices, grpc_slice* slice_out,
- grpc_byte_stream** stream_out) {
+ grpc_core::OrphanablePtr<grpc_core::ByteStream>* stream_out) {
grpc_error* error = GRPC_ERROR_NONE;
grpc_chttp2_transport* t = s->t;
@@ -197,12 +197,11 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
if (p->is_frame_compressed) {
message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
- p->parsing_frame = grpc_chttp2_incoming_byte_stream_create(
+ p->parsing_frame = grpc_core::New<grpc_core::Chttp2IncomingByteStream>(
t, s, p->frame_size, message_flags);
- *stream_out = &p->parsing_frame->base;
- if (p->parsing_frame->remaining_bytes == 0) {
- GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
- p->parsing_frame, GRPC_ERROR_NONE, true));
+ stream_out->reset(p->parsing_frame);
+ if (p->parsing_frame->remaining_bytes() == 0) {
+ GRPC_ERROR_UNREF(p->parsing_frame->Finished(GRPC_ERROR_NONE, true));
p->parsing_frame = nullptr;
p->state = GRPC_CHTTP2_DATA_FH_0;
}
@@ -226,8 +225,7 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
if (remaining == p->frame_size) {
s->stats.incoming.data_bytes += remaining;
if (GRPC_ERROR_NONE !=
- (error = grpc_chttp2_incoming_byte_stream_push(
- p->parsing_frame,
+ (error = p->parsing_frame->Push(
grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
static_cast<size_t>(end - beg)),
slice_out))) {
@@ -235,8 +233,7 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
return error;
}
if (GRPC_ERROR_NONE !=
- (error = grpc_chttp2_incoming_byte_stream_finished(
- p->parsing_frame, GRPC_ERROR_NONE, true))) {
+ (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
grpc_slice_unref_internal(slice);
return error;
}
@@ -247,8 +244,7 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
} else if (remaining < p->frame_size) {
s->stats.incoming.data_bytes += remaining;
if (GRPC_ERROR_NONE !=
- (error = grpc_chttp2_incoming_byte_stream_push(
- p->parsing_frame,
+ (error = p->parsing_frame->Push(
grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
static_cast<size_t>(end - beg)),
slice_out))) {
@@ -261,18 +257,16 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
GPR_ASSERT(remaining > p->frame_size);
s->stats.incoming.data_bytes += p->frame_size;
if (GRPC_ERROR_NONE !=
- (grpc_chttp2_incoming_byte_stream_push(
- p->parsing_frame,
+ p->parsing_frame->Push(
grpc_slice_sub(
slice, static_cast<size_t>(cur - beg),
static_cast<size_t>(cur + p->frame_size - beg)),
- slice_out))) {
+ slice_out)) {
grpc_slice_unref_internal(slice);
return error;
}
if (GRPC_ERROR_NONE !=
- (error = grpc_chttp2_incoming_byte_stream_finished(
- p->parsing_frame, GRPC_ERROR_NONE, true))) {
+ (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
grpc_slice_unref_internal(slice);
return error;
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.h b/src/core/ext/transport/chttp2/transport/frame_data.h
index 3efbbf9f76..e5d01f764e 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.h
+++ b/src/core/ext/transport/chttp2/transport/frame_data.h
@@ -26,7 +26,6 @@
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include "src/core/ext/transport/chttp2/transport/frame.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/transport/byte_stream.h"
#include "src/core/lib/transport/transport.h"
@@ -40,8 +39,9 @@ typedef enum {
GRPC_CHTTP2_DATA_ERROR
} grpc_chttp2_stream_state;
-typedef struct grpc_chttp2_incoming_byte_stream
- grpc_chttp2_incoming_byte_stream;
+namespace grpc_core {
+class Chttp2IncomingByteStream;
+} // namespace grpc_core
typedef struct {
grpc_chttp2_stream_state state;
@@ -50,7 +50,7 @@ typedef struct {
grpc_error* error;
bool is_frame_compressed;
- grpc_chttp2_incoming_byte_stream* parsing_frame;
+ grpc_core::Chttp2IncomingByteStream* parsing_frame;
} grpc_chttp2_data_parser;
/* initialize per-stream state for data frame parsing */
@@ -79,6 +79,6 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
grpc_error* grpc_deframe_unprocessed_incoming_frames(
grpc_chttp2_data_parser* p, grpc_chttp2_stream* s,
grpc_slice_buffer* slices, grpc_slice* slice_out,
- grpc_byte_stream** stream_out);
+ grpc_core::OrphanablePtr<grpc_core::ByteStream>* stream_out);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_DATA_H */
diff --git a/src/core/ext/transport/chttp2/transport/frame_goaway.h b/src/core/ext/transport/chttp2/transport/frame_goaway.h
index e17ed8d563..66c7a68bef 100644
--- a/src/core/ext/transport/chttp2/transport/frame_goaway.h
+++ b/src/core/ext/transport/chttp2/transport/frame_goaway.h
@@ -24,7 +24,6 @@
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include "src/core/ext/transport/chttp2/transport/frame.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
typedef enum {
GRPC_CHTTP2_GOAWAY_LSI0,
diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.h b/src/core/ext/transport/chttp2/transport/frame_ping.h
index 8718d6a097..55a4499ad5 100644
--- a/src/core/ext/transport/chttp2/transport/frame_ping.h
+++ b/src/core/ext/transport/chttp2/transport/frame_ping.h
@@ -23,7 +23,6 @@
#include <grpc/slice.h>
#include "src/core/ext/transport/chttp2/transport/frame.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
typedef struct {
uint8_t byte;
diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.h b/src/core/ext/transport/chttp2/transport/frame_rst_stream.h
index bb2d34f918..6bcf9c4479 100644
--- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.h
+++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.h
@@ -23,7 +23,6 @@
#include <grpc/slice.h>
#include "src/core/ext/transport/chttp2/transport/frame.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/transport/transport.h"
typedef struct {
diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.cc b/src/core/ext/transport/chttp2/transport/frame_settings.cc
index 9ea27dcd47..987ac0e79d 100644
--- a/src/core/ext/transport/chttp2/transport/frame_settings.cc
+++ b/src/core/ext/transport/chttp2/transport/frame_settings.cc
@@ -217,14 +217,14 @@ grpc_error* grpc_chttp2_settings_parser_parse(void* p, grpc_chttp2_transport* t,
t->initial_window_update += static_cast<int64_t>(parser->value) -
parser->incoming_settings[id];
if (grpc_http_trace.enabled() || grpc_flowctl_trace.enabled()) {
- gpr_log(GPR_DEBUG, "%p[%s] adding %d for initial_window change",
- t, t->is_client ? "cli" : "svr",
+ gpr_log(GPR_INFO, "%p[%s] adding %d for initial_window change", t,
+ t->is_client ? "cli" : "svr",
static_cast<int>(t->initial_window_update));
}
}
parser->incoming_settings[id] = parser->value;
if (grpc_http_trace.enabled()) {
- gpr_log(GPR_DEBUG, "CHTTP2:%s:%s: got setting %s = %d",
+ gpr_log(GPR_INFO, "CHTTP2:%s:%s: got setting %s = %d",
t->is_client ? "CLI" : "SVR", t->peer_string, sp->name,
parser->value);
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.h b/src/core/ext/transport/chttp2/transport/frame_settings.h
index df19627194..8d8d9b1a91 100644
--- a/src/core/ext/transport/chttp2/transport/frame_settings.h
+++ b/src/core/ext/transport/chttp2/transport/frame_settings.h
@@ -24,7 +24,6 @@
#include <grpc/slice.h>
#include "src/core/ext/transport/chttp2/transport/frame.h"
#include "src/core/ext/transport/chttp2/transport/http2_settings.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
typedef enum {
GRPC_CHTTP2_SPS_ID0,
diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.h b/src/core/ext/transport/chttp2/transport/frame_window_update.h
index 30667c77e1..3d2391f637 100644
--- a/src/core/ext/transport/chttp2/transport/frame_window_update.h
+++ b/src/core/ext/transport/chttp2/transport/frame_window_update.h
@@ -23,7 +23,6 @@
#include <grpc/slice.h>
#include "src/core/ext/transport/chttp2/transport/frame.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/transport/transport.h"
typedef struct {
diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc
index e4f3c1b81e..d5ef063883 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc
+++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc
@@ -470,7 +470,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor* c, grpc_mdelem elem,
v = grpc_slice_to_c_string(GRPC_MDVALUE(elem));
}
gpr_log(
- GPR_DEBUG,
+ GPR_INFO,
"Encode: '%s: %s', elem_interned=%d [%d], k_interned=%d, v_interned=%d",
k, v, GRPC_MDELEM_IS_INTERNED(elem), GRPC_MDELEM_STORAGE(elem),
grpc_slice_is_interned(GRPC_MDKEY(elem)),
@@ -654,7 +654,7 @@ void grpc_chttp2_hpack_compressor_set_max_table_size(
}
c->advertise_table_size_change = 1;
if (grpc_http_trace.enabled()) {
- gpr_log(GPR_DEBUG, "set max table size from encoder to %d", max_table_size);
+ gpr_log(GPR_INFO, "set max table size from encoder to %d", max_table_size);
}
}
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.cc b/src/core/ext/transport/chttp2/transport/hpack_parser.cc
index fc96a8b3e4..907ba71178 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.cc
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.cc
@@ -633,7 +633,7 @@ static grpc_error* on_hdr(grpc_chttp2_hpack_parser* p, grpc_mdelem md,
v = grpc_slice_to_c_string(GRPC_MDVALUE(md));
}
gpr_log(
- GPR_DEBUG,
+ GPR_INFO,
"Decode: '%s: %s', elem_interned=%d [%d], k_interned=%d, v_interned=%d",
k, v, GRPC_MDELEM_IS_INTERNED(md), GRPC_MDELEM_STORAGE(md),
grpc_slice_is_interned(GRPC_MDKEY(md)),
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.h b/src/core/ext/transport/chttp2/transport/hpack_parser.h
index b3b8018b98..3e05de4b92 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.h
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.h
@@ -25,7 +25,6 @@
#include "src/core/ext/transport/chttp2/transport/frame.h"
#include "src/core/ext/transport/chttp2/transport/hpack_table.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/transport/metadata.h"
typedef struct grpc_chttp2_hpack_parser grpc_chttp2_hpack_parser;
diff --git a/src/core/ext/transport/chttp2/transport/hpack_table.cc b/src/core/ext/transport/chttp2/transport/hpack_table.cc
index f050f502f5..7929258356 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_table.cc
+++ b/src/core/ext/transport/chttp2/transport/hpack_table.cc
@@ -247,7 +247,7 @@ void grpc_chttp2_hptbl_set_max_bytes(grpc_chttp2_hptbl* tbl,
return;
}
if (grpc_http_trace.enabled()) {
- gpr_log(GPR_DEBUG, "Update hpack parser max size to %d", max_bytes);
+ gpr_log(GPR_INFO, "Update hpack parser max size to %d", max_bytes);
}
while (tbl->mem_used > max_bytes) {
evict1(tbl);
@@ -270,7 +270,7 @@ grpc_error* grpc_chttp2_hptbl_set_current_table_size(grpc_chttp2_hptbl* tbl,
return err;
}
if (grpc_http_trace.enabled()) {
- gpr_log(GPR_DEBUG, "Update hpack parser table size to %d", bytes);
+ gpr_log(GPR_INFO, "Update hpack parser table size to %d", bytes);
}
while (tbl->mem_used > bytes) {
evict1(tbl);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index b9431cd311..ca6e715978 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -203,18 +203,58 @@ typedef struct grpc_chttp2_write_cb {
struct grpc_chttp2_write_cb* next;
} grpc_chttp2_write_cb;
-/* forward declared in frame_data.h */
-struct grpc_chttp2_incoming_byte_stream {
- grpc_byte_stream base;
- gpr_refcount refs;
+namespace grpc_core {
+
+class Chttp2IncomingByteStream : public ByteStream {
+ public:
+ Chttp2IncomingByteStream(grpc_chttp2_transport* transport,
+ grpc_chttp2_stream* stream, uint32_t frame_size,
+ uint32_t flags);
+
+ void Orphan() override;
+
+ bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
+ grpc_error* Pull(grpc_slice* slice) override;
+ void Shutdown(grpc_error* error) override;
+
+ // TODO(roth): When I converted this class to C++, I wanted to make it
+ // inherit from RefCounted or InternallyRefCounted instead of continuing
+ // to use its own custom ref-counting code. However, that would require
+ // using multiple inheritence, which sucks in general. And to make matters
+ // worse, it causes problems with our New<> and Delete<> wrappers.
+ // Specifically, unless RefCounted is first in the list of parent classes,
+ // it will see a different value of the address of the object than the one
+ // we actually allocated, in which case gpr_free() will be called on a
+ // different address than the one we got from gpr_malloc(), thus causing a
+ // crash. Given the fragility of depending on that, as well as a desire to
+ // avoid multiple inheritence in general, I've decided to leave this
+ // alone for now. We can revisit this once we're able to link against
+ // libc++, at which point we can eliminate New<> and Delete<> and
+ // switch to std::shared_ptr<>.
+ void Ref();
+ void Unref();
+
+ void PublishError(grpc_error* error);
+
+ grpc_error* Push(grpc_slice slice, grpc_slice* slice_out);
- grpc_chttp2_transport* transport; /* immutable */
- grpc_chttp2_stream* stream; /* immutable */
+ grpc_error* Finished(grpc_error* error, bool reset_on_error);
+
+ uint32_t remaining_bytes() const { return remaining_bytes_; }
+
+ private:
+ static void NextLocked(void* arg, grpc_error* error_ignored);
+ static void OrphanLocked(void* arg, grpc_error* error_ignored);
+
+ grpc_chttp2_transport* transport_; // Immutable.
+ grpc_chttp2_stream* stream_; // Immutable.
+
+ gpr_refcount refs_;
/* Accessed only by transport thread when stream->pending_byte_stream == false
* Accessed only by application thread when stream->pending_byte_stream ==
* true */
- uint32_t remaining_bytes;
+ uint32_t remaining_bytes_;
/* Accessed only by transport thread when stream->pending_byte_stream == false
* Accessed only by application thread when stream->pending_byte_stream ==
@@ -223,11 +263,12 @@ struct grpc_chttp2_incoming_byte_stream {
grpc_closure closure;
size_t max_size_hint;
grpc_closure* on_complete;
- } next_action;
- grpc_closure destroy_action;
- grpc_closure finished_action;
+ } next_action_;
+ grpc_closure destroy_action_;
};
+} // namespace grpc_core
+
typedef enum {
GRPC_CHTTP2_KEEPALIVE_STATE_WAITING,
GRPC_CHTTP2_KEEPALIVE_STATE_PINGING,
@@ -456,7 +497,7 @@ struct grpc_chttp2_stream {
grpc_metadata_batch* send_trailing_metadata;
grpc_closure* send_trailing_metadata_finished;
- grpc_byte_stream* fetching_send_message;
+ grpc_core::OrphanablePtr<grpc_core::ByteStream> fetching_send_message;
uint32_t fetched_send_message_length;
grpc_slice fetching_slice;
int64_t next_message_end_offset;
@@ -468,7 +509,7 @@ struct grpc_chttp2_stream {
grpc_metadata_batch* recv_initial_metadata;
grpc_closure* recv_initial_metadata_ready;
bool* trailing_metadata_available;
- grpc_byte_stream** recv_message;
+ grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
grpc_closure* recv_message_ready;
grpc_metadata_batch* recv_trailing_metadata;
grpc_closure* recv_trailing_metadata_finished;
@@ -509,6 +550,11 @@ struct grpc_chttp2_stream {
grpc_slice_buffer unprocessed_incoming_frames_buffer;
grpc_closure* on_next; /* protected by t combiner */
bool pending_byte_stream; /* protected by t combiner */
+ // cached length of buffer to be used by the transport thread in cases where
+ // stream->pending_byte_stream == true. The value is saved before
+ // application threads are allowed to modify
+ // unprocessed_incoming_frames_buffer
+ size_t unprocessed_incoming_frames_buffer_cached_length;
grpc_closure reset_byte_stream;
grpc_error* byte_stream_error; /* protected by t combiner */
bool received_last_frame; /* protected by t combiner */
@@ -719,18 +765,6 @@ void grpc_chttp2_unref_transport(grpc_chttp2_transport* t);
void grpc_chttp2_ref_transport(grpc_chttp2_transport* t);
#endif
-grpc_chttp2_incoming_byte_stream* grpc_chttp2_incoming_byte_stream_create(
- grpc_chttp2_transport* t, grpc_chttp2_stream* s, uint32_t frame_size,
- uint32_t flags);
-grpc_error* grpc_chttp2_incoming_byte_stream_push(
- grpc_chttp2_incoming_byte_stream* bs, grpc_slice slice,
- grpc_slice* slice_out);
-grpc_error* grpc_chttp2_incoming_byte_stream_finished(
- grpc_chttp2_incoming_byte_stream* bs, grpc_error* error,
- bool reset_on_error);
-void grpc_chttp2_incoming_byte_stream_notify(
- grpc_chttp2_incoming_byte_stream* bs, grpc_error* error);
-
void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id);
/** Add a new ping strike to ping_recv_state.ping_strikes. If
diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc
index 988380b76c..a10c9ada46 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.cc
+++ b/src/core/ext/transport/chttp2/transport/parsing.cc
@@ -373,8 +373,6 @@ error_handler:
/* t->parser = grpc_chttp2_data_parser_parse;*/
t->parser = grpc_chttp2_data_parser_parse;
t->parser_data = &s->data_parser;
- t->ping_state.pings_before_data_required =
- t->ping_policy.max_pings_without_data;
t->ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST;
return GRPC_ERROR_NONE;
} else if (grpc_error_get_int(err, GRPC_ERROR_INT_STREAM_ID, nullptr)) {
@@ -547,8 +545,6 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t,
(t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0;
}
- t->ping_state.pings_before_data_required =
- t->ping_policy.max_pings_without_data;
t->ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST;
/* could be a new grpc_chttp2_stream or an existing grpc_chttp2_stream */
diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.cc b/src/core/ext/transport/chttp2/transport/stream_lists.cc
index 5d3ec4b53b..6626170a7e 100644
--- a/src/core/ext/transport/chttp2/transport/stream_lists.cc
+++ b/src/core/ext/transport/chttp2/transport/stream_lists.cc
@@ -68,7 +68,7 @@ static bool stream_list_pop(grpc_chttp2_transport* t,
}
*stream = s;
if (s && grpc_trace_http2_stream_state.enabled()) {
- gpr_log(GPR_DEBUG, "%p[%d][%s]: pop from %s", t, s->id,
+ gpr_log(GPR_INFO, "%p[%d][%s]: pop from %s", t, s->id,
t->is_client ? "cli" : "svr", stream_list_id_string(id));
}
return s != nullptr;
@@ -90,7 +90,7 @@ static void stream_list_remove(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
t->lists[id].tail = s->links[id].prev;
}
if (grpc_trace_http2_stream_state.enabled()) {
- gpr_log(GPR_DEBUG, "%p[%d][%s]: remove from %s", t, s->id,
+ gpr_log(GPR_INFO, "%p[%d][%s]: remove from %s", t, s->id,
t->is_client ? "cli" : "svr", stream_list_id_string(id));
}
}
@@ -122,7 +122,7 @@ static void stream_list_add_tail(grpc_chttp2_transport* t,
t->lists[id].tail = s;
s->included[id] = 1;
if (grpc_trace_http2_stream_state.enabled()) {
- gpr_log(GPR_DEBUG, "%p[%d][%s]: add to %s", t, s->id,
+ gpr_log(GPR_INFO, "%p[%d][%s]: add to %s", t, s->id,
t->is_client ? "cli" : "svr", stream_list_id_string(id));
}
}
diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc
index 7471d88aa1..85efe27080 100644
--- a/src/core/ext/transport/chttp2/transport/writing.cc
+++ b/src/core/ext/transport/chttp2/transport/writing.cc
@@ -52,7 +52,7 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
/* ping already in-flight: wait */
if (grpc_http_trace.enabled() || grpc_bdp_estimator_trace.enabled()) {
- gpr_log(GPR_DEBUG, "%s: Ping delayed [%p]: already pinging",
+ gpr_log(GPR_INFO, "%s: Ping delayed [%p]: already pinging",
t->is_client ? "CLIENT" : "SERVER", t->peer_string);
}
return;
@@ -61,7 +61,7 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
t->ping_policy.max_pings_without_data != 0) {
/* need to receive something of substance before sending a ping again */
if (grpc_http_trace.enabled() || grpc_bdp_estimator_trace.enabled()) {
- gpr_log(GPR_DEBUG, "%s: Ping delayed [%p]: too many recent pings: %d/%d",
+ gpr_log(GPR_INFO, "%s: Ping delayed [%p]: too many recent pings: %d/%d",
t->is_client ? "CLIENT" : "SERVER", t->peer_string,
t->ping_state.pings_before_data_required,
t->ping_policy.max_pings_without_data);
@@ -81,7 +81,7 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
if (next_allowed_ping > now) {
/* not enough elapsed time between successive pings */
if (grpc_http_trace.enabled() || grpc_bdp_estimator_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"%s: Ping delayed [%p]: not enough time elapsed since last ping. "
" Last ping %f: Next ping %f: Now %f",
t->is_client ? "CLIENT" : "SERVER", t->peer_string,
@@ -107,7 +107,7 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
GRPC_STATS_INC_HTTP2_PINGS_SENT();
t->ping_state.last_ping_sent_time = now;
if (grpc_http_trace.enabled() || grpc_bdp_estimator_trace.enabled()) {
- gpr_log(GPR_DEBUG, "%s: Ping sent [%p]: %d/%d",
+ gpr_log(GPR_INFO, "%s: Ping sent [%p]: %d/%d",
t->is_client ? "CLIENT" : "SERVER", t->peer_string,
t->ping_state.pings_before_data_required,
t->ping_policy.max_pings_without_data);
@@ -224,7 +224,7 @@ class WriteContext {
grpc_slice_buffer_add(
&t_->outbuf, grpc_chttp2_window_update_create(0, transport_announce,
&throwaway_stats));
- ResetPingRecvClock();
+ ResetPingClock();
}
}
@@ -269,11 +269,13 @@ class WriteContext {
return s;
}
- void ResetPingRecvClock() {
+ void ResetPingClock() {
if (!t_->is_client) {
t_->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
t_->ping_recv_state.ping_strikes = 0;
}
+ t_->ping_state.pings_before_data_required =
+ t_->ping_policy.max_pings_without_data;
}
void IncInitialMetadataWrites() { ++initial_metadata_writes_; }
@@ -399,7 +401,7 @@ class StreamWriteContext {
StreamWriteContext(WriteContext* write_context, grpc_chttp2_stream* s)
: write_context_(write_context), t_(write_context->transport()), s_(s) {
GRPC_CHTTP2_IF_TRACING(
- gpr_log(GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t_,
+ gpr_log(GPR_INFO, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t_,
t_->is_client ? "CLIENT" : "SERVER", s->id,
s->sent_initial_metadata, s->send_initial_metadata != nullptr,
(int)(s->flow_control->local_window_delta() -
@@ -435,7 +437,7 @@ class StreamWriteContext {
};
grpc_chttp2_encode_header(&t_->hpack_compressor, nullptr, 0,
s_->send_initial_metadata, &hopt, &t_->outbuf);
- write_context_->ResetPingRecvClock();
+ write_context_->ResetPingClock();
write_context_->IncInitialMetadataWrites();
}
@@ -455,7 +457,7 @@ class StreamWriteContext {
grpc_slice_buffer_add(
&t_->outbuf, grpc_chttp2_window_update_create(s_->id, stream_announce,
&s_->stats.outgoing));
- write_context_->ResetPingRecvClock();
+ write_context_->ResetPingClock();
write_context_->IncWindowUpdateWrites();
}
@@ -489,7 +491,7 @@ class StreamWriteContext {
data_send_context.CompressMoreBytes();
}
}
- write_context_->ResetPingRecvClock();
+ write_context_->ResetPingClock();
if (data_send_context.is_last_frame()) {
SentLastFrame();
}
@@ -530,7 +532,7 @@ class StreamWriteContext {
s_->send_trailing_metadata, &hopt, &t_->outbuf);
}
write_context_->IncTrailingMetadataWrites();
- write_context_->ResetPingRecvClock();
+ write_context_->ResetPingClock();
SentLastFrame();
write_context_->NoteScheduledResults();