aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/cronet/transport/cronet_transport.cc
diff options
context:
space:
mode:
authorGravatar kpayson64 <kpayson@google.com>2018-05-11 12:20:11 -0700
committerGravatar kpayson64 <kpayson@google.com>2018-05-11 12:20:11 -0700
commit4fad281ce8affe27fb7428f264d2c3b9dfc45f2f (patch)
treeca96c9efd69afec56aa2e5fe072a9f758247d0a3 /src/core/ext/transport/cronet/transport/cronet_transport.cc
parentec445cc2bb270ed4acb1c710c3533fca14a50019 (diff)
parent61fdb46ac456027c79841949272ec540f66d2317 (diff)
Merge remote-tracking branch 'upstream/master' into fork_exec_ctx_check
Diffstat (limited to 'src/core/ext/transport/cronet/transport/cronet_transport.cc')
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.cc41
1 files changed, 20 insertions, 21 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc
index ff1c1aad62..8e3ea05706 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.cc
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc
@@ -31,6 +31,7 @@
#include "src/core/ext/transport/cronet/transport/cronet_transport.h"
#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/slice/slice_internal.h"
@@ -122,7 +123,7 @@ struct read_state {
bool read_stream_closed;
/* vars for holding data destined for the application */
- struct grpc_slice_buffer_stream sbs;
+ grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs;
grpc_slice_buffer read_slice_buffer;
/* vars for trailing metadata */
@@ -1041,16 +1042,14 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
grpc_slice_buffer write_slice_buffer;
grpc_slice slice;
grpc_slice_buffer_init(&write_slice_buffer);
- if (1 != grpc_byte_stream_next(
- stream_op->payload->send_message.send_message,
- stream_op->payload->send_message.send_message->length,
+ if (1 != stream_op->payload->send_message.send_message->Next(
+ stream_op->payload->send_message.send_message->length(),
nullptr)) {
/* Should never reach here */
GPR_ASSERT(false);
}
if (GRPC_ERROR_NONE !=
- grpc_byte_stream_pull(stream_op->payload->send_message.send_message,
- &slice)) {
+ stream_op->payload->send_message.send_message->Pull(&slice)) {
/* Should never reach here */
GPR_ASSERT(false);
}
@@ -1062,9 +1061,10 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
}
if (write_slice_buffer.count > 0) {
size_t write_buffer_size;
- create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
- &write_buffer_size,
- stream_op->payload->send_message.send_message->flags);
+ create_grpc_frame(
+ &write_slice_buffer, &stream_state->ws.write_buffer,
+ &write_buffer_size,
+ stream_op->payload->send_message.send_message->flags());
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
stream_state->ws.write_buffer);
stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
@@ -1089,6 +1089,7 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
}
stream_state->state_op_done[OP_SEND_MESSAGE] = true;
oas->state.state_op_done[OP_SEND_MESSAGE] = true;
+ stream_op->payload->send_message.send_message.reset();
} else if (stream_op->send_trailing_metadata &&
op_can_be_run(stream_op, s, &oas->state,
OP_SEND_TRAILING_METADATA)) {
@@ -1195,14 +1196,13 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
grpc_slice_buffer_destroy_internal(
&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
- grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
- &stream_state->rs.read_slice_buffer, 0);
+ uint32_t flags = 0;
if (stream_state->rs.compressed) {
- stream_state->rs.sbs.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+ flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
- *(reinterpret_cast<grpc_byte_buffer**>(
- stream_op->payload->recv_message.recv_message)) =
- reinterpret_cast<grpc_byte_buffer*>(&stream_state->rs.sbs);
+ stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
+ stream_op->payload->recv_message.recv_message->reset(
+ stream_state->rs.sbs.get());
GRPC_CLOSURE_SCHED(
stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
@@ -1252,14 +1252,13 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
read_data_slice);
- grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
- &stream_state->rs.read_slice_buffer, 0);
+ uint32_t flags = 0;
if (stream_state->rs.compressed) {
- stream_state->rs.sbs.base.flags = GRPC_WRITE_INTERNAL_COMPRESS;
+ flags = GRPC_WRITE_INTERNAL_COMPRESS;
}
- *(reinterpret_cast<grpc_byte_buffer**>(
- stream_op->payload->recv_message.recv_message)) =
- reinterpret_cast<grpc_byte_buffer*>(&stream_state->rs.sbs);
+ stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
+ stream_op->payload->recv_message.recv_message->reset(
+ stream_state->rs.sbs.get());
GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;