aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c204
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.c9
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c18
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c2
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c4
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c4
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c7
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c139
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_ping.c2
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_window_update.c3
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.c8
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h5
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c2
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c3
-rw-r--r--src/core/lib/iomgr/combiner.c240
-rw-r--r--src/core/lib/iomgr/combiner.h8
-rw-r--r--src/core/lib/iomgr/endpoint.c4
-rw-r--r--src/core/lib/iomgr/endpoint.h4
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.c100
-rw-r--r--src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c198
-rw-r--r--src/core/lib/iomgr/ev_epoll_thread_pool_linux.c172
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c156
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.c294
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c31
-rw-r--r--src/core/lib/iomgr/ev_posix.c26
-rw-r--r--src/core/lib/iomgr/ev_posix.h16
-rw-r--r--src/core/lib/iomgr/exec_ctx.c1
-rw-r--r--src/core/lib/iomgr/executor.c245
-rw-r--r--src/core/lib/iomgr/executor.h9
-rw-r--r--src/core/lib/iomgr/iomgr.c10
-rw-r--r--src/core/lib/iomgr/iomgr.h4
-rw-r--r--src/core/lib/iomgr/resource_quota.c31
-rw-r--r--src/core/lib/iomgr/tcp_posix.c19
-rw-r--r--src/core/lib/iomgr/tcp_uv.c7
-rw-r--r--src/core/lib/iomgr/timer.h10
-rw-r--r--src/core/lib/iomgr/timer_generic.c58
-rw-r--r--src/core/lib/iomgr/timer_manager.c187
-rw-r--r--src/core/lib/iomgr/timer_uv.c6
-rw-r--r--src/core/lib/iomgr/workqueue.h87
-rw-r--r--src/core/lib/iomgr/workqueue_uv.c65
-rw-r--r--src/core/lib/iomgr/workqueue_uv.h37
-rw-r--r--src/core/lib/iomgr/workqueue_windows.c63
-rw-r--r--src/core/lib/iomgr/workqueue_windows.h37
-rw-r--r--src/core/lib/security/transport/client_auth_filter.c36
-rw-r--r--src/core/lib/security/transport/secure_endpoint.c6
-rw-r--r--src/core/lib/support/log.c6
-rw-r--r--src/core/lib/surface/call.c5
-rw-r--r--src/core/lib/surface/init.c11
-rw-r--r--src/core/lib/transport/transport.h4
-rw-r--r--src/core/lib/transport/transport_op_string.c3
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py2
52 files changed, 733 insertions, 1877 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index ab71467d73..2ec9b15f76 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -291,7 +291,7 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
w->chand = chand;
grpc_closure_init(&w->on_changed, on_lb_policy_state_changed_locked, w,
- grpc_combiner_scheduler(chand->combiner, false));
+ grpc_combiner_scheduler(chand->combiner));
w->state = current_state;
w->lb_policy = lb_policy;
grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state,
@@ -652,7 +652,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_closure_sched(
exec_ctx,
grpc_closure_init(&op->handler_private.closure, start_transport_op_locked,
- op, grpc_combiner_scheduler(chand->combiner, false)),
+ op, grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
}
@@ -683,7 +683,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
// Initialize data members.
- chand->combiner = grpc_combiner_create(NULL);
+ chand->combiner = grpc_combiner_create();
gpr_mu_init(&chand->info_mu);
gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
@@ -694,7 +694,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
chand->owning_stack = args->channel_stack;
grpc_closure_init(&chand->on_resolver_result_changed,
on_resolver_result_changed_locked, chand,
- grpc_combiner_scheduler(chand->combiner, false));
+ grpc_combiner_scheduler(chand->combiner));
chand->interested_parties = grpc_pollset_set_create();
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
@@ -753,9 +753,8 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
channel_data *chand = elem->channel_data;
if (chand->resolver != NULL) {
grpc_closure_sched(
- exec_ctx,
- grpc_closure_create(shutdown_resolver_locked, chand->resolver,
- grpc_combiner_scheduler(chand->combiner, false)),
+ exec_ctx, grpc_closure_create(shutdown_resolver_locked, chand->resolver,
+ grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
}
if (chand->client_channel_factory != NULL) {
@@ -786,11 +785,6 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
* PER-CALL FUNCTIONS
*/
-#define GET_CALL(call_data) \
- ((grpc_subchannel_call *)(gpr_atm_acq_load(&(call_data)->subchannel_call)))
-
-#define CANCELLED_CALL ((grpc_subchannel_call *)1)
-
/** Call data. Holds a pointer to grpc_subchannel_call and the
associated machinery to create such a pointer.
Handles queueing of stream ops until a call object is ready, waiting
@@ -811,11 +805,9 @@ typedef struct client_channel_call_data {
grpc_server_retry_throttle_data *retry_throttle_data;
method_parameters *method_params;
- grpc_error *cancel_error;
-
- /** either 0 for no call, 1 for cancelled, or a pointer to a
- grpc_subchannel_call */
- gpr_atm subchannel_call;
+ /** either 0 for no call, a pointer to a grpc_subchannel_call (if the lowest
+ bit is 0), or a pointer to an error (if the lowest bit is 1) */
+ gpr_atm subchannel_call_or_error;
gpr_arena *arena;
bool pick_pending;
@@ -837,10 +829,43 @@ typedef struct client_channel_call_data {
grpc_closure *original_on_complete;
} call_data;
+typedef struct {
+ grpc_subchannel_call *subchannel_call;
+ grpc_error *error;
+} call_or_error;
+
+static call_or_error get_call_or_error(call_data *p) {
+ gpr_atm c = gpr_atm_acq_load(&p->subchannel_call_or_error);
+ if (c == 0)
+ return (call_or_error){NULL, NULL};
+ else if (c & 1)
+ return (call_or_error){NULL, (grpc_error *)((c) & ~(gpr_atm)1)};
+ else
+ return (call_or_error){(grpc_subchannel_call *)c, NULL};
+}
+
+static bool set_call_or_error(call_data *p, call_or_error coe) {
+ // this should always be under a lock
+ call_or_error existing = get_call_or_error(p);
+ if (existing.error != GRPC_ERROR_NONE) {
+ GRPC_ERROR_UNREF(coe.error);
+ return false;
+ }
+ GPR_ASSERT(existing.subchannel_call == NULL);
+ if (coe.error != GRPC_ERROR_NONE) {
+ GPR_ASSERT(coe.subchannel_call == NULL);
+ gpr_atm_rel_store(&p->subchannel_call_or_error, 1 | (gpr_atm)coe.error);
+ } else {
+ GPR_ASSERT(coe.subchannel_call != NULL);
+ gpr_atm_rel_store(&p->subchannel_call_or_error,
+ (gpr_atm)coe.subchannel_call);
+ }
+ return true;
+}
+
grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
grpc_call_element *call_elem) {
- grpc_subchannel_call *scc = GET_CALL((call_data *)call_elem->call_data);
- return scc == CANCELLED_CALL ? NULL : scc;
+ return get_call_or_error(call_elem->call_data).subchannel_call;
}
static void add_waiting_locked(call_data *calld,
@@ -872,18 +897,18 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
return;
}
- grpc_subchannel_call *call = GET_CALL(calld);
+ call_or_error call = get_call_or_error(calld);
grpc_transport_stream_op_batch **ops = calld->waiting_ops;
size_t nops = calld->waiting_ops_count;
- if (call == CANCELLED_CALL) {
- fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED);
+ if (call.error != GRPC_ERROR_NONE) {
+ fail_locked(exec_ctx, calld, GRPC_ERROR_REF(call.error));
return;
}
calld->waiting_ops = NULL;
calld->waiting_ops_count = 0;
calld->waiting_ops_capacity = 0;
for (size_t i = 0; i < nops; i++) {
- grpc_subchannel_call_process_op(exec_ctx, call, ops[i]);
+ grpc_subchannel_call_process_op(exec_ctx, call.subchannel_call, ops[i]);
}
gpr_free(ops);
}
@@ -944,19 +969,23 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
calld->pick_pending = false;
grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
chand->interested_parties);
+ call_or_error coe = get_call_or_error(calld);
if (calld->connected_subchannel == NULL) {
- gpr_atm_no_barrier_store(&calld->subchannel_call, (gpr_atm)CANCELLED_CALL);
- fail_locked(exec_ctx, calld,
- error == GRPC_ERROR_NONE
- ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Call dropped by load balancing policy")
- : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Failed to create subchannel", &error, 1));
- } else if (GET_CALL(calld) == CANCELLED_CALL) {
+ grpc_error *failure =
+ error == GRPC_ERROR_NONE
+ ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Call dropped by load balancing policy")
+ : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Failed to create subchannel", &error, 1);
+ set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)});
+ fail_locked(exec_ctx, calld, failure);
+ } else if (coe.error != GRPC_ERROR_NONE) {
/* already cancelled before subchannel became ready */
+ grpc_error *child_errors[] = {error, coe.error};
grpc_error *cancellation_error =
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Cancelled before creating subchannel", &error, 1);
+ "Cancelled before creating subchannel", child_errors,
+ GPR_ARRAY_SIZE(child_errors));
/* if due to deadline, attach the deadline exceeded status to the error */
if (gpr_time_cmp(calld->deadline, gpr_now(GPR_CLOCK_MONOTONIC)) < 0) {
cancellation_error =
@@ -976,8 +1005,8 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
.context = calld->subchannel_call_context};
grpc_error *new_error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
- gpr_atm_rel_store(&calld->subchannel_call,
- (gpr_atm)(uintptr_t)subchannel_call);
+ GPR_ASSERT(set_call_or_error(
+ calld, (call_or_error){.subchannel_call = subchannel_call}));
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
fail_locked(exec_ctx, calld, new_error);
@@ -990,8 +1019,9 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
call_data *calld = elem->call_data;
- grpc_subchannel_call *subchannel_call = GET_CALL(calld);
- if (subchannel_call == NULL || subchannel_call == CANCELLED_CALL) {
+ grpc_subchannel_call *subchannel_call =
+ get_call_or_error(calld).subchannel_call;
+ if (subchannel_call == NULL) {
return NULL;
} else {
return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
@@ -1133,7 +1163,7 @@ static bool pick_subchannel_locked(
cpa->on_ready = on_ready;
cpa->elem = elem;
grpc_closure_init(&cpa->closure, continue_picking_locked, cpa,
- grpc_combiner_scheduler(chand->combiner, true));
+ grpc_combiner_scheduler(chand->combiner));
grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure,
GRPC_ERROR_NONE);
} else {
@@ -1150,58 +1180,45 @@ static void start_transport_stream_op_batch_locked_inner(
grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- grpc_subchannel_call *call;
/* need to recheck that another thread hasn't set the call */
- call = GET_CALL(calld);
- if (call == CANCELLED_CALL) {
+ call_or_error coe = get_call_or_error(calld);
+ if (coe.error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
+ exec_ctx, op, GRPC_ERROR_REF(coe.error));
/* early out */
return;
}
- if (call != NULL) {
- grpc_subchannel_call_process_op(exec_ctx, call, op);
+ if (coe.subchannel_call != NULL) {
+ grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op);
/* early out */
return;
}
/* if this is a cancellation, then we can raise our cancelled flag */
if (op->cancel_stream) {
- if (!gpr_atm_rel_cas(&calld->subchannel_call, 0,
- (gpr_atm)(uintptr_t)CANCELLED_CALL)) {
- /* recurse to retry */
- start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
- /* early out */
- return;
+ grpc_error *error = op->payload->cancel_stream.cancel_error;
+ /* Stash a copy of cancel_error in our call data, so that we can use
+ it for subsequent operations. This ensures that if the call is
+ cancelled before any ops are passed down (e.g., if the deadline
+ is in the past when the call starts), we can return the right
+ error to the caller when the first op does get passed down. */
+ set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)});
+ if (calld->pick_pending) {
+ cancel_pick_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
} else {
- /* Stash a copy of cancel_error in our call data, so that we can use
- it for subsequent operations. This ensures that if the call is
- cancelled before any ops are passed down (e.g., if the deadline
- is in the past when the call starts), we can return the right
- error to the caller when the first op does get passed down. */
- calld->cancel_error =
- GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error);
- if (calld->pick_pending) {
- cancel_pick_locked(
- exec_ctx, elem,
- GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
- } else {
- fail_locked(exec_ctx, calld,
- GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
- }
- grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op,
- GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
- /* early out */
- return;
+ fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
}
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op,
+ GRPC_ERROR_REF(error));
+ /* early out */
+ return;
}
/* if we don't have a subchannel, try to get one */
if (!calld->pick_pending && calld->connected_subchannel == NULL &&
op->send_initial_metadata) {
calld->pick_pending = true;
grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem,
- grpc_combiner_scheduler(chand->combiner, true));
+ grpc_combiner_scheduler(chand->combiner));
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
/* If a subchannel is not available immediately, the polling entity from
call_data should be provided to channel_data's interested_parties, so
@@ -1215,12 +1232,13 @@ static void start_transport_stream_op_batch_locked_inner(
calld->pick_pending = false;
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
if (calld->connected_subchannel == NULL) {
- gpr_atm_no_barrier_store(&calld->subchannel_call,
- (gpr_atm)CANCELLED_CALL);
grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Call dropped by load balancing policy");
+ set_call_or_error(calld,
+ (call_or_error){.error = GRPC_ERROR_REF(error)});
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, op, GRPC_ERROR_REF(error));
return; // Early out.
}
} else {
@@ -1240,8 +1258,8 @@ static void start_transport_stream_op_batch_locked_inner(
.context = calld->subchannel_call_context};
grpc_error *error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
- gpr_atm_rel_store(&calld->subchannel_call,
- (gpr_atm)(uintptr_t)subchannel_call);
+ GPR_ASSERT(set_call_or_error(
+ calld, (call_or_error){.subchannel_call = subchannel_call}));
if (error != GRPC_ERROR_NONE) {
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
@@ -1320,17 +1338,17 @@ static void cc_start_transport_stream_op_batch(
op);
}
/* try to (atomically) get the call */
- grpc_subchannel_call *call = GET_CALL(calld);
+ call_or_error coe = get_call_or_error(calld);
GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
- if (call == CANCELLED_CALL) {
+ if (coe.error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
+ exec_ctx, op, GRPC_ERROR_REF(coe.error));
GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
/* early out */
return;
}
- if (call != NULL) {
- grpc_subchannel_call_process_op(exec_ctx, call, op);
+ if (coe.subchannel_call != NULL) {
+ grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op);
GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
/* early out */
return;
@@ -1339,10 +1357,9 @@ static void cc_start_transport_stream_op_batch(
GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch");
op->handler_private.extra_arg = elem;
grpc_closure_sched(
- exec_ctx,
- grpc_closure_init(&op->handler_private.closure,
- start_transport_stream_op_batch_locked, op,
- grpc_combiner_scheduler(chand->combiner, false)),
+ exec_ctx, grpc_closure_init(&op->handler_private.closure,
+ start_transport_stream_op_batch_locked, op,
+ grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
}
@@ -1379,12 +1396,14 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
if (calld->method_params != NULL) {
method_parameters_unref(calld->method_params);
}
- GRPC_ERROR_UNREF(calld->cancel_error);
- grpc_subchannel_call *call = GET_CALL(calld);
- if (call != NULL && call != CANCELLED_CALL) {
- grpc_subchannel_call_set_cleanup_closure(call, then_schedule_closure);
+ call_or_error coe = get_call_or_error(calld);
+ GRPC_ERROR_UNREF(coe.error);
+ if (coe.subchannel_call != NULL) {
+ grpc_subchannel_call_set_cleanup_closure(coe.subchannel_call,
+ then_schedule_closure);
then_schedule_closure = NULL;
- GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");
+ GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, coe.subchannel_call,
+ "client_channel_destroy_call");
}
GPR_ASSERT(!calld->pick_pending);
GPR_ASSERT(calld->waiting_ops_count == 0);
@@ -1454,9 +1473,8 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
grpc_closure_sched(
- exec_ctx,
- grpc_closure_create(try_to_connect_locked, chand,
- grpc_combiner_scheduler(chand->combiner, false)),
+ exec_ctx, grpc_closure_create(try_to_connect_locked, chand,
+ grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
}
return out;
@@ -1593,6 +1611,6 @@ void grpc_client_channel_watch_connectivity_state(
grpc_closure_sched(
exec_ctx,
grpc_closure_init(&w->my_closure, watch_connectivity_state_locked, w,
- grpc_combiner_scheduler(chand->combiner, true)),
+ grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
}
diff --git a/src/core/ext/filters/client_channel/lb_policy.c b/src/core/ext/filters/client_channel/lb_policy.c
index 7ac2cb1775..0083a8044f 100644
--- a/src/core/ext/filters/client_channel/lb_policy.c
+++ b/src/core/ext/filters/client_channel/lb_policy.c
@@ -89,11 +89,10 @@ void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx,
gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1);
gpr_atm check = 1 << WEAK_REF_BITS;
if ((old_val & mask) == check) {
- grpc_closure_sched(
- exec_ctx,
- grpc_closure_create(shutdown_locked, policy,
- grpc_combiner_scheduler(policy->combiner, false)),
- GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx, grpc_closure_create(
+ shutdown_locked, policy,
+ grpc_combiner_scheduler(policy->combiner)),
+ GRPC_ERROR_NONE);
} else {
grpc_lb_policy_weak_unref(exec_ctx,
policy REF_FUNC_PASS_ARGS("strong-unref"));
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index 6eb0a9ef21..a23a56c052 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -756,7 +756,7 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
gpr_zalloc(sizeof(rr_connectivity_data));
grpc_closure_init(&rr_connectivity->on_change,
glb_rr_connectivity_changed_locked, rr_connectivity,
- grpc_combiner_scheduler(glb_policy->base.combiner, false));
+ grpc_combiner_scheduler(glb_policy->base.combiner));
rr_connectivity->glb_policy = glb_policy;
rr_connectivity->state = rr_state;
@@ -1267,7 +1267,7 @@ static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx,
gpr_time_add(now, glb_policy->client_stats_report_interval);
grpc_closure_init(&glb_policy->client_load_report_closure,
send_client_load_report_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner, false));
+ grpc_combiner_scheduler(glb_policy->base.combiner));
grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer,
next_client_load_report_time,
&glb_policy->client_load_report_closure, now);
@@ -1295,7 +1295,7 @@ static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx,
op.data.send_message.send_message = glb_policy->client_load_report_payload;
grpc_closure_init(&glb_policy->client_load_report_closure,
client_load_report_done_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner, false));
+ grpc_combiner_scheduler(glb_policy->base.combiner));
grpc_call_error call_error = grpc_call_start_batch_and_execute(
exec_ctx, glb_policy->lb_call, &op, 1,
&glb_policy->client_load_report_closure);
@@ -1401,13 +1401,13 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&glb_policy->lb_on_sent_initial_request,
lb_on_sent_initial_request_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner, false));
+ grpc_combiner_scheduler(glb_policy->base.combiner));
grpc_closure_init(&glb_policy->lb_on_server_status_received,
lb_on_server_status_received_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner, false));
+ grpc_combiner_scheduler(glb_policy->base.combiner));
grpc_closure_init(&glb_policy->lb_on_response_received,
lb_on_response_received_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner, false));
+ grpc_combiner_scheduler(glb_policy->base.combiner));
gpr_backoff_init(&glb_policy->lb_call_backoff_state,
GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS,
@@ -1708,9 +1708,9 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
}
}
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
- grpc_closure_init(
- &glb_policy->lb_on_call_retry, lb_call_on_retry_timer_locked,
- glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner, false));
+ grpc_closure_init(&glb_policy->lb_on_call_retry,
+ lb_call_on_retry_timer_locked, glb_policy,
+ grpc_combiner_scheduler(glb_policy->base.combiner));
glb_policy->retry_timer_active = true;
grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try,
&glb_policy->lb_on_call_retry, now);
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
index b6f86255df..691c07cb7f 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
@@ -670,7 +670,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
pf_update_locked(exec_ctx, &p->base, args);
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed_locked, p,
- grpc_combiner_scheduler(args->combiner, false));
+ grpc_combiner_scheduler(args->combiner));
return &p->base;
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index c9758eef88..2e9f029fc8 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -764,7 +764,7 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
sd->subchannel = subchannel;
grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed_locked, sd,
- grpc_combiner_scheduler(args->combiner, false));
+ grpc_combiner_scheduler(args->combiner));
/* use some sentinel value outside of the range of
* grpc_connectivity_state to signal an undefined previous state. We
* won't be referring to this value again and it'll be overwritten after
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c
index 578e8d697f..861b809b19 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c
@@ -284,10 +284,10 @@ static grpc_resolver *dns_ares_create(grpc_exec_ctx *exec_ctx,
GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
grpc_closure_init(&r->dns_ares_on_retry_timer_locked,
dns_ares_on_retry_timer_locked, r,
- grpc_combiner_scheduler(r->base.combiner, false));
+ grpc_combiner_scheduler(r->base.combiner));
grpc_closure_init(&r->dns_ares_on_resolved_locked,
dns_ares_on_resolved_locked, r,
- grpc_combiner_scheduler(r->base.combiner, false));
+ grpc_combiner_scheduler(r->base.combiner));
return &r->base;
}
diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c
index ebe6a07215..f6d7176f5d 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c
@@ -194,7 +194,7 @@ static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg,
gpr_log(GPR_DEBUG, "retrying immediately");
}
grpc_closure_init(&r->on_retry, dns_on_retry_timer_locked, r,
- grpc_combiner_scheduler(r->base.combiner, false));
+ grpc_combiner_scheduler(r->base.combiner));
grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->on_retry, now);
}
if (r->resolved_result != NULL) {
@@ -216,7 +216,7 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx,
grpc_resolve_address(
exec_ctx, r->name_to_resolve, r->default_port, r->interested_parties,
grpc_closure_create(dns_on_resolved_locked, r,
- grpc_combiner_scheduler(r->base.combiner, false)),
+ grpc_combiner_scheduler(r->base.combiner)),
&r->addresses);
}
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
index cb9b1f07c2..a93ba3fa96 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
@@ -173,10 +173,9 @@ void grpc_fake_resolver_response_generator_set_response(
GPR_ASSERT(generator->resolver != NULL);
generator->next_response = grpc_channel_args_copy(next_response);
grpc_closure_sched(
- exec_ctx,
- grpc_closure_create(
- set_response_cb, generator,
- grpc_combiner_scheduler(generator->resolver->base.combiner, false)),
+ exec_ctx, grpc_closure_create(set_response_cb, generator,
+ grpc_combiner_scheduler(
+ generator->resolver->base.combiner)),
GRPC_ERROR_NONE);
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index f3268bcfca..f112ce1b76 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -50,7 +50,6 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/timer.h"
-#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
@@ -267,7 +266,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->ep = ep;
/* one ref is for destroy */
gpr_ref_init(&t->refs, 1);
- t->combiner = grpc_combiner_create(grpc_endpoint_get_workqueue(ep));
+ t->combiner = grpc_combiner_create();
t->peer_string = grpc_endpoint_get_peer(ep);
t->endpoint_reading = 1;
t->next_stream_id = is_client ? 1 : 2;
@@ -288,29 +287,29 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init(&t->write_action, write_action, t,
grpc_schedule_on_exec_ctx);
grpc_closure_init(&t->read_action_locked, read_action_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
+ grpc_combiner_scheduler(t->combiner));
grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
+ grpc_combiner_scheduler(t->combiner));
grpc_closure_init(&t->destructive_reclaimer_locked,
destructive_reclaimer_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
+ grpc_combiner_scheduler(t->combiner));
grpc_closure_init(&t->retry_initiate_ping_locked, retry_initiate_ping_locked,
- t, grpc_combiner_scheduler(t->combiner, false));
+ t, grpc_combiner_scheduler(t->combiner));
grpc_closure_init(&t->start_bdp_ping_locked, start_bdp_ping_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
+ grpc_combiner_scheduler(t->combiner));
grpc_closure_init(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
+ grpc_combiner_scheduler(t->combiner));
grpc_closure_init(&t->init_keepalive_ping_locked, init_keepalive_ping_locked,
- t, grpc_combiner_scheduler(t->combiner, false));
+ t, grpc_combiner_scheduler(t->combiner));
grpc_closure_init(&t->start_keepalive_ping_locked,
start_keepalive_ping_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
+ grpc_combiner_scheduler(t->combiner));
grpc_closure_init(&t->finish_keepalive_ping_locked,
finish_keepalive_ping_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
+ grpc_combiner_scheduler(t->combiner));
grpc_closure_init(&t->keepalive_watchdog_fired_locked,
keepalive_watchdog_fired_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
+ grpc_combiner_scheduler(t->combiner));
grpc_bdp_estimator_init(&t->bdp_estimator, t->peer_string);
t->last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC);
@@ -353,7 +352,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (is_client) {
grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string(
GRPC_CHTTP2_CLIENT_CONNECT_STRING));
- grpc_chttp2_initiate_write(exec_ctx, t, false, "initial_write");
+ grpc_chttp2_initiate_write(exec_ctx, t, "initial_write");
}
/* configure http2 the way we like it */
@@ -565,7 +564,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
}
- grpc_chttp2_initiate_write(exec_ctx, t, false, "init");
+ grpc_chttp2_initiate_write(exec_ctx, t, "init");
post_benign_reclaimer(exec_ctx, t);
}
@@ -583,9 +582,9 @@ static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp,
static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_closure_sched(exec_ctx, grpc_closure_create(
- destroy_transport_locked, t,
- grpc_combiner_scheduler(t->combiner, false)),
+ grpc_closure_sched(exec_ctx,
+ grpc_closure_create(destroy_transport_locked, t,
+ grpc_combiner_scheduler(t->combiner)),
GRPC_ERROR_NONE);
}
@@ -678,7 +677,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_slice_buffer_init(&s->frame_storage);
s->pending_byte_stream = false;
grpc_closure_init(&s->reset_byte_stream, reset_byte_stream, s,
- grpc_combiner_scheduler(t->combiner, false));
+ grpc_combiner_scheduler(t->combiner));
GRPC_CHTTP2_REF_TRANSPORT(t, "stream");
@@ -762,7 +761,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->destroy_stream_arg = then_schedule_closure;
grpc_closure_sched(
exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s,
- grpc_combiner_scheduler(t->combiner, false)),
+ grpc_combiner_scheduler(t->combiner)),
GRPC_ERROR_NONE);
GPR_TIMER_END("destroy_stream", 0);
}
@@ -800,8 +799,6 @@ static const char *write_state_name(grpc_chttp2_write_state st) {
return "WRITING";
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
return "WRITING+MORE";
- case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER:
- return "WRITING+MORE+COVERED";
}
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
@@ -824,8 +821,7 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- bool covered_by_poller, const char *reason) {
+ grpc_chttp2_transport *t, const char *reason) {
GPR_TIMER_BEGIN("grpc_chttp2_initiate_write", 0);
switch (t->write_state) {
@@ -834,28 +830,16 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_closure_sched(
exec_ctx,
- grpc_closure_init(
- &t->write_action_begin_locked, write_action_begin_locked, t,
- grpc_combiner_finally_scheduler(t->combiner, covered_by_poller)),
+ grpc_closure_init(&t->write_action_begin_locked,
+ write_action_begin_locked, t,
+ grpc_combiner_finally_scheduler(t->combiner)),
GRPC_ERROR_NONE);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING:
- set_write_state(
- exec_ctx, t,
- covered_by_poller
- ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER
- : GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
- reason);
+ set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
+ reason);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
- if (covered_by_poller) {
- set_write_state(
- exec_ctx, t,
- GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER,
- reason);
- }
- break;
- case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER:
break;
}
GPR_TIMER_END("grpc_chttp2_initiate_write", 0);
@@ -871,10 +855,10 @@ void grpc_chttp2_become_writable(
case GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK:
break;
case GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED:
- grpc_chttp2_initiate_write(exec_ctx, t, true, reason);
+ grpc_chttp2_initiate_write(exec_ctx, t, reason);
break;
case GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED:
- grpc_chttp2_initiate_write(exec_ctx, t, false, reason);
+ grpc_chttp2_initiate_write(exec_ctx, t, reason);
break;
}
}
@@ -911,7 +895,7 @@ static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) {
grpc_endpoint_write(
exec_ctx, t->ep, &t->outbuf,
grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t,
- grpc_combiner_scheduler(t->combiner, false)));
+ grpc_combiner_scheduler(t->combiner)));
GPR_TIMER_END("write_action", 0);
}
@@ -947,21 +931,9 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_closure_run(
exec_ctx,
- grpc_closure_init(
- &t->write_action_begin_locked, write_action_begin_locked, t,
- grpc_combiner_finally_scheduler(t->combiner, false)),
- GRPC_ERROR_NONE);
- break;
- case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER:
- GPR_TIMER_MARK("state=writing_stale_with_poller", 0);
- set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
- "continue writing [covered]");
- GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
- grpc_closure_run(
- exec_ctx,
grpc_closure_init(&t->write_action_begin_locked,
write_action_begin_locked, t,
- grpc_combiner_finally_scheduler(t->combiner, true)),
+ grpc_combiner_finally_scheduler(t->combiner)),
GRPC_ERROR_NONE);
break;
}
@@ -984,7 +956,7 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) {
t->settings[GRPC_LOCAL_SETTINGS][id] = use_value;
t->dirtied_local_settings = 1;
- grpc_chttp2_initiate_write(exec_ctx, t, false, "push_setting");
+ grpc_chttp2_initiate_write(exec_ctx, t, "push_setting");
}
}
@@ -1380,7 +1352,6 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
s->next_message_end_offset = s->flow_controlled_bytes_written +
(int64_t)s->flow_controlled_buffer.length +
(int64_t)len;
- s->complete_fetch_covered_by_poller = op->covered_by_poller;
if (flags & GRPC_WRITE_BUFFER_HINT) {
s->next_message_end_offset -= t->write_buffer_size;
s->write_buffering = true;
@@ -1502,9 +1473,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
grpc_closure_sched(
exec_ctx,
- grpc_closure_init(
- &op->handler_private.closure, perform_stream_op_locked, op,
- grpc_combiner_scheduler(t->combiner, op->covered_by_poller)),
+ grpc_closure_init(&op->handler_private.closure, perform_stream_op_locked,
+ op, grpc_combiner_scheduler(t->combiner)),
GRPC_ERROR_NONE);
GPR_TIMER_END("perform_stream_op", 0);
}
@@ -1531,7 +1501,7 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_ERROR_NONE);
if (grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
GRPC_ERROR_NONE)) {
- grpc_chttp2_initiate_write(exec_ctx, t, false, "send_ping");
+ grpc_chttp2_initiate_write(exec_ctx, t, "send_ping");
}
}
@@ -1539,7 +1509,7 @@ static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
grpc_chttp2_transport *t = tp;
t->ping_state.is_delayed_ping_timer_set = false;
- grpc_chttp2_initiate_write(exec_ctx, t, false, "retry_send_ping");
+ grpc_chttp2_initiate_write(exec_ctx, t, "retry_send_ping");
}
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -1554,7 +1524,7 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
grpc_closure_list_sched(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
- grpc_chttp2_initiate_write(exec_ctx, t, false, "continue_pings");
+ grpc_chttp2_initiate_write(exec_ctx, t, "continue_pings");
}
}
@@ -1567,7 +1537,7 @@ static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
&slice, &http_error);
grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)http_error,
grpc_slice_ref_internal(slice), &t->qbuf);
- grpc_chttp2_initiate_write(exec_ctx, t, false, "goaway_sent");
+ grpc_chttp2_initiate_write(exec_ctx, t, "goaway_sent");
GRPC_ERROR_UNREF(error);
}
@@ -1593,12 +1563,6 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t = op->handler_private.extra_arg;
grpc_error *close_transport = op->disconnect_with_error;
- if (op->on_connectivity_state_change != NULL) {
- grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state,
- op->on_connectivity_state_change);
- }
-
if (op->goaway_error) {
send_goaway(exec_ctx, t, op->goaway_error);
}
@@ -1622,6 +1586,12 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
op->send_ping);
}
+ if (op->on_connectivity_state_change != NULL) {
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state,
+ op->on_connectivity_state_change);
+ }
+
if (close_transport != GRPC_ERROR_NONE) {
close_transport_locked(exec_ctx, t, close_transport);
}
@@ -1638,11 +1608,11 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
gpr_free(msg);
op->handler_private.extra_arg = gt;
GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
- grpc_closure_sched(
- exec_ctx, grpc_closure_init(&op->handler_private.closure,
- perform_transport_op_locked, op,
- grpc_combiner_scheduler(t->combiner, false)),
- GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx,
+ grpc_closure_init(&op->handler_private.closure,
+ perform_transport_op_locked, op,
+ grpc_combiner_scheduler(t->combiner)),
+ GRPC_ERROR_NONE);
}
/*******************************************************************************
@@ -1797,7 +1767,7 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_add(
&t->qbuf, grpc_chttp2_rst_stream_create(s->id, (uint32_t)http_error,
&s->stats.outgoing));
- grpc_chttp2_initiate_write(exec_ctx, t, false, "rst_stream");
+ grpc_chttp2_initiate_write(exec_ctx, t, "rst_stream");
}
}
if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) {
@@ -2110,7 +2080,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
&s->stats.outgoing));
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, 1, 1, error);
- grpc_chttp2_initiate_write(exec_ctx, t, false, "close_from_api");
+ grpc_chttp2_initiate_write(exec_ctx, t, "close_from_api");
}
typedef struct {
@@ -2641,9 +2611,9 @@ static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
bs->next_action.on_complete = on_complete;
grpc_closure_sched(
exec_ctx,
- grpc_closure_init(
- &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
- grpc_combiner_scheduler(bs->transport->combiner, false)),
+ grpc_closure_init(&bs->next_action.closure,
+ incoming_byte_stream_next_locked, bs,
+ grpc_combiner_scheduler(bs->transport->combiner)),
GRPC_ERROR_NONE);
GPR_TIMER_END("incoming_byte_stream_next", 0);
return false;
@@ -2698,10 +2668,9 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
grpc_closure_sched(
- exec_ctx,
- grpc_closure_init(
- &bs->destroy_action, incoming_byte_stream_destroy_locked, bs,
- grpc_combiner_scheduler(bs->transport->combiner, false)),
+ exec_ctx, grpc_closure_init(
+ &bs->destroy_action, incoming_byte_stream_destroy_locked,
+ bs, grpc_combiner_scheduler(bs->transport->combiner)),
GRPC_ERROR_NONE);
GPR_TIMER_END("incoming_byte_stream_destroy", 0);
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c
index f09ca60739..c6269e29f9 100644
--- a/src/core/ext/transport/chttp2/transport/frame_ping.c
+++ b/src/core/ext/transport/chttp2/transport/frame_ping.c
@@ -132,7 +132,7 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
t->ping_acks, t->ping_ack_capacity * sizeof(*t->ping_acks));
}
t->ping_acks[t->ping_ack_count++] = p->opaque_8bytes;
- grpc_chttp2_initiate_write(exec_ctx, t, false, "ping response");
+ grpc_chttp2_initiate_write(exec_ctx, t, "ping response");
}
}
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c
index 8ed72dddca..579eb6d0c0 100644
--- a/src/core/ext/transport/chttp2/transport/frame_window_update.c
+++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c
@@ -124,8 +124,7 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
received_update);
bool is_zero = t->outgoing_window <= 0;
if (was_zero && !is_zero) {
- grpc_chttp2_initiate_write(exec_ctx, t, false,
- "new_global_flow_control");
+ grpc_chttp2_initiate_write(exec_ctx, t, "new_global_flow_control");
}
}
}
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c
index bb98bc4a79..0576cd1e79 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c
@@ -1664,7 +1664,7 @@ static void force_client_rst_stream(grpc_exec_ctx *exec_ctx, void *sp,
grpc_slice_buffer_add(
&t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_HTTP2_NO_ERROR,
&s->stats.outgoing));
- grpc_chttp2_initiate_write(exec_ctx, t, false, "force_rst_stream");
+ grpc_chttp2_initiate_write(exec_ctx, t, "force_rst_stream");
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, true, GRPC_ERROR_NONE);
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "final_rst");
@@ -1712,9 +1712,9 @@ grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx,
and can avoid the extra write */
GRPC_CHTTP2_STREAM_REF(s, "final_rst");
grpc_closure_sched(
- exec_ctx, grpc_closure_create(force_client_rst_stream, s,
- grpc_combiner_finally_scheduler(
- t->combiner, false)),
+ exec_ctx,
+ grpc_closure_create(force_client_rst_stream, s,
+ grpc_combiner_finally_scheduler(t->combiner)),
GRPC_ERROR_NONE);
}
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false,
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 8d66e396ee..211d6aacf1 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -73,7 +73,6 @@ typedef enum {
GRPC_CHTTP2_WRITE_STATE_IDLE,
GRPC_CHTTP2_WRITE_STATE_WRITING,
GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
- GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER,
} grpc_chttp2_write_state;
typedef enum {
@@ -458,7 +457,6 @@ struct grpc_chttp2_stream {
grpc_slice fetching_slice;
int64_t next_message_end_offset;
int64_t flow_controlled_bytes_written;
- bool complete_fetch_covered_by_poller;
grpc_closure complete_fetch_locked;
grpc_closure *fetching_send_message_finished;
@@ -549,8 +547,7 @@ struct grpc_chttp2_stream {
The actual call chain is documented in the implementation of this function.
*/
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- bool covered_by_poller, const char *reason);
+ grpc_chttp2_transport *t, const char *reason);
typedef enum {
GRPC_CHTTP2_NOTHING_TO_WRITE,
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 1a676e2259..0b2891d546 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -433,7 +433,7 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window,
incoming_frame_size);
if (t->incoming_window <= target_incoming_window / 2) {
- grpc_chttp2_initiate_write(exec_ctx, t, false, "flow_control");
+ grpc_chttp2_initiate_write(exec_ctx, t, "flow_control");
}
return GRPC_ERROR_NONE;
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 5be1092946..048fe2ed32 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -206,8 +206,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
while (grpc_chttp2_list_pop_stalled_by_transport(t, &s)) {
if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s) &&
stream_ref_if_not_destroyed(&s->refcount->refs)) {
- grpc_chttp2_initiate_write(exec_ctx, t, false,
- "transport.read_flow_control");
+ grpc_chttp2_initiate_write(exec_ctx, t, "transport.read_flow_control");
}
}
}
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index 863f22c614..075f4b2092 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -39,7 +39,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include "src/core/lib/iomgr/workqueue.h"
+#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/profiling/timers.h"
grpc_tracer_flag grpc_combiner_trace = GRPC_TRACER_INITIALIZER(false);
@@ -56,93 +56,47 @@ grpc_tracer_flag grpc_combiner_trace = GRPC_TRACER_INITIALIZER(false);
struct grpc_combiner {
grpc_combiner *next_combiner_on_this_exec_ctx;
- grpc_workqueue *optional_workqueue;
- grpc_closure_scheduler uncovered_scheduler;
- grpc_closure_scheduler covered_scheduler;
- grpc_closure_scheduler uncovered_finally_scheduler;
- grpc_closure_scheduler covered_finally_scheduler;
+ grpc_closure_scheduler scheduler;
+ grpc_closure_scheduler finally_scheduler;
gpr_mpscq queue;
+ // either:
+ // a pointer to the initiating exec ctx if that is the only exec_ctx that has
+ // ever queued to this combiner, or NULL. If this is non-null, it's not
+ // dereferencable (since the initiating exec_ctx may have gone out of scope)
+ gpr_atm initiating_exec_ctx_or_null;
// state is:
// lower bit - zero if orphaned (STATE_UNORPHANED)
// other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT)
gpr_atm state;
- // number of elements in the list that are covered by a poller: if >0, we can
- // offload safely
- gpr_atm elements_covered_by_poller;
bool time_to_execute_final_list;
- bool final_list_covered_by_poller;
grpc_closure_list final_list;
grpc_closure offload;
gpr_refcount refs;
};
-static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx,
- grpc_closure *closure, grpc_error *error);
-static void combiner_exec_covered(grpc_exec_ctx *exec_ctx,
+static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error);
+static void combiner_finally_exec(grpc_exec_ctx *exec_ctx,
grpc_closure *closure, grpc_error *error);
-static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx,
- grpc_closure *closure,
- grpc_error *error);
-static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx,
- grpc_closure *closure,
- grpc_error *error);
-
-static const grpc_closure_scheduler_vtable scheduler_uncovered = {
- combiner_exec_uncovered, combiner_exec_uncovered,
- "combiner:immediately:uncovered"};
-static const grpc_closure_scheduler_vtable scheduler_covered = {
- combiner_exec_covered, combiner_exec_covered,
- "combiner:immediately:covered"};
-static const grpc_closure_scheduler_vtable finally_scheduler_uncovered = {
- combiner_finally_exec_uncovered, combiner_finally_exec_uncovered,
- "combiner:finally:uncovered"};
-static const grpc_closure_scheduler_vtable finally_scheduler_covered = {
- combiner_finally_exec_covered, combiner_finally_exec_covered,
- "combiner:finally:covered"};
-static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
-
-typedef struct {
- grpc_error *error;
- bool covered_by_poller;
-} error_data;
-
-static uintptr_t pack_error_data(error_data d) {
- return ((uintptr_t)d.error) | (d.covered_by_poller ? 1 : 0);
-}
-
-static error_data unpack_error_data(uintptr_t p) {
- return (error_data){(grpc_error *)(p & ~(uintptr_t)1), p & 1};
-}
+static const grpc_closure_scheduler_vtable scheduler = {
+ combiner_exec, combiner_exec, "combiner:immediately"};
+static const grpc_closure_scheduler_vtable finally_scheduler = {
+ combiner_finally_exec, combiner_finally_exec, "combiner:finally"};
-static bool is_covered_by_poller(grpc_combiner *lock) {
- return lock->final_list_covered_by_poller ||
- gpr_atm_acq_load(&lock->elements_covered_by_poller) > 0;
-}
-
-#define IS_COVERED_BY_POLLER_FMT "(final=%d elems=%" PRIdPTR ")->%d"
-#define IS_COVERED_BY_POLLER_ARGS(lock) \
- (lock)->final_list_covered_by_poller, \
- gpr_atm_acq_load(&(lock)->elements_covered_by_poller), \
- is_covered_by_poller((lock))
+static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
-grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
+grpc_combiner *grpc_combiner_create(void) {
grpc_combiner *lock = gpr_malloc(sizeof(*lock));
gpr_ref_init(&lock->refs, 1);
lock->next_combiner_on_this_exec_ctx = NULL;
lock->time_to_execute_final_list = false;
- lock->optional_workqueue = optional_workqueue;
- lock->final_list_covered_by_poller = false;
- lock->uncovered_scheduler.vtable = &scheduler_uncovered;
- lock->covered_scheduler.vtable = &scheduler_covered;
- lock->uncovered_finally_scheduler.vtable = &finally_scheduler_uncovered;
- lock->covered_finally_scheduler.vtable = &finally_scheduler_covered;
+ lock->scheduler.vtable = &scheduler;
+ lock->finally_scheduler.vtable = &finally_scheduler;
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
- gpr_atm_no_barrier_store(&lock->elements_covered_by_poller, 0);
gpr_mpscq_init(&lock->queue);
grpc_closure_list_init(&lock->final_list);
- grpc_closure_init(&lock->offload, offload, lock,
- grpc_workqueue_scheduler(lock->optional_workqueue));
+ grpc_closure_init(&lock->offload, offload, lock, grpc_executor_scheduler);
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock));
return lock;
}
@@ -151,7 +105,6 @@ static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p really_destroy", lock));
GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0);
gpr_mpscq_destroy(&lock->queue);
- GRPC_WORKQUEUE_UNREF(exec_ctx, lock->optional_workqueue, "combiner");
gpr_free(lock);
}
@@ -208,48 +161,40 @@ static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx,
}
}
-static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *cl, grpc_error *error,
- bool covered_by_poller) {
+#define COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler_name) \
+ ((grpc_combiner *)(((char *)((closure)->scheduler)) - \
+ offsetof(grpc_combiner, scheduler_name)))
+
+static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *cl,
+ grpc_error *error) {
GPR_TIMER_BEGIN("combiner.execute", 0);
+ grpc_combiner *lock = COMBINER_FROM_CLOSURE_SCHEDULER(cl, scheduler);
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
- GRPC_COMBINER_TRACE(gpr_log(
- GPR_DEBUG, "C:%p grpc_combiner_execute c=%p cov=%d last=%" PRIdPTR, lock,
- cl, covered_by_poller, last));
- GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed
- assert(cl->cb);
- cl->error_data.scratch =
- pack_error_data((error_data){error, covered_by_poller});
- if (covered_by_poller) {
- gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, 1);
- }
- gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
+ GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG,
+ "C:%p grpc_combiner_execute c=%p last=%" PRIdPTR,
+ lock, cl, last));
if (last == 1) {
+ gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null,
+ (gpr_atm)exec_ctx);
// first element on this list: add it to the list of combiner locks
// executing within this exec_ctx
push_last_on_exec_ctx(exec_ctx, lock);
+ } else {
+ // there may be a race with setting here: if that happens, we may delay
+ // offload for one or two actions, and that's fine
+ gpr_atm initiator =
+ gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null);
+ if (initiator != 0 && initiator != (gpr_atm)exec_ctx) {
+ gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, 0);
+ }
}
+ GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed
+ assert(cl->cb);
+ cl->error_data.error = error;
+ gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
GPR_TIMER_END("combiner.execute", 0);
}
-#define COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler_name) \
- ((grpc_combiner *)(((char *)((closure)->scheduler)) - \
- offsetof(grpc_combiner, scheduler_name)))
-
-static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx, grpc_closure *cl,
- grpc_error *error) {
- combiner_exec(exec_ctx,
- COMBINER_FROM_CLOSURE_SCHEDULER(cl, uncovered_scheduler), cl,
- error, false);
-}
-
-static void combiner_exec_covered(grpc_exec_ctx *exec_ctx, grpc_closure *cl,
- grpc_error *error) {
- combiner_exec(exec_ctx,
- COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_scheduler), cl,
- error, true);
-}
-
static void move_next(grpc_exec_ctx *exec_ctx) {
exec_ctx->active_combiner =
exec_ctx->active_combiner->next_combiner_on_this_exec_ctx;
@@ -265,8 +210,7 @@ static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
move_next(exec_ctx);
- GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload --> %p", lock,
- lock->optional_workqueue));
+ GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload", lock));
grpc_closure_sched(exec_ctx, &lock->offload, GRPC_ERROR_NONE);
}
@@ -278,18 +222,20 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
return false;
}
- GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG,
- "C:%p grpc_combiner_continue_exec_ctx workqueue=%p "
- "is_covered_by_poller=" IS_COVERED_BY_POLLER_FMT
- " exec_ctx_ready_to_finish=%d "
- "time_to_execute_final_list=%d",
- lock, lock->optional_workqueue, IS_COVERED_BY_POLLER_ARGS(lock),
- grpc_exec_ctx_ready_to_finish(exec_ctx),
- lock->time_to_execute_final_list));
-
- if (lock->optional_workqueue != NULL && is_covered_by_poller(lock) &&
- grpc_exec_ctx_ready_to_finish(exec_ctx)) {
+ bool contended =
+ gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null) == 0;
+
+ GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG,
+ "C:%p grpc_combiner_continue_exec_ctx "
+ "contended=%d "
+ "exec_ctx_ready_to_finish=%d "
+ "time_to_execute_final_list=%d",
+ lock, contended,
+ grpc_exec_ctx_ready_to_finish(exec_ctx),
+ lock->time_to_execute_final_list));
+
+ if (contended && grpc_exec_ctx_ready_to_finish(exec_ctx) &&
+ grpc_executor_is_threaded()) {
GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
// this execution context wants to move on, and we have a workqueue (and
// so can help the execution context out): schedule remaining work to be
@@ -310,29 +256,23 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
// queue is in an inconsistent state: use this as a cue that we should
// go off and do something else for a while (and come back later)
GPR_TIMER_MARK("delay_busy", 0);
- if (lock->optional_workqueue != NULL && is_covered_by_poller(lock)) {
- queue_offload(exec_ctx, lock);
- }
+ queue_offload(exec_ctx, lock);
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return true;
}
GPR_TIMER_BEGIN("combiner.exec1", 0);
grpc_closure *cl = (grpc_closure *)n;
- error_data err = unpack_error_data(cl->error_data.scratch);
+ grpc_error *cl_err = cl->error_data.error;
#ifndef NDEBUG
cl->scheduled = false;
#endif
- cl->cb(exec_ctx, cl->cb_arg, err.error);
- if (err.covered_by_poller) {
- gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, -1);
- }
- GRPC_ERROR_UNREF(err.error);
+ cl->cb(exec_ctx, cl->cb_arg, cl_err);
+ GRPC_ERROR_UNREF(cl_err);
GPR_TIMER_END("combiner.exec1", 0);
} else {
grpc_closure *c = lock->final_list.head;
GPR_ASSERT(c != NULL);
grpc_closure_list_init(&lock->final_list);
- lock->final_list_covered_by_poller = false;
int loops = 0;
while (c != NULL) {
GPR_TIMER_BEGIN("combiner.exec_1final", 0);
@@ -398,20 +338,20 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
grpc_error *error);
-static void combiner_execute_finally(grpc_exec_ctx *exec_ctx,
- grpc_combiner *lock, grpc_closure *closure,
- grpc_error *error,
- bool covered_by_poller) {
- GRPC_COMBINER_TRACE(gpr_log(
- GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p; cov=%d", lock,
- closure, exec_ctx->active_combiner, covered_by_poller));
+static void combiner_finally_exec(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure, grpc_error *error) {
+ grpc_combiner *lock =
+ COMBINER_FROM_CLOSURE_SCHEDULER(closure, finally_scheduler);
+ GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG,
+ "C:%p grpc_combiner_execute_finally c=%p; ac=%p",
+ lock, closure, exec_ctx->active_combiner));
GPR_TIMER_BEGIN("combiner.execute_finally", 0);
if (exec_ctx->active_combiner != lock) {
GPR_TIMER_MARK("slowpath", 0);
- grpc_closure_sched(
- exec_ctx, grpc_closure_create(enqueue_finally, closure,
- grpc_combiner_scheduler(lock, false)),
- error);
+ grpc_closure_sched(exec_ctx,
+ grpc_closure_create(enqueue_finally, closure,
+ grpc_combiner_scheduler(lock)),
+ error);
GPR_TIMER_END("combiner.execute_finally", 0);
return;
}
@@ -419,42 +359,20 @@ static void combiner_execute_finally(grpc_exec_ctx *exec_ctx,
if (grpc_closure_list_empty(lock->final_list)) {
gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
}
- if (covered_by_poller) {
- lock->final_list_covered_by_poller = true;
- }
grpc_closure_list_append(&lock->final_list, closure, error);
GPR_TIMER_END("combiner.execute_finally", 0);
}
static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
grpc_error *error) {
- combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure,
- GRPC_ERROR_REF(error), false);
-}
-
-static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx,
- grpc_closure *cl,
- grpc_error *error) {
- combiner_execute_finally(exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER(
- cl, uncovered_finally_scheduler),
- cl, error, false);
-}
-
-static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx,
- grpc_closure *cl, grpc_error *error) {
- combiner_execute_finally(
- exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_finally_scheduler),
- cl, error, true);
+ combiner_finally_exec(exec_ctx, closure, GRPC_ERROR_REF(error));
}
-grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *combiner,
- bool covered_by_poller) {
- return covered_by_poller ? &combiner->covered_scheduler
- : &combiner->uncovered_scheduler;
+grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *combiner) {
+ return &combiner->scheduler;
}
grpc_closure_scheduler *grpc_combiner_finally_scheduler(
- grpc_combiner *combiner, bool covered_by_poller) {
- return covered_by_poller ? &combiner->covered_finally_scheduler
- : &combiner->uncovered_finally_scheduler;
+ grpc_combiner *combiner) {
+ return &combiner->finally_scheduler;
}
diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h
index 6ab7a2b26b..5aa8443667 100644
--- a/src/core/lib/iomgr/combiner.h
+++ b/src/core/lib/iomgr/combiner.h
@@ -48,7 +48,7 @@
// Initialize the lock, with an optional workqueue to shift load to when
// necessary
-grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue);
+grpc_combiner *grpc_combiner_create(void);
//#define GRPC_COMBINER_REFCOUNT_DEBUG
#ifdef GRPC_COMBINER_REFCOUNT_DEBUG
@@ -71,11 +71,9 @@ grpc_combiner *grpc_combiner_ref(grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS);
void grpc_combiner_unref(grpc_exec_ctx *exec_ctx,
grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS);
// Fetch a scheduler to schedule closures against
-grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock,
- bool covered_by_poller);
+grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock);
// Scheduler to execute \a action within the lock just prior to unlocking.
-grpc_closure_scheduler *grpc_combiner_finally_scheduler(grpc_combiner *lock,
- bool covered_by_poller);
+grpc_closure_scheduler *grpc_combiner_finally_scheduler(grpc_combiner *lock);
bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx);
diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c
index bf6e98146a..60b8410a45 100644
--- a/src/core/lib/iomgr/endpoint.c
+++ b/src/core/lib/iomgr/endpoint.c
@@ -69,10 +69,6 @@ char* grpc_endpoint_get_peer(grpc_endpoint* ep) {
int grpc_endpoint_get_fd(grpc_endpoint* ep) { return ep->vtable->get_fd(ep); }
-grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) {
- return ep->vtable->get_workqueue(ep);
-}
-
grpc_resource_user* grpc_endpoint_get_resource_user(grpc_endpoint* ep) {
return ep->vtable->get_resource_user(ep);
}
diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h
index 740357ecc5..f8cee0158b 100644
--- a/src/core/lib/iomgr/endpoint.h
+++ b/src/core/lib/iomgr/endpoint.h
@@ -52,7 +52,6 @@ struct grpc_endpoint_vtable {
grpc_slice_buffer *slices, grpc_closure *cb);
void (*write)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_slice_buffer *slices, grpc_closure *cb);
- grpc_workqueue *(*get_workqueue)(grpc_endpoint *ep);
void (*add_to_pollset)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset *pollset);
void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
@@ -78,9 +77,6 @@ char *grpc_endpoint_get_peer(grpc_endpoint *ep);
*/
int grpc_endpoint_get_fd(grpc_endpoint *ep);
-/* Retrieve a reference to the workqueue associated with this endpoint */
-grpc_workqueue *grpc_endpoint_get_workqueue(grpc_endpoint *ep);
-
/* Write slices out to the socket.
If the connection is ready for more data after the end of the call, it
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index ad69f808cd..39785f6d56 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -58,7 +58,6 @@
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/lockfree_event.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
@@ -130,7 +129,9 @@ struct grpc_pollset {
* Pollset-set Declarations
*/
-struct grpc_pollset_set {};
+struct grpc_pollset_set {
+ char unused;
+};
/*******************************************************************************
* Common helpers
@@ -283,10 +284,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
}
-static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
- return (grpc_workqueue *)0xb0b51ed;
-}
-
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
@@ -313,8 +310,6 @@ GPR_TLS_DECL(g_current_thread_worker);
static gpr_atm g_active_poller;
static pollset_neighbourhood *g_neighbourhoods;
static size_t g_num_neighbourhoods;
-static gpr_mu g_wq_mu;
-static grpc_closure_list g_wq_items;
/* Return true if first in list */
static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) {
@@ -363,8 +358,6 @@ static grpc_error *pollset_global_init(void) {
gpr_atm_no_barrier_store(&g_active_poller, 0);
global_wakeup_fd.read_fd = -1;
grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd);
- gpr_mu_init(&g_wq_mu);
- g_wq_items = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
if (err != GRPC_ERROR_NONE) return err;
struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
.data.ptr = &global_wakeup_fd};
@@ -383,7 +376,6 @@ static grpc_error *pollset_global_init(void) {
static void pollset_global_shutdown(void) {
gpr_tls_destroy(&g_current_thread_pollset);
gpr_tls_destroy(&g_current_thread_worker);
- gpr_mu_destroy(&g_wq_mu);
if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
for (size_t i = 0; i < g_num_neighbourhoods; i++) {
gpr_mu_destroy(&g_neighbourhoods[i].mu);
@@ -507,9 +499,6 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
for (int i = 0; i < r; i++) {
void *data_ptr = events[i].data.ptr;
if (data_ptr == &global_wakeup_fd) {
- gpr_mu_lock(&g_wq_mu);
- grpc_closure_list_move(&g_wq_items, &exec_ctx->closure_list);
- gpr_mu_unlock(&g_wq_mu);
append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc);
} else {
@@ -792,84 +781,6 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_fd *fd) {}
/*******************************************************************************
- * Workqueue Definitions
- */
-
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
- const char *file, int line,
- const char *reason) {
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {}
-#else
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue) {}
-#endif
-
-static void wq_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error) {
- // find a neighbourhood to wakeup
- bool scheduled = false;
- size_t initial_neighbourhood = choose_neighbourhood();
- for (size_t i = 0; !scheduled && i < g_num_neighbourhoods; i++) {
- pollset_neighbourhood *neighbourhood =
- &g_neighbourhoods[(initial_neighbourhood + i) % g_num_neighbourhoods];
- if (gpr_mu_trylock(&neighbourhood->mu)) {
- if (neighbourhood->active_root != NULL) {
- grpc_pollset *inspect = neighbourhood->active_root;
- do {
- if (gpr_mu_trylock(&inspect->mu)) {
- if (inspect->root_worker != NULL) {
- grpc_pollset_worker *inspect_worker = inspect->root_worker;
- do {
- if (inspect_worker->kick_state == UNKICKED) {
- inspect_worker->kick_state = KICKED;
- grpc_closure_list_append(
- &inspect_worker->schedule_on_end_work, closure, error);
- if (inspect_worker->initialized_cv) {
- gpr_cv_signal(&inspect_worker->cv);
- }
- scheduled = true;
- }
- inspect_worker = inspect_worker->next;
- } while (!scheduled && inspect_worker != inspect->root_worker);
- }
- gpr_mu_unlock(&inspect->mu);
- }
- inspect = inspect->next;
- } while (!scheduled && inspect != neighbourhood->active_root);
- }
- gpr_mu_unlock(&neighbourhood->mu);
- }
- }
- if (!scheduled) {
- gpr_mu_lock(&g_wq_mu);
- grpc_closure_list_append(&g_wq_items, closure, error);
- gpr_mu_unlock(&g_wq_mu);
- GRPC_LOG_IF_ERROR("workqueue_scheduler",
- grpc_wakeup_fd_wakeup(&global_wakeup_fd));
- }
-}
-
-static const grpc_closure_scheduler_vtable
- singleton_workqueue_scheduler_vtable = {wq_sched, wq_sched,
- "epoll1_workqueue"};
-
-static grpc_closure_scheduler singleton_workqueue_scheduler = {
- &singleton_workqueue_scheduler_vtable};
-
-static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
- return &singleton_workqueue_scheduler;
-}
-
-/*******************************************************************************
* Pollset-set Definitions
*/
@@ -920,7 +831,6 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
- .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
@@ -938,10 +848,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_set_add_fd = pollset_set_add_fd,
.pollset_set_del_fd = pollset_set_del_fd,
- .workqueue_ref = workqueue_ref,
- .workqueue_unref = workqueue_unref,
- .workqueue_scheduler = workqueue_scheduler,
-
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
index d23bf6c06c..d7a2d35484 100644
--- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
@@ -61,7 +61,6 @@
#include "src/core/lib/iomgr/lockfree_event.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
#include "src/core/lib/support/env.h"
@@ -184,13 +183,15 @@ static void fd_global_shutdown(void);
* Polling island Declarations
*/
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+//#define PI_REFCOUNT_DEBUG
+
+#ifdef PI_REFCOUNT_DEBUG
#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
#define PI_UNREF(exec_ctx, p, r) \
pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
-#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */
+#else /* defined(PI_REFCOUNT_DEBUG) */
#define PI_ADD_REF(p, r) pi_add_ref((p))
#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
@@ -204,8 +205,6 @@ typedef struct worker_node {
/* This is also used as grpc_workqueue (by directly casing it) */
typedef struct polling_island {
- grpc_closure_scheduler workqueue_scheduler;
-
gpr_mu mu;
/* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
the refcount.
@@ -226,15 +225,6 @@ typedef struct polling_island {
/* Number of threads currently polling on this island */
gpr_atm poller_count;
- /* Mutex guarding the read end of the workqueue (must be held to pop from
- * workqueue_items) */
- gpr_mu workqueue_read_mu;
- /* Queue of closures to be executed */
- gpr_mpscq workqueue_items;
- /* Count of items in workqueue_items */
- gpr_atm workqueue_item_count;
- /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
- grpc_wakeup_fd workqueue_wakeup_fd;
/* The list of workers waiting to do polling on this polling island */
gpr_mu worker_list_mu;
@@ -323,8 +313,6 @@ static __thread polling_island *g_current_thread_polling_island;
/* Forward declaration */
static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error);
#ifdef GRPC_TSAN
/* Currently TSAN may incorrectly flag data races between epoll_ctl and
@@ -337,13 +325,10 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
gpr_atm g_epoll_sync;
#endif /* defined(GRPC_TSAN) */
-static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
- workqueue_enqueue, workqueue_enqueue, "workqueue"};
-
static void pi_add_ref(polling_island *pi);
static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+#ifdef PI_REFCOUNT_DEBUG
static void pi_add_ref_dbg(polling_island *pi, const char *reason,
const char *file, int line) {
long old_cnt = gpr_atm_acq_load(&pi->ref_count);
@@ -359,36 +344,6 @@ static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
(void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
}
-
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
- const char *file, int line,
- const char *reason) {
- if (workqueue != NULL) {
- pi_add_ref_dbg((polling_island *)workqueue, reason, file, line);
- }
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {
- if (workqueue != NULL) {
- pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line);
- }
-}
-#else
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
- if (workqueue != NULL) {
- pi_add_ref((polling_island *)workqueue);
- }
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue) {
- if (workqueue != NULL) {
- pi_unref(exec_ctx, (polling_island *)workqueue);
- }
-}
#endif
static void pi_add_ref(polling_island *pi) {
@@ -592,17 +547,12 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
*error = GRPC_ERROR_NONE;
pi = gpr_malloc(sizeof(*pi));
- pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
gpr_mu_init(&pi->mu);
pi->fd_cnt = 0;
pi->fd_capacity = 0;
pi->fds = NULL;
pi->epoll_fd = -1;
- gpr_mu_init(&pi->workqueue_read_mu);
- gpr_mpscq_init(&pi->workqueue_items);
- gpr_atm_rel_store(&pi->workqueue_item_count, 0);
-
gpr_atm_rel_store(&pi->ref_count, 0);
gpr_atm_rel_store(&pi->poller_count, 0);
gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
@@ -610,11 +560,6 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
gpr_mu_init(&pi->worker_list_mu);
worker_node_init(&pi->worker_list_head);
- if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
- err_desc)) {
- goto done;
- }
-
pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (pi->epoll_fd < 0) {
@@ -622,8 +567,6 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
goto done;
}
- polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
-
if (initial_fd != NULL) {
polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
}
@@ -642,11 +585,7 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
if (pi->epoll_fd >= 0) {
close(pi->epoll_fd);
}
- GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
- gpr_mu_destroy(&pi->workqueue_read_mu);
- gpr_mpscq_destroy(&pi->workqueue_items);
gpr_mu_destroy(&pi->mu);
- grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
gpr_mu_destroy(&pi->worker_list_mu);
GPR_ASSERT(is_worker_node_detached(&pi->worker_list_head));
@@ -794,45 +733,6 @@ static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
}
}
-static void workqueue_maybe_wakeup(polling_island *pi) {
- /* If this thread is the current poller, then it may be that it's about to
- decrement the current poller count, so we need to look past this thread */
- bool is_current_poller = (g_current_thread_polling_island == pi);
- gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
- gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
- /* Only issue a wakeup if it's likely that some poller could come in and take
- it right now. Note that since we do an anticipatory mpscq_pop every poll
- loop, it's ok if we miss the wakeup here, as we'll get the work item when
- the next poller enters anyway. */
- if (current_pollers > min_current_pollers_for_wakeup) {
- GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
- grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
- }
-}
-
-static void workqueue_move_items_to_parent(polling_island *q) {
- polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
- if (p == NULL) {
- return;
- }
- gpr_mu_lock(&q->workqueue_read_mu);
- int num_added = 0;
- while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
- gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
- if (n != NULL) {
- gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
- gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
- gpr_mpscq_push(&p->workqueue_items, n);
- num_added++;
- }
- }
- gpr_mu_unlock(&q->workqueue_read_mu);
- if (num_added > 0) {
- workqueue_maybe_wakeup(p);
- }
- workqueue_move_items_to_parent(p);
-}
-
static polling_island *polling_island_merge(polling_island *p,
polling_island *q,
grpc_error **error) {
@@ -857,8 +757,6 @@ static polling_island *polling_island_merge(polling_island *p,
/* Add the 'merged_to' link from p --> q */
gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
-
- workqueue_move_items_to_parent(p);
}
/* else if p == q, nothing needs to be done */
@@ -869,32 +767,6 @@ static polling_island *polling_island_merge(polling_island *p,
return q;
}
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error) {
- GPR_TIMER_BEGIN("workqueue.enqueue", 0);
- grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler;
- /* take a ref to the workqueue: otherwise it can happen that whatever events
- * this kicks off ends up destroying the workqueue before this function
- * completes */
- GRPC_WORKQUEUE_REF(workqueue, "enqueue");
- polling_island *pi = (polling_island *)workqueue;
- gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
- closure->error_data.error = error;
- gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
- if (last == 0) {
- workqueue_maybe_wakeup(pi);
- }
- workqueue_move_items_to_parent(pi);
- GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
- GPR_TIMER_END("workqueue.enqueue", 0);
-}
-
-static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
- polling_island *pi = (polling_island *)workqueue;
- return workqueue == NULL ? grpc_schedule_on_exec_ctx
- : &pi->workqueue_scheduler;
-}
-
static grpc_error *polling_island_global_init() {
grpc_error *error = GRPC_ERROR_NONE;
@@ -1153,14 +1025,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
}
-static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
- gpr_mu_lock(&fd->po.mu);
- grpc_workqueue *workqueue =
- GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
- gpr_mu_unlock(&fd->po.mu);
- return workqueue;
-}
-
/*******************************************************************************
* Pollset Definitions
*/
@@ -1432,33 +1296,6 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->po.mu);
}
-static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
- polling_island *pi) {
- if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
- gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
- gpr_mu_unlock(&pi->workqueue_read_mu);
- if (n != NULL) {
- if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) {
- workqueue_maybe_wakeup(pi);
- }
- grpc_closure *c = (grpc_closure *)n;
- grpc_error *error = c->error_data.error;
-#ifndef NDEBUG
- c->scheduled = false;
-#endif
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- return true;
- } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
- /* n == NULL might mean there's work but it's not available to be popped
- * yet - try to ensure another workqueue wakes up to check shortly if so
- */
- workqueue_maybe_wakeup(pi);
- }
- }
- return false;
-}
-
/* NOTE: This function may modify 'now' */
static bool acquire_polling_lease(grpc_pollset_worker *worker,
polling_island *pi, gpr_timespec deadline,
@@ -1594,12 +1431,7 @@ static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd,
for (int i = 0; i < ep_rv; ++i) {
void *data_ptr = ep_ev[i].data.ptr;
- if (data_ptr == &pi->workqueue_wakeup_fd) {
- append_error(error,
- grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd),
- err_desc);
- maybe_do_workqueue_work(exec_ctx, pi);
- } else if (data_ptr == &polling_island_wakeup_fd) {
+ if (data_ptr == &polling_island_wakeup_fd) {
GRPC_POLLING_TRACE(
"pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
"%d) got merged",
@@ -1675,15 +1507,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
PI_ADD_REF(pi, "ps_work");
gpr_mu_unlock(&pollset->po.mu);
- /* If we get some workqueue work to do, it might end up completing an item on
- the completion queue, so there's no need to poll... so we skip that and
- redo the complete loop to verify */
- if (!maybe_do_workqueue_work(exec_ctx, pi)) {
- g_current_thread_polling_island = pi;
- pollset_do_epoll_pwait(exec_ctx, epoll_fd, pollset, pi, worker, now,
- deadline, sig_mask, error);
- g_current_thread_polling_island = NULL;
- }
+ g_current_thread_polling_island = pi;
+ pollset_do_epoll_pwait(exec_ctx, epoll_fd, pollset, pi, worker, now, deadline,
+ sig_mask, error);
+ g_current_thread_polling_island = NULL;
GPR_ASSERT(pi != NULL);
@@ -2036,7 +1863,6 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
- .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
@@ -2054,10 +1880,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_set_add_fd = pollset_set_add_fd,
.pollset_set_del_fd = pollset_set_del_fd,
- .workqueue_ref = workqueue_ref,
- .workqueue_unref = workqueue_unref,
- .workqueue_scheduler = workqueue_scheduler,
-
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
index bb44321922..c56b47be11 100644
--- a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
@@ -61,7 +61,6 @@
#include "src/core/lib/iomgr/lockfree_event.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
@@ -109,23 +108,22 @@ static void fd_global_shutdown(void);
* epoll set Declarations
*/
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+//#define EPS_REFCOUNT_DEBUG
+
+#ifdef EPS_REFCOUNT_DEBUG
#define EPS_ADD_REF(p, r) eps_add_ref_dbg((p), (r), __FILE__, __LINE__)
#define EPS_UNREF(exec_ctx, p, r) \
eps_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
-#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */
+#else /* defined(EPS_REFCOUNT_DEBUG) */
#define EPS_ADD_REF(p, r) eps_add_ref((p))
#define EPS_UNREF(exec_ctx, p, r) eps_unref((exec_ctx), (p))
#endif /* !defined(GRPC_EPS_REF_COUNT_DEBUG) */
-/* This is also used as grpc_workqueue (by directly casting it) */
typedef struct epoll_set {
- grpc_closure_scheduler workqueue_scheduler;
-
/* Mutex poller should acquire to poll this. This enforces that only one
* poller can be polling on epoll_set at any time */
gpr_mu mu;
@@ -139,15 +137,6 @@ typedef struct epoll_set {
/* Number of threads currently polling on this epoll set*/
gpr_atm poller_count;
- /* Mutex guarding the read end of the workqueue (must be held to pop from
- * workqueue_items) */
- gpr_mu workqueue_read_mu;
- /* Queue of closures to be executed */
- gpr_mpscq workqueue_items;
- /* Count of items in workqueue_items */
- gpr_atm workqueue_item_count;
- /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
- grpc_wakeup_fd workqueue_wakeup_fd;
/* Is the epoll set shutdown */
gpr_atm is_shutdown;
@@ -181,7 +170,9 @@ struct grpc_pollset {
/*******************************************************************************
* Pollset-set Declarations
*/
-struct grpc_pollset_set {};
+struct grpc_pollset_set {
+ char unused;
+};
/*****************************************************************************
* Dedicated polling threads and pollsets - Declarations
@@ -235,8 +226,6 @@ static __thread epoll_set *g_current_thread_epoll_set;
/* Forward declaration */
static void epoll_set_delete(epoll_set *eps);
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error);
#ifdef GRPC_TSAN
/* Currently TSAN may incorrectly flag data races between epoll_ctl and
@@ -249,13 +238,10 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
gpr_atm g_epoll_sync;
#endif /* defined(GRPC_TSAN) */
-static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
- workqueue_enqueue, workqueue_enqueue, "workqueue"};
-
static void eps_add_ref(epoll_set *eps);
static void eps_unref(grpc_exec_ctx *exec_ctx, epoll_set *eps);
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+#ifdef EPS_REFCOUNT_DEBUG
static void eps_add_ref_dbg(epoll_set *eps, const char *reason,
const char *file, int line) {
long old_cnt = gpr_atm_acq_load(&eps->ref_count);
@@ -271,36 +257,6 @@ static void eps_unref_dbg(grpc_exec_ctx *exec_ctx, epoll_set *eps,
gpr_log(GPR_DEBUG, "Unref eps: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
(void *)eps, old_cnt, (old_cnt - 1), reason, file, line);
}
-
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
- const char *file, int line,
- const char *reason) {
- if (workqueue != NULL) {
- eps_add_ref_dbg((epoll_set *)workqueue, reason, file, line);
- }
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {
- if (workqueue != NULL) {
- eps_unref_dbg(exec_ctx, (epoll_set *)workqueue, reason, file, line);
- }
-}
-#else
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
- if (workqueue != NULL) {
- eps_add_ref((epoll_set *)workqueue);
- }
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue) {
- if (workqueue != NULL) {
- eps_unref(exec_ctx, (epoll_set *)workqueue);
- }
-}
#endif
static void eps_add_ref(epoll_set *eps) {
@@ -394,24 +350,15 @@ static epoll_set *epoll_set_create(grpc_error **error) {
*error = GRPC_ERROR_NONE;
eps = gpr_malloc(sizeof(*eps));
- eps->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
eps->epoll_fd = -1;
gpr_mu_init(&eps->mu);
- gpr_mu_init(&eps->workqueue_read_mu);
- gpr_mpscq_init(&eps->workqueue_items);
- gpr_atm_rel_store(&eps->workqueue_item_count, 0);
gpr_atm_rel_store(&eps->ref_count, 0);
gpr_atm_rel_store(&eps->poller_count, 0);
gpr_atm_rel_store(&eps->is_shutdown, false);
- if (!append_error(error, grpc_wakeup_fd_init(&eps->workqueue_wakeup_fd),
- err_desc)) {
- goto done;
- }
-
eps->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (eps->epoll_fd < 0) {
@@ -419,8 +366,6 @@ static epoll_set *epoll_set_create(grpc_error **error) {
goto done;
}
- epoll_set_add_wakeup_fd_locked(eps, &eps->workqueue_wakeup_fd, error);
-
done:
if (*error != GRPC_ERROR_NONE) {
epoll_set_delete(eps);
@@ -434,57 +379,11 @@ static void epoll_set_delete(epoll_set *eps) {
close(eps->epoll_fd);
}
- GPR_ASSERT(gpr_atm_no_barrier_load(&eps->workqueue_item_count) == 0);
gpr_mu_destroy(&eps->mu);
- gpr_mu_destroy(&eps->workqueue_read_mu);
- gpr_mpscq_destroy(&eps->workqueue_items);
- grpc_wakeup_fd_destroy(&eps->workqueue_wakeup_fd);
gpr_free(eps);
}
-static void workqueue_maybe_wakeup(epoll_set *eps) {
- /* If this thread is the current poller, then it may be that it's about to
- decrement the current poller count, so we need to look past this thread */
- bool is_current_poller = (g_current_thread_epoll_set == eps);
- gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
- gpr_atm current_pollers = gpr_atm_no_barrier_load(&eps->poller_count);
- /* Only issue a wakeup if it's likely that some poller could come in and take
- it right now. Note that since we do an anticipatory mpscq_pop every poll
- loop, it's ok if we miss the wakeup here, as we'll get the work item when
- the next poller enters anyway. */
- if (current_pollers > min_current_pollers_for_wakeup) {
- GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
- grpc_wakeup_fd_wakeup(&eps->workqueue_wakeup_fd));
- }
-}
-
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error) {
- GPR_TIMER_BEGIN("workqueue.enqueue", 0);
- grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler;
- /* take a ref to the workqueue: otherwise it can happen that whatever events
- * this kicks off ends up destroying the workqueue before this function
- * completes */
- GRPC_WORKQUEUE_REF(workqueue, "enqueue");
- epoll_set *eps = (epoll_set *)workqueue;
- gpr_atm last = gpr_atm_no_barrier_fetch_add(&eps->workqueue_item_count, 1);
- closure->error_data.error = error;
- gpr_mpscq_push(&eps->workqueue_items, &closure->next_data.atm_next);
- if (last == 0) {
- workqueue_maybe_wakeup(eps);
- }
-
- GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
- GPR_TIMER_END("workqueue.enqueue", 0);
-}
-
-static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
- epoll_set *eps = (epoll_set *)workqueue;
- return workqueue == NULL ? grpc_schedule_on_exec_ctx
- : &eps->workqueue_scheduler;
-}
-
static grpc_error *epoll_set_global_init() {
grpc_error *error = GRPC_ERROR_NONE;
@@ -680,8 +579,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
}
-static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; }
-
/*******************************************************************************
* Pollset Definitions
*/
@@ -865,32 +762,6 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
}
-static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, epoll_set *eps) {
- if (gpr_mu_trylock(&eps->workqueue_read_mu)) {
- gpr_mpscq_node *n = gpr_mpscq_pop(&eps->workqueue_items);
- gpr_mu_unlock(&eps->workqueue_read_mu);
- if (n != NULL) {
- if (gpr_atm_full_fetch_add(&eps->workqueue_item_count, -1) > 1) {
- workqueue_maybe_wakeup(eps);
- }
- grpc_closure *c = (grpc_closure *)n;
- grpc_error *error = c->error_data.error;
-#ifndef NDEBUG
- c->scheduled = false;
-#endif
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- return true;
- } else if (gpr_atm_no_barrier_load(&eps->workqueue_item_count) > 0) {
- /* n == NULL might mean there's work but it's not available to be popped
- * yet - try to ensure another workqueue wakes up to check shortly if so
- */
- workqueue_maybe_wakeup(eps);
- }
- }
- return false;
-}
-
/* Blocking call */
static void acquire_epoll_lease(epoll_set *eps) {
if (g_num_threads_per_eps > 1) {
@@ -934,12 +805,7 @@ static void do_epoll_wait(grpc_exec_ctx *exec_ctx, int epoll_fd, epoll_set *eps,
for (int i = 0; i < ep_rv; ++i) {
void *data_ptr = ep_ev[i].data.ptr;
- if (data_ptr == &eps->workqueue_wakeup_fd) {
- append_error(error,
- grpc_wakeup_fd_consume_wakeup(&eps->workqueue_wakeup_fd),
- err_desc);
- maybe_do_workqueue_work(exec_ctx, eps);
- } else if (data_ptr == &epoll_set_wakeup_fd) {
+ if (data_ptr == &epoll_set_wakeup_fd) {
gpr_atm_rel_store(&eps->is_shutdown, 1);
gpr_log(GPR_INFO, "pollset poller: shutdown set");
} else {
@@ -966,18 +832,13 @@ static void epoll_set_work(grpc_exec_ctx *exec_ctx, epoll_set *eps,
epoll set. */
epoll_fd = eps->epoll_fd;
- /* If we get some workqueue work to do, it might end up completing an item on
- the completion queue, so there's no need to poll... so we skip that and
- redo the complete loop to verify */
- if (!maybe_do_workqueue_work(exec_ctx, eps)) {
- gpr_atm_no_barrier_fetch_add(&eps->poller_count, 1);
- g_current_thread_epoll_set = eps;
+ gpr_atm_no_barrier_fetch_add(&eps->poller_count, 1);
+ g_current_thread_epoll_set = eps;
- do_epoll_wait(exec_ctx, epoll_fd, eps, error);
+ do_epoll_wait(exec_ctx, epoll_fd, eps, error);
- g_current_thread_epoll_set = NULL;
- gpr_atm_no_barrier_fetch_add(&eps->poller_count, -1);
- }
+ g_current_thread_epoll_set = NULL;
+ gpr_atm_no_barrier_fetch_add(&eps->poller_count, -1);
GPR_TIMER_END("epoll_set_work", 0);
}
@@ -1120,7 +981,6 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
- .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
@@ -1138,10 +998,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_set_add_fd = pollset_set_add_fd,
.pollset_set_del_fd = pollset_set_del_fd,
- .workqueue_ref = workqueue_ref,
- .workqueue_unref = workqueue_unref,
- .workqueue_scheduler = workqueue_scheduler,
-
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index 7cb6085e25..b1dad14ba4 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -59,7 +59,6 @@
#include "src/core/lib/iomgr/sys_epoll_wrapper.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
#include "src/core/lib/support/spinlock.h"
@@ -139,17 +138,6 @@ struct grpc_fd {
Ref/Unref by two to avoid altering the orphaned bit */
gpr_atm refst;
- /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
- grpc_wakeup_fd workqueue_wakeup_fd;
- grpc_closure_scheduler workqueue_scheduler;
- /* Spinlock guarding the read end of the workqueue (must be held to pop from
- * workqueue_items) */
- gpr_spinlock workqueue_read_mu;
- /* Queue of closures to be executed */
- gpr_mpscq workqueue_items;
- /* Count of items in workqueue_items */
- gpr_atm workqueue_item_count;
-
/* The fd is either closed or we relinquished control of it. In either
cases, this indicates that the 'fd' on this structure is no longer
valid */
@@ -172,12 +160,6 @@ struct grpc_fd {
static void fd_global_init(void);
static void fd_global_shutdown(void);
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error);
-
-static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
- workqueue_enqueue, workqueue_enqueue, "workqueue"};
-
/*******************************************************************************
* Pollset Declarations
*/
@@ -347,13 +329,6 @@ static grpc_fd *fd_create(int fd, const char *name) {
grpc_lfev_init(&new_fd->write_closure);
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
- GRPC_LOG_IF_ERROR("fd_create",
- grpc_wakeup_fd_init(&new_fd->workqueue_wakeup_fd));
- new_fd->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
- new_fd->workqueue_read_mu = GPR_SPINLOCK_INITIALIZER;
- gpr_mpscq_init(&new_fd->workqueue_items);
- gpr_atm_no_barrier_store(&new_fd->workqueue_item_count, 0);
-
new_fd->freelist_next = NULL;
new_fd->on_done_closure = NULL;
@@ -446,91 +421,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
}
-static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
- REF_BY(fd, 2, "return_workqueue");
- return (grpc_workqueue *)fd;
-}
-
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
- const char *file, int line,
- const char *reason) {
- if (workqueue != NULL) {
- ref_by((grpc_fd *)workqueue, 2, file, line, reason);
- }
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {
- if (workqueue != NULL) {
- unref_by(exec_ctx, (grpc_fd *)workqueue, 2, file, line, reason);
- }
-}
-#else
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
- if (workqueue != NULL) {
- ref_by((grpc_fd *)workqueue, 2);
- }
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue) {
- if (workqueue != NULL) {
- unref_by(exec_ctx, (grpc_fd *)workqueue, 2);
- }
-}
-#endif
-
-static void workqueue_wakeup(grpc_fd *fd) {
- GRPC_LOG_IF_ERROR("workqueue_enqueue",
- grpc_wakeup_fd_wakeup(&fd->workqueue_wakeup_fd));
-}
-
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error) {
- GPR_TIMER_BEGIN("workqueue.enqueue", 0);
- grpc_fd *fd = (grpc_fd *)(((char *)closure->scheduler) -
- offsetof(grpc_fd, workqueue_scheduler));
- REF_BY(fd, 2, "workqueue_enqueue");
- gpr_atm last = gpr_atm_no_barrier_fetch_add(&fd->workqueue_item_count, 1);
- closure->error_data.error = error;
- gpr_mpscq_push(&fd->workqueue_items, &closure->next_data.atm_next);
- if (last == 0) {
- workqueue_wakeup(fd);
- }
- UNREF_BY(exec_ctx, fd, 2, "workqueue_enqueue");
-}
-
-static void fd_invoke_workqueue(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- /* handle spurious wakeups */
- if (!gpr_spinlock_trylock(&fd->workqueue_read_mu)) return;
- gpr_mpscq_node *n = gpr_mpscq_pop(&fd->workqueue_items);
- gpr_spinlock_unlock(&fd->workqueue_read_mu);
- if (n != NULL) {
- if (gpr_atm_full_fetch_add(&fd->workqueue_item_count, -1) > 1) {
- workqueue_wakeup(fd);
- }
- grpc_closure *c = (grpc_closure *)n;
- grpc_error *error = c->error_data.error;
-#ifndef NDEBUG
- c->scheduled = false;
-#endif
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- } else if (gpr_atm_no_barrier_load(&fd->workqueue_item_count) > 0) {
- /* n == NULL might mean there's work but it's not available to be popped
- * yet - try to ensure another workqueue wakes up to check shortly if so
- */
- workqueue_wakeup(fd);
- }
-}
-
-static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
- return &((grpc_fd *)workqueue)->workqueue_scheduler;
-}
-
/*******************************************************************************
* Pollable Definitions
*/
@@ -596,22 +486,7 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) {
.data.ptr = fd};
if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) {
switch (errno) {
- case EEXIST: /* if this fd is already in the epoll set, the workqueue fd
- must also be - just return */
- gpr_mu_unlock(&fd->orphaned_mu);
- return GRPC_ERROR_NONE;
- default:
- append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc);
- }
- }
- struct epoll_event ev_wq = {
- .events = (uint32_t)(EPOLLET | EPOLLIN | EPOLLEXCLUSIVE),
- .data.ptr = (void *)(1 + (intptr_t)fd)};
- if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->workqueue_wakeup_fd.read_fd, &ev_wq) !=
- 0) {
- switch (errno) {
- case EEXIST: /* if the workqueue fd is already in the epoll set we're ok
- - no need to do anything special */
+ case EEXIST:
break;
default:
append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc);
@@ -874,29 +749,21 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
append_error(&error, grpc_wakeup_fd_consume_wakeup(&p->wakeup), err_desc);
} else {
- grpc_fd *fd = (grpc_fd *)(((intptr_t)data_ptr) & ~(intptr_t)1);
- bool is_workqueue = (((intptr_t)data_ptr) & 1) != 0;
+ grpc_fd *fd = (grpc_fd *)data_ptr;
bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
bool write_ev = (events[i].events & EPOLLOUT) != 0;
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG,
- "PS:%p poll %p got fd %p: is_wq=%d cancel=%d read=%d "
+ "PS:%p poll %p got fd %p: cancel=%d read=%d "
"write=%d",
- pollset, p, fd, is_workqueue, cancel, read_ev, write_ev);
+ pollset, p, fd, cancel, read_ev, write_ev);
}
- if (is_workqueue) {
- append_error(&error,
- grpc_wakeup_fd_consume_wakeup(&fd->workqueue_wakeup_fd),
- err_desc);
- fd_invoke_workqueue(exec_ctx, fd);
- } else {
- if (read_ev || cancel) {
- fd_become_readable(exec_ctx, fd, pollset);
- }
- if (write_ev || cancel) {
- fd_become_writable(exec_ctx, fd);
- }
+ if (read_ev || cancel) {
+ fd_become_readable(exec_ctx, fd, pollset);
+ }
+ if (write_ev || cancel) {
+ fd_become_writable(exec_ctx, fd);
}
}
}
@@ -1449,7 +1316,6 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
- .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
@@ -1467,10 +1333,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_set_add_fd = pollset_set_add_fd,
.pollset_set_del_fd = pollset_set_del_fd,
- .workqueue_ref = workqueue_ref,
- .workqueue_unref = workqueue_unref,
- .workqueue_scheduler = workqueue_scheduler,
-
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c
index 92c555b7ea..aba83a6da9 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.c
+++ b/src/core/lib/iomgr/ev_epollsig_linux.c
@@ -59,7 +59,6 @@
#include "src/core/lib/iomgr/lockfree_event.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
@@ -177,7 +176,9 @@ static void fd_global_shutdown(void);
* Polling island Declarations
*/
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+//#define PI_REFCOUNT_DEBUG
+
+#ifdef PI_REFCOUNT_DEBUG
#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
#define PI_UNREF(exec_ctx, p, r) \
@@ -192,8 +193,6 @@ static void fd_global_shutdown(void);
/* This is also used as grpc_workqueue (by directly casing it) */
typedef struct polling_island {
- grpc_closure_scheduler workqueue_scheduler;
-
gpr_mu mu;
/* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
the refcount.
@@ -214,15 +213,6 @@ typedef struct polling_island {
/* Number of threads currently polling on this island */
gpr_atm poller_count;
- /* Mutex guarding the read end of the workqueue (must be held to pop from
- * workqueue_items) */
- gpr_mu workqueue_read_mu;
- /* Queue of closures to be executed */
- gpr_mpscq workqueue_items;
- /* Count of items in workqueue_items */
- gpr_atm workqueue_item_count;
- /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
- grpc_wakeup_fd workqueue_wakeup_fd;
/* The fd of the underlying epoll set */
int epoll_fd;
@@ -297,8 +287,6 @@ static __thread polling_island *g_current_thread_polling_island;
/* Forward declaration */
static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error);
#ifdef GRPC_TSAN
/* Currently TSAN may incorrectly flag data races between epoll_ctl and
@@ -311,13 +299,10 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
gpr_atm g_epoll_sync;
#endif /* defined(GRPC_TSAN) */
-static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
- workqueue_enqueue, workqueue_enqueue, "workqueue"};
-
static void pi_add_ref(polling_island *pi);
static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+#ifdef PI_REFCOUNT_DEBUG
static void pi_add_ref_dbg(polling_island *pi, const char *reason,
const char *file, int line) {
long old_cnt = gpr_atm_acq_load(&pi->ref_count);
@@ -333,36 +318,6 @@ static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
(void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
}
-
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
- const char *file, int line,
- const char *reason) {
- if (workqueue != NULL) {
- pi_add_ref_dbg((polling_island *)workqueue, reason, file, line);
- }
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {
- if (workqueue != NULL) {
- pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line);
- }
-}
-#else
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
- if (workqueue != NULL) {
- pi_add_ref((polling_island *)workqueue);
- }
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue) {
- if (workqueue != NULL) {
- pi_unref(exec_ctx, (polling_island *)workqueue);
- }
-}
#endif
static void pi_add_ref(polling_island *pi) {
@@ -526,26 +481,16 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
*error = GRPC_ERROR_NONE;
pi = gpr_malloc(sizeof(*pi));
- pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
gpr_mu_init(&pi->mu);
pi->fd_cnt = 0;
pi->fd_capacity = 0;
pi->fds = NULL;
pi->epoll_fd = -1;
- gpr_mu_init(&pi->workqueue_read_mu);
- gpr_mpscq_init(&pi->workqueue_items);
- gpr_atm_rel_store(&pi->workqueue_item_count, 0);
-
gpr_atm_rel_store(&pi->ref_count, 0);
gpr_atm_rel_store(&pi->poller_count, 0);
gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
- if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
- err_desc)) {
- goto done;
- }
-
pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (pi->epoll_fd < 0) {
@@ -553,8 +498,6 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
goto done;
}
- polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
-
if (initial_fd != NULL) {
polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
}
@@ -573,11 +516,7 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
if (pi->epoll_fd >= 0) {
close(pi->epoll_fd);
}
- GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
- gpr_mu_destroy(&pi->workqueue_read_mu);
- gpr_mpscq_destroy(&pi->workqueue_items);
gpr_mu_destroy(&pi->mu);
- grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
gpr_free(pi->fds);
gpr_free(pi);
}
@@ -722,45 +661,6 @@ static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
}
}
-static void workqueue_maybe_wakeup(polling_island *pi) {
- /* If this thread is the current poller, then it may be that it's about to
- decrement the current poller count, so we need to look past this thread */
- bool is_current_poller = (g_current_thread_polling_island == pi);
- gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
- gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
- /* Only issue a wakeup if it's likely that some poller could come in and take
- it right now. Note that since we do an anticipatory mpscq_pop every poll
- loop, it's ok if we miss the wakeup here, as we'll get the work item when
- the next poller enters anyway. */
- if (current_pollers >= min_current_pollers_for_wakeup) {
- GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
- grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
- }
-}
-
-static void workqueue_move_items_to_parent(polling_island *q) {
- polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
- if (p == NULL) {
- return;
- }
- gpr_mu_lock(&q->workqueue_read_mu);
- int num_added = 0;
- while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
- gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
- if (n != NULL) {
- gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
- gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
- gpr_mpscq_push(&p->workqueue_items, n);
- num_added++;
- }
- }
- gpr_mu_unlock(&q->workqueue_read_mu);
- if (num_added > 0) {
- workqueue_maybe_wakeup(p);
- }
- workqueue_move_items_to_parent(p);
-}
-
static polling_island *polling_island_merge(polling_island *p,
polling_island *q,
grpc_error **error) {
@@ -785,8 +685,6 @@ static polling_island *polling_island_merge(polling_island *p,
/* Add the 'merged_to' link from p --> q */
gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
-
- workqueue_move_items_to_parent(p);
}
/* else if p == q, nothing needs to be done */
@@ -797,32 +695,6 @@ static polling_island *polling_island_merge(polling_island *p,
return q;
}
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error) {
- GPR_TIMER_BEGIN("workqueue.enqueue", 0);
- grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler;
- /* take a ref to the workqueue: otherwise it can happen that whatever events
- * this kicks off ends up destroying the workqueue before this function
- * completes */
- GRPC_WORKQUEUE_REF(workqueue, "enqueue");
- polling_island *pi = (polling_island *)workqueue;
- gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
- closure->error_data.error = error;
- gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
- if (last == 0) {
- workqueue_maybe_wakeup(pi);
- }
- workqueue_move_items_to_parent(pi);
- GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
- GPR_TIMER_END("workqueue.enqueue", 0);
-}
-
-static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
- polling_island *pi = (polling_island *)workqueue;
- return workqueue == NULL ? grpc_schedule_on_exec_ctx
- : &pi->workqueue_scheduler;
-}
-
static grpc_error *polling_island_global_init() {
grpc_error *error = GRPC_ERROR_NONE;
@@ -1081,14 +953,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
}
-static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
- gpr_mu_lock(&fd->po.mu);
- grpc_workqueue *workqueue =
- GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
- gpr_mu_unlock(&fd->po.mu);
- return workqueue;
-}
-
/*******************************************************************************
* Pollset Definitions
*/
@@ -1326,44 +1190,6 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->po.mu);
}
-static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
- polling_island *pi) {
- if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
- gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
- gpr_mu_unlock(&pi->workqueue_read_mu);
- if (n != NULL) {
- gpr_atm remaining =
- gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) - 1;
- GRPC_POLLING_TRACE(
- "maybe_do_workqueue_work: pi: %p: got closure %p, remaining = "
- "%" PRIdPTR,
- pi, n, remaining);
- if (remaining > 0) {
- workqueue_maybe_wakeup(pi);
- }
- grpc_closure *c = (grpc_closure *)n;
- grpc_error *error = c->error_data.error;
-#ifndef NDEBUG
- c->scheduled = false;
-#endif
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- return true;
- } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
- /* n == NULL might mean there's work but it's not available to be popped
- * yet - try to ensure another workqueue wakes up to check shortly if so
- */
- GRPC_POLLING_TRACE(
- "maybe_do_workqueue_work: pi: %p: more to do, but not yet", pi);
- workqueue_maybe_wakeup(pi);
- }
- } else {
- GRPC_POLLING_TRACE("maybe_do_workqueue_work: pi: %p: read already locked",
- pi);
- }
- return false;
-}
-
#define GRPC_EPOLL_MAX_EVENTS 100
/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
@@ -1419,76 +1245,61 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
PI_ADD_REF(pi, "ps_work");
gpr_mu_unlock(&pollset->po.mu);
- /* If we get some workqueue work to do, it might end up completing an item on
- the completion queue, so there's no need to poll... so we skip that and
- redo the complete loop to verify */
- GRPC_POLLING_TRACE("pollset_work: pollset: %p, worker %p, pi %p", pollset,
- worker, pi);
- if (!maybe_do_workqueue_work(exec_ctx, pi)) {
- GRPC_POLLING_TRACE("pollset_work: begins");
- gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
- g_current_thread_polling_island = pi;
-
- GRPC_SCHEDULING_START_BLOCKING_REGION;
- ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
- sig_mask);
- GRPC_SCHEDULING_END_BLOCKING_REGION;
- if (ep_rv < 0) {
- if (errno != EINTR) {
- gpr_asprintf(&err_msg,
- "epoll_wait() epoll fd: %d failed with error: %d (%s)",
- epoll_fd, errno, strerror(errno));
- append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
- } else {
- /* We were interrupted. Save an interation by doing a zero timeout
- epoll_wait to see if there are any other events of interest */
- GRPC_POLLING_TRACE(
- "pollset_work: pollset: %p, worker: %p received kick",
- (void *)pollset, (void *)worker);
- ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
- }
+ gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
+ g_current_thread_polling_island = pi;
+
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
+ ep_rv =
+ epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
+ if (ep_rv < 0) {
+ if (errno != EINTR) {
+ gpr_asprintf(&err_msg,
+ "epoll_wait() epoll fd: %d failed with error: %d (%s)",
+ epoll_fd, errno, strerror(errno));
+ append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
+ } else {
+ /* We were interrupted. Save an interation by doing a zero timeout
+ epoll_wait to see if there are any other events of interest */
+ GRPC_POLLING_TRACE("pollset_work: pollset: %p, worker: %p received kick",
+ (void *)pollset, (void *)worker);
+ ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
}
+ }
#ifdef GRPC_TSAN
- /* See the definition of g_poll_sync for more details */
- gpr_atm_acq_load(&g_epoll_sync);
+ /* See the definition of g_poll_sync for more details */
+ gpr_atm_acq_load(&g_epoll_sync);
#endif /* defined(GRPC_TSAN) */
- for (int i = 0; i < ep_rv; ++i) {
- void *data_ptr = ep_ev[i].data.ptr;
- if (data_ptr == &pi->workqueue_wakeup_fd) {
- append_error(error,
- grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd),
- err_desc);
- maybe_do_workqueue_work(exec_ctx, pi);
- } else if (data_ptr == &polling_island_wakeup_fd) {
- GRPC_POLLING_TRACE(
- "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
- "%d) got merged",
- (void *)pollset, (void *)worker, epoll_fd);
- /* This means that our polling island is merged with a different
- island. We do not have to do anything here since the subsequent call
- to the function pollset_work_and_unlock() will pick up the correct
- epoll_fd */
- } else {
- grpc_fd *fd = data_ptr;
- int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
- int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
- int write_ev = ep_ev[i].events & EPOLLOUT;
- if (read_ev || cancel) {
- fd_become_readable(exec_ctx, fd, pollset);
- }
- if (write_ev || cancel) {
- fd_become_writable(exec_ctx, fd);
- }
+ for (int i = 0; i < ep_rv; ++i) {
+ void *data_ptr = ep_ev[i].data.ptr;
+ if (data_ptr == &polling_island_wakeup_fd) {
+ GRPC_POLLING_TRACE(
+ "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
+ "%d) got merged",
+ (void *)pollset, (void *)worker, epoll_fd);
+ /* This means that our polling island is merged with a different
+ island. We do not have to do anything here since the subsequent call
+ to the function pollset_work_and_unlock() will pick up the correct
+ epoll_fd */
+ } else {
+ grpc_fd *fd = data_ptr;
+ int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
+ int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
+ int write_ev = ep_ev[i].events & EPOLLOUT;
+ if (read_ev || cancel) {
+ fd_become_readable(exec_ctx, fd, pollset);
+ }
+ if (write_ev || cancel) {
+ fd_become_writable(exec_ctx, fd);
}
}
-
- g_current_thread_polling_island = NULL;
- gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
- GRPC_POLLING_TRACE("pollset_work: ends");
}
+ g_current_thread_polling_island = NULL;
+ gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
+
GPR_ASSERT(pi != NULL);
/* Before leaving, release the extra ref we added to the polling island. It
@@ -1879,7 +1690,6 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
- .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
@@ -1897,10 +1707,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_set_add_fd = pollset_set_add_fd,
.pollset_set_del_fd = pollset_set_del_fd,
- .workqueue_ref = workqueue_ref,
- .workqueue_unref = workqueue_unref,
- .workqueue_scheduler = workqueue_scheduler,
-
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 3a7648ac32..acf425751d 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -648,8 +648,6 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
GRPC_FD_UNREF(fd, "poll");
}
-static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; }
-
/*******************************************************************************
* pollset_posix.c
*/
@@ -1289,30 +1287,6 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
}
/*******************************************************************************
- * workqueue stubs
- */
-
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
- const char *file, int line,
- const char *reason) {
- return workqueue;
-}
-static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {}
-#else
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
- return workqueue;
-}
-static void workqueue_unref(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue) {}
-#endif
-
-static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
- return grpc_schedule_on_exec_ctx;
-}
-
-/*******************************************************************************
* Condition Variable polling extensions
*/
@@ -1529,7 +1503,6 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
- .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
@@ -1547,10 +1520,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_set_add_fd = pollset_set_add_fd,
.pollset_set_del_fd = pollset_set_del_fd,
- .workqueue_ref = workqueue_ref,
- .workqueue_unref = workqueue_unref,
- .workqueue_scheduler = workqueue_scheduler,
-
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index c4d2f23e29..c633aec5ac 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -171,10 +171,6 @@ grpc_fd *grpc_fd_create(int fd, const char *name) {
return g_event_engine->fd_create(fd, name);
}
-grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd) {
- return g_event_engine->fd_get_workqueue(fd);
-}
-
int grpc_fd_wrapped_fd(grpc_fd *fd) {
return g_event_engine->fd_wrapped_fd(fd);
}
@@ -276,26 +272,4 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
g_event_engine->pollset_set_del_fd(exec_ctx, pollset_set, fd);
}
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file,
- int line, const char *reason) {
- return g_event_engine->workqueue_ref(workqueue, file, line, reason);
-}
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {
- g_event_engine->workqueue_unref(exec_ctx, workqueue, file, line, reason);
-}
-#else
-grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) {
- return g_event_engine->workqueue_ref(workqueue);
-}
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
- g_event_engine->workqueue_unref(exec_ctx, workqueue);
-}
-#endif
-
-grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) {
- return g_event_engine->workqueue_scheduler(workqueue);
-}
-
#endif // GRPC_POSIX_SOCKET
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 80619aab5f..cbe5862372 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -41,7 +41,6 @@
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/iomgr/workqueue.h"
extern grpc_tracer_flag grpc_polling_trace; /* Disabled by default */
@@ -60,7 +59,6 @@ typedef struct grpc_event_engine_vtable {
void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
bool (*fd_is_shutdown)(grpc_fd *fd);
- grpc_workqueue *(*fd_get_workqueue)(grpc_fd *fd);
grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx,
grpc_fd *fd);
@@ -97,17 +95,6 @@ typedef struct grpc_event_engine_vtable {
grpc_pollset_set *pollset_set, grpc_fd *fd);
void (*shutdown_engine)(void);
-
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
- grpc_workqueue *(*workqueue_ref)(grpc_workqueue *workqueue, const char *file,
- int line, const char *reason);
- void (*workqueue_unref)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason);
-#else
- grpc_workqueue *(*workqueue_ref)(grpc_workqueue *workqueue);
- void (*workqueue_unref)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
-#endif
- grpc_closure_scheduler *(*workqueue_scheduler)(grpc_workqueue *workqueue);
} grpc_event_engine_vtable;
void grpc_event_engine_init(void);
@@ -121,9 +108,6 @@ const char *grpc_get_poll_strategy_name();
This takes ownership of closing fd. */
grpc_fd *grpc_fd_create(int fd, const char *name);
-/* Get a workqueue that's associated with this fd */
-grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd);
-
/* Return the wrapped fd, or -1 if it has been released or closed. */
int grpc_fd_wrapped_fd(grpc_fd *fd);
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index 318bb2b713..44e2676c3d 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -38,7 +38,6 @@
#include <grpc/support/thd.h>
#include "src/core/lib/iomgr/combiner.h"
-#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx) {
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c
index 75fd5b1c50..8edc9224c7 100644
--- a/src/core/lib/iomgr/executor.c
+++ b/src/core/lib/iomgr/executor.c
@@ -36,132 +36,173 @@
#include <string.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/thd.h>
+#include <grpc/support/tls.h>
+#include <grpc/support/useful.h>
+
#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/support/spinlock.h"
+
+#define MAX_DEPTH 2
-typedef struct grpc_executor_data {
- int busy; /**< is the thread currently running? */
- int shutting_down; /**< has \a grpc_shutdown() been invoked? */
- int pending_join; /**< has the thread finished but not been joined? */
- grpc_closure_list closures; /**< collection of pending work */
- gpr_thd_id tid; /**< thread id of the thread, only valid if \a busy or \a
- pending_join are true */
- gpr_thd_options options;
+typedef struct {
gpr_mu mu;
-} grpc_executor;
+ gpr_cv cv;
+ grpc_closure_list elems;
+ size_t depth;
+ bool shutdown;
+ gpr_thd_id id;
+} thread_state;
-static grpc_executor g_executor;
+static thread_state *g_thread_state;
+static size_t g_max_threads;
+static gpr_atm g_cur_threads;
+static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER;
-void grpc_executor_init() {
- memset(&g_executor, 0, sizeof(grpc_executor));
- gpr_mu_init(&g_executor.mu);
- g_executor.options = gpr_thd_options_default();
- gpr_thd_options_set_joinable(&g_executor.options);
-}
+GPR_TLS_DECL(g_this_thread_state);
-/* thread body */
-static void closure_exec_thread_func(void *ignored) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- while (1) {
- gpr_mu_lock(&g_executor.mu);
- if (g_executor.shutting_down != 0) {
- gpr_mu_unlock(&g_executor.mu);
- break;
- }
- if (grpc_closure_list_empty(g_executor.closures)) {
- /* no more work, time to die */
- GPR_ASSERT(g_executor.busy == 1);
- g_executor.busy = 0;
- gpr_mu_unlock(&g_executor.mu);
- break;
- } else {
- grpc_closure *c = g_executor.closures.head;
- grpc_closure_list_init(&g_executor.closures);
- gpr_mu_unlock(&g_executor.mu);
- while (c != NULL) {
- grpc_closure *next = c->next_data.next;
- grpc_error *error = c->error_data.error;
+static void executor_thread(void *arg);
+
+static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
+ size_t n = 0;
+
+ grpc_closure *c = list.head;
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
#ifndef NDEBUG
- c->scheduled = false;
+ c->scheduled = false;
#endif
- c->cb(&exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- c = next;
- }
- grpc_exec_ctx_flush(&exec_ctx);
- }
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ c = next;
}
- grpc_exec_ctx_finish(&exec_ctx);
+
+ return n;
}
-/* Spawn the thread if new work has arrived a no thread is up */
-static void maybe_spawn_locked() {
- if (grpc_closure_list_empty(g_executor.closures) == 1) {
- return;
- }
- if (g_executor.shutting_down == 1) {
- return;
- }
+bool grpc_executor_is_threaded() {
+ return gpr_atm_no_barrier_load(&g_cur_threads) > 0;
+}
- if (g_executor.busy != 0) {
- /* Thread still working. New work will be picked up by already running
- * thread. Not spawning anything. */
- return;
- } else if (g_executor.pending_join != 0) {
- /* Pickup the remains of the previous incarnations of the thread. */
- gpr_thd_join(g_executor.tid);
- g_executor.pending_join = 0;
+void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) {
+ gpr_atm cur_threads = gpr_atm_no_barrier_load(&g_cur_threads);
+ if (threading) {
+ if (cur_threads > 0) return;
+ g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores());
+ gpr_atm_no_barrier_store(&g_cur_threads, 1);
+ gpr_tls_init(&g_this_thread_state);
+ g_thread_state = gpr_zalloc(sizeof(thread_state) * g_max_threads);
+ for (size_t i = 0; i < g_max_threads; i++) {
+ gpr_mu_init(&g_thread_state[i].mu);
+ gpr_cv_init(&g_thread_state[i].cv);
+ g_thread_state[i].elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
+ }
+
+ gpr_thd_options opt = gpr_thd_options_default();
+ gpr_thd_options_set_joinable(&opt);
+ gpr_thd_new(&g_thread_state[0].id, executor_thread, &g_thread_state[0],
+ &opt);
+ } else {
+ if (cur_threads == 0) return;
+ for (size_t i = 0; i < g_max_threads; i++) {
+ gpr_mu_lock(&g_thread_state[i].mu);
+ g_thread_state[i].shutdown = true;
+ gpr_cv_signal(&g_thread_state[i].cv);
+ gpr_mu_unlock(&g_thread_state[i].mu);
+ }
+ /* ensure no thread is adding a new thread... once this is past, then
+ no thread will try to add a new one either (since shutdown is true) */
+ gpr_spinlock_lock(&g_adding_thread_lock);
+ gpr_spinlock_unlock(&g_adding_thread_lock);
+ for (gpr_atm i = 0; i < g_cur_threads; i++) {
+ gpr_thd_join(g_thread_state[i].id);
+ }
+ gpr_atm_no_barrier_store(&g_cur_threads, 0);
+ for (size_t i = 0; i < g_max_threads; i++) {
+ gpr_mu_destroy(&g_thread_state[i].mu);
+ gpr_cv_destroy(&g_thread_state[i].cv);
+ run_closures(exec_ctx, g_thread_state[i].elems);
+ }
+ gpr_free(g_thread_state);
+ gpr_tls_destroy(&g_this_thread_state);
}
+}
- /* All previous instances of the thread should have been joined at this point.
- * Spawn time! */
- g_executor.busy = 1;
- GPR_ASSERT(gpr_thd_new(&g_executor.tid, closure_exec_thread_func, NULL,
- &g_executor.options));
- g_executor.pending_join = 1;
+void grpc_executor_init(grpc_exec_ctx *exec_ctx) {
+ gpr_atm_no_barrier_store(&g_cur_threads, 0);
+ grpc_executor_set_threading(exec_ctx, true);
}
-static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error) {
- gpr_mu_lock(&g_executor.mu);
- if (g_executor.shutting_down == 0) {
- grpc_closure_list_append(&g_executor.closures, closure, error);
- maybe_spawn_locked();
+void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) {
+ grpc_executor_set_threading(exec_ctx, false);
+}
+
+static void executor_thread(void *arg) {
+ thread_state *ts = arg;
+ gpr_tls_set(&g_this_thread_state, (intptr_t)ts);
+
+ grpc_exec_ctx exec_ctx =
+ GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
+
+ size_t subtract_depth = 0;
+ for (;;) {
+ gpr_mu_lock(&ts->mu);
+ ts->depth -= subtract_depth;
+ while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
+ gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
+ }
+ if (ts->shutdown) {
+ gpr_mu_unlock(&ts->mu);
+ break;
+ }
+ grpc_closure_list exec = ts->elems;
+ ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
+ gpr_mu_unlock(&ts->mu);
+
+ subtract_depth = run_closures(&exec_ctx, exec);
+ grpc_exec_ctx_flush(&exec_ctx);
}
- gpr_mu_unlock(&g_executor.mu);
+ grpc_exec_ctx_finish(&exec_ctx);
}
-void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) {
- int pending_join;
-
- gpr_mu_lock(&g_executor.mu);
- pending_join = g_executor.pending_join;
- g_executor.shutting_down = 1;
- gpr_mu_unlock(&g_executor.mu);
- /* we can release the lock at this point despite the access to the closure
- * list below because we aren't accepting new work */
-
- /* Execute pending callbacks, some may be performing cleanups */
- grpc_closure *c = g_executor.closures.head;
- grpc_closure_list_init(&g_executor.closures);
- while (c != NULL) {
- grpc_closure *next = c->next_data.next;
- grpc_error *error = c->error_data.error;
-#ifndef NDEBUG
- c->scheduled = false;
-#endif
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- c = next;
+static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error) {
+ size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
+ if (cur_thread_count == 0) {
+ grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
+ return;
+ }
+ thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state);
+ if (ts == NULL) {
+ ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)];
}
- grpc_exec_ctx_flush(exec_ctx);
- GPR_ASSERT(grpc_closure_list_empty(g_executor.closures));
- if (pending_join) {
- gpr_thd_join(g_executor.tid);
+ gpr_mu_lock(&ts->mu);
+ if (grpc_closure_list_empty(ts->elems)) {
+ gpr_cv_signal(&ts->cv);
+ }
+ grpc_closure_list_append(&ts->elems, closure, error);
+ ts->depth++;
+ bool try_new_thread = ts->depth > MAX_DEPTH &&
+ cur_thread_count < g_max_threads && !ts->shutdown;
+ if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) {
+ gpr_mu_unlock(&ts->mu);
+ cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
+ if (cur_thread_count < g_max_threads) {
+ gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1);
+
+ gpr_thd_options opt = gpr_thd_options_default();
+ gpr_thd_options_set_joinable(&opt);
+ gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread,
+ &g_thread_state[cur_thread_count], &opt);
+ }
+ gpr_spinlock_unlock(&g_adding_thread_lock);
+ } else {
+ gpr_mu_unlock(&ts->mu);
}
- gpr_mu_destroy(&g_executor.mu);
}
static const grpc_closure_scheduler_vtable executor_vtable = {
diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h
index 1213016383..792d5056cb 100644
--- a/src/core/lib/iomgr/executor.h
+++ b/src/core/lib/iomgr/executor.h
@@ -41,11 +41,18 @@
* This mechanism is meant to outsource work (grpc_closure instances) to a
* thread, for those cases where blocking isn't an option but there isn't a
* non-blocking solution available. */
-void grpc_executor_init();
+void grpc_executor_init(grpc_exec_ctx *exec_ctx);
extern grpc_closure_scheduler *grpc_executor_scheduler;
/** Shutdown the executor, running all pending work as part of the call */
void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx);
+/** Is the executor multi-threaded? */
+bool grpc_executor_is_threaded();
+
+/* enable/disable threading - must be called after grpc_executor_init and before
+ grpc_executor_shutdown */
+void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool enable);
+
#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c
index 1fd41c2f88..391449136e 100644
--- a/src/core/lib/iomgr/iomgr.c
+++ b/src/core/lib/iomgr/iomgr.c
@@ -44,6 +44,7 @@
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/network_status_tracker.h"
#include "src/core/lib/iomgr/timer.h"
@@ -56,11 +57,12 @@ static gpr_cv g_rcv;
static int g_shutdown;
static grpc_iomgr_object g_root_object;
-void grpc_iomgr_init(void) {
+void grpc_iomgr_init(grpc_exec_ctx *exec_ctx) {
g_shutdown = 0;
gpr_mu_init(&g_mu);
gpr_cv_init(&g_rcv);
grpc_exec_ctx_global_init();
+ grpc_executor_init(exec_ctx);
grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC));
g_root_object.next = g_root_object.prev = &g_root_object;
g_root_object.name = "root";
@@ -68,7 +70,7 @@ void grpc_iomgr_init(void) {
grpc_iomgr_platform_init();
}
-void grpc_iomgr_start(void) { grpc_timer_manager_init(); }
+void grpc_iomgr_start(grpc_exec_ctx *exec_ctx) { grpc_timer_manager_init(); }
static size_t count_objects(void) {
grpc_iomgr_object *obj;
@@ -93,6 +95,7 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) {
grpc_timer_manager_shutdown();
grpc_iomgr_platform_flush();
+ grpc_executor_shutdown(exec_ctx);
gpr_mu_lock(&g_mu);
g_shutdown = 1;
@@ -107,7 +110,8 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) {
}
last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
}
- if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) {
+ if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL) ==
+ GRPC_TIMERS_FIRED) {
gpr_mu_unlock(&g_mu);
grpc_exec_ctx_flush(exec_ctx);
grpc_iomgr_platform_flush();
diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h
index 6e2e023615..bd6ca4a0b8 100644
--- a/src/core/lib/iomgr/iomgr.h
+++ b/src/core/lib/iomgr/iomgr.h
@@ -38,10 +38,10 @@
#include "src/core/lib/iomgr/port.h"
/** Initializes the iomgr. */
-void grpc_iomgr_init(void);
+void grpc_iomgr_init(grpc_exec_ctx *exec_ctx);
/** Starts any background threads for iomgr. */
-void grpc_iomgr_start(void);
+void grpc_iomgr_start(grpc_exec_ctx *exec_ctx);
/** Signals the intention to shutdown the iomgr. Expects to be able to flush
* exec_ctx. */
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index 6b2b85cce0..3c6a6ea1a5 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -581,7 +581,7 @@ static void rq_reclamation_done(grpc_exec_ctx *exec_ctx, void *rq,
grpc_resource_quota *grpc_resource_quota_create(const char *name) {
grpc_resource_quota *resource_quota = gpr_malloc(sizeof(*resource_quota));
gpr_ref_init(&resource_quota->refs, 1);
- resource_quota->combiner = grpc_combiner_create(NULL);
+ resource_quota->combiner = grpc_combiner_create();
resource_quota->free_pool = INT64_MAX;
resource_quota->size = INT64_MAX;
gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
@@ -594,12 +594,11 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) {
gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR,
(intptr_t)resource_quota);
}
- grpc_closure_init(
- &resource_quota->rq_step_closure, rq_step, resource_quota,
- grpc_combiner_finally_scheduler(resource_quota->combiner, true));
+ grpc_closure_init(&resource_quota->rq_step_closure, rq_step, resource_quota,
+ grpc_combiner_finally_scheduler(resource_quota->combiner));
grpc_closure_init(&resource_quota->rq_reclamation_done_closure,
rq_reclamation_done, resource_quota,
- grpc_combiner_scheduler(resource_quota->combiner, false));
+ grpc_combiner_scheduler(resource_quota->combiner));
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
resource_quota->roots[i] = NULL;
}
@@ -704,18 +703,18 @@ grpc_resource_user *grpc_resource_user_create(
grpc_resource_quota_ref_internal(resource_quota);
grpc_closure_init(&resource_user->allocate_closure, &ru_allocate,
resource_user,
- grpc_combiner_scheduler(resource_quota->combiner, false));
+ grpc_combiner_scheduler(resource_quota->combiner));
grpc_closure_init(&resource_user->add_to_free_pool_closure,
&ru_add_to_free_pool, resource_user,
- grpc_combiner_scheduler(resource_quota->combiner, false));
+ grpc_combiner_scheduler(resource_quota->combiner));
grpc_closure_init(&resource_user->post_reclaimer_closure[0],
&ru_post_benign_reclaimer, resource_user,
- grpc_combiner_scheduler(resource_quota->combiner, false));
+ grpc_combiner_scheduler(resource_quota->combiner));
grpc_closure_init(&resource_user->post_reclaimer_closure[1],
&ru_post_destructive_reclaimer, resource_user,
- grpc_combiner_scheduler(resource_quota->combiner, false));
+ grpc_combiner_scheduler(resource_quota->combiner));
grpc_closure_init(&resource_user->destroy_closure, &ru_destroy, resource_user,
- grpc_combiner_scheduler(resource_quota->combiner, false));
+ grpc_combiner_scheduler(resource_quota->combiner));
gpr_mu_init(&resource_user->mu);
gpr_atm_rel_store(&resource_user->refs, 1);
gpr_atm_rel_store(&resource_user->shutdown, 0);
@@ -772,12 +771,12 @@ void grpc_resource_user_unref(grpc_exec_ctx *exec_ctx,
void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user) {
if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) {
- grpc_closure_sched(exec_ctx,
- grpc_closure_create(
- ru_shutdown, resource_user,
- grpc_combiner_scheduler(
- resource_user->resource_quota->combiner, false)),
- GRPC_ERROR_NONE);
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_create(
+ ru_shutdown, resource_user,
+ grpc_combiner_scheduler(resource_user->resource_quota->combiner)),
+ GRPC_ERROR_NONE);
}
}
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 5d360b0b80..f066881237 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -558,26 +558,15 @@ static int tcp_get_fd(grpc_endpoint *ep) {
return tcp->fd;
}
-static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) {
- grpc_tcp *tcp = (grpc_tcp *)ep;
- return grpc_fd_get_workqueue(tcp->em_fd);
-}
-
static grpc_resource_user *tcp_get_resource_user(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return tcp->resource_user;
}
-static const grpc_endpoint_vtable vtable = {tcp_read,
- tcp_write,
- tcp_get_workqueue,
- tcp_add_to_pollset,
- tcp_add_to_pollset_set,
- tcp_shutdown,
- tcp_destroy,
- tcp_get_resource_user,
- tcp_get_peer,
- tcp_get_fd};
+static const grpc_endpoint_vtable vtable = {
+ tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
+ tcp_shutdown, tcp_destroy, tcp_get_resource_user, tcp_get_peer,
+ tcp_get_fd};
#define MAX_CHUNK_SIZE 32 * 1024 * 1024
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index dc23e4f521..31e779af75 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -338,10 +338,9 @@ static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) { return NULL; }
static int uv_get_fd(grpc_endpoint *ep) { return -1; }
static grpc_endpoint_vtable vtable = {
- uv_endpoint_read, uv_endpoint_write, uv_get_workqueue,
- uv_add_to_pollset, uv_add_to_pollset_set, uv_endpoint_shutdown,
- uv_destroy, uv_get_resource_user, uv_get_peer,
- uv_get_fd};
+ uv_endpoint_read, uv_endpoint_write, uv_add_to_pollset,
+ uv_add_to_pollset_set, uv_endpoint_shutdown, uv_destroy,
+ uv_get_resource_user, uv_get_peer, uv_get_fd};
grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
grpc_resource_quota *resource_quota,
diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h
index e0338f93c7..d6758f7c3b 100644
--- a/src/core/lib/iomgr/timer.h
+++ b/src/core/lib/iomgr/timer.h
@@ -89,6 +89,12 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer);
/* iomgr internal api for dealing with timers */
+typedef enum {
+ GRPC_TIMERS_NOT_CHECKED,
+ GRPC_TIMERS_CHECKED_AND_EMPTY,
+ GRPC_TIMERS_FIRED,
+} grpc_timer_check_result;
+
/* Check for timers to be run, and run them.
Return true if timer callbacks were executed.
If next is non-null, TRY to update *next with the next running timer
@@ -96,8 +102,8 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer);
*next is never guaranteed to be updated on any given execution; however,
with high probability at least one thread in the system will see an update
at any time slice. */
-bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
- gpr_timespec *next);
+grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx,
+ gpr_timespec now, gpr_timespec *next);
void grpc_timer_list_init(gpr_timespec now);
void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx);
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c
index b28340b71c..019cd8f309 100644
--- a/src/core/lib/iomgr/timer_generic.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -101,8 +101,10 @@ static gpr_atm saturating_add(gpr_atm a, gpr_atm b) {
return a + b;
}
-static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
- gpr_atm *next, grpc_error *error);
+static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx,
+ gpr_atm now,
+ gpr_atm *next,
+ grpc_error *error);
static gpr_timespec dbl_to_ts(double d) {
gpr_timespec ts;
@@ -421,19 +423,22 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
return n;
}
-static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
- gpr_atm *next, grpc_error *error) {
- size_t n = 0;
+static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx,
+ gpr_atm now,
+ gpr_atm *next,
+ grpc_error *error) {
+ grpc_timer_check_result result = GRPC_TIMERS_NOT_CHECKED;
gpr_atm min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer);
gpr_tls_set(&g_last_seen_min_timer, min_timer);
if (now < min_timer) {
if (next != NULL) *next = GPR_MIN(*next, min_timer);
- return 0;
+ return GRPC_TIMERS_CHECKED_AND_EMPTY;
}
if (gpr_spinlock_trylock(&g_shared_mutables.checker_mu)) {
gpr_mu_lock(&g_shared_mutables.mu);
+ result = GRPC_TIMERS_CHECKED_AND_EMPTY;
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, " .. shard[%d]->min_deadline = %" PRIdPTR,
@@ -448,14 +453,17 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
/* For efficiency, we pop as many available timers as we can from the
shard. This may violate perfect timer deadline ordering, but that
shouldn't be a big deal because we don't make ordering guarantees. */
- n +=
- pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline, error);
+ if (pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline,
+ error) > 0) {
+ result = GRPC_TIMERS_FIRED;
+ }
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
- gpr_log(GPR_DEBUG, " .. popped --> %" PRIdPTR
- ", shard[%d]->min_deadline %" PRIdPTR
- " --> %" PRIdPTR ", now=%" PRIdPTR,
- n, (int)(g_shard_queue[0] - g_shards),
+ gpr_log(GPR_DEBUG,
+ " .. result --> %d"
+ ", shard[%d]->min_deadline %" PRIdPTR " --> %" PRIdPTR
+ ", now=%" PRIdPTR,
+ result, (int)(g_shard_queue[0] - g_shards),
g_shard_queue[0]->min_deadline, new_min_deadline, now);
}
@@ -476,26 +484,15 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm now,
g_shard_queue[0]->min_deadline);
gpr_mu_unlock(&g_shared_mutables.mu);
gpr_spinlock_unlock(&g_shared_mutables.checker_mu);
- } else if (next != NULL) {
- /* TODO(ctiller): this forces calling code to do an short poll, and
- then retry the timer check (because this time through the timer list was
- contended).
-
- We could reduce the cost here dramatically by keeping a count of how
- many currently active pollers got through the uncontended case above
- successfully, and waking up other pollers IFF that count drops to zero.
-
- Once that count is in place, this entire else branch could disappear. */
- *next = GPR_MIN(*next, now + 1);
}
GRPC_ERROR_UNREF(error);
- return (int)n;
+ return result;
}
-bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
- gpr_timespec *next) {
+grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx,
+ gpr_timespec now, gpr_timespec *next) {
// prelude
GPR_ASSERT(now.clock_type == g_clock_type);
gpr_atm now_atm = timespec_to_atm_round_down(now);
@@ -513,7 +510,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
"TIMER CHECK SKIP: now_atm=%" PRIdPTR " min_timer=%" PRIdPTR,
now_atm, min_timer);
}
- return 0;
+ return GRPC_TIMERS_CHECKED_AND_EMPTY;
}
grpc_error *shutdown_error =
@@ -538,7 +535,7 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_free(next_str);
}
// actual code
- bool r;
+ grpc_timer_check_result r;
gpr_atm next_atm;
if (next == NULL) {
r = run_some_expired_timers(exec_ctx, now_atm, NULL, shutdown_error);
@@ -556,11 +553,10 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec,
next->tv_nsec, next_atm);
}
- gpr_log(GPR_DEBUG, "TIMER CHECK END: %d timers triggered; next=%s", r,
- next_str);
+ gpr_log(GPR_DEBUG, "TIMER CHECK END: r=%d; next=%s", r, next_str);
gpr_free(next_str);
}
- return r > 0;
+ return r;
}
#endif /* GRPC_TIMER_USE_GENERIC */
diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c
index 24085093e7..082dfe8299 100644
--- a/src/core/lib/iomgr/timer_manager.c
+++ b/src/core/lib/iomgr/timer_manager.c
@@ -107,86 +107,124 @@ void grpc_timer_manager_tick() {
grpc_exec_ctx_finish(&exec_ctx);
}
-static void timer_thread(void *unused) {
- // this threads exec_ctx: we try to run things through to completion here
- // since it's easy to spin up new threads
- grpc_exec_ctx exec_ctx =
- GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
+static void run_some_timers(grpc_exec_ctx *exec_ctx) {
+ // if there's something to execute...
+ gpr_mu_lock(&g_mu);
+ // remove a waiter from the pool, and start another thread if necessary
+ --g_waiter_count;
+ if (g_waiter_count == 0 && g_threaded) {
+ start_timer_thread_and_unlock();
+ } else {
+ // if there's no thread waiting with a timeout, kick an existing
+ // waiter
+ // so that the next deadline is not missed
+ if (!g_has_timed_waiter) {
+ if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ gpr_log(GPR_DEBUG, "kick untimed waiter");
+ }
+ gpr_cv_signal(&g_cv_wait);
+ }
+ gpr_mu_unlock(&g_mu);
+ }
+ // without our lock, flush the exec_ctx
+ grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&g_mu);
+ // garbage collect any threads hanging out that are dead
+ gc_completed_threads();
+ // get ready to wait again
+ ++g_waiter_count;
+ gpr_mu_unlock(&g_mu);
+}
+
+// wait until 'next' (or forever if there is already a timed waiter in the pool)
+// returns true if the thread should continue executing (false if it should
+// shutdown)
+static bool wait_until(gpr_timespec next) {
+ const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ gpr_mu_lock(&g_mu);
+ // if we're not threaded anymore, leave
+ if (!g_threaded) {
+ gpr_mu_unlock(&g_mu);
+ return false;
+ }
+ // if there's no timed waiter, we should become one: that waiter waits
+ // only until the next timer should expire
+ // all other timers wait forever
+ uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
+ if (!g_has_timed_waiter && gpr_time_cmp(next, inf_future) != 0) {
+ g_has_timed_waiter = true;
+ // we use a generation counter to track the timed waiter so we can
+ // cancel an existing one quickly (and when it actually times out it'll
+ // figure stuff out instead of incurring a wakeup)
+ my_timed_waiter_generation = ++g_timed_waiter_generation;
+ if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ gpr_timespec wait_time = gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC));
+ gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds",
+ wait_time.tv_sec, wait_time.tv_nsec);
+ }
+ } else {
+ next = inf_future;
+ if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ gpr_log(GPR_DEBUG, "sleep until kicked");
+ }
+ }
+ gpr_cv_wait(&g_cv_wait, &g_mu, next);
+ if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d",
+ my_timed_waiter_generation == g_timed_waiter_generation, g_kicked);
+ }
+ // if this was the timed waiter, then we need to check timers, and flag
+ // that there's now no timed waiter... we'll look for a replacement if
+ // there's work to do after checking timers (code above)
+ if (my_timed_waiter_generation == g_timed_waiter_generation) {
+ g_has_timed_waiter = false;
+ }
+ // if this was a kick from the timer system, consume it (and don't stop
+ // this thread yet)
+ if (g_kicked) {
+ grpc_timer_consume_kick();
+ g_kicked = false;
+ }
+ gpr_mu_unlock(&g_mu);
+ return true;
+}
+
+static void timer_main_loop(grpc_exec_ctx *exec_ctx) {
const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC);
for (;;) {
gpr_timespec next = inf_future;
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
// check timer state, updates next to the next time to run a check
- if (grpc_timer_check(&exec_ctx, now, &next)) {
- // if there's something to execute...
- gpr_mu_lock(&g_mu);
- // remove a waiter from the pool, and start another thread if necessary
- --g_waiter_count;
- if (g_waiter_count == 0 && g_threaded) {
- start_timer_thread_and_unlock();
- } else {
- // if there's no thread waiting with a timeout, kick an existing waiter
- // so that the next deadline is not missed
- if (!g_has_timed_waiter) {
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
- gpr_log(GPR_DEBUG, "kick untimed waiter");
- }
- gpr_cv_signal(&g_cv_wait);
- }
- gpr_mu_unlock(&g_mu);
- }
- // without our lock, flush the exec_ctx
- grpc_exec_ctx_flush(&exec_ctx);
- gpr_mu_lock(&g_mu);
- // garbage collect any threads hanging out that are dead
- gc_completed_threads();
- // get ready to wait again
- ++g_waiter_count;
- gpr_mu_unlock(&g_mu);
- } else {
- gpr_mu_lock(&g_mu);
- // if we're not threaded anymore, leave
- if (!g_threaded) break;
- // if there's no timed waiter, we should become one: that waiter waits
- // only until the next timer should expire
- // all other timers wait forever
- uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
- if (!g_has_timed_waiter) {
- g_has_timed_waiter = true;
- // we use a generation counter to track the timed waiter so we can
- // cancel an existing one quickly (and when it actually times out it'll
- // figure stuff out instead of incurring a wakeup)
- my_timed_waiter_generation = ++g_timed_waiter_generation;
+ switch (grpc_timer_check(exec_ctx, now, &next)) {
+ case GRPC_TIMERS_FIRED:
+ run_some_timers(exec_ctx);
+ break;
+ case GRPC_TIMERS_NOT_CHECKED:
+ /* This case only happens under contention, meaning more than one timer
+ manager thread checked timers concurrently.
+
+ If that happens, we're guaranteed that some other thread has just
+ checked timers, and this will avalanche into some other thread seeing
+ empty timers and doing a timed sleep.
+
+ Consequently, we can just sleep forever here and be happy at some
+ saved wakeup cycles. */
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
- gpr_log(GPR_DEBUG, "sleep for a while");
+ gpr_log(GPR_DEBUG, "timers not checked: expect another thread to");
}
- } else {
next = inf_future;
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
- gpr_log(GPR_DEBUG, "sleep until kicked");
+ /* fall through */
+ case GRPC_TIMERS_CHECKED_AND_EMPTY:
+ if (!wait_until(next)) {
+ return;
}
- }
- gpr_cv_wait(&g_cv_wait, &g_mu, next);
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
- gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d",
- my_timed_waiter_generation == g_timed_waiter_generation,
- g_kicked);
- }
- // if this was the timed waiter, then we need to check timers, and flag
- // that there's now no timed waiter... we'll look for a replacement if
- // there's work to do after checking timers (code above)
- if (my_timed_waiter_generation == g_timed_waiter_generation) {
- g_has_timed_waiter = false;
- }
- // if this was a kick from the timer system, consume it (and don't stop
- // this thread yet)
- if (g_kicked) {
- grpc_timer_consume_kick();
- g_kicked = false;
- }
- gpr_mu_unlock(&g_mu);
+ break;
}
}
+}
+
+static void timer_thread_cleanup(void) {
+ gpr_mu_lock(&g_mu);
// terminate the thread: drop the waiter count, thread count, and let whomever
// stopped the threading stuff know that we're done
--g_waiter_count;
@@ -199,12 +237,21 @@ static void timer_thread(void *unused) {
ct->next = g_completed_threads;
g_completed_threads = ct;
gpr_mu_unlock(&g_mu);
- grpc_exec_ctx_finish(&exec_ctx);
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "End timer thread");
}
}
+static void timer_thread(void *unused) {
+ // this threads exec_ctx: we try to run things through to completion here
+ // since it's easy to spin up new threads
+ grpc_exec_ctx exec_ctx =
+ GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
+ timer_main_loop(&exec_ctx);
+ grpc_exec_ctx_finish(&exec_ctx);
+ timer_thread_cleanup();
+}
+
static void start_threads(void) {
gpr_mu_lock(&g_mu);
if (!g_threaded) {
diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c
index 2952e44b58..967e84eb14 100644
--- a/src/core/lib/iomgr/timer_uv.c
+++ b/src/core/lib/iomgr/timer_uv.c
@@ -96,9 +96,9 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
}
}
-bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
- gpr_timespec *next) {
- return false;
+grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx,
+ gpr_timespec now, gpr_timespec *next) {
+ return GRPC_TIMERS_NOT_CHECKED;
}
void grpc_timer_list_init(gpr_timespec now) {}
diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h
deleted file mode 100644
index 371b0f55dc..0000000000
--- a/src/core/lib/iomgr/workqueue.h
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_CORE_LIB_IOMGR_WORKQUEUE_H
-#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_H
-
-#include "src/core/lib/iomgr/closure.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
-#include "src/core/lib/iomgr/iomgr.h"
-#include "src/core/lib/iomgr/pollset.h"
-#include "src/core/lib/iomgr/pollset_set.h"
-#include "src/core/lib/iomgr/port.h"
-
-#ifdef GPR_WINDOWS
-#include "src/core/lib/iomgr/workqueue_windows.h"
-#endif
-
-/* grpc_workqueue is forward declared in exec_ctx.h */
-
-/* Reference counting functions. Use the macro's always
- (GRPC_WORKQUEUE_{REF,UNREF}).
-
- Pass in a descriptive reason string for reffing/unreffing as the last
- argument to each macro. When GRPC_WORKQUEUE_REFCOUNT_DEBUG is defined, that
- string will be printed alongside the refcount. When it is not defined, the
- string will be discarded at compilation time. */
-
-/*#define GRPC_WORKQUEUE_REFCOUNT_DEBUG*/
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-#define GRPC_WORKQUEUE_REF(p, r) \
- grpc_workqueue_ref((p), __FILE__, __LINE__, (r))
-#define GRPC_WORKQUEUE_UNREF(exec_ctx, p, r) \
- grpc_workqueue_unref((exec_ctx), (p), __FILE__, __LINE__, (r))
-grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file,
- int line, const char *reason);
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason);
-#else
-#define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p))
-#define GRPC_WORKQUEUE_UNREF(cl, p, r) grpc_workqueue_unref((cl), (p))
-grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue);
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
-#endif
-
-/** Fetch the workqueue closure scheduler. Items added to a work queue will be
- started in approximately the order they were enqueued, on some thread that
- may or may not be the current thread. Successive closures enqueued onto a
- workqueue MAY be executed concurrently.
-
- It is generally more expensive to add a closure to a workqueue than to the
- execution context, both in terms of CPU work and in execution latency.
-
- Use work queues when it's important that other threads be given a chance to
- tackle some workload. */
-grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue);
-
-#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_H */
diff --git a/src/core/lib/iomgr/workqueue_uv.c b/src/core/lib/iomgr/workqueue_uv.c
deleted file mode 100644
index 4d61b40912..0000000000
--- a/src/core/lib/iomgr/workqueue_uv.c
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- *
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/lib/iomgr/port.h"
-
-#ifdef GRPC_UV
-
-#include "src/core/lib/iomgr/workqueue.h"
-
-// Minimal implementation of grpc_workqueue for libuv
-// Works by directly enqueuing workqueue items onto the current execution
-// context, which is at least correct, if not performant or in the spirit of
-// workqueues.
-
-void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
-
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file,
- int line, const char *reason) {
- return workqueue;
-}
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {}
-#else
-grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) {
- return workqueue;
-}
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
-#endif
-
-grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) {
- return grpc_schedule_on_exec_ctx;
-}
-
-#endif /* GPR_UV */
diff --git a/src/core/lib/iomgr/workqueue_uv.h b/src/core/lib/iomgr/workqueue_uv.h
deleted file mode 100644
index be3f8e4d93..0000000000
--- a/src/core/lib/iomgr/workqueue_uv.h
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_CORE_LIB_IOMGR_WORKQUEUE_UV_H
-#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_UV_H
-
-#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_UV_H */
diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c
deleted file mode 100644
index 234b47cdf5..0000000000
--- a/src/core/lib/iomgr/workqueue_windows.c
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#ifdef GPR_WINDOWS
-
-#include "src/core/lib/iomgr/workqueue.h"
-
-// Minimal implementation of grpc_workqueue for Windows
-// Works by directly enqueuing workqueue items onto the current execution
-// context, which is at least correct, if not performant or in the spirit of
-// workqueues.
-
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file,
- int line, const char *reason) {
- return workqueue;
-}
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {}
-#else
-grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) {
- return workqueue;
-}
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
-#endif
-
-grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) {
- return grpc_schedule_on_exec_ctx;
-}
-
-#endif /* GPR_WINDOWS */
diff --git a/src/core/lib/iomgr/workqueue_windows.h b/src/core/lib/iomgr/workqueue_windows.h
deleted file mode 100644
index e5d59130bb..0000000000
--- a/src/core/lib/iomgr/workqueue_windows.h
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_CORE_LIB_IOMGR_WORKQUEUE_WINDOWS_H
-#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_WINDOWS_H
-
-#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_WINDOWS_H */
diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c
index dff05633ec..ac1cce722f 100644
--- a/src/core/lib/security/transport/client_auth_filter.c
+++ b/src/core/lib/security/transport/client_auth_filter.c
@@ -65,7 +65,8 @@ typedef struct {
*/
grpc_polling_entity *pollent;
grpc_transport_stream_op_batch op;
- uint8_t security_context_set;
+ gpr_atm security_context_set;
+ gpr_mu security_context_mu;
grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT];
grpc_auth_metadata_context auth_md_context;
} call_data;
@@ -253,19 +254,26 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_linked_mdelem *l;
grpc_client_security_context *sec_ctx = NULL;
- if (!op->cancel_stream && calld->security_context_set == 0) {
- calld->security_context_set = 1;
- GPR_ASSERT(op->payload->context != NULL);
- if (op->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) {
- op->payload->context[GRPC_CONTEXT_SECURITY].value =
- grpc_client_security_context_create();
- op->payload->context[GRPC_CONTEXT_SECURITY].destroy =
- grpc_client_security_context_destroy;
+ if (!op->cancel_stream) {
+ /* double checked lock over security context to ensure it's set once */
+ if (gpr_atm_acq_load(&calld->security_context_set) == 0) {
+ gpr_mu_lock(&calld->security_context_mu);
+ if (gpr_atm_acq_load(&calld->security_context_set) == 0) {
+ GPR_ASSERT(op->payload->context != NULL);
+ if (op->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) {
+ op->payload->context[GRPC_CONTEXT_SECURITY].value =
+ grpc_client_security_context_create();
+ op->payload->context[GRPC_CONTEXT_SECURITY].destroy =
+ grpc_client_security_context_destroy;
+ }
+ sec_ctx = op->payload->context[GRPC_CONTEXT_SECURITY].value;
+ GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter");
+ sec_ctx->auth_context =
+ GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter");
+ gpr_atm_rel_store(&calld->security_context_set, 1);
+ }
+ gpr_mu_unlock(&calld->security_context_mu);
}
- sec_ctx = op->payload->context[GRPC_CONTEXT_SECURITY].value;
- GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter");
- sec_ctx->auth_context =
- GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter");
}
if (op->send_initial_metadata) {
@@ -312,6 +320,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
memset(calld, 0, sizeof(*calld));
+ gpr_mu_init(&calld->security_context_mu);
return GRPC_ERROR_NONE;
}
@@ -335,6 +344,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_slice_unref_internal(exec_ctx, calld->method);
}
reset_auth_metadata_context(&calld->auth_md_context);
+ gpr_mu_destroy(&calld->security_context_mu);
}
/* Constructor for channel_data */
diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c
index 48d368a2a7..39b39379a6 100644
--- a/src/core/lib/security/transport/secure_endpoint.c
+++ b/src/core/lib/security/transport/secure_endpoint.c
@@ -380,11 +380,6 @@ static int endpoint_get_fd(grpc_endpoint *secure_ep) {
return grpc_endpoint_get_fd(ep->wrapped_ep);
}
-static grpc_workqueue *endpoint_get_workqueue(grpc_endpoint *secure_ep) {
- secure_endpoint *ep = (secure_endpoint *)secure_ep;
- return grpc_endpoint_get_workqueue(ep->wrapped_ep);
-}
-
static grpc_resource_user *endpoint_get_resource_user(
grpc_endpoint *secure_ep) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
@@ -393,7 +388,6 @@ static grpc_resource_user *endpoint_get_resource_user(
static const grpc_endpoint_vtable vtable = {endpoint_read,
endpoint_write,
- endpoint_get_workqueue,
endpoint_add_to_pollset,
endpoint_add_to_pollset_set,
endpoint_shutdown,
diff --git a/src/core/lib/support/log.c b/src/core/lib/support/log.c
index af1651dae5..4d15553768 100644
--- a/src/core/lib/support/log.c
+++ b/src/core/lib/support/log.c
@@ -43,7 +43,7 @@
#include <string.h>
extern void gpr_default_log(gpr_log_func_args *args);
-static gpr_log_func g_log_func = gpr_default_log;
+static gpr_atm g_log_func = (gpr_atm)gpr_default_log;
static gpr_atm g_min_severity_to_print = GPR_LOG_VERBOSITY_UNSET;
const char *gpr_log_severity_string(gpr_log_severity severity) {
@@ -70,7 +70,7 @@ void gpr_log_message(const char *file, int line, gpr_log_severity severity,
lfargs.line = line;
lfargs.severity = severity;
lfargs.message = message;
- g_log_func(&lfargs);
+ ((gpr_log_func)gpr_atm_no_barrier_load(&g_log_func))(&lfargs);
}
void gpr_set_log_verbosity(gpr_log_severity min_severity_to_print) {
@@ -99,5 +99,5 @@ void gpr_log_verbosity_init() {
}
void gpr_set_log_function(gpr_log_func f) {
- g_log_func = f ? f : gpr_default_log;
+ gpr_atm_no_barrier_store(&g_log_func, (gpr_atm)(f ? f : gpr_default_log));
}
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 201969cd45..7e25a5142f 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -1468,7 +1468,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op_batch *stream_op = &bctl->op;
grpc_transport_stream_op_batch_payload *stream_op_payload =
&call->stream_op_payload;
- stream_op->covered_by_poller = true;
/* rewrite batch ops into a transport op */
for (i = 0; i < nops; i++) {
@@ -1657,10 +1656,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
- /* IF this is a server, then GRPC_OP_RECV_INITIAL_METADATA *must* come
- from server.c. In that case, it's coming from accept_stream, and in
- that case we're not necessarily covered by a poller. */
- stream_op->covered_by_poller = call->is_client;
call->received_initial_metadata = true;
call->buffered_metadata[0] =
op->data.recv_initial_metadata.recv_initial_metadata;
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 6163776152..86ce4bde61 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -128,6 +128,7 @@ void grpc_init(void) {
int i;
gpr_once_init(&g_basic_init, do_basic_init);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_mu_lock(&g_init_mu);
if (++g_initializations == 1) {
gpr_time_init();
@@ -154,8 +155,7 @@ void grpc_init(void) {
grpc_register_tracer("pending_tags", &grpc_trace_pending_tags);
#endif
grpc_security_pre_init();
- grpc_iomgr_init();
- grpc_executor_init();
+ grpc_iomgr_init(&exec_ctx);
gpr_timers_global_init();
grpc_handshaker_factory_registry_init();
grpc_security_init();
@@ -171,19 +171,20 @@ void grpc_init(void) {
grpc_tracer_init("GRPC_TRACE");
/* no more changes to channel init pipelines */
grpc_channel_init_finalize();
- grpc_iomgr_start();
+ grpc_iomgr_start(&exec_ctx);
}
gpr_mu_unlock(&g_init_mu);
+ grpc_exec_ctx_finish(&exec_ctx);
GRPC_API_TRACE("grpc_init(void)", 0, ());
}
void grpc_shutdown(void) {
int i;
GRPC_API_TRACE("grpc_shutdown(void)", 0, ());
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_exec_ctx exec_ctx =
+ GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
gpr_mu_lock(&g_init_mu);
if (--g_initializations == 0) {
- grpc_executor_shutdown(&exec_ctx);
grpc_iomgr_shutdown(&exec_ctx);
gpr_timers_global_destroy();
grpc_tracer_shutdown();
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 93369cc689..dd3a987eff 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -127,10 +127,6 @@ typedef struct grpc_transport_stream_op_batch {
/** Values for the stream op (fields set are determined by flags above) */
grpc_transport_stream_op_batch_payload *payload;
- /** Is the completion of this op covered by a poller (if false: the op should
- complete independently of some pollset being polled) */
- bool covered_by_poller : 1;
-
/** Send initial metadata to the peer, from the provided metadata batch. */
bool send_initial_metadata : 1;
diff --git a/src/core/lib/transport/transport_op_string.c b/src/core/lib/transport/transport_op_string.c
index 3a2a793e01..5e7ac5b626 100644
--- a/src/core/lib/transport/transport_op_string.c
+++ b/src/core/lib/transport/transport_op_string.c
@@ -79,9 +79,6 @@ char *grpc_transport_stream_op_batch_string(
gpr_strvec b;
gpr_strvec_init(&b);
- gpr_strvec_add(
- &b, gpr_strdup(op->covered_by_poller ? "[COVERED]" : "[UNCOVERED]"));
-
if (op->send_initial_metadata) {
gpr_strvec_add(&b, gpr_strdup(" "));
gpr_strvec_add(&b, gpr_strdup("SEND_INITIAL_METADATA{"));
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 8a1a58bb86..30529c70b1 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -158,8 +158,6 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/wakeup_fd_nospecial.c',
'src/core/lib/iomgr/wakeup_fd_pipe.c',
'src/core/lib/iomgr/wakeup_fd_posix.c',
- 'src/core/lib/iomgr/workqueue_uv.c',
- 'src/core/lib/iomgr/workqueue_windows.c',
'src/core/lib/json/json.c',
'src/core/lib/json/json_reader.c',
'src/core/lib/json/json_string.c',