aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar ncteisen <ncteisen@gmail.com>2018-03-15 13:46:28 -1000
committerGravatar ncteisen <ncteisen@gmail.com>2018-03-15 13:46:28 -1000
commitb93a006d64f071ad0804b00f9b1064411a430092 (patch)
treed5971592bf7675ab5279723d46ab96f8e5827b5f /src/core/ext
parent0c6024b94dc2a8aa9d851d8bc5d3a96e97802a55 (diff)
parent941dbaf9f0adb99f3fac4317c703084409a7a33d (diff)
Merge branch 'master' of https://github.com/grpc/grpc into channel-tracing
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/filters/client_channel/backup_poller.h1
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc38
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc15
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_factory.h1
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_registry.h1
-rw-r--r--src/core/ext/filters/client_channel/parse_address.cc26
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc16
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h1
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h1
-rw-r--r--src/core/ext/filters/client_channel/uri_parser.h1
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.cc1
-rw-r--r--src/core/ext/filters/http/client/http_client_filter.cc48
-rw-r--r--src/core/ext/filters/http/message_compress/message_compress_filter.cc41
-rw-r--r--src/core/ext/filters/http/server/http_server_filter.cc17
-rw-r--r--src/core/ext/filters/message_size/message_size_filter.cc10
-rw-r--r--src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc4
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc1
-rw-r--r--src/core/ext/transport/chttp2/server/chttp2_server.h2
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc1
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc294
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_data.cc32
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_data.h10
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_goaway.h1
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_ping.h1
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_rst_stream.h1
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_settings.h1
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_window_update.h1
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.h1
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h77
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.cc41
-rw-r--r--src/core/ext/transport/inproc/inproc_transport.cc39
31 files changed, 350 insertions, 375 deletions
diff --git a/src/core/ext/filters/client_channel/backup_poller.h b/src/core/ext/filters/client_channel/backup_poller.h
index 7285b9b93e..8f132f968c 100644
--- a/src/core/ext/filters/client_channel/backup_poller.h
+++ b/src/core/ext/filters/client_channel/backup_poller.h
@@ -23,7 +23,6 @@
#include <grpc/grpc.h>
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
/* Start polling \a interested_parties periodically in the timer thread */
void grpc_client_channel_start_backup_polling(
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 09cc06e169..bf3911e5ee 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -798,7 +798,8 @@ typedef struct {
grpc_linked_mdelem* send_initial_metadata_storage;
grpc_metadata_batch send_initial_metadata;
// For send_message.
- grpc_caching_byte_stream send_message;
+ grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream>
+ send_message;
// For send_trailing_metadata.
grpc_linked_mdelem* send_trailing_metadata_storage;
grpc_metadata_batch send_trailing_metadata;
@@ -808,7 +809,7 @@ typedef struct {
bool trailing_metadata_available;
// For intercepting recv_message.
grpc_closure recv_message_ready;
- grpc_byte_stream* recv_message;
+ grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message;
// For intercepting recv_trailing_metadata.
grpc_metadata_batch recv_trailing_metadata;
grpc_transport_stream_stats collect_stats;
@@ -914,12 +915,12 @@ typedef struct client_channel_call_data {
gpr_atm* peer_string;
// send_message
// When we get a send_message op, we replace the original byte stream
- // with a grpc_caching_byte_stream that caches the slices to a
- // local buffer for use in retries.
+ // with a CachingByteStream that caches the slices to a local buffer for
+ // use in retries.
// Note: We inline the cache for the first 3 send_message ops and use
// dynamic allocation after that. This number was essentially picked
// at random; it could be changed in the future to tune performance.
- grpc_core::InlinedVector<grpc_byte_stream_cache*, 3> send_messages;
+ grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3> send_messages;
// send_trailing_metadata
bool seen_send_trailing_metadata;
grpc_linked_mdelem* send_trailing_metadata_storage;
@@ -964,10 +965,11 @@ static void maybe_cache_send_ops_for_batch(call_data* calld,
}
// Set up cache for send_message ops.
if (batch->send_message) {
- grpc_byte_stream_cache* cache = (grpc_byte_stream_cache*)gpr_arena_alloc(
- calld->arena, sizeof(grpc_byte_stream_cache));
- grpc_byte_stream_cache_init(cache,
- batch->payload->send_message.send_message);
+ grpc_core::ByteStreamCache* cache =
+ static_cast<grpc_core::ByteStreamCache*>(
+ gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache)));
+ new (cache) grpc_core::ByteStreamCache(
+ std::move(batch->payload->send_message.send_message));
calld->send_messages.push_back(cache);
}
// Save metadata batch for send_trailing_metadata ops.
@@ -1002,7 +1004,7 @@ static void free_cached_send_op_data_after_commit(
"]",
chand, calld, i);
}
- grpc_byte_stream_cache_destroy(calld->send_messages[i]);
+ calld->send_messages[i]->Destroy();
}
if (retry_state->completed_send_trailing_metadata) {
grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
@@ -1026,8 +1028,8 @@ static void free_cached_send_op_data_for_completed_batch(
"]",
chand, calld, retry_state->completed_send_message_count - 1);
}
- grpc_byte_stream_cache_destroy(
- calld->send_messages[retry_state->completed_send_message_count - 1]);
+ calld->send_messages[retry_state->completed_send_message_count - 1]
+ ->Destroy();
}
if (batch_data->batch.send_trailing_metadata) {
grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
@@ -1079,7 +1081,7 @@ static void pending_batches_add(grpc_call_element* elem,
if (batch->send_message) {
calld->pending_send_message = true;
calld->bytes_buffered_for_retry +=
- batch->payload->send_message.send_message->length;
+ batch->payload->send_message.send_message->length();
}
if (batch->send_trailing_metadata) {
calld->pending_send_trailing_metadata = true;
@@ -1680,7 +1682,7 @@ static void invoke_recv_message_callback(void* arg, grpc_error* error) {
GPR_ASSERT(pending != nullptr);
// Return payload.
*pending->batch->payload->recv_message.recv_message =
- batch_data->recv_message;
+ std::move(batch_data->recv_message);
// Update bookkeeping.
// Note: Need to do this before invoking the callback, since invoking
// the callback will result in yielding the call combiner.
@@ -2124,13 +2126,13 @@ static void add_retriable_send_message_op(
"chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
chand, calld, retry_state->started_send_message_count);
}
- grpc_byte_stream_cache* cache =
+ grpc_core::ByteStreamCache* cache =
calld->send_messages[retry_state->started_send_message_count];
++retry_state->started_send_message_count;
- grpc_caching_byte_stream_init(&batch_data->send_message, cache);
+ batch_data->send_message.Init(cache);
batch_data->batch.send_message = true;
- batch_data->batch.payload->send_message.send_message =
- &batch_data->send_message.base;
+ batch_data->batch.payload->send_message.send_message.reset(
+ batch_data->send_message.get());
}
// Adds retriable send_trailing_metadata op to batch_data.
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index cb39e4224e..47e1deef12 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -61,6 +61,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/socket_utils.h"
#include <inttypes.h>
#include <limits.h>
@@ -417,20 +418,20 @@ void ParseServer(const grpc_grpclb_server* server,
grpc_resolved_address* addr) {
memset(addr, 0, sizeof(*addr));
if (server->drop) return;
- const uint16_t netorder_port = htons((uint16_t)server->port);
+ const uint16_t netorder_port = grpc_htons((uint16_t)server->port);
/* the addresses are given in binary format (a in(6)_addr struct) in
* server->ip_address.bytes. */
const grpc_grpclb_ip_address* ip = &server->ip_address;
if (ip->size == 4) {
- addr->len = sizeof(struct sockaddr_in);
- struct sockaddr_in* addr4 = (struct sockaddr_in*)&addr->addr;
- addr4->sin_family = AF_INET;
+ addr->len = sizeof(grpc_sockaddr_in);
+ grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
+ addr4->sin_family = GRPC_AF_INET;
memcpy(&addr4->sin_addr, ip->bytes, ip->size);
addr4->sin_port = netorder_port;
} else if (ip->size == 16) {
- addr->len = sizeof(struct sockaddr_in6);
- struct sockaddr_in6* addr6 = (struct sockaddr_in6*)&addr->addr;
- addr6->sin6_family = AF_INET6;
+ addr->len = sizeof(grpc_sockaddr_in6);
+ grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr;
+ addr6->sin6_family = GRPC_AF_INET6;
memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
addr6->sin6_port = netorder_port;
}
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h
index b8bbd32072..6440258158 100644
--- a/src/core/ext/filters/client_channel/lb_policy_factory.h
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.h
@@ -21,7 +21,6 @@
#include <grpc/support/port_platform.h>
-#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/ext/filters/client_channel/client_channel_factory.h"
diff --git a/src/core/ext/filters/client_channel/lb_policy_registry.h b/src/core/ext/filters/client_channel/lb_policy_registry.h
index 2283d848bd..2e9bb061ed 100644
--- a/src/core/ext/filters/client_channel/lb_policy_registry.h
+++ b/src/core/ext/filters/client_channel/lb_policy_registry.h
@@ -24,7 +24,6 @@
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/orphanable.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
namespace grpc_core {
diff --git a/src/core/ext/filters/client_channel/parse_address.cc b/src/core/ext/filters/client_channel/parse_address.cc
index e78dc99e0b..92ea259cf0 100644
--- a/src/core/ext/filters/client_channel/parse_address.cc
+++ b/src/core/ext/filters/client_channel/parse_address.cc
@@ -20,6 +20,7 @@
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/socket_utils.h"
#include <stdio.h>
#include <string.h>
@@ -71,10 +72,10 @@ bool grpc_parse_ipv4_hostport(const char* hostport, grpc_resolved_address* addr,
if (!gpr_split_host_port(hostport, &host, &port)) return false;
// Parse IP address.
memset(addr, 0, sizeof(*addr));
- addr->len = sizeof(struct sockaddr_in);
- struct sockaddr_in* in = reinterpret_cast<struct sockaddr_in*>(addr->addr);
- in->sin_family = AF_INET;
- if (inet_pton(AF_INET, host, &in->sin_addr) == 0) {
+ addr->len = sizeof(grpc_sockaddr_in);
+ grpc_sockaddr_in* in = reinterpret_cast<grpc_sockaddr_in*>(addr->addr);
+ in->sin_family = GRPC_AF_INET;
+ if (grpc_inet_pton(GRPC_AF_INET, host, &in->sin_addr) == 0) {
if (log_errors) gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host);
goto done;
}
@@ -88,7 +89,7 @@ bool grpc_parse_ipv4_hostport(const char* hostport, grpc_resolved_address* addr,
if (log_errors) gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port);
goto done;
}
- in->sin_port = htons(static_cast<uint16_t>(port_num));
+ in->sin_port = grpc_htons(static_cast<uint16_t>(port_num));
success = true;
done:
gpr_free(host);
@@ -117,19 +118,20 @@ bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr,
if (!gpr_split_host_port(hostport, &host, &port)) return false;
// Parse IP address.
memset(addr, 0, sizeof(*addr));
- addr->len = sizeof(struct sockaddr_in6);
- struct sockaddr_in6* in6 = reinterpret_cast<struct sockaddr_in6*>(addr->addr);
- in6->sin6_family = AF_INET6;
+ addr->len = sizeof(grpc_sockaddr_in6);
+ grpc_sockaddr_in6* in6 = reinterpret_cast<grpc_sockaddr_in6*>(addr->addr);
+ in6->sin6_family = GRPC_AF_INET6;
// Handle the RFC6874 syntax for IPv6 zone identifiers.
char* host_end = static_cast<char*>(gpr_memrchr(host, '%', strlen(host)));
if (host_end != nullptr) {
GPR_ASSERT(host_end >= host);
- char host_without_scope[INET6_ADDRSTRLEN];
+ char host_without_scope[GRPC_INET6_ADDRSTRLEN];
size_t host_without_scope_len = static_cast<size_t>(host_end - host);
uint32_t sin6_scope_id = 0;
strncpy(host_without_scope, host, host_without_scope_len);
host_without_scope[host_without_scope_len] = '\0';
- if (inet_pton(AF_INET6, host_without_scope, &in6->sin6_addr) == 0) {
+ if (grpc_inet_pton(GRPC_AF_INET6, host_without_scope, &in6->sin6_addr) ==
+ 0) {
gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host_without_scope);
goto done;
}
@@ -142,7 +144,7 @@ bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr,
// Handle "sin6_scope_id" being type "u_long". See grpc issue #10027.
in6->sin6_scope_id = sin6_scope_id;
} else {
- if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) {
+ if (grpc_inet_pton(GRPC_AF_INET6, host, &in6->sin6_addr) == 0) {
gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host);
goto done;
}
@@ -157,7 +159,7 @@ bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr,
if (log_errors) gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port);
goto done;
}
- in6->sin6_port = htons(static_cast<uint16_t>(port_num));
+ in6->sin6_port = grpc_htons(static_cast<uint16_t>(port_num));
success = true;
done:
gpr_free(host);
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
index aa93e5d8de..c63de3c509 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
@@ -440,6 +440,19 @@ class AresDnsResolverFactory : public ResolverFactory {
} // namespace grpc_core
+extern grpc_address_resolver_vtable* grpc_resolve_address_impl;
+static grpc_address_resolver_vtable* default_resolver;
+
+static grpc_error* blocking_resolve_address_ares(
+ const char* name, const char* default_port,
+ grpc_resolved_addresses** addresses) {
+ return default_resolver->blocking_resolve_address(name, default_port,
+ addresses);
+}
+
+static grpc_address_resolver_vtable ares_resolver = {
+ grpc_resolve_address_ares, blocking_resolve_address_ares};
+
void grpc_resolver_dns_ares_init() {
char* resolver_env = gpr_getenv("GRPC_DNS_RESOLVER");
/* TODO(zyc): Turn on c-ares based resolver by default after the address
@@ -450,7 +463,8 @@ void grpc_resolver_dns_ares_init() {
GRPC_LOG_IF_ERROR("ares_library_init() failed", error);
return;
}
- grpc_resolve_address = grpc_resolve_address_ares;
+ default_resolver = grpc_resolve_address_impl;
+ grpc_set_resolver_impl(&ares_resolver);
grpc_core::ResolverRegistry::Builder::RegisterResolverFactory(
grpc_core::UniquePtr<grpc_core::ResolverFactory>(
grpc_core::New<grpc_core::AresDnsResolverFactory>()));
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
index 0bc13e35f4..6239549534 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
@@ -22,7 +22,6 @@
#include <grpc/support/port_platform.h>
#include <ares.h>
-#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/pollset_set.h"
typedef struct grpc_ares_ev_driver grpc_ares_ev_driver;
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
index bda9cd1729..3e8ea01e12 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
@@ -22,7 +22,6 @@
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/resolve_address.h"
diff --git a/src/core/ext/filters/client_channel/uri_parser.h b/src/core/ext/filters/client_channel/uri_parser.h
index 1966da932b..d749f23308 100644
--- a/src/core/ext/filters/client_channel/uri_parser.h
+++ b/src/core/ext/filters/client_channel/uri_parser.h
@@ -22,7 +22,6 @@
#include <grpc/support/port_platform.h>
#include <stddef.h>
-#include "src/core/lib/iomgr/exec_ctx.h"
typedef struct {
char* scheme;
diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc
index dda3b61108..27d3eac8d6 100644
--- a/src/core/ext/filters/deadline/deadline_filter.cc
+++ b/src/core/ext/filters/deadline/deadline_filter.cc
@@ -27,7 +27,6 @@
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_stack_builder.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/channel_init.h"
diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc
index 58aefd17c7..ae94ce47b9 100644
--- a/src/core/ext/filters/http/client/http_client_filter.cc
+++ b/src/core/ext/filters/http/client/http_client_filter.cc
@@ -20,9 +20,11 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include <stdint.h>
#include <string.h>
#include "src/core/ext/filters/http/client/http_client_filter.h"
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/b64.h"
#include "src/core/lib/slice/percent_encoding.h"
@@ -58,8 +60,9 @@ struct call_data {
// State for handling send_message ops.
grpc_transport_stream_op_batch* send_message_batch;
size_t send_message_bytes_read;
- grpc_byte_stream_cache send_message_cache;
- grpc_caching_byte_stream send_message_caching_stream;
+ grpc_core::ManualConstructor<grpc_core::ByteStreamCache> send_message_cache;
+ grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream>
+ send_message_caching_stream;
grpc_closure on_send_message_next_done;
grpc_closure* original_send_message_on_complete;
grpc_closure send_message_on_complete;
@@ -166,7 +169,7 @@ static void recv_trailing_metadata_on_complete(void* user_data,
static void send_message_on_complete(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
call_data* calld = static_cast<call_data*>(elem->call_data);
- grpc_byte_stream_cache_destroy(&calld->send_message_cache);
+ calld->send_message_cache.Destroy();
GRPC_CLOSURE_RUN(calld->original_send_message_on_complete,
GRPC_ERROR_REF(error));
}
@@ -175,8 +178,7 @@ static void send_message_on_complete(void* arg, grpc_error* error) {
// calld->send_message_bytes_read.
static grpc_error* pull_slice_from_send_message(call_data* calld) {
grpc_slice incoming_slice;
- grpc_error* error = grpc_byte_stream_pull(
- &calld->send_message_caching_stream.base, &incoming_slice);
+ grpc_error* error = calld->send_message_caching_stream->Pull(&incoming_slice);
if (error == GRPC_ERROR_NONE) {
calld->send_message_bytes_read += GRPC_SLICE_LENGTH(incoming_slice);
grpc_slice_unref_internal(incoming_slice);
@@ -186,24 +188,23 @@ static grpc_error* pull_slice_from_send_message(call_data* calld) {
// Reads as many slices as possible from the send_message byte stream.
// Upon successful return, if calld->send_message_bytes_read ==
-// calld->send_message_caching_stream.base.length, then we have completed
+// calld->send_message_caching_stream->length(), then we have completed
// reading from the byte stream; otherwise, an async read has been dispatched
// and on_send_message_next_done() will be invoked when it is complete.
static grpc_error* read_all_available_send_message_data(call_data* calld) {
- while (grpc_byte_stream_next(&calld->send_message_caching_stream.base,
- ~static_cast<size_t>(0),
- &calld->on_send_message_next_done)) {
+ while (calld->send_message_caching_stream->Next(
+ SIZE_MAX, &calld->on_send_message_next_done)) {
grpc_error* error = pull_slice_from_send_message(calld);
if (error != GRPC_ERROR_NONE) return error;
if (calld->send_message_bytes_read ==
- calld->send_message_caching_stream.base.length) {
+ calld->send_message_caching_stream->length()) {
break;
}
}
return GRPC_ERROR_NONE;
}
-// Async callback for grpc_byte_stream_next().
+// Async callback for ByteStream::Next().
static void on_send_message_next_done(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
call_data* calld = static_cast<call_data*>(elem->call_data);
@@ -222,7 +223,7 @@ static void on_send_message_next_done(void* arg, grpc_error* error) {
// here, then we know that all of the data was not available
// synchronously, so we were not able to do a cached call. Instead,
// we just reset the byte stream and then send down the batch as-is.
- grpc_caching_byte_stream_reset(&calld->send_message_caching_stream);
+ calld->send_message_caching_stream->Reset();
grpc_call_next_op(elem, calld->send_message_batch);
}
@@ -253,7 +254,7 @@ static grpc_error* update_path_for_get(grpc_call_element* elem,
size_t estimated_len = GRPC_SLICE_LENGTH(path_slice);
estimated_len++; /* for the '?' */
estimated_len += grpc_base64_estimate_encoded_size(
- batch->payload->send_message.send_message->length, true /* url_safe */,
+ batch->payload->send_message.send_message->length(), true /* url_safe */,
false /* multi_line */);
grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC(estimated_len);
/* memcopy individual pieces into this slice */
@@ -265,9 +266,9 @@ static grpc_error* update_path_for_get(grpc_call_element* elem,
write_ptr += GRPC_SLICE_LENGTH(path_slice);
*write_ptr++ = '?';
char* payload_bytes =
- slice_buffer_to_string(&calld->send_message_cache.cache_buffer);
+ slice_buffer_to_string(calld->send_message_cache->cache_buffer());
grpc_base64_encode_core(write_ptr, payload_bytes,
- batch->payload->send_message.send_message->length,
+ batch->payload->send_message.send_message->length(),
true /* url_safe */, false /* multi_line */);
gpr_free(payload_bytes);
/* remove trailing unused memory and add trailing 0 to terminate string */
@@ -326,15 +327,14 @@ static void hc_start_transport_stream_op_batch(
if (batch->send_message &&
(batch->payload->send_initial_metadata.send_initial_metadata_flags &
GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) &&
- batch->payload->send_message.send_message->length <
+ batch->payload->send_message.send_message->length() <
channeld->max_payload_size_for_get) {
calld->send_message_bytes_read = 0;
- grpc_byte_stream_cache_init(&calld->send_message_cache,
- batch->payload->send_message.send_message);
- grpc_caching_byte_stream_init(&calld->send_message_caching_stream,
- &calld->send_message_cache);
- batch->payload->send_message.send_message =
- &calld->send_message_caching_stream.base;
+ calld->send_message_cache.Init(
+ std::move(batch->payload->send_message.send_message));
+ calld->send_message_caching_stream.Init(calld->send_message_cache.get());
+ batch->payload->send_message.send_message.reset(
+ calld->send_message_caching_stream.get());
calld->original_send_message_on_complete = batch->on_complete;
batch->on_complete = &calld->send_message_on_complete;
calld->send_message_batch = batch;
@@ -342,12 +342,12 @@ static void hc_start_transport_stream_op_batch(
if (error != GRPC_ERROR_NONE) goto done;
// If all the data has been read, then we can use GET.
if (calld->send_message_bytes_read ==
- calld->send_message_caching_stream.base.length) {
+ calld->send_message_caching_stream->length()) {
method = GRPC_MDELEM_METHOD_GET;
error = update_path_for_get(elem, batch);
if (error != GRPC_ERROR_NONE) goto done;
batch->send_message = false;
- grpc_byte_stream_destroy(&calld->send_message_caching_stream.base);
+ calld->send_message_caching_stream->Orphan();
} else {
// Not all data is available. The batch will be sent down
// asynchronously in on_send_message_next_done().
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc
index efe0085c5b..e7d9949386 100644
--- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc
+++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc
@@ -32,6 +32,7 @@
#include "src/core/lib/compression/compression_internal.h"
#include "src/core/lib/compression/message_compress.h"
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
@@ -62,7 +63,8 @@ struct call_data {
grpc_closure start_send_message_batch_in_call_combiner;
grpc_transport_stream_op_batch* send_message_batch;
grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */
- grpc_slice_buffer_stream replacement_stream;
+ grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream>
+ replacement_stream;
grpc_closure* original_send_message_on_complete;
grpc_closure send_message_on_complete;
grpc_closure on_send_message_next_done;
@@ -220,7 +222,7 @@ static void finish_send_message(grpc_call_element* elem) {
grpc_slice_buffer tmp;
grpc_slice_buffer_init(&tmp);
uint32_t send_flags =
- calld->send_message_batch->payload->send_message.send_message->flags;
+ calld->send_message_batch->payload->send_message.send_message->flags();
bool did_compress = grpc_msg_compress(calld->message_compression_algorithm,
&calld->slices, &tmp);
if (did_compress) {
@@ -253,12 +255,9 @@ static void finish_send_message(grpc_call_element* elem) {
grpc_slice_buffer_destroy_internal(&tmp);
// Swap out the original byte stream with our new one and send the
// batch down.
- grpc_byte_stream_destroy(
- calld->send_message_batch->payload->send_message.send_message);
- grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
- send_flags);
- calld->send_message_batch->payload->send_message.send_message =
- &calld->replacement_stream.base;
+ calld->replacement_stream.Init(&calld->slices, send_flags);
+ calld->send_message_batch->payload->send_message.send_message.reset(
+ calld->replacement_stream.get());
calld->original_send_message_on_complete =
calld->send_message_batch->on_complete;
calld->send_message_batch->on_complete = &calld->send_message_on_complete;
@@ -278,9 +277,9 @@ static void fail_send_message_batch_in_call_combiner(void* arg,
// Pulls a slice from the send_message byte stream and adds it to calld->slices.
static grpc_error* pull_slice_from_send_message(call_data* calld) {
grpc_slice incoming_slice;
- grpc_error* error = grpc_byte_stream_pull(
- calld->send_message_batch->payload->send_message.send_message,
- &incoming_slice);
+ grpc_error* error =
+ calld->send_message_batch->payload->send_message.send_message->Pull(
+ &incoming_slice);
if (error == GRPC_ERROR_NONE) {
grpc_slice_buffer_add(&calld->slices, incoming_slice);
}
@@ -289,12 +288,11 @@ static grpc_error* pull_slice_from_send_message(call_data* calld) {
// Reads as many slices as possible from the send_message byte stream.
// If all data has been read, invokes finish_send_message(). Otherwise,
-// an async call to grpc_byte_stream_next() has been started, which will
+// an async call to ByteStream::Next() has been started, which will
// eventually result in calling on_send_message_next_done().
static void continue_reading_send_message(grpc_call_element* elem) {
call_data* calld = static_cast<call_data*>(elem->call_data);
- while (grpc_byte_stream_next(
- calld->send_message_batch->payload->send_message.send_message,
+ while (calld->send_message_batch->payload->send_message.send_message->Next(
~static_cast<size_t>(0), &calld->on_send_message_next_done)) {
grpc_error* error = pull_slice_from_send_message(calld);
if (error != GRPC_ERROR_NONE) {
@@ -303,15 +301,15 @@ static void continue_reading_send_message(grpc_call_element* elem) {
GRPC_ERROR_UNREF(error);
return;
}
- if (calld->slices.length ==
- calld->send_message_batch->payload->send_message.send_message->length) {
+ if (calld->slices.length == calld->send_message_batch->payload->send_message
+ .send_message->length()) {
finish_send_message(elem);
break;
}
}
}
-// Async callback for grpc_byte_stream_next().
+// Async callback for ByteStream::Next().
static void on_send_message_next_done(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
call_data* calld = static_cast<call_data*>(elem->call_data);
@@ -328,7 +326,7 @@ static void on_send_message_next_done(void* arg, grpc_error* error) {
return;
}
if (calld->slices.length ==
- calld->send_message_batch->payload->send_message.send_message->length) {
+ calld->send_message_batch->payload->send_message.send_message->length()) {
finish_send_message(elem);
} else {
continue_reading_send_message(elem);
@@ -340,7 +338,8 @@ static void start_send_message_batch(void* arg, grpc_error* unused) {
call_data* calld = static_cast<call_data*>(elem->call_data);
if (skip_compression(
elem,
- calld->send_message_batch->payload->send_message.send_message->flags,
+ calld->send_message_batch->payload->send_message.send_message
+ ->flags(),
calld->send_initial_metadata_state == HAS_COMPRESSION_ALGORITHM)) {
send_message_batch_continue(elem);
} else {
@@ -365,9 +364,7 @@ static void compress_start_transport_stream_op_batch(
grpc_schedule_on_exec_ctx),
GRPC_ERROR_REF(calld->cancel_error), "failing send_message op");
} else {
- grpc_byte_stream_shutdown(
-
- calld->send_message_batch->payload->send_message.send_message,
+ calld->send_message_batch->payload->send_message.send_message->Shutdown(
GRPC_ERROR_REF(calld->cancel_error));
}
}
diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc
index 57ec8dce34..c202015875 100644
--- a/src/core/ext/filters/http/server/http_server_filter.cc
+++ b/src/core/ext/filters/http/server/http_server_filter.cc
@@ -23,6 +23,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <string.h>
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/b64.h"
#include "src/core/lib/slice/percent_encoding.h"
@@ -53,8 +54,8 @@ struct call_data {
*/
grpc_closure* recv_message_ready;
grpc_closure* on_complete;
- grpc_byte_stream** pp_recv_message;
- grpc_slice_buffer_stream read_stream;
+ grpc_core::OrphanablePtr<grpc_core::ByteStream>* pp_recv_message;
+ grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> read_stream;
/** Receive closures are chained: we inject this closure as the on_done_recv
up-call on transport_op, and remember to call our on_done_recv member
@@ -232,7 +233,7 @@ static grpc_error* server_filter_incoming_metadata(grpc_call_element* elem,
grpc_base64_decode_with_len(
reinterpret_cast<const char*> GRPC_SLICE_START_PTR(query_slice),
GRPC_SLICE_LENGTH(query_slice), k_url_safe));
- grpc_slice_buffer_stream_init(&calld->read_stream, &read_slice_buffer, 0);
+ calld->read_stream.Init(&read_slice_buffer, 0);
grpc_slice_buffer_destroy_internal(&read_slice_buffer);
calld->seen_path_with_query = true;
grpc_slice_unref_internal(query_slice);
@@ -281,10 +282,10 @@ static void hs_on_complete(void* user_data, grpc_error* err) {
call_data* calld = static_cast<call_data*>(elem->call_data);
/* Call recv_message_ready if we got the payload via the path field */
if (calld->seen_path_with_query && calld->recv_message_ready != nullptr) {
- *calld->pp_recv_message =
- calld->payload_bin_delivered
- ? nullptr
- : reinterpret_cast<grpc_byte_stream*>(&calld->read_stream);
+ calld->pp_recv_message->reset(
+ calld->payload_bin_delivered ? nullptr
+ : reinterpret_cast<grpc_core::ByteStream*>(
+ calld->read_stream.get()));
// Re-enter call combiner for recv_message_ready, since the surface
// code will release the call combiner for each callback it receives.
GRPC_CALL_COMBINER_START(calld->call_combiner, calld->recv_message_ready,
@@ -405,7 +406,7 @@ static void destroy_call_elem(grpc_call_element* elem,
grpc_closure* ignored) {
call_data* calld = static_cast<call_data*>(elem->call_data);
if (calld->seen_path_with_query && !calld->payload_bin_delivered) {
- grpc_byte_stream_destroy(&calld->read_stream.base);
+ calld->read_stream->Orphan();
}
}
diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc
index b1b14dde02..c7fc3f2e62 100644
--- a/src/core/ext/filters/message_size/message_size_filter.cc
+++ b/src/core/ext/filters/message_size/message_size_filter.cc
@@ -100,7 +100,7 @@ struct call_data {
// call our next_recv_message_ready member after handling it.
grpc_closure recv_message_ready;
// Used by recv_message_ready.
- grpc_byte_stream** recv_message;
+ grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
// Original recv_message_ready callback, invoked after our own.
grpc_closure* next_recv_message_ready;
};
@@ -121,12 +121,12 @@ static void recv_message_ready(void* user_data, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (*calld->recv_message != nullptr && calld->limits.max_recv_size >= 0 &&
- (*calld->recv_message)->length >
+ (*calld->recv_message)->length() >
static_cast<size_t>(calld->limits.max_recv_size)) {
char* message_string;
gpr_asprintf(&message_string,
"Received message larger than max (%u vs. %d)",
- (*calld->recv_message)->length, calld->limits.max_recv_size);
+ (*calld->recv_message)->length(), calld->limits.max_recv_size);
grpc_error* new_error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED);
@@ -150,11 +150,11 @@ static void start_transport_stream_op_batch(
call_data* calld = static_cast<call_data*>(elem->call_data);
// Check max send message size.
if (op->send_message && calld->limits.max_send_size >= 0 &&
- op->payload->send_message.send_message->length >
+ op->payload->send_message.send_message->length() >
static_cast<size_t>(calld->limits.max_send_size)) {
char* message_string;
gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)",
- op->payload->send_message.send_message->length,
+ op->payload->send_message.send_message->length(),
calld->limits.max_send_size);
grpc_transport_stream_op_batch_finish_with_failure(
op,
diff --git a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc
index bed1004c57..c7070d4d9b 100644
--- a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc
+++ b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc
@@ -93,7 +93,9 @@ static void start_transport_stream_op_batch(
/* Send message happens after client's user-agent (initial metadata) is
* received, so workaround_active must be set already */
if (calld->workaround_active) {
- op->payload->send_message.send_message->flags |= GRPC_WRITE_NO_COMPRESS;
+ op->payload->send_message.send_message->set_flags(
+ op->payload->send_message.send_message->flags() |
+ GRPC_WRITE_NO_COMPRESS);
}
}
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
index 479f0da572..b95c9dae53 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
@@ -29,7 +29,6 @@
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/endpoint.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/tcp_client_posix.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#include "src/core/lib/surface/api_trace.h"
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.h b/src/core/ext/transport/chttp2/server/chttp2_server.h
index 7b41972160..6e51001b53 100644
--- a/src/core/ext/transport/chttp2/server/chttp2_server.h
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.h
@@ -23,7 +23,7 @@
#include <grpc/impl/codegen/grpc_types.h>
-#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/error.h"
/// Adds a port to \a server. Sets \a port_num to the port number.
/// Takes ownership of \a args.
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc
index 822236dd2d..99f18cdf39 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc
@@ -41,6 +41,5 @@ int grpc_server_add_insecure_http2_port(grpc_server* server, const char* addr) {
GRPC_ERROR_UNREF(err);
}
-
return port_num;
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index df3fb8c68c..a4d616d778 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -39,6 +39,7 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/timer.h"
@@ -117,12 +118,6 @@ static void connectivity_state_set(grpc_chttp2_transport* t,
grpc_connectivity_state state,
grpc_error* error, const char* reason);
-static void incoming_byte_stream_destroy_locked(void* byte_stream,
- grpc_error* error_ignored);
-static void incoming_byte_stream_publish_error(
- grpc_chttp2_incoming_byte_stream* bs, grpc_error* error);
-static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream* bs);
-
static void benign_reclaimer_locked(void* t, grpc_error* error);
static void destructive_reclaimer_locked(void* t, grpc_error* error);
@@ -662,8 +657,8 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs,
s->t = t;
s->refcount = refcount;
/* We reserve one 'active stream' that's dropped when the stream is
- read-closed. The others are for incoming_byte_streams that are actively
- reading */
+ read-closed. The others are for Chttp2IncomingByteStreams that are
+ actively reading */
GRPC_CHTTP2_STREAM_REF(s, "chttp2");
grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena);
@@ -1256,8 +1251,7 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t,
abort(); /* TODO(ctiller): what cleanup here? */
return; /* early out */
}
- if (s->fetched_send_message_length == s->fetching_send_message->length) {
- grpc_byte_stream_destroy(s->fetching_send_message);
+ if (s->fetched_send_message_length == s->fetching_send_message->length()) {
int64_t notify_offset = s->next_message_end_offset;
if (notify_offset <= s->flow_controlled_bytes_written) {
grpc_chttp2_complete_closure_step(
@@ -1274,20 +1268,19 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t,
cb->closure = s->fetching_send_message_finished;
s->fetching_send_message_finished = nullptr;
grpc_chttp2_write_cb** list =
- s->fetching_send_message->flags & GRPC_WRITE_THROUGH
+ s->fetching_send_message->flags() & GRPC_WRITE_THROUGH
? &s->on_write_finished_cbs
: &s->on_flow_controlled_cbs;
cb->next = *list;
*list = cb;
}
- s->fetching_send_message = nullptr;
+ s->fetching_send_message.reset();
return; /* early out */
- } else if (grpc_byte_stream_next(s->fetching_send_message, UINT32_MAX,
- &s->complete_fetch_locked)) {
- grpc_error* error =
- grpc_byte_stream_pull(s->fetching_send_message, &s->fetching_slice);
+ } else if (s->fetching_send_message->Next(UINT32_MAX,
+ &s->complete_fetch_locked)) {
+ grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice);
if (error != GRPC_ERROR_NONE) {
- grpc_byte_stream_destroy(s->fetching_send_message);
+ s->fetching_send_message.reset();
grpc_chttp2_cancel_stream(t, s, error);
} else {
add_fetched_slice_locked(t, s);
@@ -1300,14 +1293,14 @@ static void complete_fetch_locked(void* gs, grpc_error* error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
grpc_chttp2_transport* t = s->t;
if (error == GRPC_ERROR_NONE) {
- error = grpc_byte_stream_pull(s->fetching_send_message, &s->fetching_slice);
+ error = s->fetching_send_message->Pull(&s->fetching_slice);
if (error == GRPC_ERROR_NONE) {
add_fetched_slice_locked(t, s);
continue_fetching_send_locked(t, s);
}
}
if (error != GRPC_ERROR_NONE) {
- grpc_byte_stream_destroy(s->fetching_send_message);
+ s->fetching_send_message.reset();
grpc_chttp2_cancel_stream(t, s, error);
}
}
@@ -1439,7 +1432,7 @@ static void perform_stream_op_locked(void* stream_op,
GPR_ASSERT(s->id != 0);
grpc_chttp2_mark_stream_writable(t, s);
if (!(op->send_message &&
- (op->payload->send_message.send_message->flags &
+ (op->payload->send_message.send_message->flags() &
GRPC_WRITE_BUFFER_HINT))) {
grpc_chttp2_initiate_write(
t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
@@ -1466,7 +1459,7 @@ static void perform_stream_op_locked(void* stream_op,
if (op->send_message) {
GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE();
GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(
- op->payload->send_message.send_message->length);
+ op->payload->send_message.send_message->length());
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
if (s->write_closed) {
@@ -1475,7 +1468,7 @@ static void perform_stream_op_locked(void* stream_op,
// streaming call might send another message before getting a
// recv_message failure, breaking out of its loop, and then
// starting recv_trailing_metadata.
- grpc_byte_stream_destroy(op->payload->send_message.send_message);
+ op->payload->send_message.send_message.reset();
grpc_chttp2_complete_closure_step(
t, s, &s->fetching_send_message_finished,
t->is_client && s->received_trailing_metadata
@@ -1488,14 +1481,15 @@ static void perform_stream_op_locked(void* stream_op,
GPR_ASSERT(s->fetching_send_message == nullptr);
uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(
&s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
- uint32_t flags = op_payload->send_message.send_message->flags;
+ uint32_t flags = op_payload->send_message.send_message->flags();
frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
- size_t len = op_payload->send_message.send_message->length;
+ size_t len = op_payload->send_message.send_message->length();
frame_hdr[1] = static_cast<uint8_t>(len >> 24);
frame_hdr[2] = static_cast<uint8_t>(len >> 16);
frame_hdr[3] = static_cast<uint8_t>(len >> 8);
frame_hdr[4] = static_cast<uint8_t>(len);
- s->fetching_send_message = op_payload->send_message.send_message;
+ s->fetching_send_message =
+ std::move(op_payload->send_message.send_message);
s->fetched_send_message_length = 0;
s->next_message_end_offset =
s->flow_controlled_bytes_written +
@@ -1711,7 +1705,6 @@ static void send_goaway(grpc_chttp2_transport* t, grpc_error* error) {
}
void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) {
- t->ping_recv_state.ping_strikes++;
if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes &&
t->ping_policy.max_ping_strikes != 0) {
send_goaway(t,
@@ -1948,12 +1941,12 @@ static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
}
if (s->pending_byte_stream) {
if (s->on_next != nullptr) {
- grpc_chttp2_incoming_byte_stream* bs = s->data_parser.parsing_frame;
+ grpc_core::Chttp2IncomingByteStream* bs = s->data_parser.parsing_frame;
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
}
- incoming_byte_stream_publish_error(bs, error);
- incoming_byte_stream_unref(bs);
+ bs->PublishError(error);
+ bs->Unref();
s->data_parser.parsing_frame = nullptr;
} else {
GRPC_ERROR_UNREF(s->byte_stream_error);
@@ -2097,10 +2090,7 @@ void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
GRPC_ERROR_REF(error),
"send_trailing_metadata_finished");
- if (s->fetching_send_message != nullptr) {
- grpc_byte_stream_destroy(s->fetching_send_message);
- s->fetching_send_message = nullptr;
- }
+ s->fetching_send_message.reset();
grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished,
GRPC_ERROR_REF(error),
"fetching_send_message_finished");
@@ -2716,7 +2706,6 @@ static void set_pollset_set(grpc_transport* gt, grpc_stream* gs,
static void reset_byte_stream(void* arg, grpc_error* error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg);
-
s->pending_byte_stream = false;
if (error == GRPC_ERROR_NONE) {
grpc_chttp2_maybe_complete_recv_message(s->t, s);
@@ -2732,22 +2721,56 @@ static void reset_byte_stream(void* arg, grpc_error* error) {
}
}
-static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream* bs) {
- if (gpr_unref(&bs->refs)) {
- gpr_free(bs);
+namespace grpc_core {
+
+Chttp2IncomingByteStream::Chttp2IncomingByteStream(
+ grpc_chttp2_transport* transport, grpc_chttp2_stream* stream,
+ uint32_t frame_size, uint32_t flags)
+ : ByteStream(frame_size, flags),
+ transport_(transport),
+ stream_(stream),
+ remaining_bytes_(frame_size) {
+ gpr_ref_init(&refs_, 2);
+ GRPC_ERROR_UNREF(stream->byte_stream_error);
+ stream->byte_stream_error = GRPC_ERROR_NONE;
+}
+
+void Chttp2IncomingByteStream::OrphanLocked(void* arg,
+ grpc_error* error_ignored) {
+ Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
+ grpc_chttp2_stream* s = bs->stream_;
+ grpc_chttp2_transport* t = s->t;
+ bs->Unref();
+ s->pending_byte_stream = false;
+ grpc_chttp2_maybe_complete_recv_message(t, s);
+ grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
+}
+
+void Chttp2IncomingByteStream::Orphan() {
+ GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0);
+ GRPC_CLOSURE_SCHED(
+ GRPC_CLOSURE_INIT(&destroy_action_,
+ &Chttp2IncomingByteStream::OrphanLocked, this,
+ grpc_combiner_scheduler(transport_->combiner)),
+ GRPC_ERROR_NONE);
+}
+
+void Chttp2IncomingByteStream::Unref() {
+ if (gpr_unref(&refs_)) {
+ Delete(this);
}
}
-static void incoming_byte_stream_next_locked(void* argp,
- grpc_error* error_ignored) {
- grpc_chttp2_incoming_byte_stream* bs =
- static_cast<grpc_chttp2_incoming_byte_stream*>(argp);
- grpc_chttp2_transport* t = bs->transport;
- grpc_chttp2_stream* s = bs->stream;
+void Chttp2IncomingByteStream::Ref() { gpr_ref(&refs_); }
+void Chttp2IncomingByteStream::NextLocked(void* arg,
+ grpc_error* error_ignored) {
+ Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
+ grpc_chttp2_transport* t = bs->transport_;
+ grpc_chttp2_stream* s = bs->stream_;
size_t cur_length = s->frame_storage.length;
if (!s->read_closed) {
- s->flow_control->IncomingByteStreamUpdate(bs->next_action.max_size_hint,
+ s->flow_control->IncomingByteStreamUpdate(bs->next_action_.max_size_hint,
cur_length);
grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
}
@@ -2756,22 +2779,22 @@ static void incoming_byte_stream_next_locked(void* argp,
grpc_slice_buffer_swap(&s->frame_storage,
&s->unprocessed_incoming_frames_buffer);
s->unprocessed_incoming_frames_decompressed = false;
- GRPC_CLOSURE_SCHED(bs->next_action.on_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, GRPC_ERROR_NONE);
} else if (s->byte_stream_error != GRPC_ERROR_NONE) {
- GRPC_CLOSURE_SCHED(bs->next_action.on_complete,
+ GRPC_CLOSURE_SCHED(bs->next_action_.on_complete,
GRPC_ERROR_REF(s->byte_stream_error));
if (s->data_parser.parsing_frame != nullptr) {
- incoming_byte_stream_unref(s->data_parser.parsing_frame);
+ s->data_parser.parsing_frame->Unref();
s->data_parser.parsing_frame = nullptr;
}
} else if (s->read_closed) {
- if (bs->remaining_bytes != 0) {
+ if (bs->remaining_bytes_ != 0) {
s->byte_stream_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
- GRPC_CLOSURE_SCHED(bs->next_action.on_complete,
+ GRPC_CLOSURE_SCHED(bs->next_action_.on_complete,
GRPC_ERROR_REF(s->byte_stream_error));
if (s->data_parser.parsing_frame != nullptr) {
- incoming_byte_stream_unref(s->data_parser.parsing_frame);
+ s->data_parser.parsing_frame->Unref();
s->data_parser.parsing_frame = nullptr;
}
} else {
@@ -2779,122 +2802,94 @@ static void incoming_byte_stream_next_locked(void* argp,
GPR_ASSERT(false);
}
} else {
- s->on_next = bs->next_action.on_complete;
+ s->on_next = bs->next_action_.on_complete;
}
- incoming_byte_stream_unref(bs);
+ bs->Unref();
}
-static bool incoming_byte_stream_next(grpc_byte_stream* byte_stream,
- size_t max_size_hint,
- grpc_closure* on_complete) {
+bool Chttp2IncomingByteStream::Next(size_t max_size_hint,
+ grpc_closure* on_complete) {
GPR_TIMER_SCOPE("incoming_byte_stream_next", 0);
- grpc_chttp2_incoming_byte_stream* bs =
- reinterpret_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream);
- grpc_chttp2_stream* s = bs->stream;
- if (s->unprocessed_incoming_frames_buffer.length > 0) {
+ if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
return true;
} else {
- gpr_ref(&bs->refs);
- bs->next_action.max_size_hint = max_size_hint;
- bs->next_action.on_complete = on_complete;
+ Ref();
+ next_action_.max_size_hint = max_size_hint;
+ next_action_.on_complete = on_complete;
GRPC_CLOSURE_SCHED(
- GRPC_CLOSURE_INIT(&bs->next_action.closure,
- incoming_byte_stream_next_locked, bs,
- grpc_combiner_scheduler(bs->transport->combiner)),
+ GRPC_CLOSURE_INIT(&next_action_.closure,
+ &Chttp2IncomingByteStream::NextLocked, this,
+ grpc_combiner_scheduler(transport_->combiner)),
GRPC_ERROR_NONE);
return false;
}
}
-static grpc_error* incoming_byte_stream_pull(grpc_byte_stream* byte_stream,
- grpc_slice* slice) {
+grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) {
GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0);
- grpc_chttp2_incoming_byte_stream* bs =
- reinterpret_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream);
- grpc_chttp2_stream* s = bs->stream;
grpc_error* error;
-
- if (s->unprocessed_incoming_frames_buffer.length > 0) {
- if (!s->unprocessed_incoming_frames_decompressed) {
+ if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
+ if (!stream_->unprocessed_incoming_frames_decompressed) {
bool end_of_context;
- if (!s->stream_decompression_ctx) {
- s->stream_decompression_ctx = grpc_stream_compression_context_create(
- s->stream_decompression_method);
+ if (!stream_->stream_decompression_ctx) {
+ stream_->stream_decompression_ctx =
+ grpc_stream_compression_context_create(
+ stream_->stream_decompression_method);
}
- if (!grpc_stream_decompress(s->stream_decompression_ctx,
- &s->unprocessed_incoming_frames_buffer,
- &s->decompressed_data_buffer, nullptr,
+ if (!grpc_stream_decompress(stream_->stream_decompression_ctx,
+ &stream_->unprocessed_incoming_frames_buffer,
+ &stream_->decompressed_data_buffer, nullptr,
MAX_SIZE_T, &end_of_context)) {
error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error.");
return error;
}
- GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
- grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
- &s->decompressed_data_buffer);
- s->unprocessed_incoming_frames_decompressed = true;
+ GPR_ASSERT(stream_->unprocessed_incoming_frames_buffer.length == 0);
+ grpc_slice_buffer_swap(&stream_->unprocessed_incoming_frames_buffer,
+ &stream_->decompressed_data_buffer);
+ stream_->unprocessed_incoming_frames_decompressed = true;
if (end_of_context) {
- grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
- s->stream_decompression_ctx = nullptr;
+ grpc_stream_compression_context_destroy(
+ stream_->stream_decompression_ctx);
+ stream_->stream_decompression_ctx = nullptr;
}
- if (s->unprocessed_incoming_frames_buffer.length == 0) {
+ if (stream_->unprocessed_incoming_frames_buffer.length == 0) {
*slice = grpc_empty_slice();
}
}
error = grpc_deframe_unprocessed_incoming_frames(
- &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, slice,
- nullptr);
+ &stream_->data_parser, stream_,
+ &stream_->unprocessed_incoming_frames_buffer, slice, nullptr);
if (error != GRPC_ERROR_NONE) {
return error;
}
} else {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
- GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
return error;
}
return GRPC_ERROR_NONE;
}
-static void incoming_byte_stream_destroy_locked(void* byte_stream,
- grpc_error* error_ignored);
-
-static void incoming_byte_stream_destroy(grpc_byte_stream* byte_stream) {
- GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0);
- grpc_chttp2_incoming_byte_stream* bs =
- reinterpret_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream);
- GRPC_CLOSURE_SCHED(
- GRPC_CLOSURE_INIT(&bs->destroy_action,
- incoming_byte_stream_destroy_locked, bs,
- grpc_combiner_scheduler(bs->transport->combiner)),
- GRPC_ERROR_NONE);
-}
-
-static void incoming_byte_stream_publish_error(
- grpc_chttp2_incoming_byte_stream* bs, grpc_error* error) {
- grpc_chttp2_stream* s = bs->stream;
-
+void Chttp2IncomingByteStream::PublishError(grpc_error* error) {
GPR_ASSERT(error != GRPC_ERROR_NONE);
- GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_REF(error));
- s->on_next = nullptr;
- GRPC_ERROR_UNREF(s->byte_stream_error);
- s->byte_stream_error = GRPC_ERROR_REF(error);
- grpc_chttp2_cancel_stream(bs->transport, bs->stream, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(stream_->on_next, GRPC_ERROR_REF(error));
+ stream_->on_next = nullptr;
+ GRPC_ERROR_UNREF(stream_->byte_stream_error);
+ stream_->byte_stream_error = GRPC_ERROR_REF(error);
+ grpc_chttp2_cancel_stream(transport_, stream_, GRPC_ERROR_REF(error));
}
-grpc_error* grpc_chttp2_incoming_byte_stream_push(
- grpc_chttp2_incoming_byte_stream* bs, grpc_slice slice,
- grpc_slice* slice_out) {
- grpc_chttp2_stream* s = bs->stream;
-
- if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) {
+grpc_error* Chttp2IncomingByteStream::Push(grpc_slice slice,
+ grpc_slice* slice_out) {
+ if (remaining_bytes_ < GRPC_SLICE_LENGTH(slice)) {
grpc_error* error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
-
- GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
grpc_slice_unref_internal(slice);
return error;
} else {
- bs->remaining_bytes -= static_cast<uint32_t> GRPC_SLICE_LENGTH(slice);
+ remaining_bytes_ -= static_cast<uint32_t> GRPC_SLICE_LENGTH(slice);
if (slice_out != nullptr) {
*slice_out = slice;
}
@@ -2902,66 +2897,25 @@ grpc_error* grpc_chttp2_incoming_byte_stream_push(
}
}
-grpc_error* grpc_chttp2_incoming_byte_stream_finished(
- grpc_chttp2_incoming_byte_stream* bs, grpc_error* error,
- bool reset_on_error) {
- grpc_chttp2_stream* s = bs->stream;
-
+grpc_error* Chttp2IncomingByteStream::Finished(grpc_error* error,
+ bool reset_on_error) {
if (error == GRPC_ERROR_NONE) {
- if (bs->remaining_bytes != 0) {
+ if (remaining_bytes_ != 0) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
}
}
if (error != GRPC_ERROR_NONE && reset_on_error) {
- GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
}
- incoming_byte_stream_unref(bs);
+ Unref();
return error;
}
-static void incoming_byte_stream_shutdown(grpc_byte_stream* byte_stream,
- grpc_error* error) {
- grpc_chttp2_incoming_byte_stream* bs =
- reinterpret_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream);
- GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
- bs, error, true /* reset_on_error */));
+void Chttp2IncomingByteStream::Shutdown(grpc_error* error) {
+ GRPC_ERROR_UNREF(Finished(error, true /* reset_on_error */));
}
-static const grpc_byte_stream_vtable grpc_chttp2_incoming_byte_stream_vtable = {
- incoming_byte_stream_next, incoming_byte_stream_pull,
- incoming_byte_stream_shutdown, incoming_byte_stream_destroy};
-
-static void incoming_byte_stream_destroy_locked(void* byte_stream,
- grpc_error* error_ignored) {
- grpc_chttp2_incoming_byte_stream* bs =
- static_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream);
- grpc_chttp2_stream* s = bs->stream;
- grpc_chttp2_transport* t = s->t;
-
- GPR_ASSERT(bs->base.vtable == &grpc_chttp2_incoming_byte_stream_vtable);
- incoming_byte_stream_unref(bs);
- s->pending_byte_stream = false;
- grpc_chttp2_maybe_complete_recv_message(t, s);
- grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
-}
-
-grpc_chttp2_incoming_byte_stream* grpc_chttp2_incoming_byte_stream_create(
- grpc_chttp2_transport* t, grpc_chttp2_stream* s, uint32_t frame_size,
- uint32_t flags) {
- grpc_chttp2_incoming_byte_stream* incoming_byte_stream =
- static_cast<grpc_chttp2_incoming_byte_stream*>(
- gpr_malloc(sizeof(*incoming_byte_stream)));
- incoming_byte_stream->base.length = frame_size;
- incoming_byte_stream->remaining_bytes = frame_size;
- incoming_byte_stream->base.flags = flags;
- incoming_byte_stream->base.vtable = &grpc_chttp2_incoming_byte_stream_vtable;
- gpr_ref_init(&incoming_byte_stream->refs, 2);
- incoming_byte_stream->transport = t;
- incoming_byte_stream->stream = s;
- GRPC_ERROR_UNREF(s->byte_stream_error);
- s->byte_stream_error = GRPC_ERROR_NONE;
- return incoming_byte_stream;
-}
+} // namespace grpc_core
/*******************************************************************************
* RESOURCE QUOTAS
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.cc b/src/core/ext/transport/chttp2/transport/frame_data.cc
index 0d37a494a2..f8f06f6789 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.cc
+++ b/src/core/ext/transport/chttp2/transport/frame_data.cc
@@ -27,6 +27,7 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/transport/transport.h"
@@ -39,8 +40,7 @@ grpc_error* grpc_chttp2_data_parser_init(grpc_chttp2_data_parser* parser) {
void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser* parser) {
if (parser->parsing_frame != nullptr) {
- GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
- parser->parsing_frame,
+ GRPC_ERROR_UNREF(parser->parsing_frame->Finished(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false));
}
GRPC_ERROR_UNREF(parser->error);
@@ -100,7 +100,7 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
grpc_error* grpc_deframe_unprocessed_incoming_frames(
grpc_chttp2_data_parser* p, grpc_chttp2_stream* s,
grpc_slice_buffer* slices, grpc_slice* slice_out,
- grpc_byte_stream** stream_out) {
+ grpc_core::OrphanablePtr<grpc_core::ByteStream>* stream_out) {
grpc_error* error = GRPC_ERROR_NONE;
grpc_chttp2_transport* t = s->t;
@@ -197,12 +197,11 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
if (p->is_frame_compressed) {
message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
- p->parsing_frame = grpc_chttp2_incoming_byte_stream_create(
+ p->parsing_frame = grpc_core::New<grpc_core::Chttp2IncomingByteStream>(
t, s, p->frame_size, message_flags);
- *stream_out = &p->parsing_frame->base;
- if (p->parsing_frame->remaining_bytes == 0) {
- GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
- p->parsing_frame, GRPC_ERROR_NONE, true));
+ stream_out->reset(p->parsing_frame);
+ if (p->parsing_frame->remaining_bytes() == 0) {
+ GRPC_ERROR_UNREF(p->parsing_frame->Finished(GRPC_ERROR_NONE, true));
p->parsing_frame = nullptr;
p->state = GRPC_CHTTP2_DATA_FH_0;
}
@@ -226,8 +225,7 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
if (remaining == p->frame_size) {
s->stats.incoming.data_bytes += remaining;
if (GRPC_ERROR_NONE !=
- (error = grpc_chttp2_incoming_byte_stream_push(
- p->parsing_frame,
+ (error = p->parsing_frame->Push(
grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
static_cast<size_t>(end - beg)),
slice_out))) {
@@ -235,8 +233,7 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
return error;
}
if (GRPC_ERROR_NONE !=
- (error = grpc_chttp2_incoming_byte_stream_finished(
- p->parsing_frame, GRPC_ERROR_NONE, true))) {
+ (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
grpc_slice_unref_internal(slice);
return error;
}
@@ -247,8 +244,7 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
} else if (remaining < p->frame_size) {
s->stats.incoming.data_bytes += remaining;
if (GRPC_ERROR_NONE !=
- (error = grpc_chttp2_incoming_byte_stream_push(
- p->parsing_frame,
+ (error = p->parsing_frame->Push(
grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
static_cast<size_t>(end - beg)),
slice_out))) {
@@ -261,18 +257,16 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
GPR_ASSERT(remaining > p->frame_size);
s->stats.incoming.data_bytes += p->frame_size;
if (GRPC_ERROR_NONE !=
- (grpc_chttp2_incoming_byte_stream_push(
- p->parsing_frame,
+ p->parsing_frame->Push(
grpc_slice_sub(
slice, static_cast<size_t>(cur - beg),
static_cast<size_t>(cur + p->frame_size - beg)),
- slice_out))) {
+ slice_out)) {
grpc_slice_unref_internal(slice);
return error;
}
if (GRPC_ERROR_NONE !=
- (error = grpc_chttp2_incoming_byte_stream_finished(
- p->parsing_frame, GRPC_ERROR_NONE, true))) {
+ (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
grpc_slice_unref_internal(slice);
return error;
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_data.h b/src/core/ext/transport/chttp2/transport/frame_data.h
index 3efbbf9f76..e5d01f764e 100644
--- a/src/core/ext/transport/chttp2/transport/frame_data.h
+++ b/src/core/ext/transport/chttp2/transport/frame_data.h
@@ -26,7 +26,6 @@
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include "src/core/ext/transport/chttp2/transport/frame.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/transport/byte_stream.h"
#include "src/core/lib/transport/transport.h"
@@ -40,8 +39,9 @@ typedef enum {
GRPC_CHTTP2_DATA_ERROR
} grpc_chttp2_stream_state;
-typedef struct grpc_chttp2_incoming_byte_stream
- grpc_chttp2_incoming_byte_stream;
+namespace grpc_core {
+class Chttp2IncomingByteStream;
+} // namespace grpc_core
typedef struct {
grpc_chttp2_stream_state state;
@@ -50,7 +50,7 @@ typedef struct {
grpc_error* error;
bool is_frame_compressed;
- grpc_chttp2_incoming_byte_stream* parsing_frame;
+ grpc_core::Chttp2IncomingByteStream* parsing_frame;
} grpc_chttp2_data_parser;
/* initialize per-stream state for data frame parsing */
@@ -79,6 +79,6 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
grpc_error* grpc_deframe_unprocessed_incoming_frames(
grpc_chttp2_data_parser* p, grpc_chttp2_stream* s,
grpc_slice_buffer* slices, grpc_slice* slice_out,
- grpc_byte_stream** stream_out);
+ grpc_core::OrphanablePtr<grpc_core::ByteStream>* stream_out);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_DATA_H */
diff --git a/src/core/ext/transport/chttp2/transport/frame_goaway.h b/src/core/ext/transport/chttp2/transport/frame_goaway.h
index e17ed8d563..66c7a68bef 100644
--- a/src/core/ext/transport/chttp2/transport/frame_goaway.h
+++ b/src/core/ext/transport/chttp2/transport/frame_goaway.h
@@ -24,7 +24,6 @@
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include "src/core/ext/transport/chttp2/transport/frame.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
typedef enum {
GRPC_CHTTP2_GOAWAY_LSI0,
diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.h b/src/core/ext/transport/chttp2/transport/frame_ping.h
index 8718d6a097..55a4499ad5 100644
--- a/src/core/ext/transport/chttp2/transport/frame_ping.h
+++ b/src/core/ext/transport/chttp2/transport/frame_ping.h
@@ -23,7 +23,6 @@
#include <grpc/slice.h>
#include "src/core/ext/transport/chttp2/transport/frame.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
typedef struct {
uint8_t byte;
diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.h b/src/core/ext/transport/chttp2/transport/frame_rst_stream.h
index bb2d34f918..6bcf9c4479 100644
--- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.h
+++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.h
@@ -23,7 +23,6 @@
#include <grpc/slice.h>
#include "src/core/ext/transport/chttp2/transport/frame.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/transport/transport.h"
typedef struct {
diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.h b/src/core/ext/transport/chttp2/transport/frame_settings.h
index df19627194..8d8d9b1a91 100644
--- a/src/core/ext/transport/chttp2/transport/frame_settings.h
+++ b/src/core/ext/transport/chttp2/transport/frame_settings.h
@@ -24,7 +24,6 @@
#include <grpc/slice.h>
#include "src/core/ext/transport/chttp2/transport/frame.h"
#include "src/core/ext/transport/chttp2/transport/http2_settings.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
typedef enum {
GRPC_CHTTP2_SPS_ID0,
diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.h b/src/core/ext/transport/chttp2/transport/frame_window_update.h
index 30667c77e1..3d2391f637 100644
--- a/src/core/ext/transport/chttp2/transport/frame_window_update.h
+++ b/src/core/ext/transport/chttp2/transport/frame_window_update.h
@@ -23,7 +23,6 @@
#include <grpc/slice.h>
#include "src/core/ext/transport/chttp2/transport/frame.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/transport/transport.h"
typedef struct {
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.h b/src/core/ext/transport/chttp2/transport/hpack_parser.h
index b3b8018b98..3e05de4b92 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.h
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.h
@@ -25,7 +25,6 @@
#include "src/core/ext/transport/chttp2/transport/frame.h"
#include "src/core/ext/transport/chttp2/transport/hpack_table.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/transport/metadata.h"
typedef struct grpc_chttp2_hpack_parser grpc_chttp2_hpack_parser;
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index b9431cd311..6d11e5aa31 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -203,18 +203,58 @@ typedef struct grpc_chttp2_write_cb {
struct grpc_chttp2_write_cb* next;
} grpc_chttp2_write_cb;
-/* forward declared in frame_data.h */
-struct grpc_chttp2_incoming_byte_stream {
- grpc_byte_stream base;
- gpr_refcount refs;
+namespace grpc_core {
+
+class Chttp2IncomingByteStream : public ByteStream {
+ public:
+ Chttp2IncomingByteStream(grpc_chttp2_transport* transport,
+ grpc_chttp2_stream* stream, uint32_t frame_size,
+ uint32_t flags);
+
+ void Orphan() override;
+
+ bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
+ grpc_error* Pull(grpc_slice* slice) override;
+ void Shutdown(grpc_error* error) override;
+
+ // TODO(roth): When I converted this class to C++, I wanted to make it
+ // inherit from RefCounted or InternallyRefCounted instead of continuing
+ // to use its own custom ref-counting code. However, that would require
+ // using multiple inheritence, which sucks in general. And to make matters
+ // worse, it causes problems with our New<> and Delete<> wrappers.
+ // Specifically, unless RefCounted is first in the list of parent classes,
+ // it will see a different value of the address of the object than the one
+ // we actually allocated, in which case gpr_free() will be called on a
+ // different address than the one we got from gpr_malloc(), thus causing a
+ // crash. Given the fragility of depending on that, as well as a desire to
+ // avoid multiple inheritence in general, I've decided to leave this
+ // alone for now. We can revisit this once we're able to link against
+ // libc++, at which point we can eliminate New<> and Delete<> and
+ // switch to std::shared_ptr<>.
+ void Ref();
+ void Unref();
+
+ void PublishError(grpc_error* error);
+
+ grpc_error* Push(grpc_slice slice, grpc_slice* slice_out);
- grpc_chttp2_transport* transport; /* immutable */
- grpc_chttp2_stream* stream; /* immutable */
+ grpc_error* Finished(grpc_error* error, bool reset_on_error);
+
+ uint32_t remaining_bytes() const { return remaining_bytes_; }
+
+ private:
+ static void NextLocked(void* arg, grpc_error* error_ignored);
+ static void OrphanLocked(void* arg, grpc_error* error_ignored);
+
+ grpc_chttp2_transport* transport_; // Immutable.
+ grpc_chttp2_stream* stream_; // Immutable.
+
+ gpr_refcount refs_;
/* Accessed only by transport thread when stream->pending_byte_stream == false
* Accessed only by application thread when stream->pending_byte_stream ==
* true */
- uint32_t remaining_bytes;
+ uint32_t remaining_bytes_;
/* Accessed only by transport thread when stream->pending_byte_stream == false
* Accessed only by application thread when stream->pending_byte_stream ==
@@ -223,11 +263,12 @@ struct grpc_chttp2_incoming_byte_stream {
grpc_closure closure;
size_t max_size_hint;
grpc_closure* on_complete;
- } next_action;
- grpc_closure destroy_action;
- grpc_closure finished_action;
+ } next_action_;
+ grpc_closure destroy_action_;
};
+} // namespace grpc_core
+
typedef enum {
GRPC_CHTTP2_KEEPALIVE_STATE_WAITING,
GRPC_CHTTP2_KEEPALIVE_STATE_PINGING,
@@ -456,7 +497,7 @@ struct grpc_chttp2_stream {
grpc_metadata_batch* send_trailing_metadata;
grpc_closure* send_trailing_metadata_finished;
- grpc_byte_stream* fetching_send_message;
+ grpc_core::OrphanablePtr<grpc_core::ByteStream> fetching_send_message;
uint32_t fetched_send_message_length;
grpc_slice fetching_slice;
int64_t next_message_end_offset;
@@ -468,7 +509,7 @@ struct grpc_chttp2_stream {
grpc_metadata_batch* recv_initial_metadata;
grpc_closure* recv_initial_metadata_ready;
bool* trailing_metadata_available;
- grpc_byte_stream** recv_message;
+ grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
grpc_closure* recv_message_ready;
grpc_metadata_batch* recv_trailing_metadata;
grpc_closure* recv_trailing_metadata_finished;
@@ -719,18 +760,6 @@ void grpc_chttp2_unref_transport(grpc_chttp2_transport* t);
void grpc_chttp2_ref_transport(grpc_chttp2_transport* t);
#endif
-grpc_chttp2_incoming_byte_stream* grpc_chttp2_incoming_byte_stream_create(
- grpc_chttp2_transport* t, grpc_chttp2_stream* s, uint32_t frame_size,
- uint32_t flags);
-grpc_error* grpc_chttp2_incoming_byte_stream_push(
- grpc_chttp2_incoming_byte_stream* bs, grpc_slice slice,
- grpc_slice* slice_out);
-grpc_error* grpc_chttp2_incoming_byte_stream_finished(
- grpc_chttp2_incoming_byte_stream* bs, grpc_error* error,
- bool reset_on_error);
-void grpc_chttp2_incoming_byte_stream_notify(
- grpc_chttp2_incoming_byte_stream* bs, grpc_error* error);
-
void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id);
/** Add a new ping strike to ping_recv_state.ping_strikes. If
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc
index ff1c1aad62..8e3ea05706 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.cc
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc
@@ -31,6 +31,7 @@
#include "src/core/ext/transport/cronet/transport/cronet_transport.h"
#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/slice/slice_internal.h"
@@ -122,7 +123,7 @@ struct read_state {
bool read_stream_closed;
/* vars for holding data destined for the application */
- struct grpc_slice_buffer_stream sbs;
+ grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs;
grpc_slice_buffer read_slice_buffer;
/* vars for trailing metadata */
@@ -1041,16 +1042,14 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
grpc_slice_buffer write_slice_buffer;
grpc_slice slice;
grpc_slice_buffer_init(&write_slice_buffer);
- if (1 != grpc_byte_stream_next(
- stream_op->payload->send_message.send_message,
- stream_op->payload->send_message.send_message->length,
+ if (1 != stream_op->payload->send_message.send_message->Next(
+ stream_op->payload->send_message.send_message->length(),
nullptr)) {
/* Should never reach here */
GPR_ASSERT(false);
}
if (GRPC_ERROR_NONE !=
- grpc_byte_stream_pull(stream_op->payload->send_message.send_message,
- &slice)) {
+ stream_op->payload->send_message.send_message->Pull(&slice)) {
/* Should never reach here */
GPR_ASSERT(false);
}
@@ -1062,9 +1061,10 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
}
if (write_slice_buffer.count > 0) {
size_t write_buffer_size;
- create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
- &write_buffer_size,
- stream_op->payload->send_message.send_message->flags);
+ create_grpc_frame(
+ &write_slice_buffer, &stream_state->ws.write_buffer,
+ &write_buffer_size,
+ stream_op->payload->send_message.send_message->flags());
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
stream_state->ws.write_buffer);
stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
@@ -1089,6 +1089,7 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
}
stream_state->state_op_done[OP_SEND_MESSAGE] = true;
oas->state.state_op_done[OP_SEND_MESSAGE] = true;
+ stream_op->payload->send_message.send_message.reset();
} else if (stream_op->send_trailing_metadata &&
op_can_be_run(stream_op, s, &oas->state,
OP_SEND_TRAILING_METADATA)) {
@@ -1195,14 +1196,13 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
grpc_slice_buffer_destroy_internal(
&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
- grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
- &stream_state->rs.read_slice_buffer, 0);
+ uint32_t flags = 0;
if (stream_state->rs.compressed) {
- stream_state->rs.sbs.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+ flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
- *(reinterpret_cast<grpc_byte_buffer**>(
- stream_op->payload->recv_message.recv_message)) =
- reinterpret_cast<grpc_byte_buffer*>(&stream_state->rs.sbs);
+ stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
+ stream_op->payload->recv_message.recv_message->reset(
+ stream_state->rs.sbs.get());
GRPC_CLOSURE_SCHED(
stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
@@ -1252,14 +1252,13 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
read_data_slice);
- grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
- &stream_state->rs.read_slice_buffer, 0);
+ uint32_t flags = 0;
if (stream_state->rs.compressed) {
- stream_state->rs.sbs.base.flags = GRPC_WRITE_INTERNAL_COMPRESS;
+ flags = GRPC_WRITE_INTERNAL_COMPRESS;
}
- *(reinterpret_cast<grpc_byte_buffer**>(
- stream_op->payload->recv_message.recv_message)) =
- reinterpret_cast<grpc_byte_buffer*>(&stream_state->rs.sbs);
+ stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
+ stream_op->payload->recv_message.recv_message->reset(
+ stream_state->rs.sbs.get());
GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index 5f898bbf25..67a380077b 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -25,6 +25,7 @@
#include <string.h>
#include "src/core/ext/transport/inproc/inproc_transport.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
@@ -99,7 +100,7 @@ typedef struct inproc_stream {
grpc_transport_stream_op_batch* recv_trailing_md_op;
grpc_slice_buffer recv_message;
- grpc_slice_buffer_stream recv_stream;
+ grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> recv_stream;
bool recv_inited;
bool initial_md_sent;
@@ -482,8 +483,7 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) {
s->recv_message_op = nullptr;
}
if (s->send_message_op) {
- grpc_byte_stream_destroy(
- s->send_message_op->payload->send_message.send_message);
+ s->send_message_op->payload->send_message.send_message.reset();
complete_if_batch_end_locked(
s, error, s->send_message_op,
"fail_helper scheduling send-message-on-complete");
@@ -521,7 +521,7 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) {
static void message_transfer_locked(inproc_stream* sender,
inproc_stream* receiver) {
size_t remaining =
- sender->send_message_op->payload->send_message.send_message->length;
+ sender->send_message_op->payload->send_message.send_message->length();
if (receiver->recv_inited) {
grpc_slice_buffer_destroy_internal(&receiver->recv_message);
}
@@ -530,12 +530,12 @@ static void message_transfer_locked(inproc_stream* sender,
do {
grpc_slice message_slice;
grpc_closure unused;
- GPR_ASSERT(grpc_byte_stream_next(
- sender->send_message_op->payload->send_message.send_message, SIZE_MAX,
- &unused));
- grpc_error* error = grpc_byte_stream_pull(
- sender->send_message_op->payload->send_message.send_message,
- &message_slice);
+ GPR_ASSERT(
+ sender->send_message_op->payload->send_message.send_message->Next(
+ SIZE_MAX, &unused));
+ grpc_error* error =
+ sender->send_message_op->payload->send_message.send_message->Pull(
+ &message_slice);
if (error != GRPC_ERROR_NONE) {
cancel_stream_locked(sender, GRPC_ERROR_REF(error));
break;
@@ -544,13 +544,11 @@ static void message_transfer_locked(inproc_stream* sender,
remaining -= GRPC_SLICE_LENGTH(message_slice);
grpc_slice_buffer_add(&receiver->recv_message, message_slice);
} while (remaining > 0);
- grpc_byte_stream_destroy(
- sender->send_message_op->payload->send_message.send_message);
+ sender->send_message_op->payload->send_message.send_message.reset();
- grpc_slice_buffer_stream_init(&receiver->recv_stream, &receiver->recv_message,
- 0);
- *receiver->recv_message_op->payload->recv_message.recv_message =
- &receiver->recv_stream.base;
+ receiver->recv_stream.Init(&receiver->recv_message, 0);
+ receiver->recv_message_op->payload->recv_message.recv_message->reset(
+ receiver->recv_stream.get());
INPROC_LOG(GPR_DEBUG, "message_transfer_locked %p scheduling message-ready",
receiver);
GRPC_CLOSURE_SCHED(
@@ -606,8 +604,7 @@ static void op_state_machine(void* arg, grpc_error* error) {
(s->trailing_md_sent || other->recv_trailing_md_op)) {
// A server send will never be matched if the client is waiting
// for trailing metadata already
- grpc_byte_stream_destroy(
- s->send_message_op->payload->send_message.send_message);
+ s->send_message_op->payload->send_message.send_message.reset();
complete_if_batch_end_locked(
s, GRPC_ERROR_NONE, s->send_message_op,
"op_state_machine scheduling send-message-on-complete");
@@ -744,8 +741,7 @@ static void op_state_machine(void* arg, grpc_error* error) {
if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
// Nothing further will try to receive from this stream, so finish off
// any outstanding send_message op
- grpc_byte_stream_destroy(
- s->send_message_op->payload->send_message.send_message);
+ s->send_message_op->payload->send_message.send_message.reset();
complete_if_batch_end_locked(
s, new_err, s->send_message_op,
"op_state_machine scheduling send-message-on-complete");
@@ -803,8 +799,7 @@ static void op_state_machine(void* arg, grpc_error* error) {
s->send_message_op) {
// Nothing further will try to receive from this stream, so finish off
// any outstanding send_message op
- grpc_byte_stream_destroy(
- s->send_message_op->payload->send_message.send_message);
+ s->send_message_op->payload->send_message.send_message.reset();
complete_if_batch_end_locked(
s, new_err, s->send_message_op,
"op_state_machine scheduling send-message-on-complete");