diff options
author | 2018-03-15 13:46:28 -1000 | |
---|---|---|
committer | 2018-03-15 13:46:28 -1000 | |
commit | b93a006d64f071ad0804b00f9b1064411a430092 (patch) | |
tree | d5971592bf7675ab5279723d46ab96f8e5827b5f /src/core/ext | |
parent | 0c6024b94dc2a8aa9d851d8bc5d3a96e97802a55 (diff) | |
parent | 941dbaf9f0adb99f3fac4317c703084409a7a33d (diff) |
Merge branch 'master' of https://github.com/grpc/grpc into channel-tracing
Diffstat (limited to 'src/core/ext')
31 files changed, 350 insertions, 375 deletions
diff --git a/src/core/ext/filters/client_channel/backup_poller.h b/src/core/ext/filters/client_channel/backup_poller.h index 7285b9b93e..8f132f968c 100644 --- a/src/core/ext/filters/client_channel/backup_poller.h +++ b/src/core/ext/filters/client_channel/backup_poller.h @@ -23,7 +23,6 @@ #include <grpc/grpc.h> #include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/iomgr/exec_ctx.h" /* Start polling \a interested_parties periodically in the timer thread */ void grpc_client_channel_start_backup_polling( diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 09cc06e169..bf3911e5ee 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -798,7 +798,8 @@ typedef struct { grpc_linked_mdelem* send_initial_metadata_storage; grpc_metadata_batch send_initial_metadata; // For send_message. - grpc_caching_byte_stream send_message; + grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream> + send_message; // For send_trailing_metadata. grpc_linked_mdelem* send_trailing_metadata_storage; grpc_metadata_batch send_trailing_metadata; @@ -808,7 +809,7 @@ typedef struct { bool trailing_metadata_available; // For intercepting recv_message. grpc_closure recv_message_ready; - grpc_byte_stream* recv_message; + grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message; // For intercepting recv_trailing_metadata. grpc_metadata_batch recv_trailing_metadata; grpc_transport_stream_stats collect_stats; @@ -914,12 +915,12 @@ typedef struct client_channel_call_data { gpr_atm* peer_string; // send_message // When we get a send_message op, we replace the original byte stream - // with a grpc_caching_byte_stream that caches the slices to a - // local buffer for use in retries. + // with a CachingByteStream that caches the slices to a local buffer for + // use in retries. // Note: We inline the cache for the first 3 send_message ops and use // dynamic allocation after that. This number was essentially picked // at random; it could be changed in the future to tune performance. - grpc_core::InlinedVector<grpc_byte_stream_cache*, 3> send_messages; + grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3> send_messages; // send_trailing_metadata bool seen_send_trailing_metadata; grpc_linked_mdelem* send_trailing_metadata_storage; @@ -964,10 +965,11 @@ static void maybe_cache_send_ops_for_batch(call_data* calld, } // Set up cache for send_message ops. if (batch->send_message) { - grpc_byte_stream_cache* cache = (grpc_byte_stream_cache*)gpr_arena_alloc( - calld->arena, sizeof(grpc_byte_stream_cache)); - grpc_byte_stream_cache_init(cache, - batch->payload->send_message.send_message); + grpc_core::ByteStreamCache* cache = + static_cast<grpc_core::ByteStreamCache*>( + gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache))); + new (cache) grpc_core::ByteStreamCache( + std::move(batch->payload->send_message.send_message)); calld->send_messages.push_back(cache); } // Save metadata batch for send_trailing_metadata ops. @@ -1002,7 +1004,7 @@ static void free_cached_send_op_data_after_commit( "]", chand, calld, i); } - grpc_byte_stream_cache_destroy(calld->send_messages[i]); + calld->send_messages[i]->Destroy(); } if (retry_state->completed_send_trailing_metadata) { grpc_metadata_batch_destroy(&calld->send_trailing_metadata); @@ -1026,8 +1028,8 @@ static void free_cached_send_op_data_for_completed_batch( "]", chand, calld, retry_state->completed_send_message_count - 1); } - grpc_byte_stream_cache_destroy( - calld->send_messages[retry_state->completed_send_message_count - 1]); + calld->send_messages[retry_state->completed_send_message_count - 1] + ->Destroy(); } if (batch_data->batch.send_trailing_metadata) { grpc_metadata_batch_destroy(&calld->send_trailing_metadata); @@ -1079,7 +1081,7 @@ static void pending_batches_add(grpc_call_element* elem, if (batch->send_message) { calld->pending_send_message = true; calld->bytes_buffered_for_retry += - batch->payload->send_message.send_message->length; + batch->payload->send_message.send_message->length(); } if (batch->send_trailing_metadata) { calld->pending_send_trailing_metadata = true; @@ -1680,7 +1682,7 @@ static void invoke_recv_message_callback(void* arg, grpc_error* error) { GPR_ASSERT(pending != nullptr); // Return payload. *pending->batch->payload->recv_message.recv_message = - batch_data->recv_message; + std::move(batch_data->recv_message); // Update bookkeeping. // Note: Need to do this before invoking the callback, since invoking // the callback will result in yielding the call combiner. @@ -2124,13 +2126,13 @@ static void add_retriable_send_message_op( "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]", chand, calld, retry_state->started_send_message_count); } - grpc_byte_stream_cache* cache = + grpc_core::ByteStreamCache* cache = calld->send_messages[retry_state->started_send_message_count]; ++retry_state->started_send_message_count; - grpc_caching_byte_stream_init(&batch_data->send_message, cache); + batch_data->send_message.Init(cache); batch_data->batch.send_message = true; - batch_data->batch.payload->send_message.send_message = - &batch_data->send_message.base; + batch_data->batch.payload->send_message.send_message.reset( + batch_data->send_message.get()); } // Adds retriable send_trailing_metadata op to batch_data. diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index cb39e4224e..47e1deef12 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -61,6 +61,7 @@ #include <grpc/support/port_platform.h> #include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/socket_utils.h" #include <inttypes.h> #include <limits.h> @@ -417,20 +418,20 @@ void ParseServer(const grpc_grpclb_server* server, grpc_resolved_address* addr) { memset(addr, 0, sizeof(*addr)); if (server->drop) return; - const uint16_t netorder_port = htons((uint16_t)server->port); + const uint16_t netorder_port = grpc_htons((uint16_t)server->port); /* the addresses are given in binary format (a in(6)_addr struct) in * server->ip_address.bytes. */ const grpc_grpclb_ip_address* ip = &server->ip_address; if (ip->size == 4) { - addr->len = sizeof(struct sockaddr_in); - struct sockaddr_in* addr4 = (struct sockaddr_in*)&addr->addr; - addr4->sin_family = AF_INET; + addr->len = sizeof(grpc_sockaddr_in); + grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr); + addr4->sin_family = GRPC_AF_INET; memcpy(&addr4->sin_addr, ip->bytes, ip->size); addr4->sin_port = netorder_port; } else if (ip->size == 16) { - addr->len = sizeof(struct sockaddr_in6); - struct sockaddr_in6* addr6 = (struct sockaddr_in6*)&addr->addr; - addr6->sin6_family = AF_INET6; + addr->len = sizeof(grpc_sockaddr_in6); + grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr; + addr6->sin6_family = GRPC_AF_INET6; memcpy(&addr6->sin6_addr, ip->bytes, ip->size); addr6->sin6_port = netorder_port; } diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h index b8bbd32072..6440258158 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.h +++ b/src/core/ext/filters/client_channel/lb_policy_factory.h @@ -21,7 +21,6 @@ #include <grpc/support/port_platform.h> -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/ext/filters/client_channel/client_channel_factory.h" diff --git a/src/core/ext/filters/client_channel/lb_policy_registry.h b/src/core/ext/filters/client_channel/lb_policy_registry.h index 2283d848bd..2e9bb061ed 100644 --- a/src/core/ext/filters/client_channel/lb_policy_registry.h +++ b/src/core/ext/filters/client_channel/lb_policy_registry.h @@ -24,7 +24,6 @@ #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/orphanable.h" -#include "src/core/lib/iomgr/exec_ctx.h" namespace grpc_core { diff --git a/src/core/ext/filters/client_channel/parse_address.cc b/src/core/ext/filters/client_channel/parse_address.cc index e78dc99e0b..92ea259cf0 100644 --- a/src/core/ext/filters/client_channel/parse_address.cc +++ b/src/core/ext/filters/client_channel/parse_address.cc @@ -20,6 +20,7 @@ #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/socket_utils.h" #include <stdio.h> #include <string.h> @@ -71,10 +72,10 @@ bool grpc_parse_ipv4_hostport(const char* hostport, grpc_resolved_address* addr, if (!gpr_split_host_port(hostport, &host, &port)) return false; // Parse IP address. memset(addr, 0, sizeof(*addr)); - addr->len = sizeof(struct sockaddr_in); - struct sockaddr_in* in = reinterpret_cast<struct sockaddr_in*>(addr->addr); - in->sin_family = AF_INET; - if (inet_pton(AF_INET, host, &in->sin_addr) == 0) { + addr->len = sizeof(grpc_sockaddr_in); + grpc_sockaddr_in* in = reinterpret_cast<grpc_sockaddr_in*>(addr->addr); + in->sin_family = GRPC_AF_INET; + if (grpc_inet_pton(GRPC_AF_INET, host, &in->sin_addr) == 0) { if (log_errors) gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host); goto done; } @@ -88,7 +89,7 @@ bool grpc_parse_ipv4_hostport(const char* hostport, grpc_resolved_address* addr, if (log_errors) gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port); goto done; } - in->sin_port = htons(static_cast<uint16_t>(port_num)); + in->sin_port = grpc_htons(static_cast<uint16_t>(port_num)); success = true; done: gpr_free(host); @@ -117,19 +118,20 @@ bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr, if (!gpr_split_host_port(hostport, &host, &port)) return false; // Parse IP address. memset(addr, 0, sizeof(*addr)); - addr->len = sizeof(struct sockaddr_in6); - struct sockaddr_in6* in6 = reinterpret_cast<struct sockaddr_in6*>(addr->addr); - in6->sin6_family = AF_INET6; + addr->len = sizeof(grpc_sockaddr_in6); + grpc_sockaddr_in6* in6 = reinterpret_cast<grpc_sockaddr_in6*>(addr->addr); + in6->sin6_family = GRPC_AF_INET6; // Handle the RFC6874 syntax for IPv6 zone identifiers. char* host_end = static_cast<char*>(gpr_memrchr(host, '%', strlen(host))); if (host_end != nullptr) { GPR_ASSERT(host_end >= host); - char host_without_scope[INET6_ADDRSTRLEN]; + char host_without_scope[GRPC_INET6_ADDRSTRLEN]; size_t host_without_scope_len = static_cast<size_t>(host_end - host); uint32_t sin6_scope_id = 0; strncpy(host_without_scope, host, host_without_scope_len); host_without_scope[host_without_scope_len] = '\0'; - if (inet_pton(AF_INET6, host_without_scope, &in6->sin6_addr) == 0) { + if (grpc_inet_pton(GRPC_AF_INET6, host_without_scope, &in6->sin6_addr) == + 0) { gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host_without_scope); goto done; } @@ -142,7 +144,7 @@ bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr, // Handle "sin6_scope_id" being type "u_long". See grpc issue #10027. in6->sin6_scope_id = sin6_scope_id; } else { - if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) { + if (grpc_inet_pton(GRPC_AF_INET6, host, &in6->sin6_addr) == 0) { gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host); goto done; } @@ -157,7 +159,7 @@ bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr, if (log_errors) gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port); goto done; } - in6->sin6_port = htons(static_cast<uint16_t>(port_num)); + in6->sin6_port = grpc_htons(static_cast<uint16_t>(port_num)); success = true; done: gpr_free(host); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index aa93e5d8de..c63de3c509 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -440,6 +440,19 @@ class AresDnsResolverFactory : public ResolverFactory { } // namespace grpc_core +extern grpc_address_resolver_vtable* grpc_resolve_address_impl; +static grpc_address_resolver_vtable* default_resolver; + +static grpc_error* blocking_resolve_address_ares( + const char* name, const char* default_port, + grpc_resolved_addresses** addresses) { + return default_resolver->blocking_resolve_address(name, default_port, + addresses); +} + +static grpc_address_resolver_vtable ares_resolver = { + grpc_resolve_address_ares, blocking_resolve_address_ares}; + void grpc_resolver_dns_ares_init() { char* resolver_env = gpr_getenv("GRPC_DNS_RESOLVER"); /* TODO(zyc): Turn on c-ares based resolver by default after the address @@ -450,7 +463,8 @@ void grpc_resolver_dns_ares_init() { GRPC_LOG_IF_ERROR("ares_library_init() failed", error); return; } - grpc_resolve_address = grpc_resolve_address_ares; + default_resolver = grpc_resolve_address_impl; + grpc_set_resolver_impl(&ares_resolver); grpc_core::ResolverRegistry::Builder::RegisterResolverFactory( grpc_core::UniquePtr<grpc_core::ResolverFactory>( grpc_core::New<grpc_core::AresDnsResolverFactory>())); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h index 0bc13e35f4..6239549534 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h @@ -22,7 +22,6 @@ #include <grpc/support/port_platform.h> #include <ares.h> -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/pollset_set.h" typedef struct grpc_ares_ev_driver grpc_ares_ev_driver; diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h index bda9cd1729..3e8ea01e12 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -22,7 +22,6 @@ #include <grpc/support/port_platform.h> #include "src/core/ext/filters/client_channel/lb_policy_factory.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/resolve_address.h" diff --git a/src/core/ext/filters/client_channel/uri_parser.h b/src/core/ext/filters/client_channel/uri_parser.h index 1966da932b..d749f23308 100644 --- a/src/core/ext/filters/client_channel/uri_parser.h +++ b/src/core/ext/filters/client_channel/uri_parser.h @@ -22,7 +22,6 @@ #include <grpc/support/port_platform.h> #include <stddef.h> -#include "src/core/lib/iomgr/exec_ctx.h" typedef struct { char* scheme; diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc index dda3b61108..27d3eac8d6 100644 --- a/src/core/ext/filters/deadline/deadline_filter.cc +++ b/src/core/ext/filters/deadline/deadline_filter.cc @@ -27,7 +27,6 @@ #include <grpc/support/time.h> #include "src/core/lib/channel/channel_stack_builder.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/channel_init.h" diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index 58aefd17c7..ae94ce47b9 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -20,9 +20,11 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include <stdint.h> #include <string.h> #include "src/core/ext/filters/http/client/http_client_filter.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/b64.h" #include "src/core/lib/slice/percent_encoding.h" @@ -58,8 +60,9 @@ struct call_data { // State for handling send_message ops. grpc_transport_stream_op_batch* send_message_batch; size_t send_message_bytes_read; - grpc_byte_stream_cache send_message_cache; - grpc_caching_byte_stream send_message_caching_stream; + grpc_core::ManualConstructor<grpc_core::ByteStreamCache> send_message_cache; + grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream> + send_message_caching_stream; grpc_closure on_send_message_next_done; grpc_closure* original_send_message_on_complete; grpc_closure send_message_on_complete; @@ -166,7 +169,7 @@ static void recv_trailing_metadata_on_complete(void* user_data, static void send_message_on_complete(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast<grpc_call_element*>(arg); call_data* calld = static_cast<call_data*>(elem->call_data); - grpc_byte_stream_cache_destroy(&calld->send_message_cache); + calld->send_message_cache.Destroy(); GRPC_CLOSURE_RUN(calld->original_send_message_on_complete, GRPC_ERROR_REF(error)); } @@ -175,8 +178,7 @@ static void send_message_on_complete(void* arg, grpc_error* error) { // calld->send_message_bytes_read. static grpc_error* pull_slice_from_send_message(call_data* calld) { grpc_slice incoming_slice; - grpc_error* error = grpc_byte_stream_pull( - &calld->send_message_caching_stream.base, &incoming_slice); + grpc_error* error = calld->send_message_caching_stream->Pull(&incoming_slice); if (error == GRPC_ERROR_NONE) { calld->send_message_bytes_read += GRPC_SLICE_LENGTH(incoming_slice); grpc_slice_unref_internal(incoming_slice); @@ -186,24 +188,23 @@ static grpc_error* pull_slice_from_send_message(call_data* calld) { // Reads as many slices as possible from the send_message byte stream. // Upon successful return, if calld->send_message_bytes_read == -// calld->send_message_caching_stream.base.length, then we have completed +// calld->send_message_caching_stream->length(), then we have completed // reading from the byte stream; otherwise, an async read has been dispatched // and on_send_message_next_done() will be invoked when it is complete. static grpc_error* read_all_available_send_message_data(call_data* calld) { - while (grpc_byte_stream_next(&calld->send_message_caching_stream.base, - ~static_cast<size_t>(0), - &calld->on_send_message_next_done)) { + while (calld->send_message_caching_stream->Next( + SIZE_MAX, &calld->on_send_message_next_done)) { grpc_error* error = pull_slice_from_send_message(calld); if (error != GRPC_ERROR_NONE) return error; if (calld->send_message_bytes_read == - calld->send_message_caching_stream.base.length) { + calld->send_message_caching_stream->length()) { break; } } return GRPC_ERROR_NONE; } -// Async callback for grpc_byte_stream_next(). +// Async callback for ByteStream::Next(). static void on_send_message_next_done(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast<grpc_call_element*>(arg); call_data* calld = static_cast<call_data*>(elem->call_data); @@ -222,7 +223,7 @@ static void on_send_message_next_done(void* arg, grpc_error* error) { // here, then we know that all of the data was not available // synchronously, so we were not able to do a cached call. Instead, // we just reset the byte stream and then send down the batch as-is. - grpc_caching_byte_stream_reset(&calld->send_message_caching_stream); + calld->send_message_caching_stream->Reset(); grpc_call_next_op(elem, calld->send_message_batch); } @@ -253,7 +254,7 @@ static grpc_error* update_path_for_get(grpc_call_element* elem, size_t estimated_len = GRPC_SLICE_LENGTH(path_slice); estimated_len++; /* for the '?' */ estimated_len += grpc_base64_estimate_encoded_size( - batch->payload->send_message.send_message->length, true /* url_safe */, + batch->payload->send_message.send_message->length(), true /* url_safe */, false /* multi_line */); grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC(estimated_len); /* memcopy individual pieces into this slice */ @@ -265,9 +266,9 @@ static grpc_error* update_path_for_get(grpc_call_element* elem, write_ptr += GRPC_SLICE_LENGTH(path_slice); *write_ptr++ = '?'; char* payload_bytes = - slice_buffer_to_string(&calld->send_message_cache.cache_buffer); + slice_buffer_to_string(calld->send_message_cache->cache_buffer()); grpc_base64_encode_core(write_ptr, payload_bytes, - batch->payload->send_message.send_message->length, + batch->payload->send_message.send_message->length(), true /* url_safe */, false /* multi_line */); gpr_free(payload_bytes); /* remove trailing unused memory and add trailing 0 to terminate string */ @@ -326,15 +327,14 @@ static void hc_start_transport_stream_op_batch( if (batch->send_message && (batch->payload->send_initial_metadata.send_initial_metadata_flags & GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) && - batch->payload->send_message.send_message->length < + batch->payload->send_message.send_message->length() < channeld->max_payload_size_for_get) { calld->send_message_bytes_read = 0; - grpc_byte_stream_cache_init(&calld->send_message_cache, - batch->payload->send_message.send_message); - grpc_caching_byte_stream_init(&calld->send_message_caching_stream, - &calld->send_message_cache); - batch->payload->send_message.send_message = - &calld->send_message_caching_stream.base; + calld->send_message_cache.Init( + std::move(batch->payload->send_message.send_message)); + calld->send_message_caching_stream.Init(calld->send_message_cache.get()); + batch->payload->send_message.send_message.reset( + calld->send_message_caching_stream.get()); calld->original_send_message_on_complete = batch->on_complete; batch->on_complete = &calld->send_message_on_complete; calld->send_message_batch = batch; @@ -342,12 +342,12 @@ static void hc_start_transport_stream_op_batch( if (error != GRPC_ERROR_NONE) goto done; // If all the data has been read, then we can use GET. if (calld->send_message_bytes_read == - calld->send_message_caching_stream.base.length) { + calld->send_message_caching_stream->length()) { method = GRPC_MDELEM_METHOD_GET; error = update_path_for_get(elem, batch); if (error != GRPC_ERROR_NONE) goto done; batch->send_message = false; - grpc_byte_stream_destroy(&calld->send_message_caching_stream.base); + calld->send_message_caching_stream->Orphan(); } else { // Not all data is available. The batch will be sent down // asynchronously in on_send_message_next_done(). diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc index efe0085c5b..e7d9949386 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc @@ -32,6 +32,7 @@ #include "src/core/lib/compression/compression_internal.h" #include "src/core/lib/compression/message_compress.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -62,7 +63,8 @@ struct call_data { grpc_closure start_send_message_batch_in_call_combiner; grpc_transport_stream_op_batch* send_message_batch; grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */ - grpc_slice_buffer_stream replacement_stream; + grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> + replacement_stream; grpc_closure* original_send_message_on_complete; grpc_closure send_message_on_complete; grpc_closure on_send_message_next_done; @@ -220,7 +222,7 @@ static void finish_send_message(grpc_call_element* elem) { grpc_slice_buffer tmp; grpc_slice_buffer_init(&tmp); uint32_t send_flags = - calld->send_message_batch->payload->send_message.send_message->flags; + calld->send_message_batch->payload->send_message.send_message->flags(); bool did_compress = grpc_msg_compress(calld->message_compression_algorithm, &calld->slices, &tmp); if (did_compress) { @@ -253,12 +255,9 @@ static void finish_send_message(grpc_call_element* elem) { grpc_slice_buffer_destroy_internal(&tmp); // Swap out the original byte stream with our new one and send the // batch down. - grpc_byte_stream_destroy( - calld->send_message_batch->payload->send_message.send_message); - grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, - send_flags); - calld->send_message_batch->payload->send_message.send_message = - &calld->replacement_stream.base; + calld->replacement_stream.Init(&calld->slices, send_flags); + calld->send_message_batch->payload->send_message.send_message.reset( + calld->replacement_stream.get()); calld->original_send_message_on_complete = calld->send_message_batch->on_complete; calld->send_message_batch->on_complete = &calld->send_message_on_complete; @@ -278,9 +277,9 @@ static void fail_send_message_batch_in_call_combiner(void* arg, // Pulls a slice from the send_message byte stream and adds it to calld->slices. static grpc_error* pull_slice_from_send_message(call_data* calld) { grpc_slice incoming_slice; - grpc_error* error = grpc_byte_stream_pull( - calld->send_message_batch->payload->send_message.send_message, - &incoming_slice); + grpc_error* error = + calld->send_message_batch->payload->send_message.send_message->Pull( + &incoming_slice); if (error == GRPC_ERROR_NONE) { grpc_slice_buffer_add(&calld->slices, incoming_slice); } @@ -289,12 +288,11 @@ static grpc_error* pull_slice_from_send_message(call_data* calld) { // Reads as many slices as possible from the send_message byte stream. // If all data has been read, invokes finish_send_message(). Otherwise, -// an async call to grpc_byte_stream_next() has been started, which will +// an async call to ByteStream::Next() has been started, which will // eventually result in calling on_send_message_next_done(). static void continue_reading_send_message(grpc_call_element* elem) { call_data* calld = static_cast<call_data*>(elem->call_data); - while (grpc_byte_stream_next( - calld->send_message_batch->payload->send_message.send_message, + while (calld->send_message_batch->payload->send_message.send_message->Next( ~static_cast<size_t>(0), &calld->on_send_message_next_done)) { grpc_error* error = pull_slice_from_send_message(calld); if (error != GRPC_ERROR_NONE) { @@ -303,15 +301,15 @@ static void continue_reading_send_message(grpc_call_element* elem) { GRPC_ERROR_UNREF(error); return; } - if (calld->slices.length == - calld->send_message_batch->payload->send_message.send_message->length) { + if (calld->slices.length == calld->send_message_batch->payload->send_message + .send_message->length()) { finish_send_message(elem); break; } } } -// Async callback for grpc_byte_stream_next(). +// Async callback for ByteStream::Next(). static void on_send_message_next_done(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast<grpc_call_element*>(arg); call_data* calld = static_cast<call_data*>(elem->call_data); @@ -328,7 +326,7 @@ static void on_send_message_next_done(void* arg, grpc_error* error) { return; } if (calld->slices.length == - calld->send_message_batch->payload->send_message.send_message->length) { + calld->send_message_batch->payload->send_message.send_message->length()) { finish_send_message(elem); } else { continue_reading_send_message(elem); @@ -340,7 +338,8 @@ static void start_send_message_batch(void* arg, grpc_error* unused) { call_data* calld = static_cast<call_data*>(elem->call_data); if (skip_compression( elem, - calld->send_message_batch->payload->send_message.send_message->flags, + calld->send_message_batch->payload->send_message.send_message + ->flags(), calld->send_initial_metadata_state == HAS_COMPRESSION_ALGORITHM)) { send_message_batch_continue(elem); } else { @@ -365,9 +364,7 @@ static void compress_start_transport_stream_op_batch( grpc_schedule_on_exec_ctx), GRPC_ERROR_REF(calld->cancel_error), "failing send_message op"); } else { - grpc_byte_stream_shutdown( - - calld->send_message_batch->payload->send_message.send_message, + calld->send_message_batch->payload->send_message.send_message->Shutdown( GRPC_ERROR_REF(calld->cancel_error)); } } diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index 57ec8dce34..c202015875 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -23,6 +23,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <string.h> +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/b64.h" #include "src/core/lib/slice/percent_encoding.h" @@ -53,8 +54,8 @@ struct call_data { */ grpc_closure* recv_message_ready; grpc_closure* on_complete; - grpc_byte_stream** pp_recv_message; - grpc_slice_buffer_stream read_stream; + grpc_core::OrphanablePtr<grpc_core::ByteStream>* pp_recv_message; + grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> read_stream; /** Receive closures are chained: we inject this closure as the on_done_recv up-call on transport_op, and remember to call our on_done_recv member @@ -232,7 +233,7 @@ static grpc_error* server_filter_incoming_metadata(grpc_call_element* elem, grpc_base64_decode_with_len( reinterpret_cast<const char*> GRPC_SLICE_START_PTR(query_slice), GRPC_SLICE_LENGTH(query_slice), k_url_safe)); - grpc_slice_buffer_stream_init(&calld->read_stream, &read_slice_buffer, 0); + calld->read_stream.Init(&read_slice_buffer, 0); grpc_slice_buffer_destroy_internal(&read_slice_buffer); calld->seen_path_with_query = true; grpc_slice_unref_internal(query_slice); @@ -281,10 +282,10 @@ static void hs_on_complete(void* user_data, grpc_error* err) { call_data* calld = static_cast<call_data*>(elem->call_data); /* Call recv_message_ready if we got the payload via the path field */ if (calld->seen_path_with_query && calld->recv_message_ready != nullptr) { - *calld->pp_recv_message = - calld->payload_bin_delivered - ? nullptr - : reinterpret_cast<grpc_byte_stream*>(&calld->read_stream); + calld->pp_recv_message->reset( + calld->payload_bin_delivered ? nullptr + : reinterpret_cast<grpc_core::ByteStream*>( + calld->read_stream.get())); // Re-enter call combiner for recv_message_ready, since the surface // code will release the call combiner for each callback it receives. GRPC_CALL_COMBINER_START(calld->call_combiner, calld->recv_message_ready, @@ -405,7 +406,7 @@ static void destroy_call_elem(grpc_call_element* elem, grpc_closure* ignored) { call_data* calld = static_cast<call_data*>(elem->call_data); if (calld->seen_path_with_query && !calld->payload_bin_delivered) { - grpc_byte_stream_destroy(&calld->read_stream.base); + calld->read_stream->Orphan(); } } diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index b1b14dde02..c7fc3f2e62 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -100,7 +100,7 @@ struct call_data { // call our next_recv_message_ready member after handling it. grpc_closure recv_message_ready; // Used by recv_message_ready. - grpc_byte_stream** recv_message; + grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message; // Original recv_message_ready callback, invoked after our own. grpc_closure* next_recv_message_ready; }; @@ -121,12 +121,12 @@ static void recv_message_ready(void* user_data, grpc_error* error) { grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); call_data* calld = static_cast<call_data*>(elem->call_data); if (*calld->recv_message != nullptr && calld->limits.max_recv_size >= 0 && - (*calld->recv_message)->length > + (*calld->recv_message)->length() > static_cast<size_t>(calld->limits.max_recv_size)) { char* message_string; gpr_asprintf(&message_string, "Received message larger than max (%u vs. %d)", - (*calld->recv_message)->length, calld->limits.max_recv_size); + (*calld->recv_message)->length(), calld->limits.max_recv_size); grpc_error* new_error = grpc_error_set_int( GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED); @@ -150,11 +150,11 @@ static void start_transport_stream_op_batch( call_data* calld = static_cast<call_data*>(elem->call_data); // Check max send message size. if (op->send_message && calld->limits.max_send_size >= 0 && - op->payload->send_message.send_message->length > + op->payload->send_message.send_message->length() > static_cast<size_t>(calld->limits.max_send_size)) { char* message_string; gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)", - op->payload->send_message.send_message->length, + op->payload->send_message.send_message->length(), calld->limits.max_send_size); grpc_transport_stream_op_batch_finish_with_failure( op, diff --git a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc index bed1004c57..c7070d4d9b 100644 --- a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc +++ b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc @@ -93,7 +93,9 @@ static void start_transport_stream_op_batch( /* Send message happens after client's user-agent (initial metadata) is * received, so workaround_active must be set already */ if (calld->workaround_active) { - op->payload->send_message.send_message->flags |= GRPC_WRITE_NO_COMPRESS; + op->payload->send_message.send_message->set_flags( + op->payload->send_message.send_message->flags() | + GRPC_WRITE_NO_COMPRESS); } } diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc index 479f0da572..b95c9dae53 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc @@ -29,7 +29,6 @@ #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/endpoint.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/tcp_client_posix.h" #include "src/core/lib/iomgr/tcp_posix.h" #include "src/core/lib/surface/api_trace.h" diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.h b/src/core/ext/transport/chttp2/server/chttp2_server.h index 7b41972160..6e51001b53 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.h +++ b/src/core/ext/transport/chttp2/server/chttp2_server.h @@ -23,7 +23,7 @@ #include <grpc/impl/codegen/grpc_types.h> -#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/error.h" /// Adds a port to \a server. Sets \a port_num to the port number. /// Takes ownership of \a args. diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc index 822236dd2d..99f18cdf39 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc @@ -41,6 +41,5 @@ int grpc_server_add_insecure_http2_port(grpc_server* server, const char* addr) { GRPC_ERROR_UNREF(err); } - return port_num; } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index df3fb8c68c..a4d616d778 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -39,6 +39,7 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/timer.h" @@ -117,12 +118,6 @@ static void connectivity_state_set(grpc_chttp2_transport* t, grpc_connectivity_state state, grpc_error* error, const char* reason); -static void incoming_byte_stream_destroy_locked(void* byte_stream, - grpc_error* error_ignored); -static void incoming_byte_stream_publish_error( - grpc_chttp2_incoming_byte_stream* bs, grpc_error* error); -static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream* bs); - static void benign_reclaimer_locked(void* t, grpc_error* error); static void destructive_reclaimer_locked(void* t, grpc_error* error); @@ -662,8 +657,8 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs, s->t = t; s->refcount = refcount; /* We reserve one 'active stream' that's dropped when the stream is - read-closed. The others are for incoming_byte_streams that are actively - reading */ + read-closed. The others are for Chttp2IncomingByteStreams that are + actively reading */ GRPC_CHTTP2_STREAM_REF(s, "chttp2"); grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena); @@ -1256,8 +1251,7 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t, abort(); /* TODO(ctiller): what cleanup here? */ return; /* early out */ } - if (s->fetched_send_message_length == s->fetching_send_message->length) { - grpc_byte_stream_destroy(s->fetching_send_message); + if (s->fetched_send_message_length == s->fetching_send_message->length()) { int64_t notify_offset = s->next_message_end_offset; if (notify_offset <= s->flow_controlled_bytes_written) { grpc_chttp2_complete_closure_step( @@ -1274,20 +1268,19 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t, cb->closure = s->fetching_send_message_finished; s->fetching_send_message_finished = nullptr; grpc_chttp2_write_cb** list = - s->fetching_send_message->flags & GRPC_WRITE_THROUGH + s->fetching_send_message->flags() & GRPC_WRITE_THROUGH ? &s->on_write_finished_cbs : &s->on_flow_controlled_cbs; cb->next = *list; *list = cb; } - s->fetching_send_message = nullptr; + s->fetching_send_message.reset(); return; /* early out */ - } else if (grpc_byte_stream_next(s->fetching_send_message, UINT32_MAX, - &s->complete_fetch_locked)) { - grpc_error* error = - grpc_byte_stream_pull(s->fetching_send_message, &s->fetching_slice); + } else if (s->fetching_send_message->Next(UINT32_MAX, + &s->complete_fetch_locked)) { + grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice); if (error != GRPC_ERROR_NONE) { - grpc_byte_stream_destroy(s->fetching_send_message); + s->fetching_send_message.reset(); grpc_chttp2_cancel_stream(t, s, error); } else { add_fetched_slice_locked(t, s); @@ -1300,14 +1293,14 @@ static void complete_fetch_locked(void* gs, grpc_error* error) { grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs); grpc_chttp2_transport* t = s->t; if (error == GRPC_ERROR_NONE) { - error = grpc_byte_stream_pull(s->fetching_send_message, &s->fetching_slice); + error = s->fetching_send_message->Pull(&s->fetching_slice); if (error == GRPC_ERROR_NONE) { add_fetched_slice_locked(t, s); continue_fetching_send_locked(t, s); } } if (error != GRPC_ERROR_NONE) { - grpc_byte_stream_destroy(s->fetching_send_message); + s->fetching_send_message.reset(); grpc_chttp2_cancel_stream(t, s, error); } } @@ -1439,7 +1432,7 @@ static void perform_stream_op_locked(void* stream_op, GPR_ASSERT(s->id != 0); grpc_chttp2_mark_stream_writable(t, s); if (!(op->send_message && - (op->payload->send_message.send_message->flags & + (op->payload->send_message.send_message->flags() & GRPC_WRITE_BUFFER_HINT))) { grpc_chttp2_initiate_write( t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA); @@ -1466,7 +1459,7 @@ static void perform_stream_op_locked(void* stream_op, if (op->send_message) { GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE(); GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE( - op->payload->send_message.send_message->length); + op->payload->send_message.send_message->length()); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; s->fetching_send_message_finished = add_closure_barrier(op->on_complete); if (s->write_closed) { @@ -1475,7 +1468,7 @@ static void perform_stream_op_locked(void* stream_op, // streaming call might send another message before getting a // recv_message failure, breaking out of its loop, and then // starting recv_trailing_metadata. - grpc_byte_stream_destroy(op->payload->send_message.send_message); + op->payload->send_message.send_message.reset(); grpc_chttp2_complete_closure_step( t, s, &s->fetching_send_message_finished, t->is_client && s->received_trailing_metadata @@ -1488,14 +1481,15 @@ static void perform_stream_op_locked(void* stream_op, GPR_ASSERT(s->fetching_send_message == nullptr); uint8_t* frame_hdr = grpc_slice_buffer_tiny_add( &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES); - uint32_t flags = op_payload->send_message.send_message->flags; + uint32_t flags = op_payload->send_message.send_message->flags(); frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0; - size_t len = op_payload->send_message.send_message->length; + size_t len = op_payload->send_message.send_message->length(); frame_hdr[1] = static_cast<uint8_t>(len >> 24); frame_hdr[2] = static_cast<uint8_t>(len >> 16); frame_hdr[3] = static_cast<uint8_t>(len >> 8); frame_hdr[4] = static_cast<uint8_t>(len); - s->fetching_send_message = op_payload->send_message.send_message; + s->fetching_send_message = + std::move(op_payload->send_message.send_message); s->fetched_send_message_length = 0; s->next_message_end_offset = s->flow_controlled_bytes_written + @@ -1711,7 +1705,6 @@ static void send_goaway(grpc_chttp2_transport* t, grpc_error* error) { } void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) { - t->ping_recv_state.ping_strikes++; if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes && t->ping_policy.max_ping_strikes != 0) { send_goaway(t, @@ -1948,12 +1941,12 @@ static void remove_stream(grpc_chttp2_transport* t, uint32_t id, } if (s->pending_byte_stream) { if (s->on_next != nullptr) { - grpc_chttp2_incoming_byte_stream* bs = s->data_parser.parsing_frame; + grpc_core::Chttp2IncomingByteStream* bs = s->data_parser.parsing_frame; if (error == GRPC_ERROR_NONE) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); } - incoming_byte_stream_publish_error(bs, error); - incoming_byte_stream_unref(bs); + bs->PublishError(error); + bs->Unref(); s->data_parser.parsing_frame = nullptr; } else { GRPC_ERROR_UNREF(s->byte_stream_error); @@ -2097,10 +2090,7 @@ void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t, GRPC_ERROR_REF(error), "send_trailing_metadata_finished"); - if (s->fetching_send_message != nullptr) { - grpc_byte_stream_destroy(s->fetching_send_message); - s->fetching_send_message = nullptr; - } + s->fetching_send_message.reset(); grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error), "fetching_send_message_finished"); @@ -2716,7 +2706,6 @@ static void set_pollset_set(grpc_transport* gt, grpc_stream* gs, static void reset_byte_stream(void* arg, grpc_error* error) { grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg); - s->pending_byte_stream = false; if (error == GRPC_ERROR_NONE) { grpc_chttp2_maybe_complete_recv_message(s->t, s); @@ -2732,22 +2721,56 @@ static void reset_byte_stream(void* arg, grpc_error* error) { } } -static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream* bs) { - if (gpr_unref(&bs->refs)) { - gpr_free(bs); +namespace grpc_core { + +Chttp2IncomingByteStream::Chttp2IncomingByteStream( + grpc_chttp2_transport* transport, grpc_chttp2_stream* stream, + uint32_t frame_size, uint32_t flags) + : ByteStream(frame_size, flags), + transport_(transport), + stream_(stream), + remaining_bytes_(frame_size) { + gpr_ref_init(&refs_, 2); + GRPC_ERROR_UNREF(stream->byte_stream_error); + stream->byte_stream_error = GRPC_ERROR_NONE; +} + +void Chttp2IncomingByteStream::OrphanLocked(void* arg, + grpc_error* error_ignored) { + Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg); + grpc_chttp2_stream* s = bs->stream_; + grpc_chttp2_transport* t = s->t; + bs->Unref(); + s->pending_byte_stream = false; + grpc_chttp2_maybe_complete_recv_message(t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); +} + +void Chttp2IncomingByteStream::Orphan() { + GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_INIT(&destroy_action_, + &Chttp2IncomingByteStream::OrphanLocked, this, + grpc_combiner_scheduler(transport_->combiner)), + GRPC_ERROR_NONE); +} + +void Chttp2IncomingByteStream::Unref() { + if (gpr_unref(&refs_)) { + Delete(this); } } -static void incoming_byte_stream_next_locked(void* argp, - grpc_error* error_ignored) { - grpc_chttp2_incoming_byte_stream* bs = - static_cast<grpc_chttp2_incoming_byte_stream*>(argp); - grpc_chttp2_transport* t = bs->transport; - grpc_chttp2_stream* s = bs->stream; +void Chttp2IncomingByteStream::Ref() { gpr_ref(&refs_); } +void Chttp2IncomingByteStream::NextLocked(void* arg, + grpc_error* error_ignored) { + Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg); + grpc_chttp2_transport* t = bs->transport_; + grpc_chttp2_stream* s = bs->stream_; size_t cur_length = s->frame_storage.length; if (!s->read_closed) { - s->flow_control->IncomingByteStreamUpdate(bs->next_action.max_size_hint, + s->flow_control->IncomingByteStreamUpdate(bs->next_action_.max_size_hint, cur_length); grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s); } @@ -2756,22 +2779,22 @@ static void incoming_byte_stream_next_locked(void* argp, grpc_slice_buffer_swap(&s->frame_storage, &s->unprocessed_incoming_frames_buffer); s->unprocessed_incoming_frames_decompressed = false; - GRPC_CLOSURE_SCHED(bs->next_action.on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, GRPC_ERROR_NONE); } else if (s->byte_stream_error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(bs->next_action.on_complete, + GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, GRPC_ERROR_REF(s->byte_stream_error)); if (s->data_parser.parsing_frame != nullptr) { - incoming_byte_stream_unref(s->data_parser.parsing_frame); + s->data_parser.parsing_frame->Unref(); s->data_parser.parsing_frame = nullptr; } } else if (s->read_closed) { - if (bs->remaining_bytes != 0) { + if (bs->remaining_bytes_ != 0) { s->byte_stream_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); - GRPC_CLOSURE_SCHED(bs->next_action.on_complete, + GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, GRPC_ERROR_REF(s->byte_stream_error)); if (s->data_parser.parsing_frame != nullptr) { - incoming_byte_stream_unref(s->data_parser.parsing_frame); + s->data_parser.parsing_frame->Unref(); s->data_parser.parsing_frame = nullptr; } } else { @@ -2779,122 +2802,94 @@ static void incoming_byte_stream_next_locked(void* argp, GPR_ASSERT(false); } } else { - s->on_next = bs->next_action.on_complete; + s->on_next = bs->next_action_.on_complete; } - incoming_byte_stream_unref(bs); + bs->Unref(); } -static bool incoming_byte_stream_next(grpc_byte_stream* byte_stream, - size_t max_size_hint, - grpc_closure* on_complete) { +bool Chttp2IncomingByteStream::Next(size_t max_size_hint, + grpc_closure* on_complete) { GPR_TIMER_SCOPE("incoming_byte_stream_next", 0); - grpc_chttp2_incoming_byte_stream* bs = - reinterpret_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream); - grpc_chttp2_stream* s = bs->stream; - if (s->unprocessed_incoming_frames_buffer.length > 0) { + if (stream_->unprocessed_incoming_frames_buffer.length > 0) { return true; } else { - gpr_ref(&bs->refs); - bs->next_action.max_size_hint = max_size_hint; - bs->next_action.on_complete = on_complete; + Ref(); + next_action_.max_size_hint = max_size_hint; + next_action_.on_complete = on_complete; GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&bs->next_action.closure, - incoming_byte_stream_next_locked, bs, - grpc_combiner_scheduler(bs->transport->combiner)), + GRPC_CLOSURE_INIT(&next_action_.closure, + &Chttp2IncomingByteStream::NextLocked, this, + grpc_combiner_scheduler(transport_->combiner)), GRPC_ERROR_NONE); return false; } } -static grpc_error* incoming_byte_stream_pull(grpc_byte_stream* byte_stream, - grpc_slice* slice) { +grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) { GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0); - grpc_chttp2_incoming_byte_stream* bs = - reinterpret_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream); - grpc_chttp2_stream* s = bs->stream; grpc_error* error; - - if (s->unprocessed_incoming_frames_buffer.length > 0) { - if (!s->unprocessed_incoming_frames_decompressed) { + if (stream_->unprocessed_incoming_frames_buffer.length > 0) { + if (!stream_->unprocessed_incoming_frames_decompressed) { bool end_of_context; - if (!s->stream_decompression_ctx) { - s->stream_decompression_ctx = grpc_stream_compression_context_create( - s->stream_decompression_method); + if (!stream_->stream_decompression_ctx) { + stream_->stream_decompression_ctx = + grpc_stream_compression_context_create( + stream_->stream_decompression_method); } - if (!grpc_stream_decompress(s->stream_decompression_ctx, - &s->unprocessed_incoming_frames_buffer, - &s->decompressed_data_buffer, nullptr, + if (!grpc_stream_decompress(stream_->stream_decompression_ctx, + &stream_->unprocessed_incoming_frames_buffer, + &stream_->decompressed_data_buffer, nullptr, MAX_SIZE_T, &end_of_context)) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error."); return error; } - GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); - grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer, - &s->decompressed_data_buffer); - s->unprocessed_incoming_frames_decompressed = true; + GPR_ASSERT(stream_->unprocessed_incoming_frames_buffer.length == 0); + grpc_slice_buffer_swap(&stream_->unprocessed_incoming_frames_buffer, + &stream_->decompressed_data_buffer); + stream_->unprocessed_incoming_frames_decompressed = true; if (end_of_context) { - grpc_stream_compression_context_destroy(s->stream_decompression_ctx); - s->stream_decompression_ctx = nullptr; + grpc_stream_compression_context_destroy( + stream_->stream_decompression_ctx); + stream_->stream_decompression_ctx = nullptr; } - if (s->unprocessed_incoming_frames_buffer.length == 0) { + if (stream_->unprocessed_incoming_frames_buffer.length == 0) { *slice = grpc_empty_slice(); } } error = grpc_deframe_unprocessed_incoming_frames( - &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, slice, - nullptr); + &stream_->data_parser, stream_, + &stream_->unprocessed_incoming_frames_buffer, slice, nullptr); if (error != GRPC_ERROR_NONE) { return error; } } else { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); - GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error)); return error; } return GRPC_ERROR_NONE; } -static void incoming_byte_stream_destroy_locked(void* byte_stream, - grpc_error* error_ignored); - -static void incoming_byte_stream_destroy(grpc_byte_stream* byte_stream) { - GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); - grpc_chttp2_incoming_byte_stream* bs = - reinterpret_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream); - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&bs->destroy_action, - incoming_byte_stream_destroy_locked, bs, - grpc_combiner_scheduler(bs->transport->combiner)), - GRPC_ERROR_NONE); -} - -static void incoming_byte_stream_publish_error( - grpc_chttp2_incoming_byte_stream* bs, grpc_error* error) { - grpc_chttp2_stream* s = bs->stream; - +void Chttp2IncomingByteStream::PublishError(grpc_error* error) { GPR_ASSERT(error != GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_REF(error)); - s->on_next = nullptr; - GRPC_ERROR_UNREF(s->byte_stream_error); - s->byte_stream_error = GRPC_ERROR_REF(error); - grpc_chttp2_cancel_stream(bs->transport, bs->stream, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(stream_->on_next, GRPC_ERROR_REF(error)); + stream_->on_next = nullptr; + GRPC_ERROR_UNREF(stream_->byte_stream_error); + stream_->byte_stream_error = GRPC_ERROR_REF(error); + grpc_chttp2_cancel_stream(transport_, stream_, GRPC_ERROR_REF(error)); } -grpc_error* grpc_chttp2_incoming_byte_stream_push( - grpc_chttp2_incoming_byte_stream* bs, grpc_slice slice, - grpc_slice* slice_out) { - grpc_chttp2_stream* s = bs->stream; - - if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) { +grpc_error* Chttp2IncomingByteStream::Push(grpc_slice slice, + grpc_slice* slice_out) { + if (remaining_bytes_ < GRPC_SLICE_LENGTH(slice)) { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream"); - - GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error)); grpc_slice_unref_internal(slice); return error; } else { - bs->remaining_bytes -= static_cast<uint32_t> GRPC_SLICE_LENGTH(slice); + remaining_bytes_ -= static_cast<uint32_t> GRPC_SLICE_LENGTH(slice); if (slice_out != nullptr) { *slice_out = slice; } @@ -2902,66 +2897,25 @@ grpc_error* grpc_chttp2_incoming_byte_stream_push( } } -grpc_error* grpc_chttp2_incoming_byte_stream_finished( - grpc_chttp2_incoming_byte_stream* bs, grpc_error* error, - bool reset_on_error) { - grpc_chttp2_stream* s = bs->stream; - +grpc_error* Chttp2IncomingByteStream::Finished(grpc_error* error, + bool reset_on_error) { if (error == GRPC_ERROR_NONE) { - if (bs->remaining_bytes != 0) { + if (remaining_bytes_ != 0) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); } } if (error != GRPC_ERROR_NONE && reset_on_error) { - GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error)); } - incoming_byte_stream_unref(bs); + Unref(); return error; } -static void incoming_byte_stream_shutdown(grpc_byte_stream* byte_stream, - grpc_error* error) { - grpc_chttp2_incoming_byte_stream* bs = - reinterpret_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream); - GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished( - bs, error, true /* reset_on_error */)); +void Chttp2IncomingByteStream::Shutdown(grpc_error* error) { + GRPC_ERROR_UNREF(Finished(error, true /* reset_on_error */)); } -static const grpc_byte_stream_vtable grpc_chttp2_incoming_byte_stream_vtable = { - incoming_byte_stream_next, incoming_byte_stream_pull, - incoming_byte_stream_shutdown, incoming_byte_stream_destroy}; - -static void incoming_byte_stream_destroy_locked(void* byte_stream, - grpc_error* error_ignored) { - grpc_chttp2_incoming_byte_stream* bs = - static_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream); - grpc_chttp2_stream* s = bs->stream; - grpc_chttp2_transport* t = s->t; - - GPR_ASSERT(bs->base.vtable == &grpc_chttp2_incoming_byte_stream_vtable); - incoming_byte_stream_unref(bs); - s->pending_byte_stream = false; - grpc_chttp2_maybe_complete_recv_message(t, s); - grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); -} - -grpc_chttp2_incoming_byte_stream* grpc_chttp2_incoming_byte_stream_create( - grpc_chttp2_transport* t, grpc_chttp2_stream* s, uint32_t frame_size, - uint32_t flags) { - grpc_chttp2_incoming_byte_stream* incoming_byte_stream = - static_cast<grpc_chttp2_incoming_byte_stream*>( - gpr_malloc(sizeof(*incoming_byte_stream))); - incoming_byte_stream->base.length = frame_size; - incoming_byte_stream->remaining_bytes = frame_size; - incoming_byte_stream->base.flags = flags; - incoming_byte_stream->base.vtable = &grpc_chttp2_incoming_byte_stream_vtable; - gpr_ref_init(&incoming_byte_stream->refs, 2); - incoming_byte_stream->transport = t; - incoming_byte_stream->stream = s; - GRPC_ERROR_UNREF(s->byte_stream_error); - s->byte_stream_error = GRPC_ERROR_NONE; - return incoming_byte_stream; -} +} // namespace grpc_core /******************************************************************************* * RESOURCE QUOTAS diff --git a/src/core/ext/transport/chttp2/transport/frame_data.cc b/src/core/ext/transport/chttp2/transport/frame_data.cc index 0d37a494a2..f8f06f6789 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.cc +++ b/src/core/ext/transport/chttp2/transport/frame_data.cc @@ -27,6 +27,7 @@ #include <grpc/support/string_util.h> #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/transport/transport.h" @@ -39,8 +40,7 @@ grpc_error* grpc_chttp2_data_parser_init(grpc_chttp2_data_parser* parser) { void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser* parser) { if (parser->parsing_frame != nullptr) { - GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished( - parser->parsing_frame, + GRPC_ERROR_UNREF(parser->parsing_frame->Finished( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false)); } GRPC_ERROR_UNREF(parser->error); @@ -100,7 +100,7 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf, grpc_error* grpc_deframe_unprocessed_incoming_frames( grpc_chttp2_data_parser* p, grpc_chttp2_stream* s, grpc_slice_buffer* slices, grpc_slice* slice_out, - grpc_byte_stream** stream_out) { + grpc_core::OrphanablePtr<grpc_core::ByteStream>* stream_out) { grpc_error* error = GRPC_ERROR_NONE; grpc_chttp2_transport* t = s->t; @@ -197,12 +197,11 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames( if (p->is_frame_compressed) { message_flags |= GRPC_WRITE_INTERNAL_COMPRESS; } - p->parsing_frame = grpc_chttp2_incoming_byte_stream_create( + p->parsing_frame = grpc_core::New<grpc_core::Chttp2IncomingByteStream>( t, s, p->frame_size, message_flags); - *stream_out = &p->parsing_frame->base; - if (p->parsing_frame->remaining_bytes == 0) { - GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished( - p->parsing_frame, GRPC_ERROR_NONE, true)); + stream_out->reset(p->parsing_frame); + if (p->parsing_frame->remaining_bytes() == 0) { + GRPC_ERROR_UNREF(p->parsing_frame->Finished(GRPC_ERROR_NONE, true)); p->parsing_frame = nullptr; p->state = GRPC_CHTTP2_DATA_FH_0; } @@ -226,8 +225,7 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames( if (remaining == p->frame_size) { s->stats.incoming.data_bytes += remaining; if (GRPC_ERROR_NONE != - (error = grpc_chttp2_incoming_byte_stream_push( - p->parsing_frame, + (error = p->parsing_frame->Push( grpc_slice_sub(slice, static_cast<size_t>(cur - beg), static_cast<size_t>(end - beg)), slice_out))) { @@ -235,8 +233,7 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames( return error; } if (GRPC_ERROR_NONE != - (error = grpc_chttp2_incoming_byte_stream_finished( - p->parsing_frame, GRPC_ERROR_NONE, true))) { + (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) { grpc_slice_unref_internal(slice); return error; } @@ -247,8 +244,7 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames( } else if (remaining < p->frame_size) { s->stats.incoming.data_bytes += remaining; if (GRPC_ERROR_NONE != - (error = grpc_chttp2_incoming_byte_stream_push( - p->parsing_frame, + (error = p->parsing_frame->Push( grpc_slice_sub(slice, static_cast<size_t>(cur - beg), static_cast<size_t>(end - beg)), slice_out))) { @@ -261,18 +257,16 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames( GPR_ASSERT(remaining > p->frame_size); s->stats.incoming.data_bytes += p->frame_size; if (GRPC_ERROR_NONE != - (grpc_chttp2_incoming_byte_stream_push( - p->parsing_frame, + p->parsing_frame->Push( grpc_slice_sub( slice, static_cast<size_t>(cur - beg), static_cast<size_t>(cur + p->frame_size - beg)), - slice_out))) { + slice_out)) { grpc_slice_unref_internal(slice); return error; } if (GRPC_ERROR_NONE != - (error = grpc_chttp2_incoming_byte_stream_finished( - p->parsing_frame, GRPC_ERROR_NONE, true))) { + (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) { grpc_slice_unref_internal(slice); return error; } diff --git a/src/core/ext/transport/chttp2/transport/frame_data.h b/src/core/ext/transport/chttp2/transport/frame_data.h index 3efbbf9f76..e5d01f764e 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.h +++ b/src/core/ext/transport/chttp2/transport/frame_data.h @@ -26,7 +26,6 @@ #include <grpc/slice.h> #include <grpc/slice_buffer.h> #include "src/core/ext/transport/chttp2/transport/frame.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/transport/byte_stream.h" #include "src/core/lib/transport/transport.h" @@ -40,8 +39,9 @@ typedef enum { GRPC_CHTTP2_DATA_ERROR } grpc_chttp2_stream_state; -typedef struct grpc_chttp2_incoming_byte_stream - grpc_chttp2_incoming_byte_stream; +namespace grpc_core { +class Chttp2IncomingByteStream; +} // namespace grpc_core typedef struct { grpc_chttp2_stream_state state; @@ -50,7 +50,7 @@ typedef struct { grpc_error* error; bool is_frame_compressed; - grpc_chttp2_incoming_byte_stream* parsing_frame; + grpc_core::Chttp2IncomingByteStream* parsing_frame; } grpc_chttp2_data_parser; /* initialize per-stream state for data frame parsing */ @@ -79,6 +79,6 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf, grpc_error* grpc_deframe_unprocessed_incoming_frames( grpc_chttp2_data_parser* p, grpc_chttp2_stream* s, grpc_slice_buffer* slices, grpc_slice* slice_out, - grpc_byte_stream** stream_out); + grpc_core::OrphanablePtr<grpc_core::ByteStream>* stream_out); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_DATA_H */ diff --git a/src/core/ext/transport/chttp2/transport/frame_goaway.h b/src/core/ext/transport/chttp2/transport/frame_goaway.h index e17ed8d563..66c7a68bef 100644 --- a/src/core/ext/transport/chttp2/transport/frame_goaway.h +++ b/src/core/ext/transport/chttp2/transport/frame_goaway.h @@ -24,7 +24,6 @@ #include <grpc/slice.h> #include <grpc/slice_buffer.h> #include "src/core/ext/transport/chttp2/transport/frame.h" -#include "src/core/lib/iomgr/exec_ctx.h" typedef enum { GRPC_CHTTP2_GOAWAY_LSI0, diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.h b/src/core/ext/transport/chttp2/transport/frame_ping.h index 8718d6a097..55a4499ad5 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.h +++ b/src/core/ext/transport/chttp2/transport/frame_ping.h @@ -23,7 +23,6 @@ #include <grpc/slice.h> #include "src/core/ext/transport/chttp2/transport/frame.h" -#include "src/core/lib/iomgr/exec_ctx.h" typedef struct { uint8_t byte; diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.h b/src/core/ext/transport/chttp2/transport/frame_rst_stream.h index bb2d34f918..6bcf9c4479 100644 --- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.h +++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.h @@ -23,7 +23,6 @@ #include <grpc/slice.h> #include "src/core/ext/transport/chttp2/transport/frame.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/transport/transport.h" typedef struct { diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.h b/src/core/ext/transport/chttp2/transport/frame_settings.h index df19627194..8d8d9b1a91 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.h +++ b/src/core/ext/transport/chttp2/transport/frame_settings.h @@ -24,7 +24,6 @@ #include <grpc/slice.h> #include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/ext/transport/chttp2/transport/http2_settings.h" -#include "src/core/lib/iomgr/exec_ctx.h" typedef enum { GRPC_CHTTP2_SPS_ID0, diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.h b/src/core/ext/transport/chttp2/transport/frame_window_update.h index 30667c77e1..3d2391f637 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.h +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.h @@ -23,7 +23,6 @@ #include <grpc/slice.h> #include "src/core/ext/transport/chttp2/transport/frame.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/transport/transport.h" typedef struct { diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.h b/src/core/ext/transport/chttp2/transport/hpack_parser.h index b3b8018b98..3e05de4b92 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.h +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.h @@ -25,7 +25,6 @@ #include "src/core/ext/transport/chttp2/transport/frame.h" #include "src/core/ext/transport/chttp2/transport/hpack_table.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/transport/metadata.h" typedef struct grpc_chttp2_hpack_parser grpc_chttp2_hpack_parser; diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index b9431cd311..6d11e5aa31 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -203,18 +203,58 @@ typedef struct grpc_chttp2_write_cb { struct grpc_chttp2_write_cb* next; } grpc_chttp2_write_cb; -/* forward declared in frame_data.h */ -struct grpc_chttp2_incoming_byte_stream { - grpc_byte_stream base; - gpr_refcount refs; +namespace grpc_core { + +class Chttp2IncomingByteStream : public ByteStream { + public: + Chttp2IncomingByteStream(grpc_chttp2_transport* transport, + grpc_chttp2_stream* stream, uint32_t frame_size, + uint32_t flags); + + void Orphan() override; + + bool Next(size_t max_size_hint, grpc_closure* on_complete) override; + grpc_error* Pull(grpc_slice* slice) override; + void Shutdown(grpc_error* error) override; + + // TODO(roth): When I converted this class to C++, I wanted to make it + // inherit from RefCounted or InternallyRefCounted instead of continuing + // to use its own custom ref-counting code. However, that would require + // using multiple inheritence, which sucks in general. And to make matters + // worse, it causes problems with our New<> and Delete<> wrappers. + // Specifically, unless RefCounted is first in the list of parent classes, + // it will see a different value of the address of the object than the one + // we actually allocated, in which case gpr_free() will be called on a + // different address than the one we got from gpr_malloc(), thus causing a + // crash. Given the fragility of depending on that, as well as a desire to + // avoid multiple inheritence in general, I've decided to leave this + // alone for now. We can revisit this once we're able to link against + // libc++, at which point we can eliminate New<> and Delete<> and + // switch to std::shared_ptr<>. + void Ref(); + void Unref(); + + void PublishError(grpc_error* error); + + grpc_error* Push(grpc_slice slice, grpc_slice* slice_out); - grpc_chttp2_transport* transport; /* immutable */ - grpc_chttp2_stream* stream; /* immutable */ + grpc_error* Finished(grpc_error* error, bool reset_on_error); + + uint32_t remaining_bytes() const { return remaining_bytes_; } + + private: + static void NextLocked(void* arg, grpc_error* error_ignored); + static void OrphanLocked(void* arg, grpc_error* error_ignored); + + grpc_chttp2_transport* transport_; // Immutable. + grpc_chttp2_stream* stream_; // Immutable. + + gpr_refcount refs_; /* Accessed only by transport thread when stream->pending_byte_stream == false * Accessed only by application thread when stream->pending_byte_stream == * true */ - uint32_t remaining_bytes; + uint32_t remaining_bytes_; /* Accessed only by transport thread when stream->pending_byte_stream == false * Accessed only by application thread when stream->pending_byte_stream == @@ -223,11 +263,12 @@ struct grpc_chttp2_incoming_byte_stream { grpc_closure closure; size_t max_size_hint; grpc_closure* on_complete; - } next_action; - grpc_closure destroy_action; - grpc_closure finished_action; + } next_action_; + grpc_closure destroy_action_; }; +} // namespace grpc_core + typedef enum { GRPC_CHTTP2_KEEPALIVE_STATE_WAITING, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING, @@ -456,7 +497,7 @@ struct grpc_chttp2_stream { grpc_metadata_batch* send_trailing_metadata; grpc_closure* send_trailing_metadata_finished; - grpc_byte_stream* fetching_send_message; + grpc_core::OrphanablePtr<grpc_core::ByteStream> fetching_send_message; uint32_t fetched_send_message_length; grpc_slice fetching_slice; int64_t next_message_end_offset; @@ -468,7 +509,7 @@ struct grpc_chttp2_stream { grpc_metadata_batch* recv_initial_metadata; grpc_closure* recv_initial_metadata_ready; bool* trailing_metadata_available; - grpc_byte_stream** recv_message; + grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message; grpc_closure* recv_message_ready; grpc_metadata_batch* recv_trailing_metadata; grpc_closure* recv_trailing_metadata_finished; @@ -719,18 +760,6 @@ void grpc_chttp2_unref_transport(grpc_chttp2_transport* t); void grpc_chttp2_ref_transport(grpc_chttp2_transport* t); #endif -grpc_chttp2_incoming_byte_stream* grpc_chttp2_incoming_byte_stream_create( - grpc_chttp2_transport* t, grpc_chttp2_stream* s, uint32_t frame_size, - uint32_t flags); -grpc_error* grpc_chttp2_incoming_byte_stream_push( - grpc_chttp2_incoming_byte_stream* bs, grpc_slice slice, - grpc_slice* slice_out); -grpc_error* grpc_chttp2_incoming_byte_stream_finished( - grpc_chttp2_incoming_byte_stream* bs, grpc_error* error, - bool reset_on_error); -void grpc_chttp2_incoming_byte_stream_notify( - grpc_chttp2_incoming_byte_stream* bs, grpc_error* error); - void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id); /** Add a new ping strike to ping_recv_state.ping_strikes. If diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index ff1c1aad62..8e3ea05706 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -31,6 +31,7 @@ #include "src/core/ext/transport/cronet/transport/cronet_transport.h" #include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/slice/slice_internal.h" @@ -122,7 +123,7 @@ struct read_state { bool read_stream_closed; /* vars for holding data destined for the application */ - struct grpc_slice_buffer_stream sbs; + grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs; grpc_slice_buffer read_slice_buffer; /* vars for trailing metadata */ @@ -1041,16 +1042,14 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { grpc_slice_buffer write_slice_buffer; grpc_slice slice; grpc_slice_buffer_init(&write_slice_buffer); - if (1 != grpc_byte_stream_next( - stream_op->payload->send_message.send_message, - stream_op->payload->send_message.send_message->length, + if (1 != stream_op->payload->send_message.send_message->Next( + stream_op->payload->send_message.send_message->length(), nullptr)) { /* Should never reach here */ GPR_ASSERT(false); } if (GRPC_ERROR_NONE != - grpc_byte_stream_pull(stream_op->payload->send_message.send_message, - &slice)) { + stream_op->payload->send_message.send_message->Pull(&slice)) { /* Should never reach here */ GPR_ASSERT(false); } @@ -1062,9 +1061,10 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { } if (write_slice_buffer.count > 0) { size_t write_buffer_size; - create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer, - &write_buffer_size, - stream_op->payload->send_message.send_message->flags); + create_grpc_frame( + &write_slice_buffer, &stream_state->ws.write_buffer, + &write_buffer_size, + stream_op->payload->send_message.send_message->flags()); CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs, stream_state->ws.write_buffer); stream_state->state_callback_received[OP_SEND_MESSAGE] = false; @@ -1089,6 +1089,7 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { } stream_state->state_op_done[OP_SEND_MESSAGE] = true; oas->state.state_op_done[OP_SEND_MESSAGE] = true; + stream_op->payload->send_message.send_message.reset(); } else if (stream_op->send_trailing_metadata && op_can_be_run(stream_op, s, &oas->state, OP_SEND_TRAILING_METADATA)) { @@ -1195,14 +1196,13 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { grpc_slice_buffer_destroy_internal( &stream_state->rs.read_slice_buffer); grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); - grpc_slice_buffer_stream_init(&stream_state->rs.sbs, - &stream_state->rs.read_slice_buffer, 0); + uint32_t flags = 0; if (stream_state->rs.compressed) { - stream_state->rs.sbs.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS; + flags |= GRPC_WRITE_INTERNAL_COMPRESS; } - *(reinterpret_cast<grpc_byte_buffer**>( - stream_op->payload->recv_message.recv_message)) = - reinterpret_cast<grpc_byte_buffer*>(&stream_state->rs.sbs); + stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags); + stream_op->payload->recv_message.recv_message->reset( + stream_state->rs.sbs.get()); GRPC_CLOSURE_SCHED( stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); @@ -1252,14 +1252,13 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer, read_data_slice); - grpc_slice_buffer_stream_init(&stream_state->rs.sbs, - &stream_state->rs.read_slice_buffer, 0); + uint32_t flags = 0; if (stream_state->rs.compressed) { - stream_state->rs.sbs.base.flags = GRPC_WRITE_INTERNAL_COMPRESS; + flags = GRPC_WRITE_INTERNAL_COMPRESS; } - *(reinterpret_cast<grpc_byte_buffer**>( - stream_op->payload->recv_message.recv_message)) = - reinterpret_cast<grpc_byte_buffer*>(&stream_state->rs.sbs); + stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags); + stream_op->payload->recv_message.recv_message->reset( + stream_state->rs.sbs.get()); GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 5f898bbf25..67a380077b 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -25,6 +25,7 @@ #include <string.h> #include "src/core/ext/transport/inproc/inproc_transport.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" @@ -99,7 +100,7 @@ typedef struct inproc_stream { grpc_transport_stream_op_batch* recv_trailing_md_op; grpc_slice_buffer recv_message; - grpc_slice_buffer_stream recv_stream; + grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> recv_stream; bool recv_inited; bool initial_md_sent; @@ -482,8 +483,7 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) { s->recv_message_op = nullptr; } if (s->send_message_op) { - grpc_byte_stream_destroy( - s->send_message_op->payload->send_message.send_message); + s->send_message_op->payload->send_message.send_message.reset(); complete_if_batch_end_locked( s, error, s->send_message_op, "fail_helper scheduling send-message-on-complete"); @@ -521,7 +521,7 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) { static void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) { size_t remaining = - sender->send_message_op->payload->send_message.send_message->length; + sender->send_message_op->payload->send_message.send_message->length(); if (receiver->recv_inited) { grpc_slice_buffer_destroy_internal(&receiver->recv_message); } @@ -530,12 +530,12 @@ static void message_transfer_locked(inproc_stream* sender, do { grpc_slice message_slice; grpc_closure unused; - GPR_ASSERT(grpc_byte_stream_next( - sender->send_message_op->payload->send_message.send_message, SIZE_MAX, - &unused)); - grpc_error* error = grpc_byte_stream_pull( - sender->send_message_op->payload->send_message.send_message, - &message_slice); + GPR_ASSERT( + sender->send_message_op->payload->send_message.send_message->Next( + SIZE_MAX, &unused)); + grpc_error* error = + sender->send_message_op->payload->send_message.send_message->Pull( + &message_slice); if (error != GRPC_ERROR_NONE) { cancel_stream_locked(sender, GRPC_ERROR_REF(error)); break; @@ -544,13 +544,11 @@ static void message_transfer_locked(inproc_stream* sender, remaining -= GRPC_SLICE_LENGTH(message_slice); grpc_slice_buffer_add(&receiver->recv_message, message_slice); } while (remaining > 0); - grpc_byte_stream_destroy( - sender->send_message_op->payload->send_message.send_message); + sender->send_message_op->payload->send_message.send_message.reset(); - grpc_slice_buffer_stream_init(&receiver->recv_stream, &receiver->recv_message, - 0); - *receiver->recv_message_op->payload->recv_message.recv_message = - &receiver->recv_stream.base; + receiver->recv_stream.Init(&receiver->recv_message, 0); + receiver->recv_message_op->payload->recv_message.recv_message->reset( + receiver->recv_stream.get()); INPROC_LOG(GPR_DEBUG, "message_transfer_locked %p scheduling message-ready", receiver); GRPC_CLOSURE_SCHED( @@ -606,8 +604,7 @@ static void op_state_machine(void* arg, grpc_error* error) { (s->trailing_md_sent || other->recv_trailing_md_op)) { // A server send will never be matched if the client is waiting // for trailing metadata already - grpc_byte_stream_destroy( - s->send_message_op->payload->send_message.send_message); + s->send_message_op->payload->send_message.send_message.reset(); complete_if_batch_end_locked( s, GRPC_ERROR_NONE, s->send_message_op, "op_state_machine scheduling send-message-on-complete"); @@ -744,8 +741,7 @@ static void op_state_machine(void* arg, grpc_error* error) { if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) { // Nothing further will try to receive from this stream, so finish off // any outstanding send_message op - grpc_byte_stream_destroy( - s->send_message_op->payload->send_message.send_message); + s->send_message_op->payload->send_message.send_message.reset(); complete_if_batch_end_locked( s, new_err, s->send_message_op, "op_state_machine scheduling send-message-on-complete"); @@ -803,8 +799,7 @@ static void op_state_machine(void* arg, grpc_error* error) { s->send_message_op) { // Nothing further will try to receive from this stream, so finish off // any outstanding send_message op - grpc_byte_stream_destroy( - s->send_message_op->payload->send_message.send_message); + s->send_message_op->payload->send_message.send_message.reset(); complete_if_batch_end_locked( s, new_err, s->send_message_op, "op_state_machine scheduling send-message-on-complete"); |