aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/call.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface/call.cc')
-rw-r--r--src/core/lib/surface/call.cc106
1 files changed, 47 insertions, 59 deletions
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index c4844da318..da488034ca 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -37,6 +37,7 @@
#include "src/core/lib/gpr/arena.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
@@ -221,9 +222,9 @@ struct grpc_call {
int send_extra_metadata_count;
grpc_millis send_deadline;
- grpc_slice_buffer_stream sending_stream;
+ grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream;
- grpc_byte_stream* receiving_stream;
+ grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream;
grpc_byte_buffer** receiving_buffer;
grpc_slice receiving_slice;
grpc_closure receiving_slice_ready;
@@ -379,7 +380,7 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
bool immediately_cancel = false;
if (args->parent != nullptr) {
- child_call* cc = call->child =
+ call->child =
static_cast<child_call*>(gpr_arena_alloc(arena, sizeof(child_call)));
call->child->parent = args->parent;
@@ -387,10 +388,6 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
GPR_ASSERT(call->is_client);
GPR_ASSERT(!args->parent->is_client);
- parent_call* pc = get_or_create_parent_call(args->parent);
-
- gpr_mu_lock(&pc->child_list_mu);
-
if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
send_deadline = GPR_MIN(send_deadline, args->parent->send_deadline);
}
@@ -418,18 +415,6 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
immediately_cancel = true;
}
}
-
- if (pc->first_child == nullptr) {
- pc->first_child = call;
- cc->sibling_next = cc->sibling_prev = call;
- } else {
- cc->sibling_next = pc->first_child;
- cc->sibling_prev = pc->first_child->child->sibling_prev;
- cc->sibling_next->child->sibling_prev =
- cc->sibling_prev->child->sibling_next = call;
- }
-
- gpr_mu_unlock(&pc->child_list_mu);
}
call->send_deadline = send_deadline;
@@ -446,6 +431,22 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
&call->call_combiner};
add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call,
call, &call_args));
+ // Publish this call to parent only after the call stack has been initialized.
+ if (args->parent != nullptr) {
+ child_call* cc = call->child;
+ parent_call* pc = get_or_create_parent_call(args->parent);
+ gpr_mu_lock(&pc->child_list_mu);
+ if (pc->first_child == nullptr) {
+ pc->first_child = call;
+ cc->sibling_next = cc->sibling_prev = call;
+ } else {
+ cc->sibling_next = pc->first_child;
+ cc->sibling_prev = pc->first_child->child->sibling_prev;
+ cc->sibling_next->child->sibling_prev =
+ cc->sibling_prev->child->sibling_next = call;
+ }
+ gpr_mu_unlock(&pc->child_list_mu);
+ }
if (error != GRPC_ERROR_NONE) {
cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error));
}
@@ -522,9 +523,7 @@ static void destroy_call(void* call, grpc_error* error) {
grpc_metadata_batch_destroy(
&c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
}
- if (c->receiving_stream != nullptr) {
- grpc_byte_stream_destroy(c->receiving_stream);
- }
+ c->receiving_stream.reset();
parent_call* pc = get_parent_call(c);
if (pc != nullptr) {
gpr_mu_destroy(&pc->child_list_mu);
@@ -611,7 +610,7 @@ grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
// This is called via the call combiner to start sending a batch down
// the filter stack.
static void execute_batch_in_call_combiner(void* arg, grpc_error* ignored) {
- GPR_TIMER_SCOPE("execute_batch", 0);
+ GPR_TIMER_SCOPE("execute_batch_in_call_combiner", 0);
grpc_transport_stream_op_batch* batch =
static_cast<grpc_transport_stream_op_batch*>(arg);
grpc_call* call = static_cast<grpc_call*>(batch->handler_private.extra_arg);
@@ -748,10 +747,10 @@ static void get_final_status(
status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i]));
}
if (grpc_call_error_trace.enabled()) {
- gpr_log(GPR_DEBUG, "get_final_status %s", call->is_client ? "CLI" : "SVR");
+ gpr_log(GPR_INFO, "get_final_status %s", call->is_client ? "CLI" : "SVR");
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (status[i].is_set) {
- gpr_log(GPR_DEBUG, " %d: %s", i, grpc_error_string(status[i].error));
+ gpr_log(GPR_INFO, " %d: %s", i, grpc_error_string(status[i].error));
}
}
}
@@ -879,8 +878,8 @@ static void set_encodings_accepted_by_peer(grpc_call* call, grpc_mdelem mdel,
} else {
char* accept_encoding_entry_str =
grpc_slice_to_c_string(accept_encoding_entry_slice);
- gpr_log(GPR_ERROR,
- "Invalid entry in accept encoding metadata: '%s'. Ignoring.",
+ gpr_log(GPR_DEBUG,
+ "Unknown entry in accept encoding metadata: '%s'. Ignoring.",
accept_encoding_entry_str);
gpr_free(accept_encoding_entry_str);
}
@@ -1281,25 +1280,21 @@ static void continue_receiving_slices(batch_control* bctl) {
grpc_error* error;
grpc_call* call = bctl->call;
for (;;) {
- size_t remaining = call->receiving_stream->length -
+ size_t remaining = call->receiving_stream->length() -
(*call->receiving_buffer)->data.raw.slice_buffer.length;
if (remaining == 0) {
call->receiving_message = 0;
- grpc_byte_stream_destroy(call->receiving_stream);
- call->receiving_stream = nullptr;
+ call->receiving_stream.reset();
finish_batch_step(bctl);
return;
}
- if (grpc_byte_stream_next(call->receiving_stream, remaining,
- &call->receiving_slice_ready)) {
- error =
- grpc_byte_stream_pull(call->receiving_stream, &call->receiving_slice);
+ if (call->receiving_stream->Next(remaining, &call->receiving_slice_ready)) {
+ error = call->receiving_stream->Pull(&call->receiving_slice);
if (error == GRPC_ERROR_NONE) {
grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
call->receiving_slice);
} else {
- grpc_byte_stream_destroy(call->receiving_stream);
- call->receiving_stream = nullptr;
+ call->receiving_stream.reset();
grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = nullptr;
call->receiving_message = 0;
@@ -1315,19 +1310,17 @@ static void continue_receiving_slices(batch_control* bctl) {
static void receiving_slice_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
- grpc_byte_stream* bs = call->receiving_stream;
bool release_error = false;
if (error == GRPC_ERROR_NONE) {
grpc_slice slice;
- error = grpc_byte_stream_pull(bs, &slice);
+ error = call->receiving_stream->Pull(&slice);
if (error == GRPC_ERROR_NONE) {
grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
slice);
continue_receiving_slices(bctl);
} else {
- /* Error returned by grpc_byte_stream_pull needs to be released manually
- */
+ /* Error returned by ByteStream::Pull() needs to be released manually */
release_error = true;
}
}
@@ -1336,8 +1329,7 @@ static void receiving_slice_ready(void* bctlp, grpc_error* error) {
if (grpc_trace_operation_failures.enabled()) {
GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
}
- grpc_byte_stream_destroy(call->receiving_stream);
- call->receiving_stream = nullptr;
+ call->receiving_stream.reset();
grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = nullptr;
call->receiving_message = 0;
@@ -1355,8 +1347,8 @@ static void process_data_after_md(batch_control* bctl) {
call->receiving_message = 0;
finish_batch_step(bctl);
} else {
- call->test_only_last_message_flags = call->receiving_stream->flags;
- if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
+ call->test_only_last_message_flags = call->receiving_stream->flags();
+ if ((call->receiving_stream->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
(call->incoming_message_compression_algorithm >
GRPC_MESSAGE_COMPRESS_NONE)) {
grpc_compression_algorithm algo;
@@ -1379,10 +1371,7 @@ static void receiving_stream_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
if (error != GRPC_ERROR_NONE) {
- if (call->receiving_stream != nullptr) {
- grpc_byte_stream_destroy(call->receiving_stream);
- call->receiving_stream = nullptr;
- }
+ call->receiving_stream.reset();
add_batch_error(bctl, GRPC_ERROR_REF(error), true);
cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error));
}
@@ -1550,7 +1539,7 @@ static void free_no_op_completion(void* p, grpc_cq_completion* completion) {
static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
size_t nops, void* notify_tag,
int is_notify_tag_closure) {
- GPR_TIMER_SCOPE("grpc_call_start_batch", 0);
+ GPR_TIMER_SCOPE("call_start_batch", 0);
size_t i;
const grpc_op* op;
@@ -1676,21 +1665,20 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
- stream_op->send_message = true;
- call->sending_message = true;
- grpc_slice_buffer_stream_init(
- &call->sending_stream,
- &op->data.send_message.send_message->data.raw.slice_buffer,
- op->flags);
+ uint32_t flags = op->flags;
/* If the outgoing buffer is already compressed, mark it as so in the
flags. These will be picked up by the compression filter and further
(wasteful) attempts at compression skipped. */
if (op->data.send_message.send_message->data.raw.compression >
GRPC_COMPRESS_NONE) {
- call->sending_stream.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+ flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
- stream_op_payload->send_message.send_message =
- &call->sending_stream.base;
+ stream_op->send_message = true;
+ call->sending_message = true;
+ call->sending_stream.Init(
+ &op->data.send_message.send_message->data.raw.slice_buffer, flags);
+ stream_op_payload->send_message.send_message.reset(
+ call->sending_stream.get());
break;
}
case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
@@ -1909,7 +1897,7 @@ done_with_error:
}
if (stream_op->send_message) {
call->sending_message = false;
- grpc_byte_stream_destroy(&call->sending_stream.base);
+ call->sending_stream->Orphan();
}
if (stream_op->send_trailing_metadata) {
call->sent_final_op = false;