aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/call.cc
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-12-06 09:47:49 -0800
committerGravatar GitHub <noreply@github.com>2017-12-06 09:47:49 -0800
commit1d4e99508409be052bd129ba507bae1fbe7eb7fa (patch)
tree6a657f8c6179d873b34505cdc24bce9462ca68eb /src/core/lib/surface/call.cc
parenta3df36cc2505a89c2f481eea4a66a87b3002844a (diff)
parentad4d2dde0052efbbf49d64b0843c45f0381cfeb3 (diff)
Merge pull request #13658 from grpc/revert-13058-execctx
Revert "All instances of exec_ctx being passed around in src/core removed"
Diffstat (limited to 'src/core/lib/surface/call.cc')
-rw-r--r--src/core/lib/surface/call.cc429
1 files changed, 236 insertions, 193 deletions
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index a457aaa7a2..a2eb02bd85 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -270,25 +270,30 @@ grpc_core::TraceFlag grpc_compression_trace(false, "compression");
#define CALL_FROM_TOP_ELEM(top_elem) \
CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
-static void execute_batch(grpc_call* call, grpc_transport_stream_op_batch* op,
+static void execute_batch(grpc_exec_ctx* exec_ctx, grpc_call* call,
+ grpc_transport_stream_op_batch* op,
grpc_closure* start_batch_closure);
-static void cancel_with_status(grpc_call* c, status_source source,
- grpc_status_code status,
+static void cancel_with_status(grpc_exec_ctx* exec_ctx, grpc_call* c,
+ status_source source, grpc_status_code status,
const char* description);
-static void cancel_with_error(grpc_call* c, status_source source,
- grpc_error* error);
-static void destroy_call(void* call_stack, grpc_error* error);
-static void receiving_slice_ready(void* bctlp, grpc_error* error);
-static void get_final_status(
- grpc_call* call, void (*set_value)(grpc_status_code code, void* user_data),
- void* set_value_user_data, grpc_slice* details, const char** error_string);
-static void set_status_value_directly(grpc_status_code status, void* dest);
-static void set_status_from_error(grpc_call* call, status_source source,
+static void cancel_with_error(grpc_exec_ctx* exec_ctx, grpc_call* c,
+ status_source source, grpc_error* error);
+static void destroy_call(grpc_exec_ctx* exec_ctx, void* call_stack,
+ grpc_error* error);
+static void receiving_slice_ready(grpc_exec_ctx* exec_ctx, void* bctlp,
grpc_error* error);
-static void process_data_after_md(batch_control* bctl);
-static void post_batch_completion(batch_control* bctl);
-static void add_batch_error(batch_control* bctl, grpc_error* error,
- bool has_cancelled);
+static void get_final_status(grpc_exec_ctx* exec_ctx, grpc_call* call,
+ void (*set_value)(grpc_status_code code,
+ void* user_data),
+ void* set_value_user_data, grpc_slice* details,
+ const char** error_string);
+static void set_status_value_directly(grpc_status_code status, void* dest);
+static void set_status_from_error(grpc_exec_ctx* exec_ctx, grpc_call* call,
+ status_source source, grpc_error* error);
+static void process_data_after_md(grpc_exec_ctx* exec_ctx, batch_control* bctl);
+static void post_batch_completion(grpc_exec_ctx* exec_ctx, batch_control* bctl);
+static void add_batch_error(grpc_exec_ctx* exec_ctx, batch_control* bctl,
+ grpc_error* error, bool has_cancelled);
static void add_init_error(grpc_error** composite, grpc_error* new_err) {
if (new_err == GRPC_ERROR_NONE) return;
@@ -306,8 +311,7 @@ static parent_call* get_or_create_parent_call(grpc_call* call) {
if (p == nullptr) {
p = (parent_call*)gpr_arena_alloc(call->arena, sizeof(*p));
gpr_mu_init(&p->child_list_mu);
- if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm) nullptr,
- (gpr_atm)p)) {
+ if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm)NULL, (gpr_atm)p)) {
gpr_mu_destroy(&p->child_list_mu);
p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
}
@@ -319,7 +323,8 @@ static parent_call* get_parent_call(grpc_call* call) {
return (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
}
-grpc_error* grpc_call_create(const grpc_call_create_args* args,
+grpc_error* grpc_call_create(grpc_exec_ctx* exec_ctx,
+ const grpc_call_create_args* args,
grpc_call** out_call) {
size_t i, j;
grpc_error* error = GRPC_ERROR_NONE;
@@ -328,7 +333,7 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
grpc_call* call;
GPR_TIMER_BEGIN("grpc_call_create", 0);
size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
- GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
+ GRPC_STATS_INC_CALL_INITIAL_SIZE(exec_ctx, initial_size);
gpr_arena* arena = gpr_arena_create(initial_size);
call = (grpc_call*)gpr_arena_alloc(
arena, sizeof(grpc_call) + channel_stack->call_stack_size);
@@ -343,9 +348,9 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
call->is_client = args->server_transport_data == nullptr;
if (call->is_client) {
- GRPC_STATS_INC_CLIENT_CALLS_CREATED();
+ GRPC_STATS_INC_CLIENT_CALLS_CREATED(exec_ctx);
} else {
- GRPC_STATS_INC_SERVER_CALLS_CREATED();
+ GRPC_STATS_INC_SERVER_CALLS_CREATED(exec_ctx);
}
call->stream_op_payload.context = call->context;
grpc_slice path = grpc_empty_slice();
@@ -440,13 +445,15 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
send_deadline,
call->arena,
&call->call_combiner};
- add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call,
- call, &call_args));
+ add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1,
+ destroy_call, call, &call_args));
if (error != GRPC_ERROR_NONE) {
- cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error));
+ cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
+ GRPC_ERROR_REF(error));
}
if (immediately_cancel) {
- cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED);
+ cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE,
+ GRPC_ERROR_CANCELLED);
}
if (args->cq != nullptr) {
GPR_ASSERT(
@@ -461,17 +468,17 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
args->pollset_set_alternative);
}
if (!grpc_polling_entity_is_empty(&call->pollent)) {
- grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
- &call->pollent);
+ grpc_call_stack_set_pollset_or_pollset_set(
+ exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
}
- grpc_slice_unref_internal(path);
+ grpc_slice_unref_internal(exec_ctx, path);
GPR_TIMER_END("grpc_call_create", 0);
return error;
}
-void grpc_call_set_completion_queue(grpc_call* call,
+void grpc_call_set_completion_queue(grpc_exec_ctx* exec_ctx, grpc_call* call,
grpc_completion_queue* cq) {
GPR_ASSERT(cq);
@@ -482,8 +489,8 @@ void grpc_call_set_completion_queue(grpc_call* call,
call->cq = cq;
GRPC_CQ_INTERNAL_REF(cq, "bind");
call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
- grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
- &call->pollent);
+ grpc_call_stack_set_pollset_or_pollset_set(
+ exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
}
#ifndef NDEBUG
@@ -496,38 +503,40 @@ void grpc_call_set_completion_queue(grpc_call* call,
void grpc_call_internal_ref(grpc_call* c REF_ARG) {
GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
}
-void grpc_call_internal_unref(grpc_call* c REF_ARG) {
- GRPC_CALL_STACK_UNREF(CALL_STACK_FROM_CALL(c), REF_REASON);
+void grpc_call_internal_unref(grpc_exec_ctx* exec_ctx, grpc_call* c REF_ARG) {
+ GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON);
}
-static void release_call(void* call, grpc_error* error) {
+static void release_call(grpc_exec_ctx* exec_ctx, void* call,
+ grpc_error* error) {
grpc_call* c = (grpc_call*)call;
grpc_channel* channel = c->channel;
grpc_call_combiner_destroy(&c->call_combiner);
gpr_free((char*)c->peer_string);
grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
- GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
}
static void set_status_value_directly(grpc_status_code status, void* dest);
-static void destroy_call(void* call, grpc_error* error) {
+static void destroy_call(grpc_exec_ctx* exec_ctx, void* call,
+ grpc_error* error) {
size_t i;
int ii;
grpc_call* c = (grpc_call*)call;
GPR_TIMER_BEGIN("destroy_call", 0);
for (i = 0; i < 2; i++) {
grpc_metadata_batch_destroy(
- &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
+ exec_ctx, &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
}
if (c->receiving_stream != nullptr) {
- grpc_byte_stream_destroy(c->receiving_stream);
+ grpc_byte_stream_destroy(exec_ctx, c->receiving_stream);
}
parent_call* pc = get_parent_call(c);
if (pc != nullptr) {
gpr_mu_destroy(&pc->child_list_mu);
}
for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
- GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md);
+ GRPC_MDELEM_UNREF(exec_ctx, c->send_extra_metadata[ii].md);
}
for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
if (c->context[i].destroy) {
@@ -535,11 +544,12 @@ static void destroy_call(void* call, grpc_error* error) {
}
}
if (c->cq) {
- GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, c->cq, "bind");
}
- get_final_status(c, set_status_value_directly, &c->final_info.final_status,
- nullptr, c->final_info.error_string);
+ get_final_status(exec_ctx, c, set_status_value_directly,
+ &c->final_info.final_status, nullptr,
+ c->final_info.error_string);
c->final_info.stats.latency =
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
@@ -548,7 +558,7 @@ static void destroy_call(void* call, grpc_error* error) {
unpack_received_status(gpr_atm_acq_load(&c->status[i])).error);
}
- grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
+ grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info,
GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
grpc_schedule_on_exec_ctx));
GPR_TIMER_END("destroy_call", 0);
@@ -560,7 +570,7 @@ void grpc_call_unref(grpc_call* c) {
if (!gpr_unref(&c->ext_ref)) return;
child_call* cc = c->child;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_TIMER_BEGIN("grpc_call_unref", 0);
GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
@@ -577,7 +587,7 @@ void grpc_call_unref(grpc_call* c) {
cc->sibling_prev->child->sibling_next = cc->sibling_next;
cc->sibling_next->child->sibling_prev = cc->sibling_prev;
gpr_mu_unlock(&pc->child_list_mu);
- GRPC_CALL_INTERNAL_UNREF(cc->parent, "child");
+ GRPC_CALL_INTERNAL_UNREF(&exec_ctx, cc->parent, "child");
}
GPR_ASSERT(!c->destroy_called);
@@ -585,49 +595,53 @@ void grpc_call_unref(grpc_call* c) {
bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
gpr_atm_acq_load(&c->received_final_op_atm) == 0;
if (cancel) {
- cancel_with_error(c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED);
+ cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE,
+ GRPC_ERROR_CANCELLED);
} else {
// Unset the call combiner cancellation closure. This has the
// effect of scheduling the previously set cancellation closure, if
// any, so that it can release any internal references it may be
// holding to the call stack.
- grpc_call_combiner_set_notify_on_cancel(&c->call_combiner, nullptr);
+ grpc_call_combiner_set_notify_on_cancel(&exec_ctx, &c->call_combiner,
+ nullptr);
}
- GRPC_CALL_INTERNAL_UNREF(c, "destroy");
-
+ GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy");
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_call_unref", 0);
}
grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
GPR_ASSERT(!reserved);
- grpc_core::ExecCtx exec_ctx;
- cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED);
-
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ cancel_with_error(&exec_ctx, call, STATUS_FROM_API_OVERRIDE,
+ GRPC_ERROR_CANCELLED);
+ grpc_exec_ctx_finish(&exec_ctx);
return GRPC_CALL_OK;
}
// This is called via the call combiner to start sending a batch down
// the filter stack.
-static void execute_batch_in_call_combiner(void* arg, grpc_error* ignored) {
+static void execute_batch_in_call_combiner(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* ignored) {
grpc_transport_stream_op_batch* batch = (grpc_transport_stream_op_batch*)arg;
grpc_call* call = (grpc_call*)batch->handler_private.extra_arg;
GPR_TIMER_BEGIN("execute_batch", 0);
grpc_call_element* elem = CALL_ELEM_FROM_CALL(call, 0);
GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
- elem->filter->start_transport_stream_op_batch(elem, batch);
+ elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch);
GPR_TIMER_END("execute_batch", 0);
}
// start_batch_closure points to a caller-allocated closure to be used
// for entering the call combiner.
-static void execute_batch(grpc_call* call,
+static void execute_batch(grpc_exec_ctx* exec_ctx, grpc_call* call,
grpc_transport_stream_op_batch* batch,
grpc_closure* start_batch_closure) {
batch->handler_private.extra_arg = call;
GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(&call->call_combiner, start_batch_closure,
+ GRPC_CALL_COMBINER_START(exec_ctx, &call->call_combiner, start_batch_closure,
GRPC_ERROR_NONE, "executing batch");
}
@@ -651,14 +665,15 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
grpc_status_code status,
const char* description,
void* reserved) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE(
"grpc_call_cancel_with_status("
"c=%p, status=%d, description=%s, reserved=%p)",
4, (c, (int)status, description, reserved));
GPR_ASSERT(reserved == nullptr);
- cancel_with_status(c, STATUS_FROM_API_OVERRIDE, status, description);
-
+ cancel_with_status(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, status,
+ description);
+ grpc_exec_ctx_finish(&exec_ctx);
return GRPC_CALL_OK;
}
@@ -670,23 +685,24 @@ typedef struct {
// The on_complete callback used when sending a cancel_stream batch down
// the filter stack. Yields the call combiner when the batch is done.
-static void done_termination(void* arg, grpc_error* error) {
+static void done_termination(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
cancel_state* state = (cancel_state*)arg;
- GRPC_CALL_COMBINER_STOP(&state->call->call_combiner,
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &state->call->call_combiner,
"on_complete for cancel_stream op");
- GRPC_CALL_INTERNAL_UNREF(state->call, "termination");
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, state->call, "termination");
gpr_free(state);
}
-static void cancel_with_error(grpc_call* c, status_source source,
- grpc_error* error) {
+static void cancel_with_error(grpc_exec_ctx* exec_ctx, grpc_call* c,
+ status_source source, grpc_error* error) {
GRPC_CALL_INTERNAL_REF(c, "termination");
// Inform the call combiner of the cancellation, so that it can cancel
// any in-flight asynchronous actions that may be holding the call
// combiner. This ensures that the cancel_stream batch can be sent
// down the filter stack in a timely manner.
- grpc_call_combiner_cancel(&c->call_combiner, GRPC_ERROR_REF(error));
- set_status_from_error(c, source, GRPC_ERROR_REF(error));
+ grpc_call_combiner_cancel(exec_ctx, &c->call_combiner, GRPC_ERROR_REF(error));
+ set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
cancel_state* state = (cancel_state*)gpr_malloc(sizeof(*state));
state->call = c;
GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
@@ -695,7 +711,7 @@ static void cancel_with_error(grpc_call* c, status_source source,
grpc_make_transport_stream_op(&state->finish_batch);
op->cancel_stream = true;
op->payload->cancel_stream.cancel_error = error;
- execute_batch(c, op, &state->start_batch);
+ execute_batch(exec_ctx, c, op, &state->start_batch);
}
static grpc_error* error_from_status(grpc_status_code status,
@@ -709,10 +725,11 @@ static grpc_error* error_from_status(grpc_status_code status,
GRPC_ERROR_INT_GRPC_STATUS, status);
}
-static void cancel_with_status(grpc_call* c, status_source source,
- grpc_status_code status,
+static void cancel_with_status(grpc_exec_ctx* exec_ctx, grpc_call* c,
+ status_source source, grpc_status_code status,
const char* description) {
- cancel_with_error(c, source, error_from_status(status, description));
+ cancel_with_error(exec_ctx, c, source,
+ error_from_status(status, description));
}
/*******************************************************************************
@@ -720,13 +737,14 @@ static void cancel_with_status(grpc_call* c, status_source source,
*/
static bool get_final_status_from(
- grpc_call* call, grpc_error* error, bool allow_ok_status,
+ grpc_exec_ctx* exec_ctx, grpc_call* call, grpc_error* error,
+ bool allow_ok_status,
void (*set_value)(grpc_status_code code, void* user_data),
void* set_value_user_data, grpc_slice* details, const char** error_string) {
grpc_status_code code;
grpc_slice slice = grpc_empty_slice();
- grpc_error_get_status(error, call->send_deadline, &code, &slice, nullptr,
- error_string);
+ grpc_error_get_status(exec_ctx, error, call->send_deadline, &code, &slice,
+ nullptr, error_string);
if (code == GRPC_STATUS_OK && !allow_ok_status) {
return false;
}
@@ -738,9 +756,11 @@ static bool get_final_status_from(
return true;
}
-static void get_final_status(
- grpc_call* call, void (*set_value)(grpc_status_code code, void* user_data),
- void* set_value_user_data, grpc_slice* details, const char** error_string) {
+static void get_final_status(grpc_exec_ctx* exec_ctx, grpc_call* call,
+ void (*set_value)(grpc_status_code code,
+ void* user_data),
+ void* set_value_user_data, grpc_slice* details,
+ const char** error_string) {
int i;
received_status status[STATUS_SOURCE_COUNT];
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
@@ -762,9 +782,9 @@ static void get_final_status(
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (status[i].is_set &&
grpc_error_has_clear_grpc_status(status[i].error)) {
- if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
- set_value, set_value_user_data, details,
- error_string)) {
+ if (get_final_status_from(exec_ctx, call, status[i].error,
+ allow_ok_status != 0, set_value,
+ set_value_user_data, details, error_string)) {
return;
}
}
@@ -772,9 +792,9 @@ static void get_final_status(
/* If no clearly defined status exists, search for 'anything' */
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (status[i].is_set) {
- if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
- set_value, set_value_user_data, details,
- error_string)) {
+ if (get_final_status_from(exec_ctx, call, status[i].error,
+ allow_ok_status != 0, set_value,
+ set_value_user_data, details, error_string)) {
return;
}
}
@@ -788,8 +808,8 @@ static void get_final_status(
}
}
-static void set_status_from_error(grpc_call* call, status_source source,
- grpc_error* error) {
+static void set_status_from_error(grpc_exec_ctx* exec_ctx, grpc_call* call,
+ status_source source, grpc_error* error) {
if (!gpr_atm_rel_cas(&call->status[source],
pack_received_status({false, GRPC_ERROR_NONE}),
pack_received_status({true, error}))) {
@@ -841,7 +861,8 @@ uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {
static void destroy_encodings_accepted_by_peer(void* p) { return; }
-static void set_encodings_accepted_by_peer(grpc_call* call, grpc_mdelem mdel) {
+static void set_encodings_accepted_by_peer(grpc_exec_ctx* exec_ctx,
+ grpc_call* call, grpc_mdelem mdel) {
size_t i;
grpc_compression_algorithm algorithm;
grpc_slice_buffer accept_encoding_parts;
@@ -879,14 +900,15 @@ static void set_encodings_accepted_by_peer(grpc_call* call, grpc_mdelem mdel) {
}
}
- grpc_slice_buffer_destroy_internal(&accept_encoding_parts);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &accept_encoding_parts);
grpc_mdelem_set_user_data(
mdel, destroy_encodings_accepted_by_peer,
(void*)(((uintptr_t)call->encodings_accepted_by_peer) + 1));
}
-static void set_stream_encodings_accepted_by_peer(grpc_call* call,
+static void set_stream_encodings_accepted_by_peer(grpc_exec_ctx* exec_ctx,
+ grpc_call* call,
grpc_mdelem mdel) {
size_t i;
grpc_stream_compression_algorithm algorithm;
@@ -924,7 +946,7 @@ static void set_stream_encodings_accepted_by_peer(grpc_call* call,
}
}
- grpc_slice_buffer_destroy_internal(&accept_encoding_parts);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &accept_encoding_parts);
grpc_mdelem_set_user_data(
mdel, destroy_encodings_accepted_by_peer,
@@ -962,12 +984,10 @@ static grpc_metadata* get_md_elem(grpc_metadata* metadata,
return res;
}
-static int prepare_application_metadata(grpc_call* call, int count,
- grpc_metadata* metadata,
- int is_trailing,
- int prepend_extra_metadata,
- grpc_metadata* additional_metadata,
- int additional_metadata_count) {
+static int prepare_application_metadata(
+ grpc_exec_ctx* exec_ctx, grpc_call* call, int count,
+ grpc_metadata* metadata, int is_trailing, int prepend_extra_metadata,
+ grpc_metadata* additional_metadata, int additional_metadata_count) {
int total_count = count + additional_metadata_count;
int i;
grpc_metadata_batch* batch =
@@ -986,14 +1006,14 @@ static int prepare_application_metadata(grpc_call* call, int count,
grpc_validate_header_nonbin_value_is_legal(md->value))) {
break;
}
- l->md = grpc_mdelem_from_grpc_metadata((grpc_metadata*)md);
+ l->md = grpc_mdelem_from_grpc_metadata(exec_ctx, (grpc_metadata*)md);
}
if (i != total_count) {
for (int j = 0; j < i; j++) {
const grpc_metadata* md =
get_md_elem(metadata, additional_metadata, j, count);
grpc_linked_mdelem* l = linked_from_md(md);
- GRPC_MDELEM_UNREF(l->md);
+ GRPC_MDELEM_UNREF(exec_ctx, l->md);
}
return 0;
}
@@ -1004,16 +1024,16 @@ static int prepare_application_metadata(grpc_call* call, int count,
for (i = 0; i < call->send_extra_metadata_count; i++) {
GRPC_LOG_IF_ERROR("prepare_application_metadata",
grpc_metadata_batch_link_tail(
- batch, &call->send_extra_metadata[i]));
+ exec_ctx, batch, &call->send_extra_metadata[i]));
}
}
}
for (i = 0; i < total_count; i++) {
grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
grpc_linked_mdelem* l = linked_from_md(md);
- grpc_error* error = grpc_metadata_batch_link_tail(batch, l);
+ grpc_error* error = grpc_metadata_batch_link_tail(exec_ctx, batch, l);
if (error != GRPC_ERROR_NONE) {
- GRPC_MDELEM_UNREF(l->md);
+ GRPC_MDELEM_UNREF(exec_ctx, l->md);
}
GRPC_LOG_IF_ERROR("prepare_application_metadata", error);
}
@@ -1100,43 +1120,46 @@ static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
GPR_TIMER_END("publish_app_metadata", 0);
}
-static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
+static void recv_initial_filter(grpc_exec_ctx* exec_ctx, grpc_call* call,
+ grpc_metadata_batch* b) {
if (b->idx.named.content_encoding != nullptr) {
if (b->idx.named.grpc_encoding != nullptr) {
gpr_log(GPR_ERROR,
"Received both content-encoding and grpc-encoding header. "
"Ignoring grpc-encoding.");
- grpc_metadata_batch_remove(b, b->idx.named.grpc_encoding);
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding);
}
GPR_TIMER_BEGIN("incoming_stream_compression_algorithm", 0);
set_incoming_stream_compression_algorithm(
call, decode_stream_compression(b->idx.named.content_encoding->md));
GPR_TIMER_END("incoming_stream_compression_algorithm", 0);
- grpc_metadata_batch_remove(b, b->idx.named.content_encoding);
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.content_encoding);
} else if (b->idx.named.grpc_encoding != nullptr) {
GPR_TIMER_BEGIN("incoming_compression_algorithm", 0);
set_incoming_compression_algorithm(
call, decode_compression(b->idx.named.grpc_encoding->md));
GPR_TIMER_END("incoming_compression_algorithm", 0);
- grpc_metadata_batch_remove(b, b->idx.named.grpc_encoding);
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding);
}
if (b->idx.named.grpc_accept_encoding != nullptr) {
GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0);
- set_encodings_accepted_by_peer(call, b->idx.named.grpc_accept_encoding->md);
- grpc_metadata_batch_remove(b, b->idx.named.grpc_accept_encoding);
+ set_encodings_accepted_by_peer(exec_ctx, call,
+ b->idx.named.grpc_accept_encoding->md);
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_accept_encoding);
GPR_TIMER_END("encodings_accepted_by_peer", 0);
}
if (b->idx.named.accept_encoding != nullptr) {
GPR_TIMER_BEGIN("stream_encodings_accepted_by_peer", 0);
- set_stream_encodings_accepted_by_peer(call,
+ set_stream_encodings_accepted_by_peer(exec_ctx, call,
b->idx.named.accept_encoding->md);
- grpc_metadata_batch_remove(b, b->idx.named.accept_encoding);
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.accept_encoding);
GPR_TIMER_END("stream_encodings_accepted_by_peer", 0);
}
publish_app_metadata(call, b, false);
}
-static void recv_trailing_filter(void* args, grpc_metadata_batch* b) {
+static void recv_trailing_filter(grpc_exec_ctx* exec_ctx, void* args,
+ grpc_metadata_batch* b) {
grpc_call* call = (grpc_call*)args;
if (b->idx.named.grpc_status != nullptr) {
uint32_t status_code = decode_status(b->idx.named.grpc_status->md);
@@ -1151,13 +1174,13 @@ static void recv_trailing_filter(void* args, grpc_metadata_batch* b) {
error = grpc_error_set_str(
error, GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md)));
- grpc_metadata_batch_remove(b, b->idx.named.grpc_message);
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_message);
} else if (error != GRPC_ERROR_NONE) {
error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_empty_slice());
}
- set_status_from_error(call, STATUS_FROM_WIRE, error);
- grpc_metadata_batch_remove(b, b->idx.named.grpc_status);
+ set_status_from_error(exec_ctx, call, STATUS_FROM_WIRE, error);
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_status);
}
publish_app_metadata(call, b, true);
}
@@ -1234,12 +1257,12 @@ static batch_control* allocate_batch_control(grpc_call* call,
return bctl;
}
-static void finish_batch_completion(void* user_data,
+static void finish_batch_completion(grpc_exec_ctx* exec_ctx, void* user_data,
grpc_cq_completion* storage) {
batch_control* bctl = (batch_control*)user_data;
grpc_call* call = bctl->call;
bctl->call = nullptr;
- GRPC_CALL_INTERNAL_UNREF(call, "completion");
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
}
static grpc_error* consolidate_batch_errors(batch_control* bctl) {
@@ -1263,13 +1286,15 @@ static grpc_error* consolidate_batch_errors(batch_control* bctl) {
}
}
-static void post_batch_completion(batch_control* bctl) {
+static void post_batch_completion(grpc_exec_ctx* exec_ctx,
+ batch_control* bctl) {
grpc_call* next_child_call;
grpc_call* call = bctl->call;
grpc_error* error = consolidate_batch_errors(bctl);
if (bctl->op.send_initial_metadata) {
grpc_metadata_batch_destroy(
+ exec_ctx,
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
if (bctl->op.send_message) {
@@ -1277,12 +1302,13 @@ static void post_batch_completion(batch_control* bctl) {
}
if (bctl->op.send_trailing_metadata) {
grpc_metadata_batch_destroy(
+ exec_ctx,
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
}
if (bctl->op.recv_trailing_metadata) {
grpc_metadata_batch* md =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- recv_trailing_filter(call, md);
+ recv_trailing_filter(exec_ctx, call, md);
/* propagate cancellation to any interested children */
gpr_atm_rel_store(&call->received_final_op_atm, 1);
@@ -1296,9 +1322,9 @@ static void post_batch_completion(batch_control* bctl) {
next_child_call = child->child->sibling_next;
if (child->cancellation_is_inherited) {
GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
- cancel_with_error(child, STATUS_FROM_API_OVERRIDE,
+ cancel_with_error(exec_ctx, child, STATUS_FROM_API_OVERRIDE,
GRPC_ERROR_CANCELLED);
- GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel");
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, child, "propagate_cancel");
}
child = next_child_call;
} while (child != pc->first_child);
@@ -1307,12 +1333,12 @@ static void post_batch_completion(batch_control* bctl) {
}
if (call->is_client) {
- get_final_status(call, set_status_value_directly,
+ get_final_status(exec_ctx, call, set_status_value_directly,
call->final_op.client.status,
call->final_op.client.status_details,
call->final_op.client.error_string);
} else {
- get_final_status(call, set_cancelled_value,
+ get_final_status(exec_ctx, call, set_cancelled_value,
call->final_op.server.cancelled, nullptr, nullptr);
}
@@ -1328,24 +1354,25 @@ static void post_batch_completion(batch_control* bctl) {
if (bctl->completion_data.notify_tag.is_closure) {
/* unrefs bctl->error */
bctl->call = nullptr;
- GRPC_CLOSURE_RUN((grpc_closure*)bctl->completion_data.notify_tag.tag,
- error);
- GRPC_CALL_INTERNAL_UNREF(call, "completion");
+ GRPC_CLOSURE_RUN(
+ exec_ctx, (grpc_closure*)bctl->completion_data.notify_tag.tag, error);
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
} else {
/* unrefs bctl->error */
- grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
- finish_batch_completion, bctl,
- &bctl->completion_data.cq_completion);
+ grpc_cq_end_op(
+ exec_ctx, bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
+ finish_batch_completion, bctl, &bctl->completion_data.cq_completion);
}
}
-static void finish_batch_step(batch_control* bctl) {
+static void finish_batch_step(grpc_exec_ctx* exec_ctx, batch_control* bctl) {
if (gpr_unref(&bctl->steps_to_complete)) {
- post_batch_completion(bctl);
+ post_batch_completion(exec_ctx, bctl);
}
}
-static void continue_receiving_slices(batch_control* bctl) {
+static void continue_receiving_slices(grpc_exec_ctx* exec_ctx,
+ batch_control* bctl) {
grpc_error* error;
grpc_call* call = bctl->call;
for (;;) {
@@ -1353,25 +1380,25 @@ static void continue_receiving_slices(batch_control* bctl) {
(*call->receiving_buffer)->data.raw.slice_buffer.length;
if (remaining == 0) {
call->receiving_message = 0;
- grpc_byte_stream_destroy(call->receiving_stream);
+ grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
call->receiving_stream = nullptr;
- finish_batch_step(bctl);
+ finish_batch_step(exec_ctx, bctl);
return;
}
- if (grpc_byte_stream_next(call->receiving_stream, remaining,
+ if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, remaining,
&call->receiving_slice_ready)) {
- error =
- grpc_byte_stream_pull(call->receiving_stream, &call->receiving_slice);
+ error = grpc_byte_stream_pull(exec_ctx, call->receiving_stream,
+ &call->receiving_slice);
if (error == GRPC_ERROR_NONE) {
grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
call->receiving_slice);
} else {
- grpc_byte_stream_destroy(call->receiving_stream);
+ grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
call->receiving_stream = nullptr;
grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = nullptr;
call->receiving_message = 0;
- finish_batch_step(bctl);
+ finish_batch_step(exec_ctx, bctl);
return;
}
} else {
@@ -1380,7 +1407,8 @@ static void continue_receiving_slices(batch_control* bctl) {
}
}
-static void receiving_slice_ready(void* bctlp, grpc_error* error) {
+static void receiving_slice_ready(grpc_exec_ctx* exec_ctx, void* bctlp,
+ grpc_error* error) {
batch_control* bctl = (batch_control*)bctlp;
grpc_call* call = bctl->call;
grpc_byte_stream* bs = call->receiving_stream;
@@ -1388,11 +1416,11 @@ static void receiving_slice_ready(void* bctlp, grpc_error* error) {
if (error == GRPC_ERROR_NONE) {
grpc_slice slice;
- error = grpc_byte_stream_pull(bs, &slice);
+ error = grpc_byte_stream_pull(exec_ctx, bs, &slice);
if (error == GRPC_ERROR_NONE) {
grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
slice);
- continue_receiving_slices(bctl);
+ continue_receiving_slices(exec_ctx, bctl);
} else {
/* Error returned by grpc_byte_stream_pull needs to be released manually
*/
@@ -1404,24 +1432,25 @@ static void receiving_slice_ready(void* bctlp, grpc_error* error) {
if (grpc_trace_operation_failures.enabled()) {
GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
}
- grpc_byte_stream_destroy(call->receiving_stream);
+ grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
call->receiving_stream = nullptr;
grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = nullptr;
call->receiving_message = 0;
- finish_batch_step(bctl);
+ finish_batch_step(exec_ctx, bctl);
if (release_error) {
GRPC_ERROR_UNREF(error);
}
}
}
-static void process_data_after_md(batch_control* bctl) {
+static void process_data_after_md(grpc_exec_ctx* exec_ctx,
+ batch_control* bctl) {
grpc_call* call = bctl->call;
if (call->receiving_stream == nullptr) {
*call->receiving_buffer = nullptr;
call->receiving_message = 0;
- finish_batch_step(bctl);
+ finish_batch_step(exec_ctx, bctl);
} else {
call->test_only_last_message_flags = call->receiving_stream->flags;
if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
@@ -1433,42 +1462,46 @@ static void process_data_after_md(batch_control* bctl) {
}
GRPC_CLOSURE_INIT(&call->receiving_slice_ready, receiving_slice_ready, bctl,
grpc_schedule_on_exec_ctx);
- continue_receiving_slices(bctl);
+ continue_receiving_slices(exec_ctx, bctl);
}
}
-static void receiving_stream_ready(void* bctlp, grpc_error* error) {
+static void receiving_stream_ready(grpc_exec_ctx* exec_ctx, void* bctlp,
+ grpc_error* error) {
batch_control* bctl = (batch_control*)bctlp;
grpc_call* call = bctl->call;
if (error != GRPC_ERROR_NONE) {
if (call->receiving_stream != nullptr) {
- grpc_byte_stream_destroy(call->receiving_stream);
+ grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
call->receiving_stream = nullptr;
}
- add_batch_error(bctl, GRPC_ERROR_REF(error), true);
- cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error));
+ add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), true);
+ cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
+ GRPC_ERROR_REF(error));
}
/* If recv_state is RECV_NONE, we will save the batch_control
* object with rel_cas, and will not use it after the cas. Its corresponding
* acq_load is in receiving_initial_metadata_ready() */
if (error != GRPC_ERROR_NONE || call->receiving_stream == nullptr ||
!gpr_atm_rel_cas(&call->recv_state, RECV_NONE, (gpr_atm)bctlp)) {
- process_data_after_md(bctl);
+ process_data_after_md(exec_ctx, bctl);
}
}
// The recv_message_ready callback used when sending a batch containing
// a recv_message op down the filter stack. Yields the call combiner
// before processing the received message.
-static void receiving_stream_ready_in_call_combiner(void* bctlp,
+static void receiving_stream_ready_in_call_combiner(grpc_exec_ctx* exec_ctx,
+ void* bctlp,
grpc_error* error) {
batch_control* bctl = (batch_control*)bctlp;
grpc_call* call = bctl->call;
- GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready");
- receiving_stream_ready(bctlp, error);
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "recv_message_ready");
+ receiving_stream_ready(exec_ctx, bctlp, error);
}
-static void validate_filtered_metadata(batch_control* bctl) {
+static void validate_filtered_metadata(grpc_exec_ctx* exec_ctx,
+ batch_control* bctl) {
grpc_call* call = bctl->call;
/* validate compression algorithms */
if (call->incoming_stream_compression_algorithm !=
@@ -1482,8 +1515,8 @@ static void validate_filtered_metadata(batch_control* bctl) {
gpr_asprintf(&error_msg,
"Invalid stream compression algorithm value '%d'.", algo);
gpr_log(GPR_ERROR, "%s", error_msg);
- cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED,
- error_msg);
+ cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
+ GRPC_STATUS_UNIMPLEMENTED, error_msg);
} else if (grpc_compression_options_is_stream_compression_algorithm_enabled(
&compression_options, algo) == 0) {
/* check if algorithm is supported by current channel config */
@@ -1492,8 +1525,8 @@ static void validate_filtered_metadata(batch_control* bctl) {
gpr_asprintf(&error_msg, "Stream compression algorithm '%s' is disabled.",
algo_name);
gpr_log(GPR_ERROR, "%s", error_msg);
- cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED,
- error_msg);
+ cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
+ GRPC_STATUS_UNIMPLEMENTED, error_msg);
}
gpr_free(error_msg);
@@ -1523,8 +1556,8 @@ static void validate_filtered_metadata(batch_control* bctl) {
gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.",
algo);
gpr_log(GPR_ERROR, "%s", error_msg);
- cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED,
- error_msg);
+ cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
+ GRPC_STATUS_UNIMPLEMENTED, error_msg);
} else if (grpc_compression_options_is_algorithm_enabled(
&compression_options, algo) == 0) {
/* check if algorithm is supported by current channel config */
@@ -1533,8 +1566,8 @@ static void validate_filtered_metadata(batch_control* bctl) {
gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
algo_name);
gpr_log(GPR_ERROR, "%s", error_msg);
- cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED,
- error_msg);
+ cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
+ GRPC_STATUS_UNIMPLEMENTED, error_msg);
} else {
call->incoming_compression_algorithm = algo;
}
@@ -1557,31 +1590,34 @@ static void validate_filtered_metadata(batch_control* bctl) {
}
}
-static void add_batch_error(batch_control* bctl, grpc_error* error,
- bool has_cancelled) {
+static void add_batch_error(grpc_exec_ctx* exec_ctx, batch_control* bctl,
+ grpc_error* error, bool has_cancelled) {
if (error == GRPC_ERROR_NONE) return;
int idx = (int)gpr_atm_full_fetch_add(&bctl->num_errors, 1);
if (idx == 0 && !has_cancelled) {
- cancel_with_error(bctl->call, STATUS_FROM_CORE, GRPC_ERROR_REF(error));
+ cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE,
+ GRPC_ERROR_REF(error));
}
bctl->errors[idx] = error;
}
-static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
+static void receiving_initial_metadata_ready(grpc_exec_ctx* exec_ctx,
+ void* bctlp, grpc_error* error) {
batch_control* bctl = (batch_control*)bctlp;
grpc_call* call = bctl->call;
- GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner,
+ "recv_initial_metadata_ready");
- add_batch_error(bctl, GRPC_ERROR_REF(error), false);
+ add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch* md =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
- recv_initial_filter(call, md);
+ recv_initial_filter(exec_ctx, call, md);
/* TODO(ctiller): this could be moved into recv_initial_filter now */
GPR_TIMER_BEGIN("validate_filtered_metadata", 0);
- validate_filtered_metadata(bctl);
+ validate_filtered_metadata(exec_ctx, bctl);
GPR_TIMER_END("validate_filtered_metadata", 0);
if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) {
@@ -1614,25 +1650,28 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
}
}
if (saved_rsr_closure != nullptr) {
- GRPC_CLOSURE_RUN(saved_rsr_closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_RUN(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error));
}
- finish_batch_step(bctl);
+ finish_batch_step(exec_ctx, bctl);
}
-static void finish_batch(void* bctlp, grpc_error* error) {
+static void finish_batch(grpc_exec_ctx* exec_ctx, void* bctlp,
+ grpc_error* error) {
batch_control* bctl = (batch_control*)bctlp;
grpc_call* call = bctl->call;
- GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
- add_batch_error(bctl, GRPC_ERROR_REF(error), false);
- finish_batch_step(bctl);
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "on_complete");
+ add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
+ finish_batch_step(exec_ctx, bctl);
}
-static void free_no_op_completion(void* p, grpc_cq_completion* completion) {
+static void free_no_op_completion(grpc_exec_ctx* exec_ctx, void* p,
+ grpc_cq_completion* completion) {
gpr_free(completion);
}
-static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
+static grpc_call_error call_start_batch(grpc_exec_ctx* exec_ctx,
+ grpc_call* call, const grpc_op* ops,
size_t nops, void* notify_tag,
int is_notify_tag_closure) {
size_t i;
@@ -1650,10 +1689,11 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
if (!is_notify_tag_closure) {
GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
grpc_cq_end_op(
- call->cq, notify_tag, GRPC_ERROR_NONE, free_no_op_completion, nullptr,
+ exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE,
+ free_no_op_completion, nullptr,
(grpc_cq_completion*)gpr_malloc(sizeof(grpc_cq_completion)));
} else {
- GRPC_CLOSURE_SCHED((grpc_closure*)notify_tag, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, (grpc_closure*)notify_tag, GRPC_ERROR_NONE);
}
error = GRPC_CALL_OK;
goto done;
@@ -1753,7 +1793,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op->send_initial_metadata = true;
call->sent_initial_metadata = true;
if (!prepare_application_metadata(
- call, (int)op->data.send_initial_metadata.count,
+ exec_ctx, call, (int)op->data.send_initial_metadata.count,
op->data.send_initial_metadata.metadata, 0, call->is_client,
&call->compression_md, (int)additional_metadata_count)) {
error = GRPC_CALL_ERROR_INVALID_METADATA;
@@ -1847,7 +1887,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
GPR_ASSERT(call->send_extra_metadata_count == 0);
call->send_extra_metadata_count = 1;
call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem(
- call->channel, op->data.send_status_from_server.status);
+ exec_ctx, call->channel, op->data.send_status_from_server.status);
{
grpc_error* override_error = GRPC_ERROR_NONE;
if (op->data.send_status_from_server.status != GRPC_STATUS_OK) {
@@ -1856,7 +1896,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
}
if (op->data.send_status_from_server.status_details != nullptr) {
call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
- GRPC_MDSTR_GRPC_MESSAGE,
+ exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
grpc_slice_ref_internal(
*op->data.send_status_from_server.status_details));
call->send_extra_metadata_count++;
@@ -1867,15 +1907,16 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
grpc_slice_from_copied_string(msg));
gpr_free(msg);
}
- set_status_from_error(call, STATUS_FROM_API_OVERRIDE, override_error);
+ set_status_from_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE,
+ override_error);
}
if (!prepare_application_metadata(
- call,
+ exec_ctx, call,
(int)op->data.send_status_from_server.trailing_metadata_count,
op->data.send_status_from_server.trailing_metadata, 1, 1,
nullptr, 0)) {
for (int n = 0; n < call->send_extra_metadata_count; n++) {
- GRPC_MDELEM_UNREF(call->send_extra_metadata[n].md);
+ GRPC_MDELEM_UNREF(exec_ctx, call->send_extra_metadata[n].md);
}
call->send_extra_metadata_count = 0;
error = GRPC_CALL_ERROR_INVALID_METADATA;
@@ -2004,7 +2045,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op->on_complete = &bctl->finish_batch;
gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
- execute_batch(call, stream_op, &bctl->start_batch);
+ execute_batch(exec_ctx, call, stream_op, &bctl->start_batch);
done:
GPR_TIMER_END("grpc_call_start_batch", 0);
@@ -2014,15 +2055,15 @@ done_with_error:
/* reverse any mutations that occured */
if (stream_op->send_initial_metadata) {
call->sent_initial_metadata = false;
- grpc_metadata_batch_clear(&call->metadata_batch[0][0]);
+ grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][0]);
}
if (stream_op->send_message) {
call->sending_message = false;
- grpc_byte_stream_destroy(&call->sending_stream.base);
+ grpc_byte_stream_destroy(exec_ctx, &call->sending_stream.base);
}
if (stream_op->send_trailing_metadata) {
call->sent_final_op = false;
- grpc_metadata_batch_clear(&call->metadata_batch[0][1]);
+ grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][1]);
}
if (stream_op->recv_initial_metadata) {
call->received_initial_metadata = false;
@@ -2038,7 +2079,7 @@ done_with_error:
grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
size_t nops, void* tag, void* reserved) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_call_error err;
GRPC_API_TRACE(
@@ -2049,17 +2090,19 @@ grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
if (reserved != nullptr) {
err = GRPC_CALL_ERROR;
} else {
- err = call_start_batch(call, ops, nops, tag, 0);
+ err = call_start_batch(&exec_ctx, call, ops, nops, tag, 0);
}
+ grpc_exec_ctx_finish(&exec_ctx);
return err;
}
-grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
+grpc_call_error grpc_call_start_batch_and_execute(grpc_exec_ctx* exec_ctx,
+ grpc_call* call,
const grpc_op* ops,
size_t nops,
grpc_closure* closure) {
- return call_start_batch(call, ops, nops, closure, 1);
+ return call_start_batch(exec_ctx, call, ops, nops, closure, 1);
}
void grpc_call_context_set(grpc_call* call, grpc_context_index elem,