diff options
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/byte_buffer.cc | 1 | ||||
-rw-r--r-- | src/core/lib/surface/byte_buffer_reader.cc | 1 | ||||
-rw-r--r-- | src/core/lib/surface/call.cc | 94 |
3 files changed, 43 insertions, 53 deletions
diff --git a/src/core/lib/surface/byte_buffer.cc b/src/core/lib/surface/byte_buffer.cc index fce87dc611..6246796e46 100644 --- a/src/core/lib/surface/byte_buffer.cc +++ b/src/core/lib/surface/byte_buffer.cc @@ -22,6 +22,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/slice/slice_internal.h" grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slices, diff --git a/src/core/lib/surface/byte_buffer_reader.cc b/src/core/lib/surface/byte_buffer_reader.cc index a10f1a3933..1debc98ea0 100644 --- a/src/core/lib/surface/byte_buffer_reader.cc +++ b/src/core/lib/surface/byte_buffer_reader.cc @@ -29,6 +29,7 @@ #include <grpc/support/log.h> #include "src/core/lib/compression/message_compress.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/slice/slice_internal.h" static int is_compressed(grpc_byte_buffer* buffer) { diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index c4844da318..c683cc02de 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -37,6 +37,7 @@ #include "src/core/lib/gpr/arena.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" @@ -221,9 +222,9 @@ struct grpc_call { int send_extra_metadata_count; grpc_millis send_deadline; - grpc_slice_buffer_stream sending_stream; + grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream; - grpc_byte_stream* receiving_stream; + grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream; grpc_byte_buffer** receiving_buffer; grpc_slice receiving_slice; grpc_closure receiving_slice_ready; @@ -379,7 +380,7 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, bool immediately_cancel = false; if (args->parent != nullptr) { - child_call* cc = call->child = + call->child = static_cast<child_call*>(gpr_arena_alloc(arena, sizeof(child_call))); call->child->parent = args->parent; @@ -387,10 +388,6 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, GPR_ASSERT(call->is_client); GPR_ASSERT(!args->parent->is_client); - parent_call* pc = get_or_create_parent_call(args->parent); - - gpr_mu_lock(&pc->child_list_mu); - if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) { send_deadline = GPR_MIN(send_deadline, args->parent->send_deadline); } @@ -418,18 +415,6 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, immediately_cancel = true; } } - - if (pc->first_child == nullptr) { - pc->first_child = call; - cc->sibling_next = cc->sibling_prev = call; - } else { - cc->sibling_next = pc->first_child; - cc->sibling_prev = pc->first_child->child->sibling_prev; - cc->sibling_next->child->sibling_prev = - cc->sibling_prev->child->sibling_next = call; - } - - gpr_mu_unlock(&pc->child_list_mu); } call->send_deadline = send_deadline; @@ -446,6 +431,22 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, &call->call_combiner}; add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call, call, &call_args)); + // Publish this call to parent only after the call stack has been initialized. + if (args->parent != nullptr) { + child_call* cc = call->child; + parent_call* pc = get_or_create_parent_call(args->parent); + gpr_mu_lock(&pc->child_list_mu); + if (pc->first_child == nullptr) { + pc->first_child = call; + cc->sibling_next = cc->sibling_prev = call; + } else { + cc->sibling_next = pc->first_child; + cc->sibling_prev = pc->first_child->child->sibling_prev; + cc->sibling_next->child->sibling_prev = + cc->sibling_prev->child->sibling_next = call; + } + gpr_mu_unlock(&pc->child_list_mu); + } if (error != GRPC_ERROR_NONE) { cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); } @@ -522,9 +523,7 @@ static void destroy_call(void* call, grpc_error* error) { grpc_metadata_batch_destroy( &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]); } - if (c->receiving_stream != nullptr) { - grpc_byte_stream_destroy(c->receiving_stream); - } + c->receiving_stream.reset(); parent_call* pc = get_parent_call(c); if (pc != nullptr) { gpr_mu_destroy(&pc->child_list_mu); @@ -1281,25 +1280,21 @@ static void continue_receiving_slices(batch_control* bctl) { grpc_error* error; grpc_call* call = bctl->call; for (;;) { - size_t remaining = call->receiving_stream->length - + size_t remaining = call->receiving_stream->length() - (*call->receiving_buffer)->data.raw.slice_buffer.length; if (remaining == 0) { call->receiving_message = 0; - grpc_byte_stream_destroy(call->receiving_stream); - call->receiving_stream = nullptr; + call->receiving_stream.reset(); finish_batch_step(bctl); return; } - if (grpc_byte_stream_next(call->receiving_stream, remaining, - &call->receiving_slice_ready)) { - error = - grpc_byte_stream_pull(call->receiving_stream, &call->receiving_slice); + if (call->receiving_stream->Next(remaining, &call->receiving_slice_ready)) { + error = call->receiving_stream->Pull(&call->receiving_slice); if (error == GRPC_ERROR_NONE) { grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, call->receiving_slice); } else { - grpc_byte_stream_destroy(call->receiving_stream); - call->receiving_stream = nullptr; + call->receiving_stream.reset(); grpc_byte_buffer_destroy(*call->receiving_buffer); *call->receiving_buffer = nullptr; call->receiving_message = 0; @@ -1315,19 +1310,17 @@ static void continue_receiving_slices(batch_control* bctl) { static void receiving_slice_ready(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast<batch_control*>(bctlp); grpc_call* call = bctl->call; - grpc_byte_stream* bs = call->receiving_stream; bool release_error = false; if (error == GRPC_ERROR_NONE) { grpc_slice slice; - error = grpc_byte_stream_pull(bs, &slice); + error = call->receiving_stream->Pull(&slice); if (error == GRPC_ERROR_NONE) { grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, slice); continue_receiving_slices(bctl); } else { - /* Error returned by grpc_byte_stream_pull needs to be released manually - */ + /* Error returned by ByteStream::Pull() needs to be released manually */ release_error = true; } } @@ -1336,8 +1329,7 @@ static void receiving_slice_ready(void* bctlp, grpc_error* error) { if (grpc_trace_operation_failures.enabled()) { GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error)); } - grpc_byte_stream_destroy(call->receiving_stream); - call->receiving_stream = nullptr; + call->receiving_stream.reset(); grpc_byte_buffer_destroy(*call->receiving_buffer); *call->receiving_buffer = nullptr; call->receiving_message = 0; @@ -1355,8 +1347,8 @@ static void process_data_after_md(batch_control* bctl) { call->receiving_message = 0; finish_batch_step(bctl); } else { - call->test_only_last_message_flags = call->receiving_stream->flags; - if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) && + call->test_only_last_message_flags = call->receiving_stream->flags(); + if ((call->receiving_stream->flags() & GRPC_WRITE_INTERNAL_COMPRESS) && (call->incoming_message_compression_algorithm > GRPC_MESSAGE_COMPRESS_NONE)) { grpc_compression_algorithm algo; @@ -1379,10 +1371,7 @@ static void receiving_stream_ready(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast<batch_control*>(bctlp); grpc_call* call = bctl->call; if (error != GRPC_ERROR_NONE) { - if (call->receiving_stream != nullptr) { - grpc_byte_stream_destroy(call->receiving_stream); - call->receiving_stream = nullptr; - } + call->receiving_stream.reset(); add_batch_error(bctl, GRPC_ERROR_REF(error), true); cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); } @@ -1676,21 +1665,20 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - stream_op->send_message = true; - call->sending_message = true; - grpc_slice_buffer_stream_init( - &call->sending_stream, - &op->data.send_message.send_message->data.raw.slice_buffer, - op->flags); + uint32_t flags = op->flags; /* If the outgoing buffer is already compressed, mark it as so in the flags. These will be picked up by the compression filter and further (wasteful) attempts at compression skipped. */ if (op->data.send_message.send_message->data.raw.compression > GRPC_COMPRESS_NONE) { - call->sending_stream.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS; + flags |= GRPC_WRITE_INTERNAL_COMPRESS; } - stream_op_payload->send_message.send_message = - &call->sending_stream.base; + stream_op->send_message = true; + call->sending_message = true; + call->sending_stream.Init( + &op->data.send_message.send_message->data.raw.slice_buffer, flags); + stream_op_payload->send_message.send_message.reset( + call->sending_stream.get()); break; } case GRPC_OP_SEND_CLOSE_FROM_CLIENT: { @@ -1909,7 +1897,7 @@ done_with_error: } if (stream_op->send_message) { call->sending_message = false; - grpc_byte_stream_destroy(&call->sending_stream.base); + call->sending_stream->Orphan(); } if (stream_op->send_trailing_metadata) { call->sent_final_op = false; |