diff options
Diffstat (limited to 'src/core/lib/surface/call.cc')
-rw-r--r-- | src/core/lib/surface/call.cc | 417 |
1 files changed, 188 insertions, 229 deletions
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 8216aa0ec8..26002056ea 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -271,29 +271,26 @@ grpc_tracer_flag grpc_compression_trace = #define CALL_FROM_TOP_ELEM(top_elem) \ CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem)) -static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_transport_stream_op_batch *op, +static void execute_batch(grpc_call *call, grpc_transport_stream_op_batch *op, grpc_closure *start_batch_closure); -static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, - status_source source, grpc_status_code status, +static void cancel_with_status(grpc_call *c, status_source source, + grpc_status_code status, const char *description); -static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, - status_source source, grpc_error *error); -static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, - grpc_error *error); -static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, - grpc_error *error); -static void get_final_status(grpc_exec_ctx *exec_ctx, grpc_call *call, +static void cancel_with_error(grpc_call *c, status_source source, + grpc_error *error); +static void destroy_call(void *call_stack, grpc_error *error); +static void receiving_slice_ready(void *bctlp, grpc_error *error); +static void get_final_status(grpc_call *call, void (*set_value)(grpc_status_code code, void *user_data), void *set_value_user_data, grpc_slice *details); static void set_status_value_directly(grpc_status_code status, void *dest); -static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call, - status_source source, grpc_error *error); -static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl); -static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl); -static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl, - grpc_error *error, bool has_cancelled); +static void set_status_from_error(grpc_call *call, status_source source, + grpc_error *error); +static void process_data_after_md(batch_control *bctl); +static void post_batch_completion(batch_control *bctl); +static void add_batch_error(batch_control *bctl, grpc_error *error, + bool has_cancelled); static void add_init_error(grpc_error **composite, grpc_error *new_err) { if (new_err == GRPC_ERROR_NONE) return; @@ -323,8 +320,7 @@ static parent_call *get_parent_call(grpc_call *call) { return (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); } -grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, - const grpc_call_create_args *args, +grpc_error *grpc_call_create(const grpc_call_create_args *args, grpc_call **out_call) { size_t i, j; grpc_error *error = GRPC_ERROR_NONE; @@ -333,7 +329,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, grpc_call *call; GPR_TIMER_BEGIN("grpc_call_create", 0); size_t initial_size = grpc_channel_get_call_size_estimate(args->channel); - GRPC_STATS_INC_CALL_INITIAL_SIZE(exec_ctx, initial_size); + GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size); gpr_arena *arena = gpr_arena_create(initial_size); call = (grpc_call *)gpr_arena_alloc( arena, sizeof(grpc_call) + channel_stack->call_stack_size); @@ -348,9 +344,9 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); call->is_client = args->server_transport_data == NULL; if (call->is_client) { - GRPC_STATS_INC_CLIENT_CALLS_CREATED(exec_ctx); + GRPC_STATS_INC_CLIENT_CALLS_CREATED(); } else { - GRPC_STATS_INC_SERVER_CALLS_CREATED(exec_ctx); + GRPC_STATS_INC_SERVER_CALLS_CREATED(); } call->stream_op_payload.context = call->context; grpc_slice path = grpc_empty_slice(); @@ -445,15 +441,13 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, send_deadline, call->arena, &call->call_combiner}; - add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1, - destroy_call, call, &call_args)); + add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call, + call, &call_args)); if (error != GRPC_ERROR_NONE) { - cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE, - GRPC_ERROR_REF(error)); + cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); } if (immediately_cancel) { - cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, - GRPC_ERROR_CANCELLED); + cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); } if (args->cq != NULL) { GPR_ASSERT( @@ -468,17 +462,17 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, args->pollset_set_alternative); } if (!grpc_polling_entity_is_empty(&call->pollent)) { - grpc_call_stack_set_pollset_or_pollset_set( - exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent); + grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call), + &call->pollent); } - grpc_slice_unref_internal(exec_ctx, path); + grpc_slice_unref_internal(path); GPR_TIMER_END("grpc_call_create", 0); return error; } -void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call, +void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq) { GPR_ASSERT(cq); @@ -489,8 +483,8 @@ void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call, call->cq = cq; GRPC_CQ_INTERNAL_REF(cq, "bind"); call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)); - grpc_call_stack_set_pollset_or_pollset_set( - exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent); + grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call), + &call->pollent); } #ifndef NDEBUG @@ -503,40 +497,38 @@ void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call, void grpc_call_internal_ref(grpc_call *c REF_ARG) { GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON); } -void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { - GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON); +void grpc_call_internal_unref(grpc_call *c REF_ARG) { + GRPC_CALL_STACK_UNREF(CALL_STACK_FROM_CALL(c), REF_REASON); } -static void release_call(grpc_exec_ctx *exec_ctx, void *call, - grpc_error *error) { +static void release_call(void *call, grpc_error *error) { grpc_call *c = (grpc_call *)call; grpc_channel *channel = c->channel; grpc_call_combiner_destroy(&c->call_combiner); gpr_free((char *)c->peer_string); grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena)); - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); + GRPC_CHANNEL_INTERNAL_UNREF(channel, "call"); } static void set_status_value_directly(grpc_status_code status, void *dest); -static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, - grpc_error *error) { +static void destroy_call(void *call, grpc_error *error) { size_t i; int ii; grpc_call *c = (grpc_call *)call; GPR_TIMER_BEGIN("destroy_call", 0); for (i = 0; i < 2; i++) { grpc_metadata_batch_destroy( - exec_ctx, &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]); + &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]); } if (c->receiving_stream != NULL) { - grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); + grpc_byte_stream_destroy(c->receiving_stream); } parent_call *pc = get_parent_call(c); if (pc != NULL) { gpr_mu_destroy(&pc->child_list_mu); } for (ii = 0; ii < c->send_extra_metadata_count; ii++) { - GRPC_MDELEM_UNREF(exec_ctx, c->send_extra_metadata[ii].md); + GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md); } for (i = 0; i < GRPC_CONTEXT_COUNT; i++) { if (c->context[i].destroy) { @@ -544,11 +536,11 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, } } if (c->cq) { - GRPC_CQ_INTERNAL_UNREF(exec_ctx, c->cq, "bind"); + GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } - get_final_status(exec_ctx, c, set_status_value_directly, - &c->final_info.final_status, NULL); + get_final_status(c, set_status_value_directly, &c->final_info.final_status, + NULL); c->final_info.stats.latency = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); @@ -557,7 +549,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, unpack_received_status(gpr_atm_acq_load(&c->status[i])).error); } - grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, + grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info, GRPC_CLOSURE_INIT(&c->release_call, release_call, c, grpc_schedule_on_exec_ctx)); GPR_TIMER_END("destroy_call", 0); @@ -569,7 +561,7 @@ void grpc_call_unref(grpc_call *c) { if (!gpr_unref(&c->ext_ref)) return; child_call *cc = c->child; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; GPR_TIMER_BEGIN("grpc_call_unref", 0); GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c)); @@ -586,7 +578,7 @@ void grpc_call_unref(grpc_call *c) { cc->sibling_prev->child->sibling_next = cc->sibling_next; cc->sibling_next->child->sibling_prev = cc->sibling_prev; gpr_mu_unlock(&pc->child_list_mu); - GRPC_CALL_INTERNAL_UNREF(&exec_ctx, cc->parent, "child"); + GRPC_CALL_INTERNAL_UNREF(cc->parent, "child"); } GPR_ASSERT(!c->destroy_called); @@ -594,52 +586,49 @@ void grpc_call_unref(grpc_call *c) { bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 && gpr_atm_acq_load(&c->received_final_op_atm) == 0; if (cancel) { - cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, - GRPC_ERROR_CANCELLED); + cancel_with_error(c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); } else { // Unset the call combiner cancellation closure. This has the // effect of scheduling the previously set cancellation closure, if // any, so that it can release any internal references it may be // holding to the call stack. - grpc_call_combiner_set_notify_on_cancel(&exec_ctx, &c->call_combiner, NULL); + grpc_call_combiner_set_notify_on_cancel(&c->call_combiner, NULL); } - GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy"); - grpc_exec_ctx_finish(&exec_ctx); + GRPC_CALL_INTERNAL_UNREF(c, "destroy"); + grpc_exec_ctx_finish(); GPR_TIMER_END("grpc_call_unref", 0); } grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved)); GPR_ASSERT(!reserved); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - cancel_with_error(&exec_ctx, call, STATUS_FROM_API_OVERRIDE, - GRPC_ERROR_CANCELLED); - grpc_exec_ctx_finish(&exec_ctx); + ExecCtx _local_exec_ctx; + cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); + grpc_exec_ctx_finish(); return GRPC_CALL_OK; } // This is called via the call combiner to start sending a batch down // the filter stack. -static void execute_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *ignored) { +static void execute_batch_in_call_combiner(void *arg, grpc_error *ignored) { grpc_transport_stream_op_batch *batch = (grpc_transport_stream_op_batch *)arg; grpc_call *call = (grpc_call *)batch->handler_private.extra_arg; GPR_TIMER_BEGIN("execute_batch", 0); grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); GRPC_CALL_LOG_OP(GPR_INFO, elem, batch); - elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch); + elem->filter->start_transport_stream_op_batch(elem, batch); GPR_TIMER_END("execute_batch", 0); } // start_batch_closure points to a caller-allocated closure to be used // for entering the call combiner. -static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, +static void execute_batch(grpc_call *call, grpc_transport_stream_op_batch *batch, grpc_closure *start_batch_closure) { batch->handler_private.extra_arg = call; GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch, grpc_schedule_on_exec_ctx); - GRPC_CALL_COMBINER_START(exec_ctx, &call->call_combiner, start_batch_closure, + GRPC_CALL_COMBINER_START(&call->call_combiner, start_batch_closure, GRPC_ERROR_NONE, "executing batch"); } @@ -663,15 +652,14 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, grpc_status_code status, const char *description, void *reserved) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; GRPC_API_TRACE( "grpc_call_cancel_with_status(" "c=%p, status=%d, description=%s, reserved=%p)", 4, (c, (int)status, description, reserved)); GPR_ASSERT(reserved == NULL); - cancel_with_status(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, status, - description); - grpc_exec_ctx_finish(&exec_ctx); + cancel_with_status(c, STATUS_FROM_API_OVERRIDE, status, description); + grpc_exec_ctx_finish(); return GRPC_CALL_OK; } @@ -683,24 +671,23 @@ typedef struct { // The on_complete callback used when sending a cancel_stream batch down // the filter stack. Yields the call combiner when the batch is done. -static void done_termination(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void done_termination(void *arg, grpc_error *error) { cancel_state *state = (cancel_state *)arg; - GRPC_CALL_COMBINER_STOP(exec_ctx, &state->call->call_combiner, + GRPC_CALL_COMBINER_STOP(&state->call->call_combiner, "on_complete for cancel_stream op"); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, state->call, "termination"); + GRPC_CALL_INTERNAL_UNREF(state->call, "termination"); gpr_free(state); } -static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, - status_source source, grpc_error *error) { +static void cancel_with_error(grpc_call *c, status_source source, + grpc_error *error) { GRPC_CALL_INTERNAL_REF(c, "termination"); // Inform the call combiner of the cancellation, so that it can cancel // any in-flight asynchronous actions that may be holding the call // combiner. This ensures that the cancel_stream batch can be sent // down the filter stack in a timely manner. - grpc_call_combiner_cancel(exec_ctx, &c->call_combiner, GRPC_ERROR_REF(error)); - set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error)); + grpc_call_combiner_cancel(&c->call_combiner, GRPC_ERROR_REF(error)); + set_status_from_error(c, source, GRPC_ERROR_REF(error)); cancel_state *state = (cancel_state *)gpr_malloc(sizeof(*state)); state->call = c; GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state, @@ -709,7 +696,7 @@ static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_make_transport_stream_op(&state->finish_batch); op->cancel_stream = true; op->payload->cancel_stream.cancel_error = error; - execute_batch(exec_ctx, c, op, &state->start_batch); + execute_batch(c, op, &state->start_batch); } static grpc_error *error_from_status(grpc_status_code status, @@ -723,27 +710,23 @@ static grpc_error *error_from_status(grpc_status_code status, GRPC_ERROR_INT_GRPC_STATUS, status); } -static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, - status_source source, grpc_status_code status, +static void cancel_with_status(grpc_call *c, status_source source, + grpc_status_code status, const char *description) { - cancel_with_error(exec_ctx, c, source, - error_from_status(status, description)); + cancel_with_error(c, source, error_from_status(status, description)); } /******************************************************************************* * FINAL STATUS CODE MANIPULATION */ -static bool get_final_status_from(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_error *error, bool allow_ok_status, - void (*set_value)(grpc_status_code code, - void *user_data), - void *set_value_user_data, - grpc_slice *details) { +static bool get_final_status_from( + grpc_call *call, grpc_error *error, bool allow_ok_status, + void (*set_value)(grpc_status_code code, void *user_data), + void *set_value_user_data, grpc_slice *details) { grpc_status_code code; grpc_slice slice = grpc_empty_slice(); - grpc_error_get_status(exec_ctx, error, call->send_deadline, &code, &slice, - NULL); + grpc_error_get_status(error, call->send_deadline, &code, &slice, NULL); if (code == GRPC_STATUS_OK && !allow_ok_status) { return false; } @@ -755,7 +738,7 @@ static bool get_final_status_from(grpc_exec_ctx *exec_ctx, grpc_call *call, return true; } -static void get_final_status(grpc_exec_ctx *exec_ctx, grpc_call *call, +static void get_final_status(grpc_call *call, void (*set_value)(grpc_status_code code, void *user_data), void *set_value_user_data, grpc_slice *details) { @@ -780,9 +763,8 @@ static void get_final_status(grpc_exec_ctx *exec_ctx, grpc_call *call, for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (status[i].is_set && grpc_error_has_clear_grpc_status(status[i].error)) { - if (get_final_status_from(exec_ctx, call, status[i].error, - allow_ok_status != 0, set_value, - set_value_user_data, details)) { + if (get_final_status_from(call, status[i].error, allow_ok_status != 0, + set_value, set_value_user_data, details)) { return; } } @@ -790,9 +772,8 @@ static void get_final_status(grpc_exec_ctx *exec_ctx, grpc_call *call, /* If no clearly defined status exists, search for 'anything' */ for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (status[i].is_set) { - if (get_final_status_from(exec_ctx, call, status[i].error, - allow_ok_status != 0, set_value, - set_value_user_data, details)) { + if (get_final_status_from(call, status[i].error, allow_ok_status != 0, + set_value, set_value_user_data, details)) { return; } } @@ -806,8 +787,8 @@ static void get_final_status(grpc_exec_ctx *exec_ctx, grpc_call *call, } } -static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call, - status_source source, grpc_error *error) { +static void set_status_from_error(grpc_call *call, status_source source, + grpc_error *error) { if (!gpr_atm_rel_cas(&call->status[source], pack_received_status({false, GRPC_ERROR_NONE}), pack_received_status({true, error}))) { @@ -859,8 +840,7 @@ uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) { static void destroy_encodings_accepted_by_peer(void *p) { return; } -static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx, - grpc_call *call, grpc_mdelem mdel) { +static void set_encodings_accepted_by_peer(grpc_call *call, grpc_mdelem mdel) { size_t i; grpc_compression_algorithm algorithm; grpc_slice_buffer accept_encoding_parts; @@ -898,15 +878,14 @@ static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx, } } - grpc_slice_buffer_destroy_internal(exec_ctx, &accept_encoding_parts); + grpc_slice_buffer_destroy_internal(&accept_encoding_parts); grpc_mdelem_set_user_data( mdel, destroy_encodings_accepted_by_peer, (void *)(((uintptr_t)call->encodings_accepted_by_peer) + 1)); } -static void set_stream_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx, - grpc_call *call, +static void set_stream_encodings_accepted_by_peer(grpc_call *call, grpc_mdelem mdel) { size_t i; grpc_stream_compression_algorithm algorithm; @@ -944,7 +923,7 @@ static void set_stream_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx, } } - grpc_slice_buffer_destroy_internal(exec_ctx, &accept_encoding_parts); + grpc_slice_buffer_destroy_internal(&accept_encoding_parts); grpc_mdelem_set_user_data( mdel, destroy_encodings_accepted_by_peer, @@ -982,10 +961,12 @@ static grpc_metadata *get_md_elem(grpc_metadata *metadata, return res; } -static int prepare_application_metadata( - grpc_exec_ctx *exec_ctx, grpc_call *call, int count, - grpc_metadata *metadata, int is_trailing, int prepend_extra_metadata, - grpc_metadata *additional_metadata, int additional_metadata_count) { +static int prepare_application_metadata(grpc_call *call, int count, + grpc_metadata *metadata, + int is_trailing, + int prepend_extra_metadata, + grpc_metadata *additional_metadata, + int additional_metadata_count) { int total_count = count + additional_metadata_count; int i; grpc_metadata_batch *batch = @@ -1004,14 +985,14 @@ static int prepare_application_metadata( grpc_validate_header_nonbin_value_is_legal(md->value))) { break; } - l->md = grpc_mdelem_from_grpc_metadata(exec_ctx, (grpc_metadata *)md); + l->md = grpc_mdelem_from_grpc_metadata((grpc_metadata *)md); } if (i != total_count) { for (int j = 0; j < i; j++) { const grpc_metadata *md = get_md_elem(metadata, additional_metadata, j, count); grpc_linked_mdelem *l = linked_from_md(md); - GRPC_MDELEM_UNREF(exec_ctx, l->md); + GRPC_MDELEM_UNREF(l->md); } return 0; } @@ -1022,16 +1003,16 @@ static int prepare_application_metadata( for (i = 0; i < call->send_extra_metadata_count; i++) { GRPC_LOG_IF_ERROR("prepare_application_metadata", grpc_metadata_batch_link_tail( - exec_ctx, batch, &call->send_extra_metadata[i])); + batch, &call->send_extra_metadata[i])); } } } for (i = 0; i < total_count; i++) { grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count); grpc_linked_mdelem *l = linked_from_md(md); - grpc_error *error = grpc_metadata_batch_link_tail(exec_ctx, batch, l); + grpc_error *error = grpc_metadata_batch_link_tail(batch, l); if (error != GRPC_ERROR_NONE) { - GRPC_MDELEM_UNREF(exec_ctx, l->md); + GRPC_MDELEM_UNREF(l->md); } GRPC_LOG_IF_ERROR("prepare_application_metadata", error); } @@ -1118,46 +1099,43 @@ static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b, GPR_TIMER_END("publish_app_metadata", 0); } -static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_metadata_batch *b) { +static void recv_initial_filter(grpc_call *call, grpc_metadata_batch *b) { if (b->idx.named.content_encoding != NULL) { if (b->idx.named.grpc_encoding != NULL) { gpr_log(GPR_ERROR, "Received both content-encoding and grpc-encoding header. " "Ignoring grpc-encoding."); - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding); + grpc_metadata_batch_remove(b, b->idx.named.grpc_encoding); } GPR_TIMER_BEGIN("incoming_stream_compression_algorithm", 0); set_incoming_stream_compression_algorithm( call, decode_stream_compression(b->idx.named.content_encoding->md)); GPR_TIMER_END("incoming_stream_compression_algorithm", 0); - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.content_encoding); + grpc_metadata_batch_remove(b, b->idx.named.content_encoding); } else if (b->idx.named.grpc_encoding != NULL) { GPR_TIMER_BEGIN("incoming_compression_algorithm", 0); set_incoming_compression_algorithm( call, decode_compression(b->idx.named.grpc_encoding->md)); GPR_TIMER_END("incoming_compression_algorithm", 0); - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding); + grpc_metadata_batch_remove(b, b->idx.named.grpc_encoding); } if (b->idx.named.grpc_accept_encoding != NULL) { GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0); - set_encodings_accepted_by_peer(exec_ctx, call, - b->idx.named.grpc_accept_encoding->md); - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_accept_encoding); + set_encodings_accepted_by_peer(call, b->idx.named.grpc_accept_encoding->md); + grpc_metadata_batch_remove(b, b->idx.named.grpc_accept_encoding); GPR_TIMER_END("encodings_accepted_by_peer", 0); } if (b->idx.named.accept_encoding != NULL) { GPR_TIMER_BEGIN("stream_encodings_accepted_by_peer", 0); - set_stream_encodings_accepted_by_peer(exec_ctx, call, + set_stream_encodings_accepted_by_peer(call, b->idx.named.accept_encoding->md); - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.accept_encoding); + grpc_metadata_batch_remove(b, b->idx.named.accept_encoding); GPR_TIMER_END("stream_encodings_accepted_by_peer", 0); } publish_app_metadata(call, b, false); } -static void recv_trailing_filter(grpc_exec_ctx *exec_ctx, void *args, - grpc_metadata_batch *b) { +static void recv_trailing_filter(void *args, grpc_metadata_batch *b) { grpc_call *call = (grpc_call *)args; if (b->idx.named.grpc_status != NULL) { uint32_t status_code = decode_status(b->idx.named.grpc_status->md); @@ -1172,13 +1150,13 @@ static void recv_trailing_filter(grpc_exec_ctx *exec_ctx, void *args, error = grpc_error_set_str( error, GRPC_ERROR_STR_GRPC_MESSAGE, grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md))); - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_message); + grpc_metadata_batch_remove(b, b->idx.named.grpc_message); } else if (error != GRPC_ERROR_NONE) { error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, grpc_empty_slice()); } - set_status_from_error(exec_ctx, call, STATUS_FROM_WIRE, error); - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_status); + set_status_from_error(call, STATUS_FROM_WIRE, error); + grpc_metadata_batch_remove(b, b->idx.named.grpc_status); } publish_app_metadata(call, b, true); } @@ -1255,12 +1233,12 @@ static batch_control *allocate_batch_control(grpc_call *call, return bctl; } -static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data, +static void finish_batch_completion(void *user_data, grpc_cq_completion *storage) { batch_control *bctl = (batch_control *)user_data; grpc_call *call = bctl->call; bctl->call = NULL; - GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); + GRPC_CALL_INTERNAL_UNREF(call, "completion"); } static grpc_error *consolidate_batch_errors(batch_control *bctl) { @@ -1284,15 +1262,13 @@ static grpc_error *consolidate_batch_errors(batch_control *bctl) { } } -static void post_batch_completion(grpc_exec_ctx *exec_ctx, - batch_control *bctl) { +static void post_batch_completion(batch_control *bctl) { grpc_call *next_child_call; grpc_call *call = bctl->call; grpc_error *error = consolidate_batch_errors(bctl); if (bctl->op.send_initial_metadata) { grpc_metadata_batch_destroy( - exec_ctx, &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); } if (bctl->op.send_message) { @@ -1300,13 +1276,12 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, } if (bctl->op.send_trailing_metadata) { grpc_metadata_batch_destroy( - exec_ctx, &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); } if (bctl->op.recv_trailing_metadata) { grpc_metadata_batch *md = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - recv_trailing_filter(exec_ctx, call, md); + recv_trailing_filter(call, md); /* propagate cancellation to any interested children */ gpr_atm_rel_store(&call->received_final_op_atm, 1); @@ -1320,9 +1295,9 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, next_child_call = child->child->sibling_next; if (child->cancellation_is_inherited) { GRPC_CALL_INTERNAL_REF(child, "propagate_cancel"); - cancel_with_error(exec_ctx, child, STATUS_FROM_API_OVERRIDE, + cancel_with_error(child, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, child, "propagate_cancel"); + GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel"); } child = next_child_call; } while (child != pc->first_child); @@ -1331,11 +1306,11 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, } if (call->is_client) { - get_final_status(exec_ctx, call, set_status_value_directly, + get_final_status(call, set_status_value_directly, call->final_op.client.status, call->final_op.client.status_details); } else { - get_final_status(exec_ctx, call, set_cancelled_value, + get_final_status(call, set_cancelled_value, call->final_op.server.cancelled, NULL); } @@ -1351,25 +1326,24 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, if (bctl->completion_data.notify_tag.is_closure) { /* unrefs bctl->error */ bctl->call = NULL; - GRPC_CLOSURE_RUN( - exec_ctx, (grpc_closure *)bctl->completion_data.notify_tag.tag, error); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); + GRPC_CLOSURE_RUN((grpc_closure *)bctl->completion_data.notify_tag.tag, + error); + GRPC_CALL_INTERNAL_UNREF(call, "completion"); } else { /* unrefs bctl->error */ - grpc_cq_end_op( - exec_ctx, bctl->call->cq, bctl->completion_data.notify_tag.tag, error, - finish_batch_completion, bctl, &bctl->completion_data.cq_completion); + grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error, + finish_batch_completion, bctl, + &bctl->completion_data.cq_completion); } } -static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) { +static void finish_batch_step(batch_control *bctl) { if (gpr_unref(&bctl->steps_to_complete)) { - post_batch_completion(exec_ctx, bctl); + post_batch_completion(bctl); } } -static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, - batch_control *bctl) { +static void continue_receiving_slices(batch_control *bctl) { grpc_error *error; grpc_call *call = bctl->call; for (;;) { @@ -1377,25 +1351,25 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, (*call->receiving_buffer)->data.raw.slice_buffer.length; if (remaining == 0) { call->receiving_message = 0; - grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); + grpc_byte_stream_destroy(call->receiving_stream); call->receiving_stream = NULL; - finish_batch_step(exec_ctx, bctl); + finish_batch_step(bctl); return; } - if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, remaining, + if (grpc_byte_stream_next(call->receiving_stream, remaining, &call->receiving_slice_ready)) { - error = grpc_byte_stream_pull(exec_ctx, call->receiving_stream, - &call->receiving_slice); + error = + grpc_byte_stream_pull(call->receiving_stream, &call->receiving_slice); if (error == GRPC_ERROR_NONE) { grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, call->receiving_slice); } else { - grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); + grpc_byte_stream_destroy(call->receiving_stream); call->receiving_stream = NULL; grpc_byte_buffer_destroy(*call->receiving_buffer); *call->receiving_buffer = NULL; call->receiving_message = 0; - finish_batch_step(exec_ctx, bctl); + finish_batch_step(bctl); return; } } else { @@ -1404,8 +1378,7 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, } } -static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, - grpc_error *error) { +static void receiving_slice_ready(void *bctlp, grpc_error *error) { batch_control *bctl = (batch_control *)bctlp; grpc_call *call = bctl->call; grpc_byte_stream *bs = call->receiving_stream; @@ -1413,11 +1386,11 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, if (error == GRPC_ERROR_NONE) { grpc_slice slice; - error = grpc_byte_stream_pull(exec_ctx, bs, &slice); + error = grpc_byte_stream_pull(bs, &slice); if (error == GRPC_ERROR_NONE) { grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, slice); - continue_receiving_slices(exec_ctx, bctl); + continue_receiving_slices(bctl); } else { /* Error returned by grpc_byte_stream_pull needs to be released manually */ @@ -1429,25 +1402,24 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, if (GRPC_TRACER_ON(grpc_trace_operation_failures)) { GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error)); } - grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); + grpc_byte_stream_destroy(call->receiving_stream); call->receiving_stream = NULL; grpc_byte_buffer_destroy(*call->receiving_buffer); *call->receiving_buffer = NULL; call->receiving_message = 0; - finish_batch_step(exec_ctx, bctl); + finish_batch_step(bctl); if (release_error) { GRPC_ERROR_UNREF(error); } } } -static void process_data_after_md(grpc_exec_ctx *exec_ctx, - batch_control *bctl) { +static void process_data_after_md(batch_control *bctl) { grpc_call *call = bctl->call; if (call->receiving_stream == NULL) { *call->receiving_buffer = NULL; call->receiving_message = 0; - finish_batch_step(exec_ctx, bctl); + finish_batch_step(bctl); } else { call->test_only_last_message_flags = call->receiving_stream->flags; if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) && @@ -1459,46 +1431,42 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, } GRPC_CLOSURE_INIT(&call->receiving_slice_ready, receiving_slice_ready, bctl, grpc_schedule_on_exec_ctx); - continue_receiving_slices(exec_ctx, bctl); + continue_receiving_slices(bctl); } } -static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, - grpc_error *error) { +static void receiving_stream_ready(void *bctlp, grpc_error *error) { batch_control *bctl = (batch_control *)bctlp; grpc_call *call = bctl->call; if (error != GRPC_ERROR_NONE) { if (call->receiving_stream != NULL) { - grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); + grpc_byte_stream_destroy(call->receiving_stream); call->receiving_stream = NULL; } - add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), true); - cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE, - GRPC_ERROR_REF(error)); + add_batch_error(bctl, GRPC_ERROR_REF(error), true); + cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); } /* If recv_state is RECV_NONE, we will save the batch_control * object with rel_cas, and will not use it after the cas. Its corresponding * acq_load is in receiving_initial_metadata_ready() */ if (error != GRPC_ERROR_NONE || call->receiving_stream == NULL || !gpr_atm_rel_cas(&call->recv_state, RECV_NONE, (gpr_atm)bctlp)) { - process_data_after_md(exec_ctx, bctl); + process_data_after_md(bctl); } } // The recv_message_ready callback used when sending a batch containing // a recv_message op down the filter stack. Yields the call combiner // before processing the received message. -static void receiving_stream_ready_in_call_combiner(grpc_exec_ctx *exec_ctx, - void *bctlp, +static void receiving_stream_ready_in_call_combiner(void *bctlp, grpc_error *error) { batch_control *bctl = (batch_control *)bctlp; grpc_call *call = bctl->call; - GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "recv_message_ready"); - receiving_stream_ready(exec_ctx, bctlp, error); + GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready"); + receiving_stream_ready(bctlp, error); } -static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, - batch_control *bctl) { +static void validate_filtered_metadata(batch_control *bctl) { grpc_call *call = bctl->call; /* validate compression algorithms */ if (call->incoming_stream_compression_algorithm != @@ -1512,8 +1480,8 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, gpr_asprintf(&error_msg, "Invalid stream compression algorithm value '%d'.", algo); gpr_log(GPR_ERROR, "%s", error_msg); - cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE, - GRPC_STATUS_UNIMPLEMENTED, error_msg); + cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED, + error_msg); } else if (grpc_compression_options_is_stream_compression_algorithm_enabled( &compression_options, algo) == 0) { /* check if algorithm is supported by current channel config */ @@ -1522,8 +1490,8 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, gpr_asprintf(&error_msg, "Stream compression algorithm '%s' is disabled.", algo_name); gpr_log(GPR_ERROR, "%s", error_msg); - cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE, - GRPC_STATUS_UNIMPLEMENTED, error_msg); + cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED, + error_msg); } gpr_free(error_msg); @@ -1553,8 +1521,8 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.", algo); gpr_log(GPR_ERROR, "%s", error_msg); - cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE, - GRPC_STATUS_UNIMPLEMENTED, error_msg); + cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED, + error_msg); } else if (grpc_compression_options_is_algorithm_enabled( &compression_options, algo) == 0) { /* check if algorithm is supported by current channel config */ @@ -1563,8 +1531,8 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.", algo_name); gpr_log(GPR_ERROR, "%s", error_msg); - cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE, - GRPC_STATUS_UNIMPLEMENTED, error_msg); + cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED, + error_msg); } else { call->incoming_compression_algorithm = algo; } @@ -1587,34 +1555,31 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, } } -static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl, - grpc_error *error, bool has_cancelled) { +static void add_batch_error(batch_control *bctl, grpc_error *error, + bool has_cancelled) { if (error == GRPC_ERROR_NONE) return; int idx = (int)gpr_atm_full_fetch_add(&bctl->num_errors, 1); if (idx == 0 && !has_cancelled) { - cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE, - GRPC_ERROR_REF(error)); + cancel_with_error(bctl->call, STATUS_FROM_CORE, GRPC_ERROR_REF(error)); } bctl->errors[idx] = error; } -static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, - void *bctlp, grpc_error *error) { +static void receiving_initial_metadata_ready(void *bctlp, grpc_error *error) { batch_control *bctl = (batch_control *)bctlp; grpc_call *call = bctl->call; - GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, - "recv_initial_metadata_ready"); + GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready"); - add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false); + add_batch_error(bctl, GRPC_ERROR_REF(error), false); if (error == GRPC_ERROR_NONE) { grpc_metadata_batch *md = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; - recv_initial_filter(exec_ctx, call, md); + recv_initial_filter(call, md); /* TODO(ctiller): this could be moved into recv_initial_filter now */ GPR_TIMER_BEGIN("validate_filtered_metadata", 0); - validate_filtered_metadata(exec_ctx, bctl); + validate_filtered_metadata(bctl); GPR_TIMER_END("validate_filtered_metadata", 0); if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) { @@ -1647,28 +1612,25 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, } } if (saved_rsr_closure != NULL) { - GRPC_CLOSURE_RUN(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(saved_rsr_closure, GRPC_ERROR_REF(error)); } - finish_batch_step(exec_ctx, bctl); + finish_batch_step(bctl); } -static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, - grpc_error *error) { +static void finish_batch(void *bctlp, grpc_error *error) { batch_control *bctl = (batch_control *)bctlp; grpc_call *call = bctl->call; - GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "on_complete"); - add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false); - finish_batch_step(exec_ctx, bctl); + GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete"); + add_batch_error(bctl, GRPC_ERROR_REF(error), false); + finish_batch_step(bctl); } -static void free_no_op_completion(grpc_exec_ctx *exec_ctx, void *p, - grpc_cq_completion *completion) { +static void free_no_op_completion(void *p, grpc_cq_completion *completion) { gpr_free(completion); } -static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, - grpc_call *call, const grpc_op *ops, +static grpc_call_error call_start_batch(grpc_call *call, const grpc_op *ops, size_t nops, void *notify_tag, int is_notify_tag_closure) { size_t i; @@ -1686,11 +1648,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, if (!is_notify_tag_closure) { GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag)); grpc_cq_end_op( - exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE, - free_no_op_completion, NULL, + call->cq, notify_tag, GRPC_ERROR_NONE, free_no_op_completion, NULL, (grpc_cq_completion *)gpr_malloc(sizeof(grpc_cq_completion))); } else { - GRPC_CLOSURE_SCHED(exec_ctx, (grpc_closure *)notify_tag, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED((grpc_closure *)notify_tag, GRPC_ERROR_NONE); } error = GRPC_CALL_OK; goto done; @@ -1790,7 +1751,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, stream_op->send_initial_metadata = true; call->sent_initial_metadata = true; if (!prepare_application_metadata( - exec_ctx, call, (int)op->data.send_initial_metadata.count, + call, (int)op->data.send_initial_metadata.count, op->data.send_initial_metadata.metadata, 0, call->is_client, &call->compression_md, (int)additional_metadata_count)) { error = GRPC_CALL_ERROR_INVALID_METADATA; @@ -1884,7 +1845,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, GPR_ASSERT(call->send_extra_metadata_count == 0); call->send_extra_metadata_count = 1; call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem( - exec_ctx, call->channel, op->data.send_status_from_server.status); + call->channel, op->data.send_status_from_server.status); { grpc_error *override_error = GRPC_ERROR_NONE; if (op->data.send_status_from_server.status != GRPC_STATUS_OK) { @@ -1893,7 +1854,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } if (op->data.send_status_from_server.status_details != NULL) { call->send_extra_metadata[1].md = grpc_mdelem_from_slices( - exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, + GRPC_MDSTR_GRPC_MESSAGE, grpc_slice_ref_internal( *op->data.send_status_from_server.status_details)); call->send_extra_metadata_count++; @@ -1904,16 +1865,15 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_slice_from_copied_string(msg)); gpr_free(msg); } - set_status_from_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, - override_error); + set_status_from_error(call, STATUS_FROM_API_OVERRIDE, override_error); } if (!prepare_application_metadata( - exec_ctx, call, + call, (int)op->data.send_status_from_server.trailing_metadata_count, op->data.send_status_from_server.trailing_metadata, 1, 1, NULL, 0)) { for (int n = 0; n < call->send_extra_metadata_count; n++) { - GRPC_MDELEM_UNREF(exec_ctx, call->send_extra_metadata[n].md); + GRPC_MDELEM_UNREF(call->send_extra_metadata[n].md); } call->send_extra_metadata_count = 0; error = GRPC_CALL_ERROR_INVALID_METADATA; @@ -2040,7 +2000,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, stream_op->on_complete = &bctl->finish_batch; gpr_atm_rel_store(&call->any_ops_sent_atm, 1); - execute_batch(exec_ctx, call, stream_op, &bctl->start_batch); + execute_batch(call, stream_op, &bctl->start_batch); done: GPR_TIMER_END("grpc_call_start_batch", 0); @@ -2050,15 +2010,15 @@ done_with_error: /* reverse any mutations that occured */ if (stream_op->send_initial_metadata) { call->sent_initial_metadata = false; - grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][0]); + grpc_metadata_batch_clear(&call->metadata_batch[0][0]); } if (stream_op->send_message) { call->sending_message = false; - grpc_byte_stream_destroy(exec_ctx, &call->sending_stream.base); + grpc_byte_stream_destroy(&call->sending_stream.base); } if (stream_op->send_trailing_metadata) { call->sent_final_op = false; - grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][1]); + grpc_metadata_batch_clear(&call->metadata_batch[0][1]); } if (stream_op->recv_initial_metadata) { call->received_initial_metadata = false; @@ -2074,7 +2034,7 @@ done_with_error: grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t nops, void *tag, void *reserved) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; grpc_call_error err; GRPC_API_TRACE( @@ -2085,19 +2045,18 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, if (reserved != NULL) { err = GRPC_CALL_ERROR; } else { - err = call_start_batch(&exec_ctx, call, ops, nops, tag, 0); + err = call_start_batch(call, ops, nops, tag, 0); } - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_finish(); return err; } -grpc_call_error grpc_call_start_batch_and_execute(grpc_exec_ctx *exec_ctx, - grpc_call *call, +grpc_call_error grpc_call_start_batch_and_execute(grpc_call *call, const grpc_op *ops, size_t nops, grpc_closure *closure) { - return call_start_batch(exec_ctx, call, ops, nops, closure, 1); + return call_start_batch(call, ops, nops, closure, 1); } void grpc_call_context_set(grpc_call *call, grpc_context_index elem, |