aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/client_channel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/client_channel.cc')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc38
1 files changed, 20 insertions, 18 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 09cc06e169..bf3911e5ee 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -798,7 +798,8 @@ typedef struct {
grpc_linked_mdelem* send_initial_metadata_storage;
grpc_metadata_batch send_initial_metadata;
// For send_message.
- grpc_caching_byte_stream send_message;
+ grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream>
+ send_message;
// For send_trailing_metadata.
grpc_linked_mdelem* send_trailing_metadata_storage;
grpc_metadata_batch send_trailing_metadata;
@@ -808,7 +809,7 @@ typedef struct {
bool trailing_metadata_available;
// For intercepting recv_message.
grpc_closure recv_message_ready;
- grpc_byte_stream* recv_message;
+ grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message;
// For intercepting recv_trailing_metadata.
grpc_metadata_batch recv_trailing_metadata;
grpc_transport_stream_stats collect_stats;
@@ -914,12 +915,12 @@ typedef struct client_channel_call_data {
gpr_atm* peer_string;
// send_message
// When we get a send_message op, we replace the original byte stream
- // with a grpc_caching_byte_stream that caches the slices to a
- // local buffer for use in retries.
+ // with a CachingByteStream that caches the slices to a local buffer for
+ // use in retries.
// Note: We inline the cache for the first 3 send_message ops and use
// dynamic allocation after that. This number was essentially picked
// at random; it could be changed in the future to tune performance.
- grpc_core::InlinedVector<grpc_byte_stream_cache*, 3> send_messages;
+ grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3> send_messages;
// send_trailing_metadata
bool seen_send_trailing_metadata;
grpc_linked_mdelem* send_trailing_metadata_storage;
@@ -964,10 +965,11 @@ static void maybe_cache_send_ops_for_batch(call_data* calld,
}
// Set up cache for send_message ops.
if (batch->send_message) {
- grpc_byte_stream_cache* cache = (grpc_byte_stream_cache*)gpr_arena_alloc(
- calld->arena, sizeof(grpc_byte_stream_cache));
- grpc_byte_stream_cache_init(cache,
- batch->payload->send_message.send_message);
+ grpc_core::ByteStreamCache* cache =
+ static_cast<grpc_core::ByteStreamCache*>(
+ gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache)));
+ new (cache) grpc_core::ByteStreamCache(
+ std::move(batch->payload->send_message.send_message));
calld->send_messages.push_back(cache);
}
// Save metadata batch for send_trailing_metadata ops.
@@ -1002,7 +1004,7 @@ static void free_cached_send_op_data_after_commit(
"]",
chand, calld, i);
}
- grpc_byte_stream_cache_destroy(calld->send_messages[i]);
+ calld->send_messages[i]->Destroy();
}
if (retry_state->completed_send_trailing_metadata) {
grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
@@ -1026,8 +1028,8 @@ static void free_cached_send_op_data_for_completed_batch(
"]",
chand, calld, retry_state->completed_send_message_count - 1);
}
- grpc_byte_stream_cache_destroy(
- calld->send_messages[retry_state->completed_send_message_count - 1]);
+ calld->send_messages[retry_state->completed_send_message_count - 1]
+ ->Destroy();
}
if (batch_data->batch.send_trailing_metadata) {
grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
@@ -1079,7 +1081,7 @@ static void pending_batches_add(grpc_call_element* elem,
if (batch->send_message) {
calld->pending_send_message = true;
calld->bytes_buffered_for_retry +=
- batch->payload->send_message.send_message->length;
+ batch->payload->send_message.send_message->length();
}
if (batch->send_trailing_metadata) {
calld->pending_send_trailing_metadata = true;
@@ -1680,7 +1682,7 @@ static void invoke_recv_message_callback(void* arg, grpc_error* error) {
GPR_ASSERT(pending != nullptr);
// Return payload.
*pending->batch->payload->recv_message.recv_message =
- batch_data->recv_message;
+ std::move(batch_data->recv_message);
// Update bookkeeping.
// Note: Need to do this before invoking the callback, since invoking
// the callback will result in yielding the call combiner.
@@ -2124,13 +2126,13 @@ static void add_retriable_send_message_op(
"chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
chand, calld, retry_state->started_send_message_count);
}
- grpc_byte_stream_cache* cache =
+ grpc_core::ByteStreamCache* cache =
calld->send_messages[retry_state->started_send_message_count];
++retry_state->started_send_message_count;
- grpc_caching_byte_stream_init(&batch_data->send_message, cache);
+ batch_data->send_message.Init(cache);
batch_data->batch.send_message = true;
- batch_data->batch.payload->send_message.send_message =
- &batch_data->send_message.base;
+ batch_data->batch.payload->send_message.send_message.reset(
+ batch_data->send_message.get());
}
// Adds retriable send_trailing_metadata op to batch_data.