aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/alarm.cc25
-rw-r--r--src/core/lib/surface/byte_buffer.cc5
-rw-r--r--src/core/lib/surface/byte_buffer_reader.cc16
-rw-r--r--src/core/lib/surface/call.cc429
-rw-r--r--src/core/lib/surface/call.h24
-rw-r--r--src/core/lib/surface/call_details.cc7
-rw-r--r--src/core/lib/surface/channel.cc115
-rw-r--r--src/core/lib/surface/channel.h27
-rw-r--r--src/core/lib/surface/channel_init.cc5
-rw-r--r--src/core/lib/surface/channel_init.h6
-rw-r--r--src/core/lib/surface/channel_ping.cc14
-rw-r--r--src/core/lib/surface/completion_queue.cc328
-rw-r--r--src/core/lib/surface/completion_queue.h21
-rw-r--r--src/core/lib/surface/init.cc51
-rw-r--r--src/core/lib/surface/init_secure.cc4
-rw-r--r--src/core/lib/surface/lame_client.cc53
-rw-r--r--src/core/lib/surface/server.cc322
-rw-r--r--src/core/lib/surface/server.h15
18 files changed, 657 insertions, 810 deletions
diff --git a/src/core/lib/surface/alarm.cc b/src/core/lib/surface/alarm.cc
index b1c9f7b164..f6ea016c33 100644
--- a/src/core/lib/surface/alarm.cc
+++ b/src/core/lib/surface/alarm.cc
@@ -45,11 +45,11 @@ static void alarm_ref(grpc_alarm* alarm) { gpr_ref(&alarm->refs); }
static void alarm_unref(grpc_alarm* alarm) {
if (gpr_unref(&alarm->refs)) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
if (alarm->cq != nullptr) {
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm");
+ GRPC_CQ_INTERNAL_UNREF(alarm->cq, "alarm");
}
- grpc_exec_ctx_finish(&exec_ctx);
+
gpr_free(alarm);
}
}
@@ -80,20 +80,19 @@ static void alarm_unref_dbg(grpc_alarm* alarm, const char* reason,
}
#endif
-static void alarm_end_completion(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_cq_completion* c) {
+static void alarm_end_completion(void* arg, grpc_cq_completion* c) {
grpc_alarm* alarm = (grpc_alarm*)arg;
GRPC_ALARM_UNREF(alarm, "dequeue-end-op");
}
-static void alarm_cb(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+static void alarm_cb(void* arg, grpc_error* error) {
grpc_alarm* alarm = (grpc_alarm*)arg;
/* We are queuing an op on completion queue. This means, the alarm's structure
cannot be destroyed until the op is dequeued. Adding an extra ref
here and unref'ing when the op is dequeued will achieve this */
GRPC_ALARM_REF(alarm, "queue-end-op");
- grpc_cq_end_op(exec_ctx, alarm->cq, alarm->tag, error, alarm_end_completion,
+ grpc_cq_end_op(alarm->cq, alarm->tag, error, alarm_end_completion,
(void*)alarm, &alarm->completion);
}
@@ -116,22 +115,20 @@ grpc_alarm* grpc_alarm_create(void* reserved) {
void grpc_alarm_set(grpc_alarm* alarm, grpc_completion_queue* cq,
gpr_timespec deadline, void* tag, void* reserved) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
GRPC_CQ_INTERNAL_REF(cq, "alarm");
alarm->cq = cq;
alarm->tag = tag;
GPR_ASSERT(grpc_cq_begin_op(cq, tag));
- grpc_timer_init(&exec_ctx, &alarm->alarm,
- grpc_timespec_to_millis_round_up(deadline), &alarm->on_alarm);
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_timer_init(&alarm->alarm, grpc_timespec_to_millis_round_up(deadline),
+ &alarm->on_alarm);
}
void grpc_alarm_cancel(grpc_alarm* alarm, void* reserved) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_timer_cancel(&exec_ctx, &alarm->alarm);
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_core::ExecCtx exec_ctx;
+ grpc_timer_cancel(&alarm->alarm);
}
void grpc_alarm_destroy(grpc_alarm* alarm, void* reserved) {
diff --git a/src/core/lib/surface/byte_buffer.cc b/src/core/lib/surface/byte_buffer.cc
index 9e0636b4ce..e4c2a4a4c2 100644
--- a/src/core/lib/surface/byte_buffer.cc
+++ b/src/core/lib/surface/byte_buffer.cc
@@ -71,14 +71,13 @@ grpc_byte_buffer* grpc_byte_buffer_copy(grpc_byte_buffer* bb) {
void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) {
if (!bb) return;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
switch (bb->type) {
case GRPC_BB_RAW:
- grpc_slice_buffer_destroy_internal(&exec_ctx, &bb->data.raw.slice_buffer);
+ grpc_slice_buffer_destroy_internal(&bb->data.raw.slice_buffer);
break;
}
gpr_free(bb);
- grpc_exec_ctx_finish(&exec_ctx);
}
size_t grpc_byte_buffer_length(grpc_byte_buffer* bb) {
diff --git a/src/core/lib/surface/byte_buffer_reader.cc b/src/core/lib/surface/byte_buffer_reader.cc
index 001227a2aa..81a48e95fc 100644
--- a/src/core/lib/surface/byte_buffer_reader.cc
+++ b/src/core/lib/surface/byte_buffer_reader.cc
@@ -42,15 +42,14 @@ static int is_compressed(grpc_byte_buffer* buffer) {
int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
grpc_byte_buffer* buffer) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
grpc_slice_buffer decompressed_slices_buffer;
reader->buffer_in = buffer;
switch (reader->buffer_in->type) {
case GRPC_BB_RAW:
grpc_slice_buffer_init(&decompressed_slices_buffer);
if (is_compressed(reader->buffer_in)) {
- if (grpc_msg_decompress(&exec_ctx,
- reader->buffer_in->data.raw.compression,
+ if (grpc_msg_decompress(reader->buffer_in->data.raw.compression,
&reader->buffer_in->data.raw.slice_buffer,
&decompressed_slices_buffer) == 0) {
gpr_log(GPR_ERROR,
@@ -64,15 +63,14 @@ int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices,
decompressed_slices_buffer.count);
}
- grpc_slice_buffer_destroy_internal(&exec_ctx,
- &decompressed_slices_buffer);
+ grpc_slice_buffer_destroy_internal(&decompressed_slices_buffer);
} else { /* not compressed, use the input buffer as output */
reader->buffer_out = reader->buffer_in;
}
reader->current.index = 0;
break;
}
- grpc_exec_ctx_finish(&exec_ctx);
+
return 1;
}
@@ -112,14 +110,14 @@ grpc_slice grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader* reader) {
grpc_slice out_slice = GRPC_SLICE_MALLOC(input_size);
uint8_t* const outbuf = GRPC_SLICE_START_PTR(out_slice); /* just an alias */
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
while (grpc_byte_buffer_reader_next(reader, &in_slice) != 0) {
const size_t slice_length = GRPC_SLICE_LENGTH(in_slice);
memcpy(&(outbuf[bytes_read]), GRPC_SLICE_START_PTR(in_slice), slice_length);
bytes_read += slice_length;
- grpc_slice_unref_internal(&exec_ctx, in_slice);
+ grpc_slice_unref_internal(in_slice);
GPR_ASSERT(bytes_read <= input_size);
}
- grpc_exec_ctx_finish(&exec_ctx);
+
return out_slice;
}
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index a2eb02bd85..a457aaa7a2 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -270,30 +270,25 @@ grpc_core::TraceFlag grpc_compression_trace(false, "compression");
#define CALL_FROM_TOP_ELEM(top_elem) \
CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
-static void execute_batch(grpc_exec_ctx* exec_ctx, grpc_call* call,
- grpc_transport_stream_op_batch* op,
+static void execute_batch(grpc_call* call, grpc_transport_stream_op_batch* op,
grpc_closure* start_batch_closure);
-static void cancel_with_status(grpc_exec_ctx* exec_ctx, grpc_call* c,
- status_source source, grpc_status_code status,
+static void cancel_with_status(grpc_call* c, status_source source,
+ grpc_status_code status,
const char* description);
-static void cancel_with_error(grpc_exec_ctx* exec_ctx, grpc_call* c,
- status_source source, grpc_error* error);
-static void destroy_call(grpc_exec_ctx* exec_ctx, void* call_stack,
- grpc_error* error);
-static void receiving_slice_ready(grpc_exec_ctx* exec_ctx, void* bctlp,
- grpc_error* error);
-static void get_final_status(grpc_exec_ctx* exec_ctx, grpc_call* call,
- void (*set_value)(grpc_status_code code,
- void* user_data),
- void* set_value_user_data, grpc_slice* details,
- const char** error_string);
+static void cancel_with_error(grpc_call* c, status_source source,
+ grpc_error* error);
+static void destroy_call(void* call_stack, grpc_error* error);
+static void receiving_slice_ready(void* bctlp, grpc_error* error);
+static void get_final_status(
+ grpc_call* call, void (*set_value)(grpc_status_code code, void* user_data),
+ void* set_value_user_data, grpc_slice* details, const char** error_string);
static void set_status_value_directly(grpc_status_code status, void* dest);
-static void set_status_from_error(grpc_exec_ctx* exec_ctx, grpc_call* call,
- status_source source, grpc_error* error);
-static void process_data_after_md(grpc_exec_ctx* exec_ctx, batch_control* bctl);
-static void post_batch_completion(grpc_exec_ctx* exec_ctx, batch_control* bctl);
-static void add_batch_error(grpc_exec_ctx* exec_ctx, batch_control* bctl,
- grpc_error* error, bool has_cancelled);
+static void set_status_from_error(grpc_call* call, status_source source,
+ grpc_error* error);
+static void process_data_after_md(batch_control* bctl);
+static void post_batch_completion(batch_control* bctl);
+static void add_batch_error(batch_control* bctl, grpc_error* error,
+ bool has_cancelled);
static void add_init_error(grpc_error** composite, grpc_error* new_err) {
if (new_err == GRPC_ERROR_NONE) return;
@@ -311,7 +306,8 @@ static parent_call* get_or_create_parent_call(grpc_call* call) {
if (p == nullptr) {
p = (parent_call*)gpr_arena_alloc(call->arena, sizeof(*p));
gpr_mu_init(&p->child_list_mu);
- if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm)NULL, (gpr_atm)p)) {
+ if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm) nullptr,
+ (gpr_atm)p)) {
gpr_mu_destroy(&p->child_list_mu);
p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
}
@@ -323,8 +319,7 @@ static parent_call* get_parent_call(grpc_call* call) {
return (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
}
-grpc_error* grpc_call_create(grpc_exec_ctx* exec_ctx,
- const grpc_call_create_args* args,
+grpc_error* grpc_call_create(const grpc_call_create_args* args,
grpc_call** out_call) {
size_t i, j;
grpc_error* error = GRPC_ERROR_NONE;
@@ -333,7 +328,7 @@ grpc_error* grpc_call_create(grpc_exec_ctx* exec_ctx,
grpc_call* call;
GPR_TIMER_BEGIN("grpc_call_create", 0);
size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
- GRPC_STATS_INC_CALL_INITIAL_SIZE(exec_ctx, initial_size);
+ GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
gpr_arena* arena = gpr_arena_create(initial_size);
call = (grpc_call*)gpr_arena_alloc(
arena, sizeof(grpc_call) + channel_stack->call_stack_size);
@@ -348,9 +343,9 @@ grpc_error* grpc_call_create(grpc_exec_ctx* exec_ctx,
GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
call->is_client = args->server_transport_data == nullptr;
if (call->is_client) {
- GRPC_STATS_INC_CLIENT_CALLS_CREATED(exec_ctx);
+ GRPC_STATS_INC_CLIENT_CALLS_CREATED();
} else {
- GRPC_STATS_INC_SERVER_CALLS_CREATED(exec_ctx);
+ GRPC_STATS_INC_SERVER_CALLS_CREATED();
}
call->stream_op_payload.context = call->context;
grpc_slice path = grpc_empty_slice();
@@ -445,15 +440,13 @@ grpc_error* grpc_call_create(grpc_exec_ctx* exec_ctx,
send_deadline,
call->arena,
&call->call_combiner};
- add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1,
- destroy_call, call, &call_args));
+ add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call,
+ call, &call_args));
if (error != GRPC_ERROR_NONE) {
- cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
- GRPC_ERROR_REF(error));
+ cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error));
}
if (immediately_cancel) {
- cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE,
- GRPC_ERROR_CANCELLED);
+ cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED);
}
if (args->cq != nullptr) {
GPR_ASSERT(
@@ -468,17 +461,17 @@ grpc_error* grpc_call_create(grpc_exec_ctx* exec_ctx,
args->pollset_set_alternative);
}
if (!grpc_polling_entity_is_empty(&call->pollent)) {
- grpc_call_stack_set_pollset_or_pollset_set(
- exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
+ grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
+ &call->pollent);
}
- grpc_slice_unref_internal(exec_ctx, path);
+ grpc_slice_unref_internal(path);
GPR_TIMER_END("grpc_call_create", 0);
return error;
}
-void grpc_call_set_completion_queue(grpc_exec_ctx* exec_ctx, grpc_call* call,
+void grpc_call_set_completion_queue(grpc_call* call,
grpc_completion_queue* cq) {
GPR_ASSERT(cq);
@@ -489,8 +482,8 @@ void grpc_call_set_completion_queue(grpc_exec_ctx* exec_ctx, grpc_call* call,
call->cq = cq;
GRPC_CQ_INTERNAL_REF(cq, "bind");
call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
- grpc_call_stack_set_pollset_or_pollset_set(
- exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
+ grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
+ &call->pollent);
}
#ifndef NDEBUG
@@ -503,40 +496,38 @@ void grpc_call_set_completion_queue(grpc_exec_ctx* exec_ctx, grpc_call* call,
void grpc_call_internal_ref(grpc_call* c REF_ARG) {
GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
}
-void grpc_call_internal_unref(grpc_exec_ctx* exec_ctx, grpc_call* c REF_ARG) {
- GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON);
+void grpc_call_internal_unref(grpc_call* c REF_ARG) {
+ GRPC_CALL_STACK_UNREF(CALL_STACK_FROM_CALL(c), REF_REASON);
}
-static void release_call(grpc_exec_ctx* exec_ctx, void* call,
- grpc_error* error) {
+static void release_call(void* call, grpc_error* error) {
grpc_call* c = (grpc_call*)call;
grpc_channel* channel = c->channel;
grpc_call_combiner_destroy(&c->call_combiner);
gpr_free((char*)c->peer_string);
grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
+ GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
}
static void set_status_value_directly(grpc_status_code status, void* dest);
-static void destroy_call(grpc_exec_ctx* exec_ctx, void* call,
- grpc_error* error) {
+static void destroy_call(void* call, grpc_error* error) {
size_t i;
int ii;
grpc_call* c = (grpc_call*)call;
GPR_TIMER_BEGIN("destroy_call", 0);
for (i = 0; i < 2; i++) {
grpc_metadata_batch_destroy(
- exec_ctx, &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
+ &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
}
if (c->receiving_stream != nullptr) {
- grpc_byte_stream_destroy(exec_ctx, c->receiving_stream);
+ grpc_byte_stream_destroy(c->receiving_stream);
}
parent_call* pc = get_parent_call(c);
if (pc != nullptr) {
gpr_mu_destroy(&pc->child_list_mu);
}
for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
- GRPC_MDELEM_UNREF(exec_ctx, c->send_extra_metadata[ii].md);
+ GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md);
}
for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
if (c->context[i].destroy) {
@@ -544,12 +535,11 @@ static void destroy_call(grpc_exec_ctx* exec_ctx, void* call,
}
}
if (c->cq) {
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, c->cq, "bind");
+ GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
}
- get_final_status(exec_ctx, c, set_status_value_directly,
- &c->final_info.final_status, nullptr,
- c->final_info.error_string);
+ get_final_status(c, set_status_value_directly, &c->final_info.final_status,
+ nullptr, c->final_info.error_string);
c->final_info.stats.latency =
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
@@ -558,7 +548,7 @@ static void destroy_call(grpc_exec_ctx* exec_ctx, void* call,
unpack_received_status(gpr_atm_acq_load(&c->status[i])).error);
}
- grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info,
+ grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
grpc_schedule_on_exec_ctx));
GPR_TIMER_END("destroy_call", 0);
@@ -570,7 +560,7 @@ void grpc_call_unref(grpc_call* c) {
if (!gpr_unref(&c->ext_ref)) return;
child_call* cc = c->child;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
GPR_TIMER_BEGIN("grpc_call_unref", 0);
GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
@@ -587,7 +577,7 @@ void grpc_call_unref(grpc_call* c) {
cc->sibling_prev->child->sibling_next = cc->sibling_next;
cc->sibling_next->child->sibling_prev = cc->sibling_prev;
gpr_mu_unlock(&pc->child_list_mu);
- GRPC_CALL_INTERNAL_UNREF(&exec_ctx, cc->parent, "child");
+ GRPC_CALL_INTERNAL_UNREF(cc->parent, "child");
}
GPR_ASSERT(!c->destroy_called);
@@ -595,53 +585,49 @@ void grpc_call_unref(grpc_call* c) {
bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
gpr_atm_acq_load(&c->received_final_op_atm) == 0;
if (cancel) {
- cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE,
- GRPC_ERROR_CANCELLED);
+ cancel_with_error(c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED);
} else {
// Unset the call combiner cancellation closure. This has the
// effect of scheduling the previously set cancellation closure, if
// any, so that it can release any internal references it may be
// holding to the call stack.
- grpc_call_combiner_set_notify_on_cancel(&exec_ctx, &c->call_combiner,
- nullptr);
+ grpc_call_combiner_set_notify_on_cancel(&c->call_combiner, nullptr);
}
- GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy");
- grpc_exec_ctx_finish(&exec_ctx);
+ GRPC_CALL_INTERNAL_UNREF(c, "destroy");
+
GPR_TIMER_END("grpc_call_unref", 0);
}
grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
GPR_ASSERT(!reserved);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- cancel_with_error(&exec_ctx, call, STATUS_FROM_API_OVERRIDE,
- GRPC_ERROR_CANCELLED);
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_core::ExecCtx exec_ctx;
+ cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED);
+
return GRPC_CALL_OK;
}
// This is called via the call combiner to start sending a batch down
// the filter stack.
-static void execute_batch_in_call_combiner(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* ignored) {
+static void execute_batch_in_call_combiner(void* arg, grpc_error* ignored) {
grpc_transport_stream_op_batch* batch = (grpc_transport_stream_op_batch*)arg;
grpc_call* call = (grpc_call*)batch->handler_private.extra_arg;
GPR_TIMER_BEGIN("execute_batch", 0);
grpc_call_element* elem = CALL_ELEM_FROM_CALL(call, 0);
GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
- elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch);
+ elem->filter->start_transport_stream_op_batch(elem, batch);
GPR_TIMER_END("execute_batch", 0);
}
// start_batch_closure points to a caller-allocated closure to be used
// for entering the call combiner.
-static void execute_batch(grpc_exec_ctx* exec_ctx, grpc_call* call,
+static void execute_batch(grpc_call* call,
grpc_transport_stream_op_batch* batch,
grpc_closure* start_batch_closure) {
batch->handler_private.extra_arg = call;
GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(exec_ctx, &call->call_combiner, start_batch_closure,
+ GRPC_CALL_COMBINER_START(&call->call_combiner, start_batch_closure,
GRPC_ERROR_NONE, "executing batch");
}
@@ -665,15 +651,14 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
grpc_status_code status,
const char* description,
void* reserved) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE(
"grpc_call_cancel_with_status("
"c=%p, status=%d, description=%s, reserved=%p)",
4, (c, (int)status, description, reserved));
GPR_ASSERT(reserved == nullptr);
- cancel_with_status(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, status,
- description);
- grpc_exec_ctx_finish(&exec_ctx);
+ cancel_with_status(c, STATUS_FROM_API_OVERRIDE, status, description);
+
return GRPC_CALL_OK;
}
@@ -685,24 +670,23 @@ typedef struct {
// The on_complete callback used when sending a cancel_stream batch down
// the filter stack. Yields the call combiner when the batch is done.
-static void done_termination(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+static void done_termination(void* arg, grpc_error* error) {
cancel_state* state = (cancel_state*)arg;
- GRPC_CALL_COMBINER_STOP(exec_ctx, &state->call->call_combiner,
+ GRPC_CALL_COMBINER_STOP(&state->call->call_combiner,
"on_complete for cancel_stream op");
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, state->call, "termination");
+ GRPC_CALL_INTERNAL_UNREF(state->call, "termination");
gpr_free(state);
}
-static void cancel_with_error(grpc_exec_ctx* exec_ctx, grpc_call* c,
- status_source source, grpc_error* error) {
+static void cancel_with_error(grpc_call* c, status_source source,
+ grpc_error* error) {
GRPC_CALL_INTERNAL_REF(c, "termination");
// Inform the call combiner of the cancellation, so that it can cancel
// any in-flight asynchronous actions that may be holding the call
// combiner. This ensures that the cancel_stream batch can be sent
// down the filter stack in a timely manner.
- grpc_call_combiner_cancel(exec_ctx, &c->call_combiner, GRPC_ERROR_REF(error));
- set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
+ grpc_call_combiner_cancel(&c->call_combiner, GRPC_ERROR_REF(error));
+ set_status_from_error(c, source, GRPC_ERROR_REF(error));
cancel_state* state = (cancel_state*)gpr_malloc(sizeof(*state));
state->call = c;
GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
@@ -711,7 +695,7 @@ static void cancel_with_error(grpc_exec_ctx* exec_ctx, grpc_call* c,
grpc_make_transport_stream_op(&state->finish_batch);
op->cancel_stream = true;
op->payload->cancel_stream.cancel_error = error;
- execute_batch(exec_ctx, c, op, &state->start_batch);
+ execute_batch(c, op, &state->start_batch);
}
static grpc_error* error_from_status(grpc_status_code status,
@@ -725,11 +709,10 @@ static grpc_error* error_from_status(grpc_status_code status,
GRPC_ERROR_INT_GRPC_STATUS, status);
}
-static void cancel_with_status(grpc_exec_ctx* exec_ctx, grpc_call* c,
- status_source source, grpc_status_code status,
+static void cancel_with_status(grpc_call* c, status_source source,
+ grpc_status_code status,
const char* description) {
- cancel_with_error(exec_ctx, c, source,
- error_from_status(status, description));
+ cancel_with_error(c, source, error_from_status(status, description));
}
/*******************************************************************************
@@ -737,14 +720,13 @@ static void cancel_with_status(grpc_exec_ctx* exec_ctx, grpc_call* c,
*/
static bool get_final_status_from(
- grpc_exec_ctx* exec_ctx, grpc_call* call, grpc_error* error,
- bool allow_ok_status,
+ grpc_call* call, grpc_error* error, bool allow_ok_status,
void (*set_value)(grpc_status_code code, void* user_data),
void* set_value_user_data, grpc_slice* details, const char** error_string) {
grpc_status_code code;
grpc_slice slice = grpc_empty_slice();
- grpc_error_get_status(exec_ctx, error, call->send_deadline, &code, &slice,
- nullptr, error_string);
+ grpc_error_get_status(error, call->send_deadline, &code, &slice, nullptr,
+ error_string);
if (code == GRPC_STATUS_OK && !allow_ok_status) {
return false;
}
@@ -756,11 +738,9 @@ static bool get_final_status_from(
return true;
}
-static void get_final_status(grpc_exec_ctx* exec_ctx, grpc_call* call,
- void (*set_value)(grpc_status_code code,
- void* user_data),
- void* set_value_user_data, grpc_slice* details,
- const char** error_string) {
+static void get_final_status(
+ grpc_call* call, void (*set_value)(grpc_status_code code, void* user_data),
+ void* set_value_user_data, grpc_slice* details, const char** error_string) {
int i;
received_status status[STATUS_SOURCE_COUNT];
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
@@ -782,9 +762,9 @@ static void get_final_status(grpc_exec_ctx* exec_ctx, grpc_call* call,
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (status[i].is_set &&
grpc_error_has_clear_grpc_status(status[i].error)) {
- if (get_final_status_from(exec_ctx, call, status[i].error,
- allow_ok_status != 0, set_value,
- set_value_user_data, details, error_string)) {
+ if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
+ set_value, set_value_user_data, details,
+ error_string)) {
return;
}
}
@@ -792,9 +772,9 @@ static void get_final_status(grpc_exec_ctx* exec_ctx, grpc_call* call,
/* If no clearly defined status exists, search for 'anything' */
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (status[i].is_set) {
- if (get_final_status_from(exec_ctx, call, status[i].error,
- allow_ok_status != 0, set_value,
- set_value_user_data, details, error_string)) {
+ if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
+ set_value, set_value_user_data, details,
+ error_string)) {
return;
}
}
@@ -808,8 +788,8 @@ static void get_final_status(grpc_exec_ctx* exec_ctx, grpc_call* call,
}
}
-static void set_status_from_error(grpc_exec_ctx* exec_ctx, grpc_call* call,
- status_source source, grpc_error* error) {
+static void set_status_from_error(grpc_call* call, status_source source,
+ grpc_error* error) {
if (!gpr_atm_rel_cas(&call->status[source],
pack_received_status({false, GRPC_ERROR_NONE}),
pack_received_status({true, error}))) {
@@ -861,8 +841,7 @@ uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {
static void destroy_encodings_accepted_by_peer(void* p) { return; }
-static void set_encodings_accepted_by_peer(grpc_exec_ctx* exec_ctx,
- grpc_call* call, grpc_mdelem mdel) {
+static void set_encodings_accepted_by_peer(grpc_call* call, grpc_mdelem mdel) {
size_t i;
grpc_compression_algorithm algorithm;
grpc_slice_buffer accept_encoding_parts;
@@ -900,15 +879,14 @@ static void set_encodings_accepted_by_peer(grpc_exec_ctx* exec_ctx,
}
}
- grpc_slice_buffer_destroy_internal(exec_ctx, &accept_encoding_parts);
+ grpc_slice_buffer_destroy_internal(&accept_encoding_parts);
grpc_mdelem_set_user_data(
mdel, destroy_encodings_accepted_by_peer,
(void*)(((uintptr_t)call->encodings_accepted_by_peer) + 1));
}
-static void set_stream_encodings_accepted_by_peer(grpc_exec_ctx* exec_ctx,
- grpc_call* call,
+static void set_stream_encodings_accepted_by_peer(grpc_call* call,
grpc_mdelem mdel) {
size_t i;
grpc_stream_compression_algorithm algorithm;
@@ -946,7 +924,7 @@ static void set_stream_encodings_accepted_by_peer(grpc_exec_ctx* exec_ctx,
}
}
- grpc_slice_buffer_destroy_internal(exec_ctx, &accept_encoding_parts);
+ grpc_slice_buffer_destroy_internal(&accept_encoding_parts);
grpc_mdelem_set_user_data(
mdel, destroy_encodings_accepted_by_peer,
@@ -984,10 +962,12 @@ static grpc_metadata* get_md_elem(grpc_metadata* metadata,
return res;
}
-static int prepare_application_metadata(
- grpc_exec_ctx* exec_ctx, grpc_call* call, int count,
- grpc_metadata* metadata, int is_trailing, int prepend_extra_metadata,
- grpc_metadata* additional_metadata, int additional_metadata_count) {
+static int prepare_application_metadata(grpc_call* call, int count,
+ grpc_metadata* metadata,
+ int is_trailing,
+ int prepend_extra_metadata,
+ grpc_metadata* additional_metadata,
+ int additional_metadata_count) {
int total_count = count + additional_metadata_count;
int i;
grpc_metadata_batch* batch =
@@ -1006,14 +986,14 @@ static int prepare_application_metadata(
grpc_validate_header_nonbin_value_is_legal(md->value))) {
break;
}
- l->md = grpc_mdelem_from_grpc_metadata(exec_ctx, (grpc_metadata*)md);
+ l->md = grpc_mdelem_from_grpc_metadata((grpc_metadata*)md);
}
if (i != total_count) {
for (int j = 0; j < i; j++) {
const grpc_metadata* md =
get_md_elem(metadata, additional_metadata, j, count);
grpc_linked_mdelem* l = linked_from_md(md);
- GRPC_MDELEM_UNREF(exec_ctx, l->md);
+ GRPC_MDELEM_UNREF(l->md);
}
return 0;
}
@@ -1024,16 +1004,16 @@ static int prepare_application_metadata(
for (i = 0; i < call->send_extra_metadata_count; i++) {
GRPC_LOG_IF_ERROR("prepare_application_metadata",
grpc_metadata_batch_link_tail(
- exec_ctx, batch, &call->send_extra_metadata[i]));
+ batch, &call->send_extra_metadata[i]));
}
}
}
for (i = 0; i < total_count; i++) {
grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
grpc_linked_mdelem* l = linked_from_md(md);
- grpc_error* error = grpc_metadata_batch_link_tail(exec_ctx, batch, l);
+ grpc_error* error = grpc_metadata_batch_link_tail(batch, l);
if (error != GRPC_ERROR_NONE) {
- GRPC_MDELEM_UNREF(exec_ctx, l->md);
+ GRPC_MDELEM_UNREF(l->md);
}
GRPC_LOG_IF_ERROR("prepare_application_metadata", error);
}
@@ -1120,46 +1100,43 @@ static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
GPR_TIMER_END("publish_app_metadata", 0);
}
-static void recv_initial_filter(grpc_exec_ctx* exec_ctx, grpc_call* call,
- grpc_metadata_batch* b) {
+static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
if (b->idx.named.content_encoding != nullptr) {
if (b->idx.named.grpc_encoding != nullptr) {
gpr_log(GPR_ERROR,
"Received both content-encoding and grpc-encoding header. "
"Ignoring grpc-encoding.");
- grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding);
+ grpc_metadata_batch_remove(b, b->idx.named.grpc_encoding);
}
GPR_TIMER_BEGIN("incoming_stream_compression_algorithm", 0);
set_incoming_stream_compression_algorithm(
call, decode_stream_compression(b->idx.named.content_encoding->md));
GPR_TIMER_END("incoming_stream_compression_algorithm", 0);
- grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.content_encoding);
+ grpc_metadata_batch_remove(b, b->idx.named.content_encoding);
} else if (b->idx.named.grpc_encoding != nullptr) {
GPR_TIMER_BEGIN("incoming_compression_algorithm", 0);
set_incoming_compression_algorithm(
call, decode_compression(b->idx.named.grpc_encoding->md));
GPR_TIMER_END("incoming_compression_algorithm", 0);
- grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding);
+ grpc_metadata_batch_remove(b, b->idx.named.grpc_encoding);
}
if (b->idx.named.grpc_accept_encoding != nullptr) {
GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0);
- set_encodings_accepted_by_peer(exec_ctx, call,
- b->idx.named.grpc_accept_encoding->md);
- grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_accept_encoding);
+ set_encodings_accepted_by_peer(call, b->idx.named.grpc_accept_encoding->md);
+ grpc_metadata_batch_remove(b, b->idx.named.grpc_accept_encoding);
GPR_TIMER_END("encodings_accepted_by_peer", 0);
}
if (b->idx.named.accept_encoding != nullptr) {
GPR_TIMER_BEGIN("stream_encodings_accepted_by_peer", 0);
- set_stream_encodings_accepted_by_peer(exec_ctx, call,
+ set_stream_encodings_accepted_by_peer(call,
b->idx.named.accept_encoding->md);
- grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.accept_encoding);
+ grpc_metadata_batch_remove(b, b->idx.named.accept_encoding);
GPR_TIMER_END("stream_encodings_accepted_by_peer", 0);
}
publish_app_metadata(call, b, false);
}
-static void recv_trailing_filter(grpc_exec_ctx* exec_ctx, void* args,
- grpc_metadata_batch* b) {
+static void recv_trailing_filter(void* args, grpc_metadata_batch* b) {
grpc_call* call = (grpc_call*)args;
if (b->idx.named.grpc_status != nullptr) {
uint32_t status_code = decode_status(b->idx.named.grpc_status->md);
@@ -1174,13 +1151,13 @@ static void recv_trailing_filter(grpc_exec_ctx* exec_ctx, void* args,
error = grpc_error_set_str(
error, GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md)));
- grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_message);
+ grpc_metadata_batch_remove(b, b->idx.named.grpc_message);
} else if (error != GRPC_ERROR_NONE) {
error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_empty_slice());
}
- set_status_from_error(exec_ctx, call, STATUS_FROM_WIRE, error);
- grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_status);
+ set_status_from_error(call, STATUS_FROM_WIRE, error);
+ grpc_metadata_batch_remove(b, b->idx.named.grpc_status);
}
publish_app_metadata(call, b, true);
}
@@ -1257,12 +1234,12 @@ static batch_control* allocate_batch_control(grpc_call* call,
return bctl;
}
-static void finish_batch_completion(grpc_exec_ctx* exec_ctx, void* user_data,
+static void finish_batch_completion(void* user_data,
grpc_cq_completion* storage) {
batch_control* bctl = (batch_control*)user_data;
grpc_call* call = bctl->call;
bctl->call = nullptr;
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
+ GRPC_CALL_INTERNAL_UNREF(call, "completion");
}
static grpc_error* consolidate_batch_errors(batch_control* bctl) {
@@ -1286,15 +1263,13 @@ static grpc_error* consolidate_batch_errors(batch_control* bctl) {
}
}
-static void post_batch_completion(grpc_exec_ctx* exec_ctx,
- batch_control* bctl) {
+static void post_batch_completion(batch_control* bctl) {
grpc_call* next_child_call;
grpc_call* call = bctl->call;
grpc_error* error = consolidate_batch_errors(bctl);
if (bctl->op.send_initial_metadata) {
grpc_metadata_batch_destroy(
- exec_ctx,
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
if (bctl->op.send_message) {
@@ -1302,13 +1277,12 @@ static void post_batch_completion(grpc_exec_ctx* exec_ctx,
}
if (bctl->op.send_trailing_metadata) {
grpc_metadata_batch_destroy(
- exec_ctx,
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
}
if (bctl->op.recv_trailing_metadata) {
grpc_metadata_batch* md =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- recv_trailing_filter(exec_ctx, call, md);
+ recv_trailing_filter(call, md);
/* propagate cancellation to any interested children */
gpr_atm_rel_store(&call->received_final_op_atm, 1);
@@ -1322,9 +1296,9 @@ static void post_batch_completion(grpc_exec_ctx* exec_ctx,
next_child_call = child->child->sibling_next;
if (child->cancellation_is_inherited) {
GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
- cancel_with_error(exec_ctx, child, STATUS_FROM_API_OVERRIDE,
+ cancel_with_error(child, STATUS_FROM_API_OVERRIDE,
GRPC_ERROR_CANCELLED);
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, child, "propagate_cancel");
+ GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel");
}
child = next_child_call;
} while (child != pc->first_child);
@@ -1333,12 +1307,12 @@ static void post_batch_completion(grpc_exec_ctx* exec_ctx,
}
if (call->is_client) {
- get_final_status(exec_ctx, call, set_status_value_directly,
+ get_final_status(call, set_status_value_directly,
call->final_op.client.status,
call->final_op.client.status_details,
call->final_op.client.error_string);
} else {
- get_final_status(exec_ctx, call, set_cancelled_value,
+ get_final_status(call, set_cancelled_value,
call->final_op.server.cancelled, nullptr, nullptr);
}
@@ -1354,25 +1328,24 @@ static void post_batch_completion(grpc_exec_ctx* exec_ctx,
if (bctl->completion_data.notify_tag.is_closure) {
/* unrefs bctl->error */
bctl->call = nullptr;
- GRPC_CLOSURE_RUN(
- exec_ctx, (grpc_closure*)bctl->completion_data.notify_tag.tag, error);
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
+ GRPC_CLOSURE_RUN((grpc_closure*)bctl->completion_data.notify_tag.tag,
+ error);
+ GRPC_CALL_INTERNAL_UNREF(call, "completion");
} else {
/* unrefs bctl->error */
- grpc_cq_end_op(
- exec_ctx, bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
- finish_batch_completion, bctl, &bctl->completion_data.cq_completion);
+ grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
+ finish_batch_completion, bctl,
+ &bctl->completion_data.cq_completion);
}
}
-static void finish_batch_step(grpc_exec_ctx* exec_ctx, batch_control* bctl) {
+static void finish_batch_step(batch_control* bctl) {
if (gpr_unref(&bctl->steps_to_complete)) {
- post_batch_completion(exec_ctx, bctl);
+ post_batch_completion(bctl);
}
}
-static void continue_receiving_slices(grpc_exec_ctx* exec_ctx,
- batch_control* bctl) {
+static void continue_receiving_slices(batch_control* bctl) {
grpc_error* error;
grpc_call* call = bctl->call;
for (;;) {
@@ -1380,25 +1353,25 @@ static void continue_receiving_slices(grpc_exec_ctx* exec_ctx,
(*call->receiving_buffer)->data.raw.slice_buffer.length;
if (remaining == 0) {
call->receiving_message = 0;
- grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
+ grpc_byte_stream_destroy(call->receiving_stream);
call->receiving_stream = nullptr;
- finish_batch_step(exec_ctx, bctl);
+ finish_batch_step(bctl);
return;
}
- if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, remaining,
+ if (grpc_byte_stream_next(call->receiving_stream, remaining,
&call->receiving_slice_ready)) {
- error = grpc_byte_stream_pull(exec_ctx, call->receiving_stream,
- &call->receiving_slice);
+ error =
+ grpc_byte_stream_pull(call->receiving_stream, &call->receiving_slice);
if (error == GRPC_ERROR_NONE) {
grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
call->receiving_slice);
} else {
- grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
+ grpc_byte_stream_destroy(call->receiving_stream);
call->receiving_stream = nullptr;
grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = nullptr;
call->receiving_message = 0;
- finish_batch_step(exec_ctx, bctl);
+ finish_batch_step(bctl);
return;
}
} else {
@@ -1407,8 +1380,7 @@ static void continue_receiving_slices(grpc_exec_ctx* exec_ctx,
}
}
-static void receiving_slice_ready(grpc_exec_ctx* exec_ctx, void* bctlp,
- grpc_error* error) {
+static void receiving_slice_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = (batch_control*)bctlp;
grpc_call* call = bctl->call;
grpc_byte_stream* bs = call->receiving_stream;
@@ -1416,11 +1388,11 @@ static void receiving_slice_ready(grpc_exec_ctx* exec_ctx, void* bctlp,
if (error == GRPC_ERROR_NONE) {
grpc_slice slice;
- error = grpc_byte_stream_pull(exec_ctx, bs, &slice);
+ error = grpc_byte_stream_pull(bs, &slice);
if (error == GRPC_ERROR_NONE) {
grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
slice);
- continue_receiving_slices(exec_ctx, bctl);
+ continue_receiving_slices(bctl);
} else {
/* Error returned by grpc_byte_stream_pull needs to be released manually
*/
@@ -1432,25 +1404,24 @@ static void receiving_slice_ready(grpc_exec_ctx* exec_ctx, void* bctlp,
if (grpc_trace_operation_failures.enabled()) {
GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
}
- grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
+ grpc_byte_stream_destroy(call->receiving_stream);
call->receiving_stream = nullptr;
grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = nullptr;
call->receiving_message = 0;
- finish_batch_step(exec_ctx, bctl);
+ finish_batch_step(bctl);
if (release_error) {
GRPC_ERROR_UNREF(error);
}
}
}
-static void process_data_after_md(grpc_exec_ctx* exec_ctx,
- batch_control* bctl) {
+static void process_data_after_md(batch_control* bctl) {
grpc_call* call = bctl->call;
if (call->receiving_stream == nullptr) {
*call->receiving_buffer = nullptr;
call->receiving_message = 0;
- finish_batch_step(exec_ctx, bctl);
+ finish_batch_step(bctl);
} else {
call->test_only_last_message_flags = call->receiving_stream->flags;
if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
@@ -1462,46 +1433,42 @@ static void process_data_after_md(grpc_exec_ctx* exec_ctx,
}
GRPC_CLOSURE_INIT(&call->receiving_slice_ready, receiving_slice_ready, bctl,
grpc_schedule_on_exec_ctx);
- continue_receiving_slices(exec_ctx, bctl);
+ continue_receiving_slices(bctl);
}
}
-static void receiving_stream_ready(grpc_exec_ctx* exec_ctx, void* bctlp,
- grpc_error* error) {
+static void receiving_stream_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = (batch_control*)bctlp;
grpc_call* call = bctl->call;
if (error != GRPC_ERROR_NONE) {
if (call->receiving_stream != nullptr) {
- grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
+ grpc_byte_stream_destroy(call->receiving_stream);
call->receiving_stream = nullptr;
}
- add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), true);
- cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
- GRPC_ERROR_REF(error));
+ add_batch_error(bctl, GRPC_ERROR_REF(error), true);
+ cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error));
}
/* If recv_state is RECV_NONE, we will save the batch_control
* object with rel_cas, and will not use it after the cas. Its corresponding
* acq_load is in receiving_initial_metadata_ready() */
if (error != GRPC_ERROR_NONE || call->receiving_stream == nullptr ||
!gpr_atm_rel_cas(&call->recv_state, RECV_NONE, (gpr_atm)bctlp)) {
- process_data_after_md(exec_ctx, bctl);
+ process_data_after_md(bctl);
}
}
// The recv_message_ready callback used when sending a batch containing
// a recv_message op down the filter stack. Yields the call combiner
// before processing the received message.
-static void receiving_stream_ready_in_call_combiner(grpc_exec_ctx* exec_ctx,
- void* bctlp,
+static void receiving_stream_ready_in_call_combiner(void* bctlp,
grpc_error* error) {
batch_control* bctl = (batch_control*)bctlp;
grpc_call* call = bctl->call;
- GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "recv_message_ready");
- receiving_stream_ready(exec_ctx, bctlp, error);
+ GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready");
+ receiving_stream_ready(bctlp, error);
}
-static void validate_filtered_metadata(grpc_exec_ctx* exec_ctx,
- batch_control* bctl) {
+static void validate_filtered_metadata(batch_control* bctl) {
grpc_call* call = bctl->call;
/* validate compression algorithms */
if (call->incoming_stream_compression_algorithm !=
@@ -1515,8 +1482,8 @@ static void validate_filtered_metadata(grpc_exec_ctx* exec_ctx,
gpr_asprintf(&error_msg,
"Invalid stream compression algorithm value '%d'.", algo);
gpr_log(GPR_ERROR, "%s", error_msg);
- cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
- GRPC_STATUS_UNIMPLEMENTED, error_msg);
+ cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED,
+ error_msg);
} else if (grpc_compression_options_is_stream_compression_algorithm_enabled(
&compression_options, algo) == 0) {
/* check if algorithm is supported by current channel config */
@@ -1525,8 +1492,8 @@ static void validate_filtered_metadata(grpc_exec_ctx* exec_ctx,
gpr_asprintf(&error_msg, "Stream compression algorithm '%s' is disabled.",
algo_name);
gpr_log(GPR_ERROR, "%s", error_msg);
- cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
- GRPC_STATUS_UNIMPLEMENTED, error_msg);
+ cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED,
+ error_msg);
}
gpr_free(error_msg);
@@ -1556,8 +1523,8 @@ static void validate_filtered_metadata(grpc_exec_ctx* exec_ctx,
gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.",
algo);
gpr_log(GPR_ERROR, "%s", error_msg);
- cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
- GRPC_STATUS_UNIMPLEMENTED, error_msg);
+ cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED,
+ error_msg);
} else if (grpc_compression_options_is_algorithm_enabled(
&compression_options, algo) == 0) {
/* check if algorithm is supported by current channel config */
@@ -1566,8 +1533,8 @@ static void validate_filtered_metadata(grpc_exec_ctx* exec_ctx,
gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
algo_name);
gpr_log(GPR_ERROR, "%s", error_msg);
- cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
- GRPC_STATUS_UNIMPLEMENTED, error_msg);
+ cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED,
+ error_msg);
} else {
call->incoming_compression_algorithm = algo;
}
@@ -1590,34 +1557,31 @@ static void validate_filtered_metadata(grpc_exec_ctx* exec_ctx,
}
}
-static void add_batch_error(grpc_exec_ctx* exec_ctx, batch_control* bctl,
- grpc_error* error, bool has_cancelled) {
+static void add_batch_error(batch_control* bctl, grpc_error* error,
+ bool has_cancelled) {
if (error == GRPC_ERROR_NONE) return;
int idx = (int)gpr_atm_full_fetch_add(&bctl->num_errors, 1);
if (idx == 0 && !has_cancelled) {
- cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE,
- GRPC_ERROR_REF(error));
+ cancel_with_error(bctl->call, STATUS_FROM_CORE, GRPC_ERROR_REF(error));
}
bctl->errors[idx] = error;
}
-static void receiving_initial_metadata_ready(grpc_exec_ctx* exec_ctx,
- void* bctlp, grpc_error* error) {
+static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = (batch_control*)bctlp;
grpc_call* call = bctl->call;
- GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner,
- "recv_initial_metadata_ready");
+ GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
- add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
+ add_batch_error(bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch* md =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
- recv_initial_filter(exec_ctx, call, md);
+ recv_initial_filter(call, md);
/* TODO(ctiller): this could be moved into recv_initial_filter now */
GPR_TIMER_BEGIN("validate_filtered_metadata", 0);
- validate_filtered_metadata(exec_ctx, bctl);
+ validate_filtered_metadata(bctl);
GPR_TIMER_END("validate_filtered_metadata", 0);
if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) {
@@ -1650,28 +1614,25 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx* exec_ctx,
}
}
if (saved_rsr_closure != nullptr) {
- GRPC_CLOSURE_RUN(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_RUN(saved_rsr_closure, GRPC_ERROR_REF(error));
}
- finish_batch_step(exec_ctx, bctl);
+ finish_batch_step(bctl);
}
-static void finish_batch(grpc_exec_ctx* exec_ctx, void* bctlp,
- grpc_error* error) {
+static void finish_batch(void* bctlp, grpc_error* error) {
batch_control* bctl = (batch_control*)bctlp;
grpc_call* call = bctl->call;
- GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "on_complete");
- add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
- finish_batch_step(exec_ctx, bctl);
+ GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
+ add_batch_error(bctl, GRPC_ERROR_REF(error), false);
+ finish_batch_step(bctl);
}
-static void free_no_op_completion(grpc_exec_ctx* exec_ctx, void* p,
- grpc_cq_completion* completion) {
+static void free_no_op_completion(void* p, grpc_cq_completion* completion) {
gpr_free(completion);
}
-static grpc_call_error call_start_batch(grpc_exec_ctx* exec_ctx,
- grpc_call* call, const grpc_op* ops,
+static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
size_t nops, void* notify_tag,
int is_notify_tag_closure) {
size_t i;
@@ -1689,11 +1650,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx* exec_ctx,
if (!is_notify_tag_closure) {
GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
grpc_cq_end_op(
- exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE,
- free_no_op_completion, nullptr,
+ call->cq, notify_tag, GRPC_ERROR_NONE, free_no_op_completion, nullptr,
(grpc_cq_completion*)gpr_malloc(sizeof(grpc_cq_completion)));
} else {
- GRPC_CLOSURE_SCHED(exec_ctx, (grpc_closure*)notify_tag, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED((grpc_closure*)notify_tag, GRPC_ERROR_NONE);
}
error = GRPC_CALL_OK;
goto done;
@@ -1793,7 +1753,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx* exec_ctx,
stream_op->send_initial_metadata = true;
call->sent_initial_metadata = true;
if (!prepare_application_metadata(
- exec_ctx, call, (int)op->data.send_initial_metadata.count,
+ call, (int)op->data.send_initial_metadata.count,
op->data.send_initial_metadata.metadata, 0, call->is_client,
&call->compression_md, (int)additional_metadata_count)) {
error = GRPC_CALL_ERROR_INVALID_METADATA;
@@ -1887,7 +1847,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx* exec_ctx,
GPR_ASSERT(call->send_extra_metadata_count == 0);
call->send_extra_metadata_count = 1;
call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem(
- exec_ctx, call->channel, op->data.send_status_from_server.status);
+ call->channel, op->data.send_status_from_server.status);
{
grpc_error* override_error = GRPC_ERROR_NONE;
if (op->data.send_status_from_server.status != GRPC_STATUS_OK) {
@@ -1896,7 +1856,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx* exec_ctx,
}
if (op->data.send_status_from_server.status_details != nullptr) {
call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
- exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
+ GRPC_MDSTR_GRPC_MESSAGE,
grpc_slice_ref_internal(
*op->data.send_status_from_server.status_details));
call->send_extra_metadata_count++;
@@ -1907,16 +1867,15 @@ static grpc_call_error call_start_batch(grpc_exec_ctx* exec_ctx,
grpc_slice_from_copied_string(msg));
gpr_free(msg);
}
- set_status_from_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE,
- override_error);
+ set_status_from_error(call, STATUS_FROM_API_OVERRIDE, override_error);
}
if (!prepare_application_metadata(
- exec_ctx, call,
+ call,
(int)op->data.send_status_from_server.trailing_metadata_count,
op->data.send_status_from_server.trailing_metadata, 1, 1,
nullptr, 0)) {
for (int n = 0; n < call->send_extra_metadata_count; n++) {
- GRPC_MDELEM_UNREF(exec_ctx, call->send_extra_metadata[n].md);
+ GRPC_MDELEM_UNREF(call->send_extra_metadata[n].md);
}
call->send_extra_metadata_count = 0;
error = GRPC_CALL_ERROR_INVALID_METADATA;
@@ -2045,7 +2004,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx* exec_ctx,
stream_op->on_complete = &bctl->finish_batch;
gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
- execute_batch(exec_ctx, call, stream_op, &bctl->start_batch);
+ execute_batch(call, stream_op, &bctl->start_batch);
done:
GPR_TIMER_END("grpc_call_start_batch", 0);
@@ -2055,15 +2014,15 @@ done_with_error:
/* reverse any mutations that occured */
if (stream_op->send_initial_metadata) {
call->sent_initial_metadata = false;
- grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][0]);
+ grpc_metadata_batch_clear(&call->metadata_batch[0][0]);
}
if (stream_op->send_message) {
call->sending_message = false;
- grpc_byte_stream_destroy(exec_ctx, &call->sending_stream.base);
+ grpc_byte_stream_destroy(&call->sending_stream.base);
}
if (stream_op->send_trailing_metadata) {
call->sent_final_op = false;
- grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][1]);
+ grpc_metadata_batch_clear(&call->metadata_batch[0][1]);
}
if (stream_op->recv_initial_metadata) {
call->received_initial_metadata = false;
@@ -2079,7 +2038,7 @@ done_with_error:
grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
size_t nops, void* tag, void* reserved) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
grpc_call_error err;
GRPC_API_TRACE(
@@ -2090,19 +2049,17 @@ grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
if (reserved != nullptr) {
err = GRPC_CALL_ERROR;
} else {
- err = call_start_batch(&exec_ctx, call, ops, nops, tag, 0);
+ err = call_start_batch(call, ops, nops, tag, 0);
}
- grpc_exec_ctx_finish(&exec_ctx);
return err;
}
-grpc_call_error grpc_call_start_batch_and_execute(grpc_exec_ctx* exec_ctx,
- grpc_call* call,
+grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
const grpc_op* ops,
size_t nops,
grpc_closure* closure) {
- return call_start_batch(exec_ctx, call, ops, nops, closure, 1);
+ return call_start_batch(call, ops, nops, closure, 1);
}
void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index 1d2e266717..189329ccc4 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -26,8 +26,7 @@
#include <grpc/grpc.h>
#include <grpc/impl/codegen/compression_types.h>
-typedef void (*grpc_ioreq_completion_func)(grpc_exec_ctx* exec_ctx,
- grpc_call* call, int success,
+typedef void (*grpc_ioreq_completion_func)(grpc_call* call, int success,
void* user_data);
typedef struct grpc_call_create_args {
@@ -51,33 +50,28 @@ typedef struct grpc_call_create_args {
/* Create a new call based on \a args.
Regardless of success or failure, always returns a valid new call into *call
*/
-grpc_error* grpc_call_create(grpc_exec_ctx* exec_ctx,
- const grpc_call_create_args* args,
+grpc_error* grpc_call_create(const grpc_call_create_args* args,
grpc_call** call);
-void grpc_call_set_completion_queue(grpc_exec_ctx* exec_ctx, grpc_call* call,
- grpc_completion_queue* cq);
+void grpc_call_set_completion_queue(grpc_call* call, grpc_completion_queue* cq);
#ifndef NDEBUG
void grpc_call_internal_ref(grpc_call* call, const char* reason);
-void grpc_call_internal_unref(grpc_exec_ctx* exec_ctx, grpc_call* call,
- const char* reason);
+void grpc_call_internal_unref(grpc_call* call, const char* reason);
#define GRPC_CALL_INTERNAL_REF(call, reason) \
grpc_call_internal_ref(call, reason)
-#define GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, reason) \
- grpc_call_internal_unref(exec_ctx, call, reason)
+#define GRPC_CALL_INTERNAL_UNREF(call, reason) \
+ grpc_call_internal_unref(call, reason)
#else
void grpc_call_internal_ref(grpc_call* call);
-void grpc_call_internal_unref(grpc_exec_ctx* exec_ctx, grpc_call* call);
+void grpc_call_internal_unref(grpc_call* call);
#define GRPC_CALL_INTERNAL_REF(call, reason) grpc_call_internal_ref(call)
-#define GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, reason) \
- grpc_call_internal_unref(exec_ctx, call)
+#define GRPC_CALL_INTERNAL_UNREF(call, reason) grpc_call_internal_unref(call)
#endif
grpc_call_stack* grpc_call_get_call_stack(grpc_call* call);
-grpc_call_error grpc_call_start_batch_and_execute(grpc_exec_ctx* exec_ctx,
- grpc_call* call,
+grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
const grpc_op* ops,
size_t nops,
grpc_closure* closure);
diff --git a/src/core/lib/surface/call_details.cc b/src/core/lib/surface/call_details.cc
index ea9208c7e3..cd0b14586a 100644
--- a/src/core/lib/surface/call_details.cc
+++ b/src/core/lib/surface/call_details.cc
@@ -34,8 +34,7 @@ void grpc_call_details_init(grpc_call_details* cd) {
void grpc_call_details_destroy(grpc_call_details* cd) {
GRPC_API_TRACE("grpc_call_details_destroy(cd=%p)", 1, (cd));
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_slice_unref_internal(&exec_ctx, cd->method);
- grpc_slice_unref_internal(&exec_ctx, cd->host);
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_core::ExecCtx exec_ctx;
+ grpc_slice_unref_internal(cd->method);
+ grpc_slice_unref_internal(cd->host);
}
diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc
index 1be734cdb7..cf5e8c2150 100644
--- a/src/core/lib/surface/channel.cc
+++ b/src/core/lib/surface/channel.cc
@@ -69,23 +69,22 @@ struct grpc_channel {
#define CHANNEL_FROM_TOP_ELEM(top_elem) \
CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem))
-static void destroy_channel(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error);
+static void destroy_channel(void* arg, grpc_error* error);
grpc_channel* grpc_channel_create_with_builder(
- grpc_exec_ctx* exec_ctx, grpc_channel_stack_builder* builder,
+ grpc_channel_stack_builder* builder,
grpc_channel_stack_type channel_stack_type) {
char* target = gpr_strdup(grpc_channel_stack_builder_get_target(builder));
grpc_channel_args* args = grpc_channel_args_copy(
grpc_channel_stack_builder_get_channel_arguments(builder));
grpc_channel* channel;
if (channel_stack_type == GRPC_SERVER_CHANNEL) {
- GRPC_STATS_INC_SERVER_CHANNELS_CREATED(exec_ctx);
+ GRPC_STATS_INC_SERVER_CHANNELS_CREATED();
} else {
- GRPC_STATS_INC_CLIENT_CHANNELS_CREATED(exec_ctx);
+ GRPC_STATS_INC_CLIENT_CHANNELS_CREATED();
}
grpc_error* error = grpc_channel_stack_builder_finish(
- exec_ctx, builder, sizeof(grpc_channel), 1, destroy_channel, nullptr,
+ builder, sizeof(grpc_channel), 1, destroy_channel, nullptr,
(void**)&channel);
if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "channel stack builder failed: %s",
@@ -114,10 +113,10 @@ grpc_channel* grpc_channel_create_with_builder(
} else {
if (!GRPC_MDISNULL(channel->default_authority)) {
/* setting this takes precedence over anything else */
- GRPC_MDELEM_UNREF(exec_ctx, channel->default_authority);
+ GRPC_MDELEM_UNREF(channel->default_authority);
}
channel->default_authority = grpc_mdelem_from_slices(
- exec_ctx, GRPC_MDSTR_AUTHORITY,
+ GRPC_MDSTR_AUTHORITY,
grpc_slice_intern(
grpc_slice_from_static_string(args->args[i].value.string)));
}
@@ -134,7 +133,7 @@ grpc_channel* grpc_channel_create_with_builder(
GRPC_SSL_TARGET_NAME_OVERRIDE_ARG);
} else {
channel->default_authority = grpc_mdelem_from_slices(
- exec_ctx, GRPC_MDSTR_AUTHORITY,
+ GRPC_MDSTR_AUTHORITY,
grpc_slice_intern(
grpc_slice_from_static_string(args->args[i].value.string)));
}
@@ -191,25 +190,23 @@ grpc_channel* grpc_channel_create_with_builder(
}
done:
- grpc_channel_args_destroy(exec_ctx, args);
+ grpc_channel_args_destroy(args);
return channel;
}
-grpc_channel* grpc_channel_create(grpc_exec_ctx* exec_ctx, const char* target,
+grpc_channel* grpc_channel_create(const char* target,
const grpc_channel_args* input_args,
grpc_channel_stack_type channel_stack_type,
grpc_transport* optional_transport) {
grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create();
- grpc_channel_stack_builder_set_channel_arguments(exec_ctx, builder,
- input_args);
+ grpc_channel_stack_builder_set_channel_arguments(builder, input_args);
grpc_channel_stack_builder_set_target(builder, target);
grpc_channel_stack_builder_set_transport(builder, optional_transport);
- if (!grpc_channel_init_create_stack(exec_ctx, builder, channel_stack_type)) {
- grpc_channel_stack_builder_destroy(exec_ctx, builder);
+ if (!grpc_channel_init_create_stack(builder, channel_stack_type)) {
+ grpc_channel_stack_builder_destroy(builder);
return nullptr;
}
- return grpc_channel_create_with_builder(exec_ctx, builder,
- channel_stack_type);
+ return grpc_channel_create_with_builder(builder, channel_stack_type);
}
size_t grpc_channel_get_call_size_estimate(grpc_channel* channel) {
@@ -251,18 +248,17 @@ char* grpc_channel_get_target(grpc_channel* channel) {
void grpc_channel_get_info(grpc_channel* channel,
const grpc_channel_info* channel_info) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
grpc_channel_element* elem =
grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
- elem->filter->get_channel_info(&exec_ctx, elem, channel_info);
- grpc_exec_ctx_finish(&exec_ctx);
+ elem->filter->get_channel_info(elem, channel_info);
}
static grpc_call* grpc_channel_create_call_internal(
- grpc_exec_ctx* exec_ctx, grpc_channel* channel, grpc_call* parent_call,
- uint32_t propagation_mask, grpc_completion_queue* cq,
- grpc_pollset_set* pollset_set_alternative, grpc_mdelem path_mdelem,
- grpc_mdelem authority_mdelem, grpc_millis deadline) {
+ grpc_channel* channel, grpc_call* parent_call, uint32_t propagation_mask,
+ grpc_completion_queue* cq, grpc_pollset_set* pollset_set_alternative,
+ grpc_mdelem path_mdelem, grpc_mdelem authority_mdelem,
+ grpc_millis deadline) {
grpc_mdelem send_metadata[2];
size_t num_metadata = 0;
@@ -289,7 +285,7 @@ static grpc_call* grpc_channel_create_call_internal(
args.send_deadline = deadline;
grpc_call* call;
- GRPC_LOG_IF_ERROR("call_create", grpc_call_create(exec_ctx, &args, &call));
+ GRPC_LOG_IF_ERROR("call_create", grpc_call_create(&args, &call));
return call;
}
@@ -300,29 +296,27 @@ grpc_call* grpc_channel_create_call(grpc_channel* channel,
grpc_slice method, const grpc_slice* host,
gpr_timespec deadline, void* reserved) {
GPR_ASSERT(!reserved);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
grpc_call* call = grpc_channel_create_call_internal(
- &exec_ctx, channel, parent_call, propagation_mask, cq, nullptr,
- grpc_mdelem_from_slices(&exec_ctx, GRPC_MDSTR_PATH,
- grpc_slice_ref_internal(method)),
- host != nullptr ? grpc_mdelem_from_slices(&exec_ctx, GRPC_MDSTR_AUTHORITY,
+ channel, parent_call, propagation_mask, cq, nullptr,
+ grpc_mdelem_from_slices(GRPC_MDSTR_PATH, grpc_slice_ref_internal(method)),
+ host != nullptr ? grpc_mdelem_from_slices(GRPC_MDSTR_AUTHORITY,
grpc_slice_ref_internal(*host))
: GRPC_MDNULL,
grpc_timespec_to_millis_round_up(deadline));
- grpc_exec_ctx_finish(&exec_ctx);
+
return call;
}
grpc_call* grpc_channel_create_pollset_set_call(
- grpc_exec_ctx* exec_ctx, grpc_channel* channel, grpc_call* parent_call,
- uint32_t propagation_mask, grpc_pollset_set* pollset_set, grpc_slice method,
- const grpc_slice* host, grpc_millis deadline, void* reserved) {
+ grpc_channel* channel, grpc_call* parent_call, uint32_t propagation_mask,
+ grpc_pollset_set* pollset_set, grpc_slice method, const grpc_slice* host,
+ grpc_millis deadline, void* reserved) {
GPR_ASSERT(!reserved);
return grpc_channel_create_call_internal(
- exec_ctx, channel, parent_call, propagation_mask, nullptr, pollset_set,
- grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_PATH,
- grpc_slice_ref_internal(method)),
- host != nullptr ? grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_AUTHORITY,
+ channel, parent_call, propagation_mask, nullptr, pollset_set,
+ grpc_mdelem_from_slices(GRPC_MDSTR_PATH, grpc_slice_ref_internal(method)),
+ host != nullptr ? grpc_mdelem_from_slices(GRPC_MDSTR_AUTHORITY,
grpc_slice_ref_internal(*host))
: GRPC_MDNULL,
deadline);
@@ -335,21 +329,21 @@ void* grpc_channel_register_call(grpc_channel* channel, const char* method,
"grpc_channel_register_call(channel=%p, method=%s, host=%s, reserved=%p)",
4, (channel, method, host, reserved));
GPR_ASSERT(!reserved);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
rc->path = grpc_mdelem_from_slices(
- &exec_ctx, GRPC_MDSTR_PATH,
+ GRPC_MDSTR_PATH,
grpc_slice_intern(grpc_slice_from_static_string(method)));
rc->authority =
host ? grpc_mdelem_from_slices(
- &exec_ctx, GRPC_MDSTR_AUTHORITY,
+ GRPC_MDSTR_AUTHORITY,
grpc_slice_intern(grpc_slice_from_static_string(host)))
: GRPC_MDNULL;
gpr_mu_lock(&channel->registered_call_mu);
rc->next = channel->registered_calls;
channel->registered_calls = rc;
gpr_mu_unlock(&channel->registered_call_mu);
- grpc_exec_ctx_finish(&exec_ctx);
+
return rc;
}
@@ -370,12 +364,12 @@ grpc_call* grpc_channel_create_registered_call(
registered_call_handle, deadline.tv_sec, deadline.tv_nsec,
(int)deadline.clock_type, reserved));
GPR_ASSERT(!reserved);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
grpc_call* call = grpc_channel_create_call_internal(
- &exec_ctx, channel, parent_call, propagation_mask, completion_queue,
- nullptr, GRPC_MDELEM_REF(rc->path), GRPC_MDELEM_REF(rc->authority),
+ channel, parent_call, propagation_mask, completion_queue, nullptr,
+ GRPC_MDELEM_REF(rc->path), GRPC_MDELEM_REF(rc->authority),
grpc_timespec_to_millis_round_up(deadline));
- grpc_exec_ctx_finish(&exec_ctx);
+
return call;
}
@@ -390,23 +384,21 @@ void grpc_channel_internal_ref(grpc_channel* c REF_ARG) {
GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CHANNEL(c), REF_REASON);
}
-void grpc_channel_internal_unref(grpc_exec_ctx* exec_ctx,
- grpc_channel* c REF_ARG) {
- GRPC_CHANNEL_STACK_UNREF(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(c), REF_REASON);
+void grpc_channel_internal_unref(grpc_channel* c REF_ARG) {
+ GRPC_CHANNEL_STACK_UNREF(CHANNEL_STACK_FROM_CHANNEL(c), REF_REASON);
}
-static void destroy_channel(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+static void destroy_channel(void* arg, grpc_error* error) {
grpc_channel* channel = (grpc_channel*)arg;
- grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(channel));
+ grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel));
while (channel->registered_calls) {
registered_call* rc = channel->registered_calls;
channel->registered_calls = rc->next;
- GRPC_MDELEM_UNREF(exec_ctx, rc->path);
- GRPC_MDELEM_UNREF(exec_ctx, rc->authority);
+ GRPC_MDELEM_UNREF(rc->path);
+ GRPC_MDELEM_UNREF(rc->authority);
gpr_free(rc);
}
- GRPC_MDELEM_UNREF(exec_ctx, channel->default_authority);
+ GRPC_MDELEM_UNREF(channel->default_authority);
gpr_mu_destroy(&channel->registered_call_mu);
gpr_free(channel->target);
gpr_free(channel);
@@ -415,16 +407,14 @@ static void destroy_channel(grpc_exec_ctx* exec_ctx, void* arg,
void grpc_channel_destroy(grpc_channel* channel) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
grpc_channel_element* elem;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (channel));
op->disconnect_with_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Destroyed");
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
- elem->filter->start_transport_op(&exec_ctx, elem, op);
-
- GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, channel, "channel");
+ elem->filter->start_transport_op(elem, op);
- grpc_exec_ctx_finish(&exec_ctx);
+ GRPC_CHANNEL_INTERNAL_UNREF(channel, "channel");
}
grpc_channel_stack* grpc_channel_get_channel_stack(grpc_channel* channel) {
@@ -436,8 +426,7 @@ grpc_compression_options grpc_channel_compression_options(
return channel->compression_options;
}
-grpc_mdelem grpc_channel_get_reffed_status_elem(grpc_exec_ctx* exec_ctx,
- grpc_channel* channel, int i) {
+grpc_mdelem grpc_channel_get_reffed_status_elem(grpc_channel* channel, int i) {
char tmp[GPR_LTOA_MIN_BUFSIZE];
switch (i) {
case 0:
@@ -448,6 +437,6 @@ grpc_mdelem grpc_channel_get_reffed_status_elem(grpc_exec_ctx* exec_ctx,
return GRPC_MDELEM_GRPC_STATUS_2;
}
gpr_ltoa(i, tmp);
- return grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_STATUS,
+ return grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_STATUS,
grpc_slice_from_copied_string(tmp));
}
diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h
index a2e53c777d..26d8fceb2f 100644
--- a/src/core/lib/surface/channel.h
+++ b/src/core/lib/surface/channel.h
@@ -23,13 +23,13 @@
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/surface/channel_stack_type.h"
-grpc_channel* grpc_channel_create(grpc_exec_ctx* exec_ctx, const char* target,
+grpc_channel* grpc_channel_create(const char* target,
const grpc_channel_args* args,
grpc_channel_stack_type channel_stack_type,
grpc_transport* optional_transport);
grpc_channel* grpc_channel_create_with_builder(
- grpc_exec_ctx* exec_ctx, grpc_channel_stack_builder* builder,
+ grpc_channel_stack_builder* builder,
grpc_channel_stack_type channel_stack_type);
/** Create a call given a grpc_channel, in order to call \a method.
@@ -41,9 +41,9 @@ grpc_channel* grpc_channel_create_with_builder(
properties from the server call to this new client call, depending on the
value of \a propagation_mask (see propagation_bits.h for possible values) */
grpc_call* grpc_channel_create_pollset_set_call(
- grpc_exec_ctx* exec_ctx, grpc_channel* channel, grpc_call* parent_call,
- uint32_t propagation_mask, grpc_pollset_set* pollset_set, grpc_slice method,
- const grpc_slice* host, grpc_millis deadline, void* reserved);
+ grpc_channel* channel, grpc_call* parent_call, uint32_t propagation_mask,
+ grpc_pollset_set* pollset_set, grpc_slice method, const grpc_slice* host,
+ grpc_millis deadline, void* reserved);
/** Get a (borrowed) pointer to this channels underlying channel stack */
grpc_channel_stack* grpc_channel_get_channel_stack(grpc_channel* channel);
@@ -52,8 +52,7 @@ grpc_channel_stack* grpc_channel_get_channel_stack(grpc_channel* channel);
status_code.
The returned elem is owned by the caller. */
-grpc_mdelem grpc_channel_get_reffed_status_elem(grpc_exec_ctx* exec_ctx,
- grpc_channel* channel,
+grpc_mdelem grpc_channel_get_reffed_status_elem(grpc_channel* channel,
int status_code);
size_t grpc_channel_get_call_size_estimate(grpc_channel* channel);
@@ -61,20 +60,18 @@ void grpc_channel_update_call_size_estimate(grpc_channel* channel, size_t size);
#ifndef NDEBUG
void grpc_channel_internal_ref(grpc_channel* channel, const char* reason);
-void grpc_channel_internal_unref(grpc_exec_ctx* exec_ctx, grpc_channel* channel,
- const char* reason);
+void grpc_channel_internal_unref(grpc_channel* channel, const char* reason);
#define GRPC_CHANNEL_INTERNAL_REF(channel, reason) \
grpc_channel_internal_ref(channel, reason)
-#define GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, reason) \
- grpc_channel_internal_unref(exec_ctx, channel, reason)
+#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason) \
+ grpc_channel_internal_unref(channel, reason)
#else
void grpc_channel_internal_ref(grpc_channel* channel);
-void grpc_channel_internal_unref(grpc_exec_ctx* exec_ctx,
- grpc_channel* channel);
+void grpc_channel_internal_unref(grpc_channel* channel);
#define GRPC_CHANNEL_INTERNAL_REF(channel, reason) \
grpc_channel_internal_ref(channel)
-#define GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, reason) \
- grpc_channel_internal_unref(exec_ctx, channel)
+#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason) \
+ grpc_channel_internal_unref(channel)
#endif
/** Return the channel's compression options. */
diff --git a/src/core/lib/surface/channel_init.cc b/src/core/lib/surface/channel_init.cc
index b563537f35..95cbbbd037 100644
--- a/src/core/lib/surface/channel_init.cc
+++ b/src/core/lib/surface/channel_init.cc
@@ -89,8 +89,7 @@ void grpc_channel_init_shutdown(void) {
}
}
-bool grpc_channel_init_create_stack(grpc_exec_ctx* exec_ctx,
- grpc_channel_stack_builder* builder,
+bool grpc_channel_init_create_stack(grpc_channel_stack_builder* builder,
grpc_channel_stack_type type) {
GPR_ASSERT(g_finalized);
@@ -99,7 +98,7 @@ bool grpc_channel_init_create_stack(grpc_exec_ctx* exec_ctx,
for (size_t i = 0; i < g_slots[type].num_slots; i++) {
const stage_slot* slot = &g_slots[type].slots[i];
- if (!slot->fn(exec_ctx, builder, slot->arg)) {
+ if (!slot->fn(builder, slot->arg)) {
return false;
}
}
diff --git a/src/core/lib/surface/channel_init.h b/src/core/lib/surface/channel_init.h
index 556ecc4147..d702f0f325 100644
--- a/src/core/lib/surface/channel_init.h
+++ b/src/core/lib/surface/channel_init.h
@@ -32,8 +32,7 @@
/// One stage of mutation: call functions against \a builder to influence the
/// finally constructed channel stack
-typedef bool (*grpc_channel_init_stage)(grpc_exec_ctx* exec_ctx,
- grpc_channel_stack_builder* builder,
+typedef bool (*grpc_channel_init_stage)(grpc_channel_stack_builder* builder,
void* arg);
/// Global initialization of the system
@@ -66,8 +65,7 @@ void grpc_channel_init_shutdown(void);
/// \a optional_transport is either NULL or a constructed transport object
/// Returns a pointer to the base of the memory allocated (the actual channel
/// stack object will be prefix_bytes past that pointer)
-bool grpc_channel_init_create_stack(grpc_exec_ctx* exec_ctx,
- grpc_channel_stack_builder* builder,
+bool grpc_channel_init_create_stack(grpc_channel_stack_builder* builder,
grpc_channel_stack_type type);
#endif /* GRPC_CORE_LIB_SURFACE_CHANNEL_INIT_H */
diff --git a/src/core/lib/surface/channel_ping.cc b/src/core/lib/surface/channel_ping.cc
index e8f47f01cf..616ba9e0ac 100644
--- a/src/core/lib/surface/channel_ping.cc
+++ b/src/core/lib/surface/channel_ping.cc
@@ -33,15 +33,14 @@ typedef struct {
grpc_cq_completion completion_storage;
} ping_result;
-static void ping_destroy(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_cq_completion* storage) {
+static void ping_destroy(void* arg, grpc_cq_completion* storage) {
gpr_free(arg);
}
-static void ping_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+static void ping_done(void* arg, grpc_error* error) {
ping_result* pr = (ping_result*)arg;
- grpc_cq_end_op(exec_ctx, pr->cq, pr->tag, GRPC_ERROR_REF(error), ping_destroy,
- pr, &pr->completion_storage);
+ grpc_cq_end_op(pr->cq, pr->tag, GRPC_ERROR_REF(error), ping_destroy, pr,
+ &pr->completion_storage);
}
void grpc_channel_ping(grpc_channel* channel, grpc_completion_queue* cq,
@@ -52,7 +51,7 @@ void grpc_channel_ping(grpc_channel* channel, grpc_completion_queue* cq,
ping_result* pr = (ping_result*)gpr_malloc(sizeof(*pr));
grpc_channel_element* top_elem =
grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
GPR_ASSERT(reserved == nullptr);
pr->tag = tag;
pr->cq = cq;
@@ -60,6 +59,5 @@ void grpc_channel_ping(grpc_channel* channel, grpc_completion_queue* cq,
op->send_ping = &pr->closure;
op->bind_pollset = grpc_cq_pollset(cq);
GPR_ASSERT(grpc_cq_begin_op(cq, tag));
- top_elem->filter->start_transport_op(&exec_ctx, top_elem, op);
- grpc_exec_ctx_finish(&exec_ctx);
+ top_elem->filter->start_transport_op(top_elem, op);
}
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index 98d7e35943..12385b7130 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -62,13 +62,12 @@ typedef struct {
bool can_listen;
size_t (*size)(void);
void (*init)(grpc_pollset* pollset, gpr_mu** mu);
- grpc_error* (*kick)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
+ grpc_error* (*kick)(grpc_pollset* pollset,
grpc_pollset_worker* specific_worker);
- grpc_error* (*work)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
- grpc_pollset_worker** worker, grpc_millis deadline);
- void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
- grpc_closure* closure);
- void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset);
+ grpc_error* (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker,
+ grpc_millis deadline);
+ void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure);
+ void (*destroy)(grpc_pollset* pollset);
} cq_poller_vtable;
typedef struct non_polling_worker {
@@ -94,14 +93,12 @@ static void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) {
*mu = &npp->mu;
}
-static void non_polling_poller_destroy(grpc_exec_ctx* exec_ctx,
- grpc_pollset* pollset) {
+static void non_polling_poller_destroy(grpc_pollset* pollset) {
non_polling_poller* npp = (non_polling_poller*)pollset;
gpr_mu_destroy(&npp->mu);
}
-static grpc_error* non_polling_poller_work(grpc_exec_ctx* exec_ctx,
- grpc_pollset* pollset,
+static grpc_error* non_polling_poller_work(grpc_pollset* pollset,
grpc_pollset_worker** worker,
grpc_millis deadline) {
non_polling_poller* npp = (non_polling_poller*)pollset;
@@ -122,12 +119,12 @@ static grpc_error* non_polling_poller_work(grpc_exec_ctx* exec_ctx,
while (!npp->shutdown && !w.kicked &&
!gpr_cv_wait(&w.cv, &npp->mu, deadline_ts))
;
- grpc_exec_ctx_invalidate_now(exec_ctx);
+ grpc_core::ExecCtx::Get()->InvalidateNow();
if (&w == npp->root) {
npp->root = w.next;
if (&w == npp->root) {
if (npp->shutdown) {
- GRPC_CLOSURE_SCHED(exec_ctx, npp->shutdown, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(npp->shutdown, GRPC_ERROR_NONE);
}
npp->root = nullptr;
}
@@ -140,8 +137,7 @@ static grpc_error* non_polling_poller_work(grpc_exec_ctx* exec_ctx,
}
static grpc_error* non_polling_poller_kick(
- grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
- grpc_pollset_worker* specific_worker) {
+ grpc_pollset* pollset, grpc_pollset_worker* specific_worker) {
non_polling_poller* p = (non_polling_poller*)pollset;
if (specific_worker == nullptr)
specific_worker = (grpc_pollset_worker*)p->root;
@@ -155,14 +151,13 @@ static grpc_error* non_polling_poller_kick(
return GRPC_ERROR_NONE;
}
-static void non_polling_poller_shutdown(grpc_exec_ctx* exec_ctx,
- grpc_pollset* pollset,
+static void non_polling_poller_shutdown(grpc_pollset* pollset,
grpc_closure* closure) {
non_polling_poller* p = (non_polling_poller*)pollset;
GPR_ASSERT(closure != nullptr);
p->shutdown = closure;
if (p->root == nullptr) {
- GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
} else {
non_polling_worker* w = p->root;
do {
@@ -189,13 +184,11 @@ typedef struct cq_vtable {
grpc_cq_completion_type cq_completion_type;
size_t data_size;
void (*init)(void* data);
- void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq);
+ void (*shutdown)(grpc_completion_queue* cq);
void (*destroy)(void* data);
bool (*begin_op)(grpc_completion_queue* cq, void* tag);
- void (*end_op)(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq, void* tag,
- grpc_error* error,
- void (*done)(grpc_exec_ctx* exec_ctx, void* done_arg,
- grpc_cq_completion* storage),
+ void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error,
+ void (*done)(void* done_arg, grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage);
grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
void* reserved);
@@ -280,31 +273,23 @@ struct grpc_completion_queue {
};
/* Forward declarations */
-static void cq_finish_shutdown_next(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq);
-static void cq_finish_shutdown_pluck(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq);
-static void cq_shutdown_next(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq);
-static void cq_shutdown_pluck(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq);
+static void cq_finish_shutdown_next(grpc_completion_queue* cq);
+static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
+static void cq_shutdown_next(grpc_completion_queue* cq);
+static void cq_shutdown_pluck(grpc_completion_queue* cq);
static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
-static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq, void* tag,
+static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
grpc_error* error,
- void (*done)(grpc_exec_ctx* exec_ctx,
- void* done_arg,
+ void (*done)(void* done_arg,
grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage);
-static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq, void* tag,
+static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
grpc_error* error,
- void (*done)(grpc_exec_ctx* exec_ctx,
- void* done_arg,
+ void (*done)(void* done_arg,
grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage);
@@ -346,8 +331,7 @@ grpc_core::TraceFlag grpc_cq_event_timeout_trace(true, "queue_timeout");
gpr_free(_ev); \
}
-static void on_pollset_shutdown_done(grpc_exec_ctx* exec_ctx, void* cq,
- grpc_error* error);
+static void on_pollset_shutdown_done(void* cq, grpc_error* error);
void grpc_cq_global_init() {
gpr_tls_init(&g_cached_event);
@@ -369,19 +353,18 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
if (storage != nullptr &&
(grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq) {
*tag = storage->tag;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
*ok = (storage->next & (uintptr_t)(1)) == 1;
- storage->done(&exec_ctx, storage->done_arg, storage);
+ storage->done(storage->done_arg, storage);
ret = 1;
cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_mu_lock(cq->mu);
- cq_finish_shutdown_next(&exec_ctx, cq);
+ cq_finish_shutdown_next(cq);
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "shutting_down");
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
}
- grpc_exec_ctx_finish(&exec_ctx);
}
gpr_tls_set(&g_cached_event, (intptr_t)0);
gpr_tls_set(&g_cached_cq, (intptr_t)0);
@@ -406,24 +389,22 @@ static bool cq_event_queue_push(grpc_cq_event_queue* q, grpc_cq_completion* c) {
static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) {
grpc_cq_completion* c = nullptr;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
if (gpr_spinlock_trylock(&q->queue_lock)) {
- GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(&exec_ctx);
+ GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES();
bool is_empty = false;
c = (grpc_cq_completion*)gpr_mpscq_pop_and_check_end(&q->queue, &is_empty);
gpr_spinlock_unlock(&q->queue_lock);
if (c == nullptr && !is_empty) {
- GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(&exec_ctx);
+ GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES();
}
} else {
- GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(&exec_ctx);
+ GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES();
}
- grpc_exec_ctx_finish(&exec_ctx);
-
if (c) {
gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1);
}
@@ -453,9 +434,8 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
const cq_poller_vtable* poller_vtable =
&g_poller_vtable_by_poller_type[polling_type];
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- GRPC_STATS_INC_CQS_CREATED(&exec_ctx);
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_core::ExecCtx exec_ctx;
+ GRPC_STATS_INC_CQS_CREATED();
cq = (grpc_completion_queue*)gpr_zalloc(sizeof(grpc_completion_queue) +
vtable->data_size +
@@ -537,15 +517,14 @@ void grpc_cq_internal_ref(grpc_completion_queue* cq) {
gpr_ref(&cq->owning_refs);
}
-static void on_pollset_shutdown_done(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+static void on_pollset_shutdown_done(void* arg, grpc_error* error) {
grpc_completion_queue* cq = (grpc_completion_queue*)arg;
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "pollset_destroy");
+ GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy");
}
#ifndef NDEBUG
-void grpc_cq_internal_unref(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq,
- const char* reason, const char* file, int line) {
+void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason,
+ const char* file, int line) {
if (grpc_trace_cq_refcount.enabled()) {
gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
@@ -553,12 +532,11 @@ void grpc_cq_internal_unref(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq,
reason);
}
#else
-void grpc_cq_internal_unref(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq) {
+void grpc_cq_internal_unref(grpc_completion_queue* cq) {
#endif
if (gpr_unref(&cq->owning_refs)) {
cq->vtable->destroy(DATA_FROM_CQ(cq));
- cq->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cq));
+ cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq));
#ifndef NDEBUG
gpr_free(cq->outstanding_tags);
#endif
@@ -639,11 +617,9 @@ bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion
* type of GRPC_CQ_NEXT) */
-static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq, void* tag,
+static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
grpc_error* error,
- void (*done)(grpc_exec_ctx* exec_ctx,
- void* done_arg,
+ void (*done)(void* done_arg,
grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage) {
GPR_TIMER_BEGIN("cq_end_op_for_next", 0);
@@ -652,9 +628,9 @@ static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx,
(grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) {
const char* errmsg = grpc_error_string(error);
GRPC_API_TRACE(
- "cq_end_op_for_next(exec_ctx=%p, cq=%p, tag=%p, error=%s, "
+ "cq_end_op_for_next(cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
- 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage));
+ 6, (cq, tag, errmsg, done, done_arg, storage));
if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
}
@@ -689,7 +665,7 @@ static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx,
if (is_first) {
gpr_mu_lock(cq->mu);
grpc_error* kick_error =
- cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), nullptr);
+ cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
gpr_mu_unlock(cq->mu);
if (kick_error != GRPC_ERROR_NONE) {
@@ -701,17 +677,17 @@ static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx,
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_mu_lock(cq->mu);
- cq_finish_shutdown_next(exec_ctx, cq);
+ cq_finish_shutdown_next(cq);
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
}
} else {
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_atm_rel_store(&cqd->pending_events, 0);
gpr_mu_lock(cq->mu);
- cq_finish_shutdown_next(exec_ctx, cq);
+ cq_finish_shutdown_next(cq);
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
}
}
@@ -723,11 +699,9 @@ static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx,
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion
* type of GRPC_CQ_PLUCK) */
-static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq, void* tag,
+static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
grpc_error* error,
- void (*done)(grpc_exec_ctx* exec_ctx,
- void* done_arg,
+ void (*done)(void* done_arg,
grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage) {
cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
@@ -739,9 +713,9 @@ static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx,
(grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) {
const char* errmsg = grpc_error_string(error);
GRPC_API_TRACE(
- "cq_end_op_for_pluck(exec_ctx=%p, cq=%p, tag=%p, error=%s, "
+ "cq_end_op_for_pluck(cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
- 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage));
+ 6, (cq, tag, errmsg, done, done_arg, storage));
if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
}
@@ -762,7 +736,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx,
cqd->completed_tail = storage;
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
- cq_finish_shutdown_pluck(exec_ctx, cq);
+ cq_finish_shutdown_pluck(cq);
gpr_mu_unlock(cq->mu);
} else {
grpc_pollset_worker* pluck_worker = nullptr;
@@ -774,7 +748,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx,
}
grpc_error* kick_error =
- cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), pluck_worker);
+ cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
gpr_mu_unlock(cq->mu);
@@ -791,12 +765,10 @@ static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx,
GRPC_ERROR_UNREF(error);
}
-void grpc_cq_end_op(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq,
- void* tag, grpc_error* error,
- void (*done)(grpc_exec_ctx* exec_ctx, void* done_arg,
- grpc_cq_completion* storage),
+void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
+ void (*done)(void* done_arg, grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage) {
- cq->vtable->end_op(exec_ctx, cq, tag, error, done, done_arg, storage);
+ cq->vtable->end_op(cq, tag, error, done, done_arg, storage);
}
typedef struct {
@@ -808,31 +780,40 @@ typedef struct {
bool first_loop;
} cq_is_finished_arg;
-static bool cq_is_next_finished(grpc_exec_ctx* exec_ctx, void* arg) {
- cq_is_finished_arg* a = (cq_is_finished_arg*)arg;
- grpc_completion_queue* cq = a->cq;
- cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
- GPR_ASSERT(a->stolen_completion == nullptr);
+class ExecCtxNext : public grpc_core::ExecCtx {
+ public:
+ ExecCtxNext(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
- gpr_atm current_last_seen_things_queued_ever =
- gpr_atm_no_barrier_load(&cqd->things_queued_ever);
+ bool CheckReadyToFinish() override {
+ cq_is_finished_arg* a = (cq_is_finished_arg*)check_ready_to_finish_arg_;
+ grpc_completion_queue* cq = a->cq;
+ cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
+ GPR_ASSERT(a->stolen_completion == nullptr);
- if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
- a->last_seen_things_queued_ever =
+ gpr_atm current_last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever);
- /* Pop a cq_completion from the queue. Returns NULL if the queue is empty
- * might return NULL in some cases even if the queue is not empty; but
- * that
- * is ok and doesn't affect correctness. Might effect the tail latencies a
- * bit) */
- a->stolen_completion = cq_event_queue_pop(&cqd->queue);
- if (a->stolen_completion != nullptr) {
- return true;
+ if (current_last_seen_things_queued_ever !=
+ a->last_seen_things_queued_ever) {
+ a->last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cqd->things_queued_ever);
+
+ /* Pop a cq_completion from the queue. Returns NULL if the queue is empty
+ * might return NULL in some cases even if the queue is not empty; but
+ * that
+ * is ok and doesn't affect correctness. Might effect the tail latencies a
+ * bit) */
+ a->stolen_completion = cq_event_queue_pop(&cqd->queue);
+ if (a->stolen_completion != nullptr) {
+ return true;
+ }
}
+ return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
}
- return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx);
-}
+
+ private:
+ void* check_ready_to_finish_arg_;
+};
#ifndef NDEBUG
static void dump_pending_tags(grpc_completion_queue* cq) {
@@ -887,8 +868,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
nullptr,
nullptr,
true};
- grpc_exec_ctx exec_ctx =
- GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg);
+ ExecCtxNext exec_ctx(&is_finished_arg);
for (;;) {
grpc_millis iteration_deadline = deadline_millis;
@@ -898,7 +878,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(&exec_ctx, c->done_arg, c);
+ c->done(c->done_arg, c);
break;
}
@@ -908,7 +888,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(&exec_ctx, c->done_arg, c);
+ c->done(c->done_arg, c);
break;
} else {
/* If c == NULL it means either the queue is empty OR in an transient
@@ -939,7 +919,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
}
if (!is_finished_arg.first_loop &&
- grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) {
+ grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
dump_pending_tags(cq);
@@ -949,8 +929,8 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
/* The main polling work happens in grpc_pollset_work */
gpr_mu_lock(cq->mu);
cq->num_polls++;
- grpc_error* err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
- nullptr, iteration_deadline);
+ grpc_error* err = cq->poller_vtable->work(POLLSET_FROM_CQ(cq), nullptr,
+ iteration_deadline);
gpr_mu_unlock(cq->mu);
if (err != GRPC_ERROR_NONE) {
@@ -969,13 +949,13 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
if (cq_event_queue_num_items(&cqd->queue) > 0 &&
gpr_atm_acq_load(&cqd->pending_events) > 0) {
gpr_mu_lock(cq->mu);
- cq->poller_vtable->kick(&exec_ctx, POLLSET_FROM_CQ(cq), nullptr);
+ cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
gpr_mu_unlock(cq->mu);
}
GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next");
- grpc_exec_ctx_finish(&exec_ctx);
+ GRPC_CQ_INTERNAL_UNREF(cq, "next");
+
GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
GPR_TIMER_END("grpc_completion_queue_next", 0);
@@ -989,19 +969,16 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
- Must be called only once in completion queue's lifetime
- grpc_completion_queue_shutdown() MUST have been called before calling
this function */
-static void cq_finish_shutdown_next(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq) {
+static void cq_finish_shutdown_next(grpc_completion_queue* cq) {
cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0);
- cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq),
- &cq->pollset_shutdown_done);
+ cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
}
-static void cq_shutdown_next(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq) {
+static void cq_shutdown_next(grpc_completion_queue* cq) {
cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because:
@@ -1014,7 +991,7 @@ static void cq_shutdown_next(grpc_exec_ctx* exec_ctx,
gpr_mu_lock(cq->mu);
if (cqd->shutdown_called) {
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
return;
}
cqd->shutdown_called = true;
@@ -1022,10 +999,10 @@ static void cq_shutdown_next(grpc_exec_ctx* exec_ctx,
* cq_begin_op_for_next and and cq_end_op_for_next functions which read/write
* on this counter without necessarily holding a lock on cq */
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
- cq_finish_shutdown_next(exec_ctx, cq);
+ cq_finish_shutdown_next(cq);
}
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
}
grpc_event grpc_completion_queue_next(grpc_completion_queue* cq,
@@ -1058,37 +1035,46 @@ static void del_plucker(grpc_completion_queue* cq, void* tag,
GPR_UNREACHABLE_CODE(return );
}
-static bool cq_is_pluck_finished(grpc_exec_ctx* exec_ctx, void* arg) {
- cq_is_finished_arg* a = (cq_is_finished_arg*)arg;
- grpc_completion_queue* cq = a->cq;
- cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
+class ExecCtxPluck : public grpc_core::ExecCtx {
+ public:
+ ExecCtxPluck(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
- GPR_ASSERT(a->stolen_completion == nullptr);
- gpr_atm current_last_seen_things_queued_ever =
- gpr_atm_no_barrier_load(&cqd->things_queued_ever);
- if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
- gpr_mu_lock(cq->mu);
- a->last_seen_things_queued_ever =
+ bool CheckReadyToFinish() override {
+ cq_is_finished_arg* a = (cq_is_finished_arg*)check_ready_to_finish_arg_;
+ grpc_completion_queue* cq = a->cq;
+ cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
+
+ GPR_ASSERT(a->stolen_completion == nullptr);
+ gpr_atm current_last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever);
- grpc_cq_completion* c;
- grpc_cq_completion* prev = &cqd->completed_head;
- while ((c = (grpc_cq_completion*)(prev->next & ~(uintptr_t)1)) !=
- &cqd->completed_head) {
- if (c->tag == a->tag) {
- prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
- if (c == cqd->completed_tail) {
- cqd->completed_tail = prev;
+ if (current_last_seen_things_queued_ever !=
+ a->last_seen_things_queued_ever) {
+ gpr_mu_lock(cq->mu);
+ a->last_seen_things_queued_ever =
+ gpr_atm_no_barrier_load(&cqd->things_queued_ever);
+ grpc_cq_completion* c;
+ grpc_cq_completion* prev = &cqd->completed_head;
+ while ((c = (grpc_cq_completion*)(prev->next & ~(uintptr_t)1)) !=
+ &cqd->completed_head) {
+ if (c->tag == a->tag) {
+ prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
+ if (c == cqd->completed_tail) {
+ cqd->completed_tail = prev;
+ }
+ gpr_mu_unlock(cq->mu);
+ a->stolen_completion = c;
+ return true;
}
- gpr_mu_unlock(cq->mu);
- a->stolen_completion = c;
- return true;
+ prev = c;
}
- prev = c;
+ gpr_mu_unlock(cq->mu);
}
- gpr_mu_unlock(cq->mu);
+ return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
}
- return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx);
-}
+
+ private:
+ void* check_ready_to_finish_arg_;
+};
static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline, void* reserved) {
@@ -1125,8 +1111,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
nullptr,
tag,
true};
- grpc_exec_ctx exec_ctx =
- GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg);
+ ExecCtxPluck exec_ctx(&is_finished_arg);
for (;;) {
if (is_finished_arg.stolen_completion != nullptr) {
gpr_mu_unlock(cq->mu);
@@ -1135,7 +1120,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(&exec_ctx, c->done_arg, c);
+ c->done(c->done_arg, c);
break;
}
prev = &cqd->completed_head;
@@ -1150,7 +1135,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
- c->done(&exec_ctx, c->done_arg, c);
+ c->done(c->done_arg, c);
goto done;
}
prev = c;
@@ -1174,7 +1159,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
break;
}
if (!is_finished_arg.first_loop &&
- grpc_exec_ctx_now(&exec_ctx) >= deadline_millis) {
+ grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
del_plucker(cq, tag, &worker);
gpr_mu_unlock(cq->mu);
memset(&ret, 0, sizeof(ret));
@@ -1183,8 +1168,8 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
break;
}
cq->num_polls++;
- grpc_error* err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
- &worker, deadline_millis);
+ grpc_error* err =
+ cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis);
if (err != GRPC_ERROR_NONE) {
del_plucker(cq, tag, &worker);
gpr_mu_unlock(cq->mu);
@@ -1202,8 +1187,8 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
}
done:
GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "pluck");
- grpc_exec_ctx_finish(&exec_ctx);
+ GRPC_CQ_INTERNAL_UNREF(cq, "pluck");
+
GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
GPR_TIMER_END("grpc_completion_queue_pluck", 0);
@@ -1216,22 +1201,19 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
return cq->vtable->pluck(cq, tag, deadline, reserved);
}
-static void cq_finish_shutdown_pluck(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq) {
+static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) {
cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
gpr_atm_no_barrier_store(&cqd->shutdown, 1);
- cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq),
- &cq->pollset_shutdown_done);
+ cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
}
/* NOTE: This function is almost exactly identical to cq_shutdown_next() but
* merging them is a bit tricky and probably not worth it */
-static void cq_shutdown_pluck(grpc_exec_ctx* exec_ctx,
- grpc_completion_queue* cq) {
+static void cq_shutdown_pluck(grpc_completion_queue* cq) {
cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because:
@@ -1244,25 +1226,25 @@ static void cq_shutdown_pluck(grpc_exec_ctx* exec_ctx,
gpr_mu_lock(cq->mu);
if (cqd->shutdown_called) {
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)");
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
return;
}
cqd->shutdown_called = true;
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
- cq_finish_shutdown_pluck(exec_ctx, cq);
+ cq_finish_shutdown_pluck(cq);
}
gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)");
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
}
/* Shutdown simply drops a ref that we reserved at creation time; if we drop
to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
- cq->vtable->shutdown(&exec_ctx, cq);
- grpc_exec_ctx_finish(&exec_ctx);
+ cq->vtable->shutdown(cq);
+
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
}
@@ -1271,9 +1253,9 @@ void grpc_completion_queue_destroy(grpc_completion_queue* cq) {
GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0);
grpc_completion_queue_shutdown(cq);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "destroy");
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_core::ExecCtx exec_ctx;
+ GRPC_CQ_INTERNAL_UNREF(cq, "destroy");
+
GPR_TIMER_END("grpc_completion_queue_destroy", 0);
}
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index 13d3e5807d..aea47afaf5 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -40,8 +40,7 @@ typedef struct grpc_cq_completion {
void* tag;
/** done callback - called when this queue element is no longer
needed by the completion queue */
- void (*done)(grpc_exec_ctx* exec_ctx, void* done_arg,
- struct grpc_cq_completion* c);
+ void (*done)(void* done_arg, struct grpc_cq_completion* c);
void* done_arg;
/** next pointer; low bit is used to indicate success or not */
uintptr_t next;
@@ -50,17 +49,17 @@ typedef struct grpc_cq_completion {
#ifndef NDEBUG
void grpc_cq_internal_ref(grpc_completion_queue* cc, const char* reason,
const char* file, int line);
-void grpc_cq_internal_unref(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cc,
- const char* reason, const char* file, int line);
+void grpc_cq_internal_unref(grpc_completion_queue* cc, const char* reason,
+ const char* file, int line);
#define GRPC_CQ_INTERNAL_REF(cc, reason) \
grpc_cq_internal_ref(cc, reason, __FILE__, __LINE__)
-#define GRPC_CQ_INTERNAL_UNREF(ec, cc, reason) \
- grpc_cq_internal_unref(ec, cc, reason, __FILE__, __LINE__)
+#define GRPC_CQ_INTERNAL_UNREF(cc, reason) \
+ grpc_cq_internal_unref(cc, reason, __FILE__, __LINE__)
#else
void grpc_cq_internal_ref(grpc_completion_queue* cc);
-void grpc_cq_internal_unref(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cc);
+void grpc_cq_internal_unref(grpc_completion_queue* cc);
#define GRPC_CQ_INTERNAL_REF(cc, reason) grpc_cq_internal_ref(cc)
-#define GRPC_CQ_INTERNAL_UNREF(ec, cc, reason) grpc_cq_internal_unref(ec, cc)
+#define GRPC_CQ_INTERNAL_UNREF(cc, reason) grpc_cq_internal_unref(cc)
#endif
/* Initializes global variables used by completion queues */
@@ -74,10 +73,8 @@ bool grpc_cq_begin_op(grpc_completion_queue* cc, void* tag);
/* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to
grpc_cq_begin_op */
-void grpc_cq_end_op(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cc,
- void* tag, grpc_error* error,
- void (*done)(grpc_exec_ctx* exec_ctx, void* done_arg,
- grpc_cq_completion* storage),
+void grpc_cq_end_op(grpc_completion_queue* cc, void* tag, grpc_error* error,
+ void (*done)(void* done_arg, grpc_cq_completion* storage),
void* done_arg, grpc_cq_completion* storage);
grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cc);
diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc
index 8ee1383fb8..c6ce235da7 100644
--- a/src/core/lib/surface/init.cc
+++ b/src/core/lib/surface/init.cc
@@ -73,14 +73,12 @@ static void do_basic_init(void) {
grpc_fork_handlers_auto_register();
}
-static bool append_filter(grpc_exec_ctx* exec_ctx,
- grpc_channel_stack_builder* builder, void* arg) {
+static bool append_filter(grpc_channel_stack_builder* builder, void* arg) {
return grpc_channel_stack_builder_append_filter(
builder, (const grpc_channel_filter*)arg, nullptr, nullptr);
}
-static bool prepend_filter(grpc_exec_ctx* exec_ctx,
- grpc_channel_stack_builder* builder, void* arg) {
+static bool prepend_filter(grpc_channel_stack_builder* builder, void* arg) {
return grpc_channel_stack_builder_prepend_filter(
builder, (const grpc_channel_filter*)arg, nullptr, nullptr);
}
@@ -123,7 +121,6 @@ void grpc_init(void) {
int i;
gpr_once_init(&g_basic_init, do_basic_init);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_mu_lock(&g_init_mu);
if (++g_initializations == 1) {
gpr_time_init();
@@ -133,7 +130,8 @@ void grpc_init(void) {
grpc_mdctx_global_init();
grpc_channel_init_init();
grpc_security_pre_init();
- grpc_iomgr_init(&exec_ctx);
+ grpc_core::ExecCtx::GlobalInit();
+ grpc_iomgr_init();
gpr_timers_global_init();
grpc_handshaker_factory_registry_init();
grpc_security_init();
@@ -149,37 +147,44 @@ void grpc_init(void) {
grpc_tracer_init("GRPC_TRACE");
/* no more changes to channel init pipelines */
grpc_channel_init_finalize();
- grpc_iomgr_start(&exec_ctx);
+ grpc_iomgr_start();
}
gpr_mu_unlock(&g_init_mu);
- grpc_exec_ctx_finish(&exec_ctx);
+
GRPC_API_TRACE("grpc_init(void)", 0, ());
}
void grpc_shutdown(void) {
int i;
GRPC_API_TRACE("grpc_shutdown(void)", 0, ());
- grpc_exec_ctx exec_ctx =
- GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, nullptr);
+ if (grpc_core::ExecCtx::Get()) {
+ grpc_core::ExecCtx::Get()->Flush();
+ }
gpr_mu_lock(&g_init_mu);
if (--g_initializations == 0) {
- grpc_executor_shutdown(&exec_ctx);
- grpc_timer_manager_set_threading(false); // shutdown timer_manager thread
- for (i = g_number_of_plugins; i >= 0; i--) {
- if (g_all_of_the_plugins[i].destroy != nullptr) {
- g_all_of_the_plugins[i].destroy();
+ {
+ grpc_core::ExecCtx exec_ctx(0);
+ {
+ grpc_executor_shutdown();
+ grpc_timer_manager_set_threading(
+ false); // shutdown timer_manager thread
+ for (i = g_number_of_plugins; i >= 0; i--) {
+ if (g_all_of_the_plugins[i].destroy != nullptr) {
+ g_all_of_the_plugins[i].destroy();
+ }
+ }
}
+ grpc_iomgr_shutdown();
+ gpr_timers_global_destroy();
+ grpc_tracer_shutdown();
+ grpc_mdctx_global_shutdown();
+ grpc_handshaker_factory_registry_shutdown();
+ grpc_slice_intern_shutdown();
+ grpc_stats_shutdown();
}
- grpc_iomgr_shutdown(&exec_ctx);
- gpr_timers_global_destroy();
- grpc_tracer_shutdown();
- grpc_mdctx_global_shutdown(&exec_ctx);
- grpc_handshaker_factory_registry_shutdown(&exec_ctx);
- grpc_slice_intern_shutdown();
- grpc_stats_shutdown();
+ grpc_core::ExecCtx::GlobalShutdown();
}
gpr_mu_unlock(&g_init_mu);
- grpc_exec_ctx_finish(&exec_ctx);
}
int grpc_is_initialized(void) {
diff --git a/src/core/lib/surface/init_secure.cc b/src/core/lib/surface/init_secure.cc
index 3eee570fc2..75ed9faef0 100644
--- a/src/core/lib/surface/init_secure.cc
+++ b/src/core/lib/surface/init_secure.cc
@@ -37,7 +37,7 @@
void grpc_security_pre_init(void) {}
static bool maybe_prepend_client_auth_filter(
- grpc_exec_ctx* exec_ctx, grpc_channel_stack_builder* builder, void* arg) {
+ grpc_channel_stack_builder* builder, void* arg) {
const grpc_channel_args* args =
grpc_channel_stack_builder_get_channel_arguments(builder);
if (args) {
@@ -52,7 +52,7 @@ static bool maybe_prepend_client_auth_filter(
}
static bool maybe_prepend_server_auth_filter(
- grpc_exec_ctx* exec_ctx, grpc_channel_stack_builder* builder, void* arg) {
+ grpc_channel_stack_builder* builder, void* arg) {
const grpc_channel_args* args =
grpc_channel_stack_builder_get_channel_arguments(builder);
if (args) {
diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc
index c32c9af50e..29b4e3f0c7 100644
--- a/src/core/lib/surface/lame_client.cc
+++ b/src/core/lib/surface/lame_client.cc
@@ -49,8 +49,7 @@ struct ChannelData {
const char* error_message;
};
-static void fill_metadata(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- grpc_metadata_batch* mdb) {
+static void fill_metadata(grpc_call_element* elem, grpc_metadata_batch* mdb) {
CallData* calld = reinterpret_cast<CallData*>(elem->call_data);
bool expected = false;
if (!calld->filled_metadata.compare_exchange_strong(
@@ -62,9 +61,9 @@ static void fill_metadata(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
char tmp[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(chand->error_code, tmp);
calld->status.md = grpc_mdelem_from_slices(
- exec_ctx, GRPC_MDSTR_GRPC_STATUS, grpc_slice_from_copied_string(tmp));
+ GRPC_MDSTR_GRPC_STATUS, grpc_slice_from_copied_string(tmp));
calld->details.md = grpc_mdelem_from_slices(
- exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
+ GRPC_MDSTR_GRPC_MESSAGE,
grpc_slice_from_copied_string(chand->error_message));
calld->status.prev = calld->details.next = nullptr;
calld->status.next = &calld->details;
@@ -76,69 +75,61 @@ static void fill_metadata(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
}
static void lame_start_transport_stream_op_batch(
- grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- grpc_transport_stream_op_batch* op) {
+ grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
CallData* calld = reinterpret_cast<CallData*>(elem->call_data);
if (op->recv_initial_metadata) {
- fill_metadata(exec_ctx, elem,
+ fill_metadata(elem,
op->payload->recv_initial_metadata.recv_initial_metadata);
} else if (op->recv_trailing_metadata) {
- fill_metadata(exec_ctx, elem,
+ fill_metadata(elem,
op->payload->recv_trailing_metadata.recv_trailing_metadata);
}
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op, GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"),
+ op, GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"),
calld->call_combiner);
}
-static void lame_get_channel_info(grpc_exec_ctx* exec_ctx,
- grpc_channel_element* elem,
+static void lame_get_channel_info(grpc_channel_element* elem,
const grpc_channel_info* channel_info) {}
-static void lame_start_transport_op(grpc_exec_ctx* exec_ctx,
- grpc_channel_element* elem,
+static void lame_start_transport_op(grpc_channel_element* elem,
grpc_transport_op* op) {
if (op->on_connectivity_state_change) {
GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_SHUTDOWN);
*op->connectivity_state = GRPC_CHANNEL_SHUTDOWN;
- GRPC_CLOSURE_SCHED(exec_ctx, op->on_connectivity_state_change,
- GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(op->on_connectivity_state_change, GRPC_ERROR_NONE);
}
if (op->send_ping != nullptr) {
- GRPC_CLOSURE_SCHED(
- exec_ctx, op->send_ping,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"));
+ GRPC_CLOSURE_SCHED(op->send_ping, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "lame client channel"));
}
GRPC_ERROR_UNREF(op->disconnect_with_error);
if (op->on_consumed != nullptr) {
- GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
}
}
-static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem,
+static grpc_error* init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
CallData* calld = reinterpret_cast<CallData*>(elem->call_data);
calld->call_combiner = args->call_combiner;
return GRPC_ERROR_NONE;
}
-static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* then_schedule_closure) {
- GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
}
-static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
- grpc_channel_element* elem,
+static grpc_error* init_channel_elem(grpc_channel_element* elem,
grpc_channel_element_args* args) {
GPR_ASSERT(args->is_first);
GPR_ASSERT(args->is_last);
return GRPC_ERROR_NONE;
}
-static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
- grpc_channel_element* elem) {}
+static void destroy_channel_elem(grpc_channel_element* elem) {}
} // namespace
@@ -163,10 +154,10 @@ const grpc_channel_filter grpc_lame_filter = {
grpc_channel* grpc_lame_client_channel_create(const char* target,
grpc_status_code error_code,
const char* error_message) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
grpc_channel_element* elem;
- grpc_channel* channel = grpc_channel_create(
- &exec_ctx, target, nullptr, GRPC_CLIENT_LAME_CHANNEL, nullptr);
+ grpc_channel* channel =
+ grpc_channel_create(target, nullptr, GRPC_CLIENT_LAME_CHANNEL, nullptr);
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
GRPC_API_TRACE(
"grpc_lame_client_channel_create(target=%s, error_code=%d, "
@@ -176,6 +167,6 @@ grpc_channel* grpc_lame_client_channel_create(const char* target,
auto chand = reinterpret_cast<grpc_core::ChannelData*>(elem->channel_data);
chand->error_code = error_code;
chand->error_message = error_message;
- grpc_exec_ctx_finish(&exec_ctx);
+
return channel;
}
diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc
index 0f8a057f31..4f07183180 100644
--- a/src/core/lib/surface/server.cc
+++ b/src/core/lib/surface/server.cc
@@ -46,10 +46,9 @@
typedef struct listener {
void* arg;
- void (*start)(grpc_exec_ctx* exec_ctx, grpc_server* server, void* arg,
- grpc_pollset** pollsets, size_t pollset_count);
- void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_server* server, void* arg,
- grpc_closure* closure);
+ void (*start)(grpc_server* server, void* arg, grpc_pollset** pollsets,
+ size_t pollset_count);
+ void (*destroy)(grpc_server* server, void* arg, grpc_closure* closure);
struct listener* next;
grpc_closure destroy_done;
} listener;
@@ -224,13 +223,12 @@ struct grpc_server {
#define SERVER_FROM_CALL_ELEM(elem) \
(((channel_data*)(elem)->channel_data)->server)
-static void publish_new_rpc(grpc_exec_ctx* exec_ctx, void* calld,
- grpc_error* error);
-static void fail_call(grpc_exec_ctx* exec_ctx, grpc_server* server,
- size_t cq_idx, requested_call* rc, grpc_error* error);
+static void publish_new_rpc(void* calld, grpc_error* error);
+static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
+ grpc_error* error);
/* Before calling maybe_finish_shutdown, we must hold mu_global and not
hold mu_call */
-static void maybe_finish_shutdown(grpc_exec_ctx* exec_ctx, grpc_server* server);
+static void maybe_finish_shutdown(grpc_server* server);
/*
* channel broadcaster
@@ -258,15 +256,14 @@ struct shutdown_cleanup_args {
grpc_slice slice;
};
-static void shutdown_cleanup(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+static void shutdown_cleanup(void* arg, grpc_error* error) {
struct shutdown_cleanup_args* a = (struct shutdown_cleanup_args*)arg;
- grpc_slice_unref_internal(exec_ctx, a->slice);
+ grpc_slice_unref_internal(a->slice);
gpr_free(a);
}
-static void send_shutdown(grpc_exec_ctx* exec_ctx, grpc_channel* channel,
- bool send_goaway, grpc_error* send_disconnect) {
+static void send_shutdown(grpc_channel* channel, bool send_goaway,
+ grpc_error* send_disconnect) {
struct shutdown_cleanup_args* sc =
(struct shutdown_cleanup_args*)gpr_malloc(sizeof(*sc));
GRPC_CLOSURE_INIT(&sc->closure, shutdown_cleanup, sc,
@@ -284,19 +281,18 @@ static void send_shutdown(grpc_exec_ctx* exec_ctx, grpc_channel* channel,
op->disconnect_with_error = send_disconnect;
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
- elem->filter->start_transport_op(exec_ctx, elem, op);
+ elem->filter->start_transport_op(elem, op);
}
-static void channel_broadcaster_shutdown(grpc_exec_ctx* exec_ctx,
- channel_broadcaster* cb,
+static void channel_broadcaster_shutdown(channel_broadcaster* cb,
bool send_goaway,
grpc_error* force_disconnect) {
size_t i;
for (i = 0; i < cb->num_channels; i++) {
- send_shutdown(exec_ctx, cb->channels[i], send_goaway,
+ send_shutdown(cb->channels[i], send_goaway,
GRPC_ERROR_REF(force_disconnect));
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, cb->channels[i], "broadcast");
+ GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
}
gpr_free(cb->channels);
GRPC_ERROR_UNREF(force_disconnect);
@@ -324,13 +320,11 @@ static void request_matcher_destroy(request_matcher* rm) {
gpr_free(rm->requests_per_cq);
}
-static void kill_zombie(grpc_exec_ctx* exec_ctx, void* elem,
- grpc_error* error) {
+static void kill_zombie(void* elem, grpc_error* error) {
grpc_call_unref(grpc_call_from_top_element((grpc_call_element*)elem));
}
-static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx* exec_ctx,
- request_matcher* rm) {
+static void request_matcher_zombify_all_pending_calls(request_matcher* rm) {
while (rm->pending_head) {
call_data* calld = rm->pending_head;
rm->pending_head = calld->pending_next;
@@ -339,19 +333,18 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx* exec_ctx,
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
}
}
-static void request_matcher_kill_requests(grpc_exec_ctx* exec_ctx,
- grpc_server* server,
+static void request_matcher_kill_requests(grpc_server* server,
request_matcher* rm,
grpc_error* error) {
requested_call* rc;
for (size_t i = 0; i < server->cq_count; i++) {
while ((rc = (requested_call*)gpr_locked_mpscq_pop(
&rm->requests_per_cq[i])) != nullptr) {
- fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error));
+ fail_call(server, i, rc, GRPC_ERROR_REF(error));
}
}
GRPC_ERROR_UNREF(error);
@@ -365,10 +358,10 @@ static void server_ref(grpc_server* server) {
gpr_ref(&server->internal_refcount);
}
-static void server_delete(grpc_exec_ctx* exec_ctx, grpc_server* server) {
+static void server_delete(grpc_server* server) {
registered_method* rm;
size_t i;
- grpc_channel_args_destroy(exec_ctx, server->channel_args);
+ grpc_channel_args_destroy(server->channel_args);
gpr_mu_destroy(&server->mu_global);
gpr_mu_destroy(&server->mu_call);
gpr_cv_destroy(&server->starting_cv);
@@ -385,7 +378,7 @@ static void server_delete(grpc_exec_ctx* exec_ctx, grpc_server* server) {
request_matcher_destroy(&server->unregistered_request_matcher);
}
for (i = 0; i < server->cq_count; i++) {
- GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server");
+ GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
}
gpr_free(server->cqs);
gpr_free(server->pollsets);
@@ -393,9 +386,9 @@ static void server_delete(grpc_exec_ctx* exec_ctx, grpc_server* server) {
gpr_free(server);
}
-static void server_unref(grpc_exec_ctx* exec_ctx, grpc_server* server) {
+static void server_unref(grpc_server* server) {
if (gpr_unref(&server->internal_refcount)) {
- server_delete(exec_ctx, server);
+ server_delete(server);
}
}
@@ -409,21 +402,19 @@ static void orphan_channel(channel_data* chand) {
chand->next = chand->prev = chand;
}
-static void finish_destroy_channel(grpc_exec_ctx* exec_ctx, void* cd,
- grpc_error* error) {
+static void finish_destroy_channel(void* cd, grpc_error* error) {
channel_data* chand = (channel_data*)cd;
grpc_server* server = chand->server;
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "server");
- server_unref(exec_ctx, server);
+ GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");
+ server_unref(server);
}
-static void destroy_channel(grpc_exec_ctx* exec_ctx, channel_data* chand,
- grpc_error* error) {
+static void destroy_channel(channel_data* chand, grpc_error* error) {
if (is_channel_orphaned(chand)) return;
GPR_ASSERT(chand->server != nullptr);
orphan_channel(chand);
server_ref(chand->server);
- maybe_finish_shutdown(exec_ctx, chand->server);
+ maybe_finish_shutdown(chand->server);
GRPC_CLOSURE_INIT(&chand->finish_destroy_channel_closure,
finish_destroy_channel, chand, grpc_schedule_on_exec_ctx);
@@ -436,20 +427,18 @@ static void destroy_channel(grpc_exec_ctx* exec_ctx, channel_data* chand,
grpc_transport_op* op =
grpc_make_transport_op(&chand->finish_destroy_channel_closure);
op->set_accept_stream = true;
- grpc_channel_next_op(exec_ctx,
- grpc_channel_stack_element(
+ grpc_channel_next_op(grpc_channel_stack_element(
grpc_channel_get_channel_stack(chand->channel), 0),
op);
}
-static void done_request_event(grpc_exec_ctx* exec_ctx, void* req,
- grpc_cq_completion* c) {
+static void done_request_event(void* req, grpc_cq_completion* c) {
gpr_free(req);
}
-static void publish_call(grpc_exec_ctx* exec_ctx, grpc_server* server,
- call_data* calld, size_t cq_idx, requested_call* rc) {
- grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call);
+static void publish_call(grpc_server* server, call_data* calld, size_t cq_idx,
+ requested_call* rc) {
+ grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
grpc_call* call = calld->call;
*rc->call = call;
calld->cq_new = server->cqs[cq_idx];
@@ -476,12 +465,11 @@ static void publish_call(grpc_exec_ctx* exec_ctx, grpc_server* server,
GPR_UNREACHABLE_CODE(return );
}
- grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE,
- done_request_event, rc, &rc->completion);
+ grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event,
+ rc, &rc->completion);
}
-static void publish_new_rpc(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
+static void publish_new_rpc(void* arg, grpc_error* error) {
grpc_call_element* call_elem = (grpc_call_element*)arg;
call_data* calld = (call_data*)call_elem->call_data;
channel_data* chand = (channel_data*)call_elem->channel_data;
@@ -494,8 +482,7 @@ static void publish_new_rpc(grpc_exec_ctx* exec_ctx, void* arg,
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure,
- GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_REF(error));
return;
}
@@ -506,15 +493,15 @@ static void publish_new_rpc(grpc_exec_ctx* exec_ctx, void* arg,
if (rc == nullptr) {
continue;
} else {
- GRPC_STATS_INC_SERVER_CQS_CHECKED(exec_ctx, i);
+ GRPC_STATS_INC_SERVER_CQS_CHECKED(i);
gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
- publish_call(exec_ctx, server, calld, cq_idx, rc);
+ publish_call(server, calld, cq_idx, rc);
return; /* early out */
}
}
/* no cq to take the request found: queue it on the slow list */
- GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx);
+ GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED();
gpr_mu_lock(&server->mu_call);
// We need to ensure that all the queues are empty. We do this under
@@ -529,9 +516,9 @@ static void publish_new_rpc(grpc_exec_ctx* exec_ctx, void* arg,
continue;
} else {
gpr_mu_unlock(&server->mu_call);
- GRPC_STATS_INC_SERVER_CQS_CHECKED(exec_ctx, i + server->cq_count);
+ GRPC_STATS_INC_SERVER_CQS_CHECKED(i + server->cq_count);
gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
- publish_call(exec_ctx, server, calld, cq_idx, rc);
+ publish_call(server, calld, cq_idx, rc);
return; /* early out */
}
}
@@ -548,8 +535,7 @@ static void publish_new_rpc(grpc_exec_ctx* exec_ctx, void* arg,
}
static void finish_start_new_rpc(
- grpc_exec_ctx* exec_ctx, grpc_server* server, grpc_call_element* elem,
- request_matcher* rm,
+ grpc_server* server, grpc_call_element* elem, request_matcher* rm,
grpc_server_register_method_payload_handling payload_handling) {
call_data* calld = (call_data*)elem->call_data;
@@ -557,7 +543,7 @@ static void finish_start_new_rpc(
gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
return;
}
@@ -565,7 +551,7 @@ static void finish_start_new_rpc(
switch (payload_handling) {
case GRPC_SRM_PAYLOAD_NONE:
- publish_new_rpc(exec_ctx, elem, GRPC_ERROR_NONE);
+ publish_new_rpc(elem, GRPC_ERROR_NONE);
break;
case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: {
grpc_op op;
@@ -574,14 +560,13 @@ static void finish_start_new_rpc(
op.data.recv_message.recv_message = &calld->payload;
GRPC_CLOSURE_INIT(&calld->publish, publish_new_rpc, elem,
grpc_schedule_on_exec_ctx);
- grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1,
- &calld->publish);
+ grpc_call_start_batch_and_execute(calld->call, &op, 1, &calld->publish);
break;
}
}
}
-static void start_new_rpc(grpc_exec_ctx* exec_ctx, grpc_call_element* elem) {
+static void start_new_rpc(grpc_call_element* elem) {
channel_data* chand = (channel_data*)elem->channel_data;
call_data* calld = (call_data*)elem->call_data;
grpc_server* server = chand->server;
@@ -606,8 +591,7 @@ static void start_new_rpc(grpc_exec_ctx* exec_ctx, grpc_call_element* elem) {
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
continue;
}
- finish_start_new_rpc(exec_ctx, server, elem,
- &rm->server_registered_method->matcher,
+ finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher,
rm->server_registered_method->payload_handling);
return;
}
@@ -624,14 +608,12 @@ static void start_new_rpc(grpc_exec_ctx* exec_ctx, grpc_call_element* elem) {
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
continue;
}
- finish_start_new_rpc(exec_ctx, server, elem,
- &rm->server_registered_method->matcher,
+ finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher,
rm->server_registered_method->payload_handling);
return;
}
}
- finish_start_new_rpc(exec_ctx, server, elem,
- &server->unregistered_request_matcher,
+ finish_start_new_rpc(server, elem, &server->unregistered_request_matcher,
GRPC_SRM_PAYLOAD_NONE);
}
@@ -644,9 +626,8 @@ static int num_listeners(grpc_server* server) {
return n;
}
-static void done_shutdown_event(grpc_exec_ctx* exec_ctx, void* server,
- grpc_cq_completion* completion) {
- server_unref(exec_ctx, (grpc_server*)server);
+static void done_shutdown_event(void* server, grpc_cq_completion* completion) {
+ server_unref((grpc_server*)server);
}
static int num_channels(grpc_server* server) {
@@ -659,34 +640,30 @@ static int num_channels(grpc_server* server) {
return n;
}
-static void kill_pending_work_locked(grpc_exec_ctx* exec_ctx,
- grpc_server* server, grpc_error* error) {
+static void kill_pending_work_locked(grpc_server* server, grpc_error* error) {
if (server->started) {
- request_matcher_kill_requests(exec_ctx, server,
- &server->unregistered_request_matcher,
+ request_matcher_kill_requests(server, &server->unregistered_request_matcher,
GRPC_ERROR_REF(error));
request_matcher_zombify_all_pending_calls(
- exec_ctx, &server->unregistered_request_matcher);
+ &server->unregistered_request_matcher);
for (registered_method* rm = server->registered_methods; rm;
rm = rm->next) {
- request_matcher_kill_requests(exec_ctx, server, &rm->matcher,
+ request_matcher_kill_requests(server, &rm->matcher,
GRPC_ERROR_REF(error));
- request_matcher_zombify_all_pending_calls(exec_ctx, &rm->matcher);
+ request_matcher_zombify_all_pending_calls(&rm->matcher);
}
}
GRPC_ERROR_UNREF(error);
}
-static void maybe_finish_shutdown(grpc_exec_ctx* exec_ctx,
- grpc_server* server) {
+static void maybe_finish_shutdown(grpc_server* server) {
size_t i;
if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
return;
}
kill_pending_work_locked(
- exec_ctx, server,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
+ server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
if (server->root_channel_data.next != &server->root_channel_data ||
server->listeners_destroyed < num_listeners(server)) {
@@ -706,15 +683,13 @@ static void maybe_finish_shutdown(grpc_exec_ctx* exec_ctx,
server->shutdown_published = 1;
for (i = 0; i < server->num_shutdown_tags; i++) {
server_ref(server);
- grpc_cq_end_op(exec_ctx, server->shutdown_tags[i].cq,
- server->shutdown_tags[i].tag, GRPC_ERROR_NONE,
- done_shutdown_event, server,
+ grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
+ GRPC_ERROR_NONE, done_shutdown_event, server,
&server->shutdown_tags[i].completion);
}
}
-static void server_on_recv_initial_metadata(grpc_exec_ctx* exec_ctx, void* ptr,
- grpc_error* error) {
+static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) {
grpc_call_element* elem = (grpc_call_element*)ptr;
call_data* calld = (call_data*)elem->call_data;
grpc_millis op_deadline;
@@ -728,10 +703,10 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx* exec_ctx, void* ptr,
GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.authority->md));
calld->path_set = true;
calld->host_set = true;
- grpc_metadata_batch_remove(exec_ctx, calld->recv_initial_metadata,
+ grpc_metadata_batch_remove(calld->recv_initial_metadata,
calld->recv_initial_metadata->idx.named.path);
grpc_metadata_batch_remove(
- exec_ctx, calld->recv_initial_metadata,
+ calld->recv_initial_metadata,
calld->recv_initial_metadata->idx.named.authority);
} else {
GRPC_ERROR_REF(error);
@@ -749,7 +724,7 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx* exec_ctx, void* ptr,
GRPC_ERROR_UNREF(src_error);
}
- GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv_initial_metadata, error);
+ GRPC_CLOSURE_RUN(calld->on_done_recv_initial_metadata, error);
}
static void server_mutate_op(grpc_call_element* elem,
@@ -770,24 +745,21 @@ static void server_mutate_op(grpc_call_element* elem,
}
static void server_start_transport_stream_op_batch(
- grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- grpc_transport_stream_op_batch* op) {
+ grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
server_mutate_op(elem, op);
- grpc_call_next_op(exec_ctx, elem, op);
+ grpc_call_next_op(elem, op);
}
-static void got_initial_metadata(grpc_exec_ctx* exec_ctx, void* ptr,
- grpc_error* error) {
+static void got_initial_metadata(void* ptr, grpc_error* error) {
grpc_call_element* elem = (grpc_call_element*)ptr;
call_data* calld = (call_data*)elem->call_data;
if (error == GRPC_ERROR_NONE) {
- start_new_rpc(exec_ctx, elem);
+ start_new_rpc(elem);
} else {
if (gpr_atm_full_cas(&calld->state, NOT_STARTED, ZOMBIED)) {
GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure,
- GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
} else if (gpr_atm_full_cas(&calld->state, PENDING, ZOMBIED)) {
/* zombied call will be destroyed when it's removed from the pending
queue... later */
@@ -795,8 +767,7 @@ static void got_initial_metadata(grpc_exec_ctx* exec_ctx, void* ptr,
}
}
-static void accept_stream(grpc_exec_ctx* exec_ctx, void* cd,
- grpc_transport* transport,
+static void accept_stream(void* cd, grpc_transport* transport,
const void* transport_server_data) {
channel_data* chand = (channel_data*)cd;
/* create a call */
@@ -806,11 +777,11 @@ static void accept_stream(grpc_exec_ctx* exec_ctx, void* cd,
args.server_transport_data = transport_server_data;
args.send_deadline = GRPC_MILLIS_INF_FUTURE;
grpc_call* call;
- grpc_error* error = grpc_call_create(exec_ctx, &args, &call);
+ grpc_error* error = grpc_call_create(&args, &call);
grpc_call_element* elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
if (error != GRPC_ERROR_NONE) {
- got_initial_metadata(exec_ctx, elem, error);
+ got_initial_metadata(elem, error);
GRPC_ERROR_UNREF(error);
return;
}
@@ -822,32 +793,28 @@ static void accept_stream(grpc_exec_ctx* exec_ctx, void* cd,
&calld->initial_metadata;
GRPC_CLOSURE_INIT(&calld->got_initial_metadata, got_initial_metadata, elem,
grpc_schedule_on_exec_ctx);
- grpc_call_start_batch_and_execute(exec_ctx, call, &op, 1,
- &calld->got_initial_metadata);
+ grpc_call_start_batch_and_execute(call, &op, 1, &calld->got_initial_metadata);
}
-static void channel_connectivity_changed(grpc_exec_ctx* exec_ctx, void* cd,
- grpc_error* error) {
+static void channel_connectivity_changed(void* cd, grpc_error* error) {
channel_data* chand = (channel_data*)cd;
grpc_server* server = chand->server;
if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->on_connectivity_state_change = &chand->channel_connectivity_changed;
op->connectivity_state = &chand->connectivity_state;
- grpc_channel_next_op(exec_ctx,
- grpc_channel_stack_element(
+ grpc_channel_next_op(grpc_channel_stack_element(
grpc_channel_get_channel_stack(chand->channel), 0),
op);
} else {
gpr_mu_lock(&server->mu_global);
- destroy_channel(exec_ctx, chand, GRPC_ERROR_REF(error));
+ destroy_channel(chand, GRPC_ERROR_REF(error));
gpr_mu_unlock(&server->mu_global);
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "connectivity");
+ GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity");
}
}
-static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem,
+static grpc_error* init_call_elem(grpc_call_element* elem,
const grpc_call_element_args* args) {
call_data* calld = (call_data*)elem->call_data;
channel_data* chand = (channel_data*)elem->channel_data;
@@ -863,7 +830,7 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
return GRPC_ERROR_NONE;
}
-static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {
channel_data* chand = (channel_data*)elem->channel_data;
@@ -872,19 +839,18 @@ static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
GPR_ASSERT(calld->state != PENDING);
if (calld->host_set) {
- grpc_slice_unref_internal(exec_ctx, calld->host);
+ grpc_slice_unref_internal(calld->host);
}
if (calld->path_set) {
- grpc_slice_unref_internal(exec_ctx, calld->path);
+ grpc_slice_unref_internal(calld->path);
}
grpc_metadata_array_destroy(&calld->initial_metadata);
grpc_byte_buffer_destroy(calld->payload);
- server_unref(exec_ctx, chand->server);
+ server_unref(chand->server);
}
-static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
- grpc_channel_element* elem,
+static grpc_error* init_channel_elem(grpc_channel_element* elem,
grpc_channel_element_args* args) {
channel_data* chand = (channel_data*)elem->channel_data;
GPR_ASSERT(args->is_first);
@@ -900,15 +866,14 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
return GRPC_ERROR_NONE;
}
-static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
- grpc_channel_element* elem) {
+static void destroy_channel_elem(grpc_channel_element* elem) {
size_t i;
channel_data* chand = (channel_data*)elem->channel_data;
if (chand->registered_methods) {
for (i = 0; i < chand->registered_method_slots; i++) {
- grpc_slice_unref_internal(exec_ctx, chand->registered_methods[i].method);
+ grpc_slice_unref_internal(chand->registered_methods[i].method);
if (chand->registered_methods[i].has_host) {
- grpc_slice_unref_internal(exec_ctx, chand->registered_methods[i].host);
+ grpc_slice_unref_internal(chand->registered_methods[i].host);
}
}
gpr_free(chand->registered_methods);
@@ -918,9 +883,9 @@ static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
chand->next->prev = chand->prev;
chand->prev->next = chand->next;
chand->next = chand->prev = chand;
- maybe_finish_shutdown(exec_ctx, chand->server);
+ maybe_finish_shutdown(chand->server);
gpr_mu_unlock(&chand->server->mu_global);
- server_unref(exec_ctx, chand->server);
+ server_unref(chand->server);
}
}
@@ -1034,11 +999,10 @@ void* grpc_server_register_method(
return m;
}
-static void start_listeners(grpc_exec_ctx* exec_ctx, void* s,
- grpc_error* error) {
+static void start_listeners(void* s, grpc_error* error) {
grpc_server* server = (grpc_server*)s;
for (listener* l = server->listeners; l; l = l->next) {
- l->start(exec_ctx, server, l->arg, server->pollsets, server->pollset_count);
+ l->start(server, l->arg, server->pollsets, server->pollset_count);
}
gpr_mu_lock(&server->mu_global);
@@ -1046,12 +1010,12 @@ static void start_listeners(grpc_exec_ctx* exec_ctx, void* s,
gpr_cv_signal(&server->starting_cv);
gpr_mu_unlock(&server->mu_global);
- server_unref(exec_ctx, server);
+ server_unref(server);
}
void grpc_server_start(grpc_server* server) {
size_t i;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));
@@ -1073,12 +1037,9 @@ void grpc_server_start(grpc_server* server) {
server_ref(server);
server->starting = true;
GRPC_CLOSURE_SCHED(
- &exec_ctx,
GRPC_CLOSURE_CREATE(start_listeners, server,
grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)),
GRPC_ERROR_NONE);
-
- grpc_exec_ctx_finish(&exec_ctx);
}
void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,
@@ -1087,8 +1048,7 @@ void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,
*pollsets = server->pollsets;
}
-void grpc_server_setup_transport(grpc_exec_ctx* exec_ctx, grpc_server* s,
- grpc_transport* transport,
+void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport,
grpc_pollset* accepting_pollset,
const grpc_channel_args* args) {
size_t num_registered_methods;
@@ -1103,8 +1063,7 @@ void grpc_server_setup_transport(grpc_exec_ctx* exec_ctx, grpc_server* s,
uint32_t max_probes = 0;
grpc_transport_op* op = nullptr;
- channel = grpc_channel_create(exec_ctx, nullptr, args, GRPC_SERVER_CHANNEL,
- transport);
+ channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport);
chand = (channel_data*)grpc_channel_stack_element(
grpc_channel_get_channel_stack(channel), 0)
->channel_data;
@@ -1181,21 +1140,19 @@ void grpc_server_setup_transport(grpc_exec_ctx* exec_ctx, grpc_server* s,
op->disconnect_with_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown");
}
- grpc_transport_perform_op(exec_ctx, transport, op);
+ grpc_transport_perform_op(transport, op);
}
-void done_published_shutdown(grpc_exec_ctx* exec_ctx, void* done_arg,
- grpc_cq_completion* storage) {
+void done_published_shutdown(void* done_arg, grpc_cq_completion* storage) {
(void)done_arg;
gpr_free(storage);
}
-static void listener_destroy_done(grpc_exec_ctx* exec_ctx, void* s,
- grpc_error* error) {
+static void listener_destroy_done(void* s, grpc_error* error) {
grpc_server* server = (grpc_server*)s;
gpr_mu_lock(&server->mu_global);
server->listeners_destroyed++;
- maybe_finish_shutdown(exec_ctx, server);
+ maybe_finish_shutdown(server);
gpr_mu_unlock(&server->mu_global);
}
@@ -1204,7 +1161,7 @@ void grpc_server_shutdown_and_notify(grpc_server* server,
listener* l;
shutdown_tag* sdt;
channel_broadcaster broadcaster;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
(server, cq, tag));
@@ -1219,11 +1176,10 @@ void grpc_server_shutdown_and_notify(grpc_server* server,
/* stay locked, and gather up some stuff to do */
GPR_ASSERT(grpc_cq_begin_op(cq, tag));
if (server->shutdown_published) {
- grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown,
- nullptr,
+ grpc_cq_end_op(cq, tag, GRPC_ERROR_NONE, done_published_shutdown, nullptr,
(grpc_cq_completion*)gpr_malloc(sizeof(grpc_cq_completion)));
gpr_mu_unlock(&server->mu_global);
- goto done;
+ return;
}
server->shutdown_tags = (shutdown_tag*)gpr_realloc(
server->shutdown_tags,
@@ -1233,7 +1189,7 @@ void grpc_server_shutdown_and_notify(grpc_server* server,
sdt->cq = cq;
if (gpr_atm_acq_load(&server->shutdown_flag)) {
gpr_mu_unlock(&server->mu_global);
- goto done;
+ return;
}
server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
@@ -1245,30 +1201,26 @@ void grpc_server_shutdown_and_notify(grpc_server* server,
/* collect all unregistered then registered calls */
gpr_mu_lock(&server->mu_call);
kill_pending_work_locked(
- &exec_ctx, server,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
+ server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
gpr_mu_unlock(&server->mu_call);
- maybe_finish_shutdown(&exec_ctx, server);
+ maybe_finish_shutdown(server);
gpr_mu_unlock(&server->mu_global);
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
GRPC_CLOSURE_INIT(&l->destroy_done, listener_destroy_done, server,
grpc_schedule_on_exec_ctx);
- l->destroy(&exec_ctx, server, l->arg, &l->destroy_done);
+ l->destroy(server, l->arg, &l->destroy_done);
}
- channel_broadcaster_shutdown(&exec_ctx, &broadcaster, true /* send_goaway */,
+ channel_broadcaster_shutdown(&broadcaster, true /* send_goaway */,
GRPC_ERROR_NONE);
-
-done:
- grpc_exec_ctx_finish(&exec_ctx);
}
void grpc_server_cancel_all_calls(grpc_server* server) {
channel_broadcaster broadcaster;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server));
@@ -1277,14 +1229,13 @@ void grpc_server_cancel_all_calls(grpc_server* server) {
gpr_mu_unlock(&server->mu_global);
channel_broadcaster_shutdown(
- &exec_ctx, &broadcaster, false /* send_goaway */,
+ &broadcaster, false /* send_goaway */,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Cancelling all calls"));
- grpc_exec_ctx_finish(&exec_ctx);
}
void grpc_server_destroy(grpc_server* server) {
listener* l;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));
@@ -1300,16 +1251,15 @@ void grpc_server_destroy(grpc_server* server) {
gpr_mu_unlock(&server->mu_global);
- server_unref(&exec_ctx, server);
- grpc_exec_ctx_finish(&exec_ctx);
+ server_unref(server);
}
-void grpc_server_add_listener(
- grpc_exec_ctx* exec_ctx, grpc_server* server, void* arg,
- void (*start)(grpc_exec_ctx* exec_ctx, grpc_server* server, void* arg,
- grpc_pollset** pollsets, size_t pollset_count),
- void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_server* server, void* arg,
- grpc_closure* on_done)) {
+void grpc_server_add_listener(grpc_server* server, void* arg,
+ void (*start)(grpc_server* server, void* arg,
+ grpc_pollset** pollsets,
+ size_t pollset_count),
+ void (*destroy)(grpc_server* server, void* arg,
+ grpc_closure* on_done)) {
listener* l = (listener*)gpr_malloc(sizeof(listener));
l->arg = arg;
l->start = start;
@@ -1318,13 +1268,12 @@ void grpc_server_add_listener(
server->listeners = l;
}
-static grpc_call_error queue_call_request(grpc_exec_ctx* exec_ctx,
- grpc_server* server, size_t cq_idx,
+static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,
requested_call* rc) {
call_data* calld = nullptr;
request_matcher* rm = nullptr;
if (gpr_atm_acq_load(&server->shutdown_flag)) {
- fail_call(exec_ctx, server, cq_idx, rc,
+ fail_call(server, cq_idx, rc,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
return GRPC_CALL_OK;
}
@@ -1351,10 +1300,9 @@ static grpc_call_error queue_call_request(grpc_exec_ctx* exec_ctx,
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure,
- GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
} else {
- publish_call(exec_ctx, server, calld, cq_idx, rc);
+ publish_call(server, calld, cq_idx, rc);
}
gpr_mu_lock(&server->mu_call);
}
@@ -1369,9 +1317,9 @@ grpc_call_error grpc_server_request_call(
grpc_completion_queue* cq_bound_to_call,
grpc_completion_queue* cq_for_notification, void* tag) {
grpc_call_error error;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
requested_call* rc = (requested_call*)gpr_malloc(sizeof(*rc));
- GRPC_STATS_INC_SERVER_REQUESTED_CALLS(&exec_ctx);
+ GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
GRPC_API_TRACE(
"grpc_server_request_call("
"server=%p, call=%p, details=%p, initial_metadata=%p, "
@@ -1404,9 +1352,9 @@ grpc_call_error grpc_server_request_call(
rc->call = call;
rc->data.batch.details = details;
rc->initial_metadata = initial_metadata;
- error = queue_call_request(&exec_ctx, server, cq_idx, rc);
+ error = queue_call_request(server, cq_idx, rc);
done:
- grpc_exec_ctx_finish(&exec_ctx);
+
return error;
}
@@ -1416,10 +1364,10 @@ grpc_call_error grpc_server_request_registered_call(
grpc_completion_queue* cq_bound_to_call,
grpc_completion_queue* cq_for_notification, void* tag) {
grpc_call_error error;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_core::ExecCtx exec_ctx;
requested_call* rc = (requested_call*)gpr_malloc(sizeof(*rc));
registered_method* rm = (registered_method*)rmp;
- GRPC_STATS_INC_SERVER_REQUESTED_CALLS(&exec_ctx);
+ GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
GRPC_API_TRACE(
"grpc_server_request_registered_call("
"server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
@@ -1461,20 +1409,20 @@ grpc_call_error grpc_server_request_registered_call(
rc->data.registered.deadline = deadline;
rc->initial_metadata = initial_metadata;
rc->data.registered.optional_payload = optional_payload;
- error = queue_call_request(&exec_ctx, server, cq_idx, rc);
+ error = queue_call_request(server, cq_idx, rc);
done:
- grpc_exec_ctx_finish(&exec_ctx);
+
return error;
}
-static void fail_call(grpc_exec_ctx* exec_ctx, grpc_server* server,
- size_t cq_idx, requested_call* rc, grpc_error* error) {
+static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
+ grpc_error* error) {
*rc->call = nullptr;
rc->initial_metadata->count = 0;
GPR_ASSERT(error != GRPC_ERROR_NONE);
- grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error,
- done_request_event, rc, &rc->completion);
+ grpc_cq_end_op(server->cqs[cq_idx], rc->tag, error, done_request_event, rc,
+ &rc->completion);
}
const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server) {
diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h
index d7ec025d95..63b6dff16b 100644
--- a/src/core/lib/surface/server.h
+++ b/src/core/lib/surface/server.h
@@ -31,17 +31,16 @@ extern grpc_core::TraceFlag grpc_server_channel_trace;
/* Add a listener to the server: when the server starts, it will call start,
and when it shuts down, it will call destroy */
-void grpc_server_add_listener(
- grpc_exec_ctx* exec_ctx, grpc_server* server, void* listener,
- void (*start)(grpc_exec_ctx* exec_ctx, grpc_server* server, void* arg,
- grpc_pollset** pollsets, size_t npollsets),
- void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_server* server, void* arg,
- grpc_closure* on_done));
+void grpc_server_add_listener(grpc_server* server, void* listener,
+ void (*start)(grpc_server* server, void* arg,
+ grpc_pollset** pollsets,
+ size_t npollsets),
+ void (*destroy)(grpc_server* server, void* arg,
+ grpc_closure* on_done));
/* Setup a transport - creates a channel stack, binds the transport to the
server */
-void grpc_server_setup_transport(grpc_exec_ctx* exec_ctx, grpc_server* server,
- grpc_transport* transport,
+void grpc_server_setup_transport(grpc_server* server, grpc_transport* transport,
grpc_pollset* accepting_pollset,
const grpc_channel_args* args);