diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/client_channel.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.cc | 38 |
1 files changed, 20 insertions, 18 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 09cc06e169..bf3911e5ee 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -798,7 +798,8 @@ typedef struct { grpc_linked_mdelem* send_initial_metadata_storage; grpc_metadata_batch send_initial_metadata; // For send_message. - grpc_caching_byte_stream send_message; + grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream> + send_message; // For send_trailing_metadata. grpc_linked_mdelem* send_trailing_metadata_storage; grpc_metadata_batch send_trailing_metadata; @@ -808,7 +809,7 @@ typedef struct { bool trailing_metadata_available; // For intercepting recv_message. grpc_closure recv_message_ready; - grpc_byte_stream* recv_message; + grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message; // For intercepting recv_trailing_metadata. grpc_metadata_batch recv_trailing_metadata; grpc_transport_stream_stats collect_stats; @@ -914,12 +915,12 @@ typedef struct client_channel_call_data { gpr_atm* peer_string; // send_message // When we get a send_message op, we replace the original byte stream - // with a grpc_caching_byte_stream that caches the slices to a - // local buffer for use in retries. + // with a CachingByteStream that caches the slices to a local buffer for + // use in retries. // Note: We inline the cache for the first 3 send_message ops and use // dynamic allocation after that. This number was essentially picked // at random; it could be changed in the future to tune performance. - grpc_core::InlinedVector<grpc_byte_stream_cache*, 3> send_messages; + grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3> send_messages; // send_trailing_metadata bool seen_send_trailing_metadata; grpc_linked_mdelem* send_trailing_metadata_storage; @@ -964,10 +965,11 @@ static void maybe_cache_send_ops_for_batch(call_data* calld, } // Set up cache for send_message ops. if (batch->send_message) { - grpc_byte_stream_cache* cache = (grpc_byte_stream_cache*)gpr_arena_alloc( - calld->arena, sizeof(grpc_byte_stream_cache)); - grpc_byte_stream_cache_init(cache, - batch->payload->send_message.send_message); + grpc_core::ByteStreamCache* cache = + static_cast<grpc_core::ByteStreamCache*>( + gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache))); + new (cache) grpc_core::ByteStreamCache( + std::move(batch->payload->send_message.send_message)); calld->send_messages.push_back(cache); } // Save metadata batch for send_trailing_metadata ops. @@ -1002,7 +1004,7 @@ static void free_cached_send_op_data_after_commit( "]", chand, calld, i); } - grpc_byte_stream_cache_destroy(calld->send_messages[i]); + calld->send_messages[i]->Destroy(); } if (retry_state->completed_send_trailing_metadata) { grpc_metadata_batch_destroy(&calld->send_trailing_metadata); @@ -1026,8 +1028,8 @@ static void free_cached_send_op_data_for_completed_batch( "]", chand, calld, retry_state->completed_send_message_count - 1); } - grpc_byte_stream_cache_destroy( - calld->send_messages[retry_state->completed_send_message_count - 1]); + calld->send_messages[retry_state->completed_send_message_count - 1] + ->Destroy(); } if (batch_data->batch.send_trailing_metadata) { grpc_metadata_batch_destroy(&calld->send_trailing_metadata); @@ -1079,7 +1081,7 @@ static void pending_batches_add(grpc_call_element* elem, if (batch->send_message) { calld->pending_send_message = true; calld->bytes_buffered_for_retry += - batch->payload->send_message.send_message->length; + batch->payload->send_message.send_message->length(); } if (batch->send_trailing_metadata) { calld->pending_send_trailing_metadata = true; @@ -1680,7 +1682,7 @@ static void invoke_recv_message_callback(void* arg, grpc_error* error) { GPR_ASSERT(pending != nullptr); // Return payload. *pending->batch->payload->recv_message.recv_message = - batch_data->recv_message; + std::move(batch_data->recv_message); // Update bookkeeping. // Note: Need to do this before invoking the callback, since invoking // the callback will result in yielding the call combiner. @@ -2124,13 +2126,13 @@ static void add_retriable_send_message_op( "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]", chand, calld, retry_state->started_send_message_count); } - grpc_byte_stream_cache* cache = + grpc_core::ByteStreamCache* cache = calld->send_messages[retry_state->started_send_message_count]; ++retry_state->started_send_message_count; - grpc_caching_byte_stream_init(&batch_data->send_message, cache); + batch_data->send_message.Init(cache); batch_data->batch.send_message = true; - batch_data->batch.payload->send_message.send_message = - &batch_data->send_message.base; + batch_data->batch.payload->send_message.send_message.reset( + batch_data->send_message.get()); } // Adds retriable send_trailing_metadata op to batch_data. |