aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/call.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface/call.cc')
-rw-r--r--src/core/lib/surface/call.cc55
1 files changed, 35 insertions, 20 deletions
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index 9a9113643d..86e0afa6ee 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -67,6 +67,9 @@
#define MAX_SEND_EXTRA_METADATA_COUNT 3
+// Used to create arena for the first call.
+#define ESTIMATED_MDELEM_COUNT 16
+
/* Status data for a request can come from several sources; this
enumerates them all, and acts as a priority sorting for which
status to return to the application - earlier entries override
@@ -323,6 +326,11 @@ static parent_call* get_parent_call(grpc_call* call) {
return (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
}
+size_t grpc_call_get_initial_size_estimate() {
+ return sizeof(grpc_call) + sizeof(batch_control) * MAX_CONCURRENT_BATCHES +
+ sizeof(grpc_linked_mdelem) * ESTIMATED_MDELEM_COUNT;
+}
+
grpc_error* grpc_call_create(const grpc_call_create_args* args,
grpc_call** out_call) {
GPR_TIMER_SCOPE("grpc_call_create", 0);
@@ -610,7 +618,7 @@ grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
// This is called via the call combiner to start sending a batch down
// the filter stack.
static void execute_batch_in_call_combiner(void* arg, grpc_error* ignored) {
- GPR_TIMER_SCOPE("execute_batch", 0);
+ GPR_TIMER_SCOPE("execute_batch_in_call_combiner", 0);
grpc_transport_stream_op_batch* batch =
static_cast<grpc_transport_stream_op_batch*>(arg);
grpc_call* call = static_cast<grpc_call*>(batch->handler_private.extra_arg);
@@ -747,10 +755,10 @@ static void get_final_status(
status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i]));
}
if (grpc_call_error_trace.enabled()) {
- gpr_log(GPR_DEBUG, "get_final_status %s", call->is_client ? "CLI" : "SVR");
+ gpr_log(GPR_INFO, "get_final_status %s", call->is_client ? "CLI" : "SVR");
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (status[i].is_set) {
- gpr_log(GPR_DEBUG, " %d: %s", i, grpc_error_string(status[i].error));
+ gpr_log(GPR_INFO, " %d: %s", i, grpc_error_string(status[i].error));
}
}
}
@@ -1124,7 +1132,7 @@ static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
return !(flags & invalid_positions);
}
-static int batch_slot_for_op(grpc_op_type type) {
+static size_t batch_slot_for_op(grpc_op_type type) {
switch (type) {
case GRPC_OP_SEND_INITIAL_METADATA:
return 0;
@@ -1144,20 +1152,23 @@ static int batch_slot_for_op(grpc_op_type type) {
GPR_UNREACHABLE_CODE(return 123456789);
}
-static batch_control* allocate_batch_control(grpc_call* call,
- const grpc_op* ops,
- size_t num_ops) {
- int slot = batch_slot_for_op(ops[0].op);
- batch_control** pslot = &call->active_batches[slot];
- if (*pslot == nullptr) {
- *pslot = static_cast<batch_control*>(
+static batch_control* reuse_or_allocate_batch_control(grpc_call* call,
+ const grpc_op* ops,
+ size_t num_ops) {
+ size_t slot_idx = batch_slot_for_op(ops[0].op);
+ batch_control** pslot = &call->active_batches[slot_idx];
+ batch_control* bctl;
+ if (*pslot != nullptr) {
+ bctl = *pslot;
+ if (bctl->call != nullptr) {
+ return nullptr;
+ }
+ memset(bctl, 0, sizeof(*bctl));
+ } else {
+ bctl = static_cast<batch_control*>(
gpr_arena_alloc(call->arena, sizeof(batch_control)));
+ *pslot = bctl;
}
- batch_control* bctl = *pslot;
- if (bctl->call != nullptr) {
- return nullptr;
- }
- memset(bctl, 0, sizeof(*bctl));
bctl->call = call;
bctl->op.payload = &call->stream_op_payload;
return bctl;
@@ -1259,8 +1270,12 @@ static void post_batch_completion(batch_control* bctl) {
if (bctl->completion_data.notify_tag.is_closure) {
/* unrefs bctl->error */
bctl->call = nullptr;
- GRPC_CLOSURE_RUN((grpc_closure*)bctl->completion_data.notify_tag.tag,
- error);
+ /* This closure may be meant to be run within some combiner. Since we aren't
+ * running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead
+ * of GRPC_CLOSURE_RUN.
+ */
+ GRPC_CLOSURE_SCHED((grpc_closure*)bctl->completion_data.notify_tag.tag,
+ error);
GRPC_CALL_INTERNAL_UNREF(call, "completion");
} else {
/* unrefs bctl->error */
@@ -1539,7 +1554,7 @@ static void free_no_op_completion(void* p, grpc_cq_completion* completion) {
static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
size_t nops, void* notify_tag,
int is_notify_tag_closure) {
- GPR_TIMER_SCOPE("grpc_call_start_batch", 0);
+ GPR_TIMER_SCOPE("call_start_batch", 0);
size_t i;
const grpc_op* op;
@@ -1565,7 +1580,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
goto done;
}
- bctl = allocate_batch_control(call, ops, nops);
+ bctl = reuse_or_allocate_batch_control(call, ops, nops);
if (bctl == nullptr) {
return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
}