aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2018-03-09 13:25:40 -0800
committerGravatar Mark D. Roth <roth@google.com>2018-03-09 13:25:40 -0800
commit3d8b32d8b3d7e85d0588064994efa6763d6fec02 (patch)
treeb25c81bd702b99d45073b74f0486f262337e431c /src/core/lib/surface
parent59ea0ae3ebcca0aef5a15c5aa5b4d27b7f3fc9c4 (diff)
Convert byte_stream API to C++.
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/call.cc60
1 files changed, 24 insertions, 36 deletions
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index c4844da318..adb6ee5a06 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -37,6 +37,7 @@
#include "src/core/lib/gpr/arena.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
@@ -221,9 +222,9 @@ struct grpc_call {
int send_extra_metadata_count;
grpc_millis send_deadline;
- grpc_slice_buffer_stream sending_stream;
+ grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream;
- grpc_byte_stream* receiving_stream;
+ grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream;
grpc_byte_buffer** receiving_buffer;
grpc_slice receiving_slice;
grpc_closure receiving_slice_ready;
@@ -522,9 +523,7 @@ static void destroy_call(void* call, grpc_error* error) {
grpc_metadata_batch_destroy(
&c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
}
- if (c->receiving_stream != nullptr) {
- grpc_byte_stream_destroy(c->receiving_stream);
- }
+ c->receiving_stream.reset();
parent_call* pc = get_parent_call(c);
if (pc != nullptr) {
gpr_mu_destroy(&pc->child_list_mu);
@@ -1281,25 +1280,21 @@ static void continue_receiving_slices(batch_control* bctl) {
grpc_error* error;
grpc_call* call = bctl->call;
for (;;) {
- size_t remaining = call->receiving_stream->length -
+ size_t remaining = call->receiving_stream->length() -
(*call->receiving_buffer)->data.raw.slice_buffer.length;
if (remaining == 0) {
call->receiving_message = 0;
- grpc_byte_stream_destroy(call->receiving_stream);
- call->receiving_stream = nullptr;
+ call->receiving_stream.reset();
finish_batch_step(bctl);
return;
}
- if (grpc_byte_stream_next(call->receiving_stream, remaining,
- &call->receiving_slice_ready)) {
- error =
- grpc_byte_stream_pull(call->receiving_stream, &call->receiving_slice);
+ if (call->receiving_stream->Next(remaining, &call->receiving_slice_ready)) {
+ error = call->receiving_stream->Pull(&call->receiving_slice);
if (error == GRPC_ERROR_NONE) {
grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
call->receiving_slice);
} else {
- grpc_byte_stream_destroy(call->receiving_stream);
- call->receiving_stream = nullptr;
+ call->receiving_stream.reset();
grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = nullptr;
call->receiving_message = 0;
@@ -1315,19 +1310,17 @@ static void continue_receiving_slices(batch_control* bctl) {
static void receiving_slice_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
- grpc_byte_stream* bs = call->receiving_stream;
bool release_error = false;
if (error == GRPC_ERROR_NONE) {
grpc_slice slice;
- error = grpc_byte_stream_pull(bs, &slice);
+ error = call->receiving_stream->Pull(&slice);
if (error == GRPC_ERROR_NONE) {
grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
slice);
continue_receiving_slices(bctl);
} else {
- /* Error returned by grpc_byte_stream_pull needs to be released manually
- */
+ /* Error returned by ByteStream::Pull() needs to be released manually */
release_error = true;
}
}
@@ -1336,8 +1329,7 @@ static void receiving_slice_ready(void* bctlp, grpc_error* error) {
if (grpc_trace_operation_failures.enabled()) {
GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
}
- grpc_byte_stream_destroy(call->receiving_stream);
- call->receiving_stream = nullptr;
+ call->receiving_stream.reset();
grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = nullptr;
call->receiving_message = 0;
@@ -1355,8 +1347,8 @@ static void process_data_after_md(batch_control* bctl) {
call->receiving_message = 0;
finish_batch_step(bctl);
} else {
- call->test_only_last_message_flags = call->receiving_stream->flags;
- if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
+ call->test_only_last_message_flags = call->receiving_stream->flags();
+ if ((call->receiving_stream->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
(call->incoming_message_compression_algorithm >
GRPC_MESSAGE_COMPRESS_NONE)) {
grpc_compression_algorithm algo;
@@ -1379,10 +1371,7 @@ static void receiving_stream_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
if (error != GRPC_ERROR_NONE) {
- if (call->receiving_stream != nullptr) {
- grpc_byte_stream_destroy(call->receiving_stream);
- call->receiving_stream = nullptr;
- }
+ call->receiving_stream.reset();
add_batch_error(bctl, GRPC_ERROR_REF(error), true);
cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error));
}
@@ -1676,21 +1665,20 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
- stream_op->send_message = true;
- call->sending_message = true;
- grpc_slice_buffer_stream_init(
- &call->sending_stream,
- &op->data.send_message.send_message->data.raw.slice_buffer,
- op->flags);
+ uint32_t flags = op->flags;
/* If the outgoing buffer is already compressed, mark it as so in the
flags. These will be picked up by the compression filter and further
(wasteful) attempts at compression skipped. */
if (op->data.send_message.send_message->data.raw.compression >
GRPC_COMPRESS_NONE) {
- call->sending_stream.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+ flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
- stream_op_payload->send_message.send_message =
- &call->sending_stream.base;
+ stream_op->send_message = true;
+ call->sending_message = true;
+ call->sending_stream.Init(
+ &op->data.send_message.send_message->data.raw.slice_buffer, flags);
+ stream_op_payload->send_message.send_message.reset(
+ call->sending_stream.get());
break;
}
case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
@@ -1909,7 +1897,7 @@ done_with_error:
}
if (stream_op->send_message) {
call->sending_message = false;
- grpc_byte_stream_destroy(&call->sending_stream.base);
+ call->sending_stream->Orphan();
}
if (stream_op->send_trailing_metadata) {
call->sent_final_op = false;