aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-06-08 16:03:36 -0700
committerGravatar GitHub <noreply@github.com>2017-06-08 16:03:36 -0700
commitdc1685564991c65f1c177c2363c93441bc35134c (patch)
tree72cb361a167913363b015c93a2192332ddf51397 /src
parent8f3d02106b72783f61430c84d3501ed3d88b3415 (diff)
parentae6083674ad9fef86223968c5ffe12bc5a133d83 (diff)
Merge pull request #11336 from ctiller/ALL-the-things
Roll-up of threading changes
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c201
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.c9
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c20
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c2
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c4
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c4
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c7
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c139
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_ping.c2
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_window_update.c3
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.c8
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h5
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c2
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c3
-rw-r--r--src/core/lib/iomgr/combiner.c245
-rw-r--r--src/core/lib/iomgr/combiner.h8
-rw-r--r--src/core/lib/iomgr/endpoint.c4
-rw-r--r--src/core/lib/iomgr/endpoint.h4
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.c100
-rw-r--r--src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c198
-rw-r--r--src/core/lib/iomgr/ev_epoll_thread_pool_linux.c172
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c156
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.c294
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c31
-rw-r--r--src/core/lib/iomgr/ev_posix.c26
-rw-r--r--src/core/lib/iomgr/ev_posix.h16
-rw-r--r--src/core/lib/iomgr/exec_ctx.c1
-rw-r--r--src/core/lib/iomgr/executor.c244
-rw-r--r--src/core/lib/iomgr/executor.h9
-rw-r--r--src/core/lib/iomgr/iomgr.c7
-rw-r--r--src/core/lib/iomgr/iomgr.h4
-rw-r--r--src/core/lib/iomgr/resource_quota.c31
-rw-r--r--src/core/lib/iomgr/tcp_posix.c19
-rw-r--r--src/core/lib/iomgr/tcp_uv.c9
-rw-r--r--src/core/lib/iomgr/tcp_windows.c16
-rw-r--r--src/core/lib/iomgr/workqueue.h72
-rw-r--r--src/core/lib/iomgr/workqueue_uv.c50
-rw-r--r--src/core/lib/iomgr/workqueue_uv.h22
-rw-r--r--src/core/lib/iomgr/workqueue_windows.c48
-rw-r--r--src/core/lib/iomgr/workqueue_windows.h22
-rw-r--r--src/core/lib/security/transport/client_auth_filter.c36
-rw-r--r--src/core/lib/security/transport/secure_endpoint.c6
-rw-r--r--src/core/lib/surface/call.c5
-rw-r--r--src/core/lib/surface/init.c11
-rw-r--r--src/core/lib/transport/transport.h4
-rw-r--r--src/core/lib/transport/transport_op_string.c3
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py2
48 files changed, 577 insertions, 1709 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index 013096d46b..3ec539884b 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -276,7 +276,7 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
w->chand = chand;
grpc_closure_init(&w->on_changed, on_lb_policy_state_changed_locked, w,
- grpc_combiner_scheduler(chand->combiner, false));
+ grpc_combiner_scheduler(chand->combiner));
w->state = current_state;
w->lb_policy = lb_policy;
grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state,
@@ -637,7 +637,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_closure_sched(
exec_ctx,
grpc_closure_init(&op->handler_private.closure, start_transport_op_locked,
- op, grpc_combiner_scheduler(chand->combiner, false)),
+ op, grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
}
@@ -668,7 +668,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
// Initialize data members.
- chand->combiner = grpc_combiner_create(NULL);
+ chand->combiner = grpc_combiner_create();
gpr_mu_init(&chand->info_mu);
gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
@@ -679,7 +679,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
chand->owning_stack = args->channel_stack;
grpc_closure_init(&chand->on_resolver_result_changed,
on_resolver_result_changed_locked, chand,
- grpc_combiner_scheduler(chand->combiner, false));
+ grpc_combiner_scheduler(chand->combiner));
chand->interested_parties = grpc_pollset_set_create();
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
@@ -738,9 +738,8 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
channel_data *chand = elem->channel_data;
if (chand->resolver != NULL) {
grpc_closure_sched(
- exec_ctx,
- grpc_closure_create(shutdown_resolver_locked, chand->resolver,
- grpc_combiner_scheduler(chand->combiner, false)),
+ exec_ctx, grpc_closure_create(shutdown_resolver_locked, chand->resolver,
+ grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
}
if (chand->client_channel_factory != NULL) {
@@ -771,11 +770,6 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
* PER-CALL FUNCTIONS
*/
-#define GET_CALL(call_data) \
- ((grpc_subchannel_call *)(gpr_atm_acq_load(&(call_data)->subchannel_call)))
-
-#define CANCELLED_CALL ((grpc_subchannel_call *)1)
-
/** Call data. Holds a pointer to grpc_subchannel_call and the
associated machinery to create such a pointer.
Handles queueing of stream ops until a call object is ready, waiting
@@ -796,11 +790,9 @@ typedef struct client_channel_call_data {
grpc_server_retry_throttle_data *retry_throttle_data;
method_parameters *method_params;
- grpc_error *cancel_error;
-
- /** either 0 for no call, 1 for cancelled, or a pointer to a
- grpc_subchannel_call */
- gpr_atm subchannel_call;
+ /** either 0 for no call, a pointer to a grpc_subchannel_call (if the lowest
+ bit is 0), or a pointer to an error (if the lowest bit is 1) */
+ gpr_atm subchannel_call_or_error;
gpr_arena *arena;
bool pick_pending;
@@ -822,10 +814,43 @@ typedef struct client_channel_call_data {
grpc_closure *original_on_complete;
} call_data;
+typedef struct {
+ grpc_subchannel_call *subchannel_call;
+ grpc_error *error;
+} call_or_error;
+
+static call_or_error get_call_or_error(call_data *p) {
+ gpr_atm c = gpr_atm_acq_load(&p->subchannel_call_or_error);
+ if (c == 0)
+ return (call_or_error){NULL, NULL};
+ else if (c & 1)
+ return (call_or_error){NULL, (grpc_error *)((c) & ~(gpr_atm)1)};
+ else
+ return (call_or_error){(grpc_subchannel_call *)c, NULL};
+}
+
+static bool set_call_or_error(call_data *p, call_or_error coe) {
+ // this should always be under a lock
+ call_or_error existing = get_call_or_error(p);
+ if (existing.error != GRPC_ERROR_NONE) {
+ GRPC_ERROR_UNREF(coe.error);
+ return false;
+ }
+ GPR_ASSERT(existing.subchannel_call == NULL);
+ if (coe.error != GRPC_ERROR_NONE) {
+ GPR_ASSERT(coe.subchannel_call == NULL);
+ gpr_atm_rel_store(&p->subchannel_call_or_error, 1 | (gpr_atm)coe.error);
+ } else {
+ GPR_ASSERT(coe.subchannel_call != NULL);
+ gpr_atm_rel_store(&p->subchannel_call_or_error,
+ (gpr_atm)coe.subchannel_call);
+ }
+ return true;
+}
+
grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
grpc_call_element *call_elem) {
- grpc_subchannel_call *scc = GET_CALL((call_data *)call_elem->call_data);
- return scc == CANCELLED_CALL ? NULL : scc;
+ return get_call_or_error(call_elem->call_data).subchannel_call;
}
static void add_waiting_locked(call_data *calld,
@@ -857,18 +882,18 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
return;
}
- grpc_subchannel_call *call = GET_CALL(calld);
+ call_or_error call = get_call_or_error(calld);
grpc_transport_stream_op_batch **ops = calld->waiting_ops;
size_t nops = calld->waiting_ops_count;
- if (call == CANCELLED_CALL) {
- fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED);
+ if (call.error != GRPC_ERROR_NONE) {
+ fail_locked(exec_ctx, calld, GRPC_ERROR_REF(call.error));
return;
}
calld->waiting_ops = NULL;
calld->waiting_ops_count = 0;
calld->waiting_ops_capacity = 0;
for (size_t i = 0; i < nops; i++) {
- grpc_subchannel_call_process_op(exec_ctx, call, ops[i]);
+ grpc_subchannel_call_process_op(exec_ctx, call.subchannel_call, ops[i]);
}
gpr_free(ops);
}
@@ -929,19 +954,23 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
calld->pick_pending = false;
grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
chand->interested_parties);
+ call_or_error coe = get_call_or_error(calld);
if (calld->connected_subchannel == NULL) {
- gpr_atm_no_barrier_store(&calld->subchannel_call, (gpr_atm)CANCELLED_CALL);
- fail_locked(exec_ctx, calld,
- error == GRPC_ERROR_NONE
- ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Call dropped by load balancing policy")
- : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Failed to create subchannel", &error, 1));
- } else if (GET_CALL(calld) == CANCELLED_CALL) {
+ grpc_error *failure =
+ error == GRPC_ERROR_NONE
+ ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Call dropped by load balancing policy")
+ : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Failed to create subchannel", &error, 1);
+ set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)});
+ fail_locked(exec_ctx, calld, failure);
+ } else if (coe.error != GRPC_ERROR_NONE) {
/* already cancelled before subchannel became ready */
+ grpc_error *child_errors[] = {error, coe.error};
grpc_error *cancellation_error =
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Cancelled before creating subchannel", &error, 1);
+ "Cancelled before creating subchannel", child_errors,
+ GPR_ARRAY_SIZE(child_errors));
/* if due to deadline, attach the deadline exceeded status to the error */
if (gpr_time_cmp(calld->deadline, gpr_now(GPR_CLOCK_MONOTONIC)) < 0) {
cancellation_error =
@@ -961,8 +990,8 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
.context = calld->subchannel_call_context};
grpc_error *new_error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
- gpr_atm_rel_store(&calld->subchannel_call,
- (gpr_atm)(uintptr_t)subchannel_call);
+ GPR_ASSERT(set_call_or_error(
+ calld, (call_or_error){.subchannel_call = subchannel_call}));
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
fail_locked(exec_ctx, calld, new_error);
@@ -975,8 +1004,9 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
call_data *calld = elem->call_data;
- grpc_subchannel_call *subchannel_call = GET_CALL(calld);
- if (subchannel_call == NULL || subchannel_call == CANCELLED_CALL) {
+ grpc_subchannel_call *subchannel_call =
+ get_call_or_error(calld).subchannel_call;
+ if (subchannel_call == NULL) {
return NULL;
} else {
return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
@@ -1118,7 +1148,7 @@ static bool pick_subchannel_locked(
cpa->on_ready = on_ready;
cpa->elem = elem;
grpc_closure_init(&cpa->closure, continue_picking_locked, cpa,
- grpc_combiner_scheduler(chand->combiner, true));
+ grpc_combiner_scheduler(chand->combiner));
grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure,
GRPC_ERROR_NONE);
} else {
@@ -1135,58 +1165,45 @@ static void start_transport_stream_op_batch_locked_inner(
grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- grpc_subchannel_call *call;
/* need to recheck that another thread hasn't set the call */
- call = GET_CALL(calld);
- if (call == CANCELLED_CALL) {
+ call_or_error coe = get_call_or_error(calld);
+ if (coe.error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
+ exec_ctx, op, GRPC_ERROR_REF(coe.error));
/* early out */
return;
}
- if (call != NULL) {
- grpc_subchannel_call_process_op(exec_ctx, call, op);
+ if (coe.subchannel_call != NULL) {
+ grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op);
/* early out */
return;
}
/* if this is a cancellation, then we can raise our cancelled flag */
if (op->cancel_stream) {
- if (!gpr_atm_rel_cas(&calld->subchannel_call, 0,
- (gpr_atm)(uintptr_t)CANCELLED_CALL)) {
- /* recurse to retry */
- start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
- /* early out */
- return;
+ grpc_error *error = op->payload->cancel_stream.cancel_error;
+ /* Stash a copy of cancel_error in our call data, so that we can use
+ it for subsequent operations. This ensures that if the call is
+ cancelled before any ops are passed down (e.g., if the deadline
+ is in the past when the call starts), we can return the right
+ error to the caller when the first op does get passed down. */
+ set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)});
+ if (calld->pick_pending) {
+ cancel_pick_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
} else {
- /* Stash a copy of cancel_error in our call data, so that we can use
- it for subsequent operations. This ensures that if the call is
- cancelled before any ops are passed down (e.g., if the deadline
- is in the past when the call starts), we can return the right
- error to the caller when the first op does get passed down. */
- calld->cancel_error =
- GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error);
- if (calld->pick_pending) {
- cancel_pick_locked(
- exec_ctx, elem,
- GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
- } else {
- fail_locked(exec_ctx, calld,
- GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
- }
- grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op,
- GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
- /* early out */
- return;
+ fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
}
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op,
+ GRPC_ERROR_REF(error));
+ /* early out */
+ return;
}
/* if we don't have a subchannel, try to get one */
if (!calld->pick_pending && calld->connected_subchannel == NULL &&
op->send_initial_metadata) {
calld->pick_pending = true;
grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem,
- grpc_combiner_scheduler(chand->combiner, true));
+ grpc_combiner_scheduler(chand->combiner));
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
/* If a subchannel is not available immediately, the polling entity from
call_data should be provided to channel_data's interested_parties, so
@@ -1200,10 +1217,10 @@ static void start_transport_stream_op_batch_locked_inner(
calld->pick_pending = false;
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
if (calld->connected_subchannel == NULL) {
- gpr_atm_no_barrier_store(&calld->subchannel_call,
- (gpr_atm)CANCELLED_CALL);
grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Call dropped by load balancing policy");
+ set_call_or_error(calld,
+ (call_or_error){.error = GRPC_ERROR_REF(error)});
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
return; // Early out.
@@ -1225,8 +1242,8 @@ static void start_transport_stream_op_batch_locked_inner(
.context = calld->subchannel_call_context};
grpc_error *error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
- gpr_atm_rel_store(&calld->subchannel_call,
- (gpr_atm)(uintptr_t)subchannel_call);
+ GPR_ASSERT(set_call_or_error(
+ calld, (call_or_error){.subchannel_call = subchannel_call}));
if (error != GRPC_ERROR_NONE) {
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
@@ -1305,17 +1322,17 @@ static void cc_start_transport_stream_op_batch(
op);
}
/* try to (atomically) get the call */
- grpc_subchannel_call *call = GET_CALL(calld);
+ call_or_error coe = get_call_or_error(calld);
GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
- if (call == CANCELLED_CALL) {
+ if (coe.error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
+ exec_ctx, op, GRPC_ERROR_REF(coe.error));
GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
/* early out */
return;
}
- if (call != NULL) {
- grpc_subchannel_call_process_op(exec_ctx, call, op);
+ if (coe.subchannel_call != NULL) {
+ grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op);
GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
/* early out */
return;
@@ -1324,10 +1341,9 @@ static void cc_start_transport_stream_op_batch(
GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch");
op->handler_private.extra_arg = elem;
grpc_closure_sched(
- exec_ctx,
- grpc_closure_init(&op->handler_private.closure,
- start_transport_stream_op_batch_locked, op,
- grpc_combiner_scheduler(chand->combiner, false)),
+ exec_ctx, grpc_closure_init(&op->handler_private.closure,
+ start_transport_stream_op_batch_locked, op,
+ grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
}
@@ -1364,12 +1380,14 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
if (calld->method_params != NULL) {
method_parameters_unref(calld->method_params);
}
- GRPC_ERROR_UNREF(calld->cancel_error);
- grpc_subchannel_call *call = GET_CALL(calld);
- if (call != NULL && call != CANCELLED_CALL) {
- grpc_subchannel_call_set_cleanup_closure(call, then_schedule_closure);
+ call_or_error coe = get_call_or_error(calld);
+ GRPC_ERROR_UNREF(coe.error);
+ if (coe.subchannel_call != NULL) {
+ grpc_subchannel_call_set_cleanup_closure(coe.subchannel_call,
+ then_schedule_closure);
then_schedule_closure = NULL;
- GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");
+ GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, coe.subchannel_call,
+ "client_channel_destroy_call");
}
GPR_ASSERT(!calld->pick_pending);
GPR_ASSERT(calld->waiting_ops_count == 0);
@@ -1439,9 +1457,8 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
grpc_closure_sched(
- exec_ctx,
- grpc_closure_create(try_to_connect_locked, chand,
- grpc_combiner_scheduler(chand->combiner, false)),
+ exec_ctx, grpc_closure_create(try_to_connect_locked, chand,
+ grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
}
return out;
@@ -1578,6 +1595,6 @@ void grpc_client_channel_watch_connectivity_state(
grpc_closure_sched(
exec_ctx,
grpc_closure_init(&w->my_closure, watch_connectivity_state_locked, w,
- grpc_combiner_scheduler(chand->combiner, true)),
+ grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
}
diff --git a/src/core/ext/filters/client_channel/lb_policy.c b/src/core/ext/filters/client_channel/lb_policy.c
index e3efb735df..0c8e179925 100644
--- a/src/core/ext/filters/client_channel/lb_policy.c
+++ b/src/core/ext/filters/client_channel/lb_policy.c
@@ -74,11 +74,10 @@ void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx,
gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1);
gpr_atm check = 1 << WEAK_REF_BITS;
if ((old_val & mask) == check) {
- grpc_closure_sched(
- exec_ctx,
- grpc_closure_create(shutdown_locked, policy,
- grpc_combiner_scheduler(policy->combiner, false)),
- GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx, grpc_closure_create(
+ shutdown_locked, policy,
+ grpc_combiner_scheduler(policy->combiner)),
+ GRPC_ERROR_NONE);
} else {
grpc_lb_policy_weak_unref(exec_ctx,
policy REF_FUNC_PASS_ARGS("strong-unref"));
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index 5ecbd3ba93..d3182f35fe 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -741,7 +741,7 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
gpr_zalloc(sizeof(rr_connectivity_data));
grpc_closure_init(&rr_connectivity->on_change,
glb_rr_connectivity_changed_locked, rr_connectivity,
- grpc_combiner_scheduler(glb_policy->base.combiner, false));
+ grpc_combiner_scheduler(glb_policy->base.combiner));
rr_connectivity->glb_policy = glb_policy;
rr_connectivity->state = rr_state;
@@ -1006,7 +1006,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&glb_policy->lb_channel_on_connectivity_changed,
glb_lb_channel_on_connectivity_changed_cb, glb_policy,
- grpc_combiner_scheduler(args->combiner, false));
+ grpc_combiner_scheduler(args->combiner));
grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
"grpclb");
@@ -1252,7 +1252,7 @@ static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx,
gpr_time_add(now, glb_policy->client_stats_report_interval);
grpc_closure_init(&glb_policy->client_load_report_closure,
send_client_load_report_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner, false));
+ grpc_combiner_scheduler(glb_policy->base.combiner));
grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer,
next_client_load_report_time,
&glb_policy->client_load_report_closure, now);
@@ -1280,7 +1280,7 @@ static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx,
op.data.send_message.send_message = glb_policy->client_load_report_payload;
grpc_closure_init(&glb_policy->client_load_report_closure,
client_load_report_done_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner, false));
+ grpc_combiner_scheduler(glb_policy->base.combiner));
grpc_call_error call_error = grpc_call_start_batch_and_execute(
exec_ctx, glb_policy->lb_call, &op, 1,
&glb_policy->client_load_report_closure);
@@ -1386,13 +1386,13 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&glb_policy->lb_on_sent_initial_request,
lb_on_sent_initial_request_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner, false));
+ grpc_combiner_scheduler(glb_policy->base.combiner));
grpc_closure_init(&glb_policy->lb_on_server_status_received,
lb_on_server_status_received_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner, false));
+ grpc_combiner_scheduler(glb_policy->base.combiner));
grpc_closure_init(&glb_policy->lb_on_response_received,
lb_on_response_received_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner, false));
+ grpc_combiner_scheduler(glb_policy->base.combiner));
gpr_backoff_init(&glb_policy->lb_call_backoff_state,
GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS,
@@ -1693,9 +1693,9 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
}
}
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
- grpc_closure_init(
- &glb_policy->lb_on_call_retry, lb_call_on_retry_timer_locked,
- glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner, false));
+ grpc_closure_init(&glb_policy->lb_on_call_retry,
+ lb_call_on_retry_timer_locked, glb_policy,
+ grpc_combiner_scheduler(glb_policy->base.combiner));
glb_policy->retry_timer_active = true;
grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try,
&glb_policy->lb_on_call_retry, now);
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
index 51cc632649..0e2b4131f1 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
@@ -655,7 +655,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
pf_update_locked(exec_ctx, &p->base, args);
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed_locked, p,
- grpc_combiner_scheduler(args->combiner, false));
+ grpc_combiner_scheduler(args->combiner));
return &p->base;
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index 33d9522380..9ecb0a39da 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -749,7 +749,7 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
sd->subchannel = subchannel;
grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed_locked, sd,
- grpc_combiner_scheduler(args->combiner, false));
+ grpc_combiner_scheduler(args->combiner));
/* use some sentinel value outside of the range of
* grpc_connectivity_state to signal an undefined previous state. We
* won't be referring to this value again and it'll be overwritten after
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c
index 2bf44a4f1b..92f060b422 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c
@@ -268,10 +268,10 @@ static grpc_resolver *dns_ares_create(grpc_exec_ctx *exec_ctx,
GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
grpc_closure_init(&r->dns_ares_on_retry_timer_locked,
dns_ares_on_retry_timer_locked, r,
- grpc_combiner_scheduler(r->base.combiner, false));
+ grpc_combiner_scheduler(r->base.combiner));
grpc_closure_init(&r->dns_ares_on_resolved_locked,
dns_ares_on_resolved_locked, r,
- grpc_combiner_scheduler(r->base.combiner, false));
+ grpc_combiner_scheduler(r->base.combiner));
return &r->base;
}
diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c
index afa978ff01..6c263c32f9 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c
@@ -179,7 +179,7 @@ static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg,
gpr_log(GPR_DEBUG, "retrying immediately");
}
grpc_closure_init(&r->on_retry, dns_on_retry_timer_locked, r,
- grpc_combiner_scheduler(r->base.combiner, false));
+ grpc_combiner_scheduler(r->base.combiner));
grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->on_retry, now);
}
if (r->resolved_result != NULL) {
@@ -201,7 +201,7 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx,
grpc_resolve_address(
exec_ctx, r->name_to_resolve, r->default_port, r->interested_parties,
grpc_closure_create(dns_on_resolved_locked, r,
- grpc_combiner_scheduler(r->base.combiner, false)),
+ grpc_combiner_scheduler(r->base.combiner)),
&r->addresses);
}
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
index 59560e068d..cbb0e628ed 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
@@ -158,10 +158,9 @@ void grpc_fake_resolver_response_generator_set_response(
GPR_ASSERT(generator->resolver != NULL);
generator->next_response = grpc_channel_args_copy(next_response);
grpc_closure_sched(
- exec_ctx,
- grpc_closure_create(
- set_response_cb, generator,
- grpc_combiner_scheduler(generator->resolver->base.combiner, false)),
+ exec_ctx, grpc_closure_create(set_response_cb, generator,
+ grpc_combiner_scheduler(
+ generator->resolver->base.combiner)),
GRPC_ERROR_NONE);
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 0428cf4e5b..bdebc8eeec 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -35,7 +35,6 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/timer.h"
-#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
@@ -252,7 +251,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->ep = ep;
/* one ref is for destroy */
gpr_ref_init(&t->refs, 1);
- t->combiner = grpc_combiner_create(grpc_endpoint_get_workqueue(ep));
+ t->combiner = grpc_combiner_create();
t->peer_string = grpc_endpoint_get_peer(ep);
t->endpoint_reading = 1;
t->next_stream_id = is_client ? 1 : 2;
@@ -273,29 +272,29 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init(&t->write_action, write_action, t,
grpc_schedule_on_exec_ctx);
grpc_closure_init(&t->read_action_locked, read_action_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
+ grpc_combiner_scheduler(t->combiner));
grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
+ grpc_combiner_scheduler(t->combiner));
grpc_closure_init(&t->destructive_reclaimer_locked,
destructive_reclaimer_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
+ grpc_combiner_scheduler(t->combiner));
grpc_closure_init(&t->retry_initiate_ping_locked, retry_initiate_ping_locked,
- t, grpc_combiner_scheduler(t->combiner, false));
+ t, grpc_combiner_scheduler(t->combiner));
grpc_closure_init(&t->start_bdp_ping_locked, start_bdp_ping_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
+ grpc_combiner_scheduler(t->combiner));
grpc_closure_init(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
+ grpc_combiner_scheduler(t->combiner));
grpc_closure_init(&t->init_keepalive_ping_locked, init_keepalive_ping_locked,
- t, grpc_combiner_scheduler(t->combiner, false));
+ t, grpc_combiner_scheduler(t->combiner));
grpc_closure_init(&t->start_keepalive_ping_locked,
start_keepalive_ping_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
+ grpc_combiner_scheduler(t->combiner));
grpc_closure_init(&t->finish_keepalive_ping_locked,
finish_keepalive_ping_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
+ grpc_combiner_scheduler(t->combiner));
grpc_closure_init(&t->keepalive_watchdog_fired_locked,
keepalive_watchdog_fired_locked, t,
- grpc_combiner_scheduler(t->combiner, false));
+ grpc_combiner_scheduler(t->combiner));
grpc_bdp_estimator_init(&t->bdp_estimator, t->peer_string);
t->last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC);
@@ -338,7 +337,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (is_client) {
grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string(
GRPC_CHTTP2_CLIENT_CONNECT_STRING));
- grpc_chttp2_initiate_write(exec_ctx, t, false, "initial_write");
+ grpc_chttp2_initiate_write(exec_ctx, t, "initial_write");
}
/* configure http2 the way we like it */
@@ -550,7 +549,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
}
- grpc_chttp2_initiate_write(exec_ctx, t, false, "init");
+ grpc_chttp2_initiate_write(exec_ctx, t, "init");
post_benign_reclaimer(exec_ctx, t);
}
@@ -568,9 +567,9 @@ static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp,
static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_closure_sched(exec_ctx, grpc_closure_create(
- destroy_transport_locked, t,
- grpc_combiner_scheduler(t->combiner, false)),
+ grpc_closure_sched(exec_ctx,
+ grpc_closure_create(destroy_transport_locked, t,
+ grpc_combiner_scheduler(t->combiner)),
GRPC_ERROR_NONE);
}
@@ -663,7 +662,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_slice_buffer_init(&s->frame_storage);
s->pending_byte_stream = false;
grpc_closure_init(&s->reset_byte_stream, reset_byte_stream, s,
- grpc_combiner_scheduler(t->combiner, false));
+ grpc_combiner_scheduler(t->combiner));
GRPC_CHTTP2_REF_TRANSPORT(t, "stream");
@@ -747,7 +746,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->destroy_stream_arg = then_schedule_closure;
grpc_closure_sched(
exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s,
- grpc_combiner_scheduler(t->combiner, false)),
+ grpc_combiner_scheduler(t->combiner)),
GRPC_ERROR_NONE);
GPR_TIMER_END("destroy_stream", 0);
}
@@ -785,8 +784,6 @@ static const char *write_state_name(grpc_chttp2_write_state st) {
return "WRITING";
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
return "WRITING+MORE";
- case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER:
- return "WRITING+MORE+COVERED";
}
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
@@ -809,8 +806,7 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- bool covered_by_poller, const char *reason) {
+ grpc_chttp2_transport *t, const char *reason) {
GPR_TIMER_BEGIN("grpc_chttp2_initiate_write", 0);
switch (t->write_state) {
@@ -819,28 +815,16 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_closure_sched(
exec_ctx,
- grpc_closure_init(
- &t->write_action_begin_locked, write_action_begin_locked, t,
- grpc_combiner_finally_scheduler(t->combiner, covered_by_poller)),
+ grpc_closure_init(&t->write_action_begin_locked,
+ write_action_begin_locked, t,
+ grpc_combiner_finally_scheduler(t->combiner)),
GRPC_ERROR_NONE);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING:
- set_write_state(
- exec_ctx, t,
- covered_by_poller
- ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER
- : GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
- reason);
+ set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
+ reason);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
- if (covered_by_poller) {
- set_write_state(
- exec_ctx, t,
- GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER,
- reason);
- }
- break;
- case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER:
break;
}
GPR_TIMER_END("grpc_chttp2_initiate_write", 0);
@@ -856,10 +840,10 @@ void grpc_chttp2_become_writable(
case GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK:
break;
case GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED:
- grpc_chttp2_initiate_write(exec_ctx, t, true, reason);
+ grpc_chttp2_initiate_write(exec_ctx, t, reason);
break;
case GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED:
- grpc_chttp2_initiate_write(exec_ctx, t, false, reason);
+ grpc_chttp2_initiate_write(exec_ctx, t, reason);
break;
}
}
@@ -896,7 +880,7 @@ static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) {
grpc_endpoint_write(
exec_ctx, t->ep, &t->outbuf,
grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t,
- grpc_combiner_scheduler(t->combiner, false)));
+ grpc_combiner_scheduler(t->combiner)));
GPR_TIMER_END("write_action", 0);
}
@@ -932,21 +916,9 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_closure_run(
exec_ctx,
- grpc_closure_init(
- &t->write_action_begin_locked, write_action_begin_locked, t,
- grpc_combiner_finally_scheduler(t->combiner, false)),
- GRPC_ERROR_NONE);
- break;
- case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER:
- GPR_TIMER_MARK("state=writing_stale_with_poller", 0);
- set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
- "continue writing [covered]");
- GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
- grpc_closure_run(
- exec_ctx,
grpc_closure_init(&t->write_action_begin_locked,
write_action_begin_locked, t,
- grpc_combiner_finally_scheduler(t->combiner, true)),
+ grpc_combiner_finally_scheduler(t->combiner)),
GRPC_ERROR_NONE);
break;
}
@@ -969,7 +941,7 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) {
t->settings[GRPC_LOCAL_SETTINGS][id] = use_value;
t->dirtied_local_settings = 1;
- grpc_chttp2_initiate_write(exec_ctx, t, false, "push_setting");
+ grpc_chttp2_initiate_write(exec_ctx, t, "push_setting");
}
}
@@ -1365,7 +1337,6 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
s->next_message_end_offset = s->flow_controlled_bytes_written +
(int64_t)s->flow_controlled_buffer.length +
(int64_t)len;
- s->complete_fetch_covered_by_poller = op->covered_by_poller;
if (flags & GRPC_WRITE_BUFFER_HINT) {
s->next_message_end_offset -= t->write_buffer_size;
s->write_buffering = true;
@@ -1487,9 +1458,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
grpc_closure_sched(
exec_ctx,
- grpc_closure_init(
- &op->handler_private.closure, perform_stream_op_locked, op,
- grpc_combiner_scheduler(t->combiner, op->covered_by_poller)),
+ grpc_closure_init(&op->handler_private.closure, perform_stream_op_locked,
+ op, grpc_combiner_scheduler(t->combiner)),
GRPC_ERROR_NONE);
GPR_TIMER_END("perform_stream_op", 0);
}
@@ -1516,7 +1486,7 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_ERROR_NONE);
if (grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
GRPC_ERROR_NONE)) {
- grpc_chttp2_initiate_write(exec_ctx, t, false, "send_ping");
+ grpc_chttp2_initiate_write(exec_ctx, t, "send_ping");
}
}
@@ -1524,7 +1494,7 @@ static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
grpc_chttp2_transport *t = tp;
t->ping_state.is_delayed_ping_timer_set = false;
- grpc_chttp2_initiate_write(exec_ctx, t, false, "retry_send_ping");
+ grpc_chttp2_initiate_write(exec_ctx, t, "retry_send_ping");
}
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -1539,7 +1509,7 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
grpc_closure_list_sched(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
- grpc_chttp2_initiate_write(exec_ctx, t, false, "continue_pings");
+ grpc_chttp2_initiate_write(exec_ctx, t, "continue_pings");
}
}
@@ -1552,7 +1522,7 @@ static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
&slice, &http_error);
grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)http_error,
grpc_slice_ref_internal(slice), &t->qbuf);
- grpc_chttp2_initiate_write(exec_ctx, t, false, "goaway_sent");
+ grpc_chttp2_initiate_write(exec_ctx, t, "goaway_sent");
GRPC_ERROR_UNREF(error);
}
@@ -1578,12 +1548,6 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t = op->handler_private.extra_arg;
grpc_error *close_transport = op->disconnect_with_error;
- if (op->on_connectivity_state_change != NULL) {
- grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state,
- op->on_connectivity_state_change);
- }
-
if (op->goaway_error) {
send_goaway(exec_ctx, t, op->goaway_error);
}
@@ -1607,6 +1571,12 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
op->send_ping);
}
+ if (op->on_connectivity_state_change != NULL) {
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state,
+ op->on_connectivity_state_change);
+ }
+
if (close_transport != GRPC_ERROR_NONE) {
close_transport_locked(exec_ctx, t, close_transport);
}
@@ -1623,11 +1593,11 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
gpr_free(msg);
op->handler_private.extra_arg = gt;
GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
- grpc_closure_sched(
- exec_ctx, grpc_closure_init(&op->handler_private.closure,
- perform_transport_op_locked, op,
- grpc_combiner_scheduler(t->combiner, false)),
- GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx,
+ grpc_closure_init(&op->handler_private.closure,
+ perform_transport_op_locked, op,
+ grpc_combiner_scheduler(t->combiner)),
+ GRPC_ERROR_NONE);
}
/*******************************************************************************
@@ -1782,7 +1752,7 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_add(
&t->qbuf, grpc_chttp2_rst_stream_create(s->id, (uint32_t)http_error,
&s->stats.outgoing));
- grpc_chttp2_initiate_write(exec_ctx, t, false, "rst_stream");
+ grpc_chttp2_initiate_write(exec_ctx, t, "rst_stream");
}
}
if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) {
@@ -2095,7 +2065,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
&s->stats.outgoing));
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, 1, 1, error);
- grpc_chttp2_initiate_write(exec_ctx, t, false, "close_from_api");
+ grpc_chttp2_initiate_write(exec_ctx, t, "close_from_api");
}
typedef struct {
@@ -2626,9 +2596,9 @@ static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
bs->next_action.on_complete = on_complete;
grpc_closure_sched(
exec_ctx,
- grpc_closure_init(
- &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
- grpc_combiner_scheduler(bs->transport->combiner, false)),
+ grpc_closure_init(&bs->next_action.closure,
+ incoming_byte_stream_next_locked, bs,
+ grpc_combiner_scheduler(bs->transport->combiner)),
GRPC_ERROR_NONE);
GPR_TIMER_END("incoming_byte_stream_next", 0);
return false;
@@ -2683,10 +2653,9 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
grpc_closure_sched(
- exec_ctx,
- grpc_closure_init(
- &bs->destroy_action, incoming_byte_stream_destroy_locked, bs,
- grpc_combiner_scheduler(bs->transport->combiner, false)),
+ exec_ctx, grpc_closure_init(
+ &bs->destroy_action, incoming_byte_stream_destroy_locked,
+ bs, grpc_combiner_scheduler(bs->transport->combiner)),
GRPC_ERROR_NONE);
GPR_TIMER_END("incoming_byte_stream_destroy", 0);
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c
index 04354e0dc2..3d7c6fbfad 100644
--- a/src/core/ext/transport/chttp2/transport/frame_ping.c
+++ b/src/core/ext/transport/chttp2/transport/frame_ping.c
@@ -117,7 +117,7 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
t->ping_acks, t->ping_ack_capacity * sizeof(*t->ping_acks));
}
t->ping_acks[t->ping_ack_count++] = p->opaque_8bytes;
- grpc_chttp2_initiate_write(exec_ctx, t, false, "ping response");
+ grpc_chttp2_initiate_write(exec_ctx, t, "ping response");
}
}
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c
index 815a87cbe3..682be2c89b 100644
--- a/src/core/ext/transport/chttp2/transport/frame_window_update.c
+++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c
@@ -109,8 +109,7 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
received_update);
bool is_zero = t->outgoing_window <= 0;
if (was_zero && !is_zero) {
- grpc_chttp2_initiate_write(exec_ctx, t, false,
- "new_global_flow_control");
+ grpc_chttp2_initiate_write(exec_ctx, t, "new_global_flow_control");
}
}
}
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c
index 7476ff6188..ab5f3288ac 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c
@@ -1649,7 +1649,7 @@ static void force_client_rst_stream(grpc_exec_ctx *exec_ctx, void *sp,
grpc_slice_buffer_add(
&t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_HTTP2_NO_ERROR,
&s->stats.outgoing));
- grpc_chttp2_initiate_write(exec_ctx, t, false, "force_rst_stream");
+ grpc_chttp2_initiate_write(exec_ctx, t, "force_rst_stream");
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, true, GRPC_ERROR_NONE);
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "final_rst");
@@ -1697,9 +1697,9 @@ grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx,
and can avoid the extra write */
GRPC_CHTTP2_STREAM_REF(s, "final_rst");
grpc_closure_sched(
- exec_ctx, grpc_closure_create(force_client_rst_stream, s,
- grpc_combiner_finally_scheduler(
- t->combiner, false)),
+ exec_ctx,
+ grpc_closure_create(force_client_rst_stream, s,
+ grpc_combiner_finally_scheduler(t->combiner)),
GRPC_ERROR_NONE);
}
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false,
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 933f254bde..4041b29fec 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -58,7 +58,6 @@ typedef enum {
GRPC_CHTTP2_WRITE_STATE_IDLE,
GRPC_CHTTP2_WRITE_STATE_WRITING,
GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
- GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER,
} grpc_chttp2_write_state;
typedef enum {
@@ -443,7 +442,6 @@ struct grpc_chttp2_stream {
grpc_slice fetching_slice;
int64_t next_message_end_offset;
int64_t flow_controlled_bytes_written;
- bool complete_fetch_covered_by_poller;
grpc_closure complete_fetch_locked;
grpc_closure *fetching_send_message_finished;
@@ -534,8 +532,7 @@ struct grpc_chttp2_stream {
The actual call chain is documented in the implementation of this function.
*/
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- bool covered_by_poller, const char *reason);
+ grpc_chttp2_transport *t, const char *reason);
typedef enum {
GRPC_CHTTP2_NOTHING_TO_WRITE,
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 8bbd9c24b8..941260be9a 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -418,7 +418,7 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window,
incoming_frame_size);
if (t->incoming_window <= target_incoming_window / 2) {
- grpc_chttp2_initiate_write(exec_ctx, t, false, "flow_control");
+ grpc_chttp2_initiate_write(exec_ctx, t, "flow_control");
}
return GRPC_ERROR_NONE;
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 5c3b9f5ada..02cf42283a 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -191,8 +191,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
while (grpc_chttp2_list_pop_stalled_by_transport(t, &s)) {
if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s) &&
stream_ref_if_not_destroyed(&s->refcount->refs)) {
- grpc_chttp2_initiate_write(exec_ctx, t, false,
- "transport.read_flow_control");
+ grpc_chttp2_initiate_write(exec_ctx, t, "transport.read_flow_control");
}
}
}
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index 4b8102ea54..f6976c5650 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -24,7 +24,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include "src/core/lib/iomgr/workqueue.h"
+#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/profiling/timers.h"
grpc_tracer_flag grpc_combiner_trace = GRPC_TRACER_INITIALIZER(false);
@@ -41,93 +41,47 @@ grpc_tracer_flag grpc_combiner_trace = GRPC_TRACER_INITIALIZER(false);
struct grpc_combiner {
grpc_combiner *next_combiner_on_this_exec_ctx;
- grpc_workqueue *optional_workqueue;
- grpc_closure_scheduler uncovered_scheduler;
- grpc_closure_scheduler covered_scheduler;
- grpc_closure_scheduler uncovered_finally_scheduler;
- grpc_closure_scheduler covered_finally_scheduler;
+ grpc_closure_scheduler scheduler;
+ grpc_closure_scheduler finally_scheduler;
gpr_mpscq queue;
+ // either:
+ // a pointer to the initiating exec ctx if that is the only exec_ctx that has
+ // ever queued to this combiner, or NULL. If this is non-null, it's not
+ // dereferencable (since the initiating exec_ctx may have gone out of scope)
+ gpr_atm initiating_exec_ctx_or_null;
// state is:
// lower bit - zero if orphaned (STATE_UNORPHANED)
// other bits - number of items queued on the lock (STATE_ELEM_COUNT_LOW_BIT)
gpr_atm state;
- // number of elements in the list that are covered by a poller: if >0, we can
- // offload safely
- gpr_atm elements_covered_by_poller;
bool time_to_execute_final_list;
- bool final_list_covered_by_poller;
grpc_closure_list final_list;
grpc_closure offload;
gpr_refcount refs;
};
-static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx,
- grpc_closure *closure, grpc_error *error);
-static void combiner_exec_covered(grpc_exec_ctx *exec_ctx,
+static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error);
+static void combiner_finally_exec(grpc_exec_ctx *exec_ctx,
grpc_closure *closure, grpc_error *error);
-static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx,
- grpc_closure *closure,
- grpc_error *error);
-static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx,
- grpc_closure *closure,
- grpc_error *error);
-
-static const grpc_closure_scheduler_vtable scheduler_uncovered = {
- combiner_exec_uncovered, combiner_exec_uncovered,
- "combiner:immediately:uncovered"};
-static const grpc_closure_scheduler_vtable scheduler_covered = {
- combiner_exec_covered, combiner_exec_covered,
- "combiner:immediately:covered"};
-static const grpc_closure_scheduler_vtable finally_scheduler_uncovered = {
- combiner_finally_exec_uncovered, combiner_finally_exec_uncovered,
- "combiner:finally:uncovered"};
-static const grpc_closure_scheduler_vtable finally_scheduler_covered = {
- combiner_finally_exec_covered, combiner_finally_exec_covered,
- "combiner:finally:covered"};
-static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
-
-typedef struct {
- grpc_error *error;
- bool covered_by_poller;
-} error_data;
-
-static uintptr_t pack_error_data(error_data d) {
- return ((uintptr_t)d.error) | (d.covered_by_poller ? 1 : 0);
-}
-
-static error_data unpack_error_data(uintptr_t p) {
- return (error_data){(grpc_error *)(p & ~(uintptr_t)1), p & 1};
-}
+static const grpc_closure_scheduler_vtable scheduler = {
+ combiner_exec, combiner_exec, "combiner:immediately"};
+static const grpc_closure_scheduler_vtable finally_scheduler = {
+ combiner_finally_exec, combiner_finally_exec, "combiner:finally"};
-static bool is_covered_by_poller(grpc_combiner *lock) {
- return lock->final_list_covered_by_poller ||
- gpr_atm_acq_load(&lock->elements_covered_by_poller) > 0;
-}
-
-#define IS_COVERED_BY_POLLER_FMT "(final=%d elems=%" PRIdPTR ")->%d"
-#define IS_COVERED_BY_POLLER_ARGS(lock) \
- (lock)->final_list_covered_by_poller, \
- gpr_atm_acq_load(&(lock)->elements_covered_by_poller), \
- is_covered_by_poller((lock))
+static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
-grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
+grpc_combiner *grpc_combiner_create(void) {
grpc_combiner *lock = gpr_malloc(sizeof(*lock));
gpr_ref_init(&lock->refs, 1);
lock->next_combiner_on_this_exec_ctx = NULL;
lock->time_to_execute_final_list = false;
- lock->optional_workqueue = optional_workqueue;
- lock->final_list_covered_by_poller = false;
- lock->uncovered_scheduler.vtable = &scheduler_uncovered;
- lock->covered_scheduler.vtable = &scheduler_covered;
- lock->uncovered_finally_scheduler.vtable = &finally_scheduler_uncovered;
- lock->covered_finally_scheduler.vtable = &finally_scheduler_covered;
+ lock->scheduler.vtable = &scheduler;
+ lock->finally_scheduler.vtable = &finally_scheduler;
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
- gpr_atm_no_barrier_store(&lock->elements_covered_by_poller, 0);
gpr_mpscq_init(&lock->queue);
grpc_closure_list_init(&lock->final_list);
- grpc_closure_init(&lock->offload, offload, lock,
- grpc_workqueue_scheduler(lock->optional_workqueue));
+ grpc_closure_init(&lock->offload, offload, lock, grpc_executor_scheduler);
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock));
return lock;
}
@@ -136,7 +90,6 @@ static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p really_destroy", lock));
GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0);
gpr_mpscq_destroy(&lock->queue);
- GRPC_WORKQUEUE_UNREF(exec_ctx, lock->optional_workqueue, "combiner");
gpr_free(lock);
}
@@ -193,48 +146,40 @@ static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx,
}
}
-static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *cl, grpc_error *error,
- bool covered_by_poller) {
+#define COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler_name) \
+ ((grpc_combiner *)(((char *)((closure)->scheduler)) - \
+ offsetof(grpc_combiner, scheduler_name)))
+
+static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *cl,
+ grpc_error *error) {
GPR_TIMER_BEGIN("combiner.execute", 0);
+ grpc_combiner *lock = COMBINER_FROM_CLOSURE_SCHEDULER(cl, scheduler);
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
- GRPC_COMBINER_TRACE(gpr_log(
- GPR_DEBUG, "C:%p grpc_combiner_execute c=%p cov=%d last=%" PRIdPTR, lock,
- cl, covered_by_poller, last));
- GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed
- assert(cl->cb);
- cl->error_data.scratch =
- pack_error_data((error_data){error, covered_by_poller});
- if (covered_by_poller) {
- gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, 1);
- }
- gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
+ GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG,
+ "C:%p grpc_combiner_execute c=%p last=%" PRIdPTR,
+ lock, cl, last));
if (last == 1) {
+ gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null,
+ (gpr_atm)exec_ctx);
// first element on this list: add it to the list of combiner locks
// executing within this exec_ctx
push_last_on_exec_ctx(exec_ctx, lock);
+ } else {
+ // there may be a race with setting here: if that happens, we may delay
+ // offload for one or two actions, and that's fine
+ gpr_atm initiator =
+ gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null);
+ if (initiator != 0 && initiator != (gpr_atm)exec_ctx) {
+ gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, 0);
+ }
}
+ GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed
+ assert(cl->cb);
+ cl->error_data.error = error;
+ gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next);
GPR_TIMER_END("combiner.execute", 0);
}
-#define COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler_name) \
- ((grpc_combiner *)(((char *)((closure)->scheduler)) - \
- offsetof(grpc_combiner, scheduler_name)))
-
-static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx, grpc_closure *cl,
- grpc_error *error) {
- combiner_exec(exec_ctx,
- COMBINER_FROM_CLOSURE_SCHEDULER(cl, uncovered_scheduler), cl,
- error, false);
-}
-
-static void combiner_exec_covered(grpc_exec_ctx *exec_ctx, grpc_closure *cl,
- grpc_error *error) {
- combiner_exec(exec_ctx,
- COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_scheduler), cl,
- error, true);
-}
-
static void move_next(grpc_exec_ctx *exec_ctx) {
exec_ctx->active_combiner =
exec_ctx->active_combiner->next_combiner_on_this_exec_ctx;
@@ -250,8 +195,7 @@ static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
move_next(exec_ctx);
- GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload --> %p", lock,
- lock->optional_workqueue));
+ GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload", lock));
grpc_closure_sched(exec_ctx, &lock->offload, GRPC_ERROR_NONE);
}
@@ -263,22 +207,23 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
return false;
}
- GRPC_COMBINER_TRACE(
- gpr_log(GPR_DEBUG,
- "C:%p grpc_combiner_continue_exec_ctx workqueue=%p "
- "is_covered_by_poller=" IS_COVERED_BY_POLLER_FMT
- " exec_ctx_ready_to_finish=%d "
- "time_to_execute_final_list=%d",
- lock, lock->optional_workqueue, IS_COVERED_BY_POLLER_ARGS(lock),
- grpc_exec_ctx_ready_to_finish(exec_ctx),
- lock->time_to_execute_final_list));
-
- if (lock->optional_workqueue != NULL && is_covered_by_poller(lock) &&
- grpc_exec_ctx_ready_to_finish(exec_ctx)) {
+ bool contended =
+ gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null) == 0;
+
+ GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG,
+ "C:%p grpc_combiner_continue_exec_ctx "
+ "contended=%d "
+ "exec_ctx_ready_to_finish=%d "
+ "time_to_execute_final_list=%d",
+ lock, contended,
+ grpc_exec_ctx_ready_to_finish(exec_ctx),
+ lock->time_to_execute_final_list));
+
+ if (contended && grpc_exec_ctx_ready_to_finish(exec_ctx) &&
+ grpc_executor_is_threaded()) {
GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
- // this execution context wants to move on, and we have a workqueue (and
- // so can help the execution context out): schedule remaining work to be
- // picked up on the workqueue
+ // this execution context wants to move on: schedule remaining work to be
+ // picked up on the executor
queue_offload(exec_ctx, lock);
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return true;
@@ -295,29 +240,23 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
// queue is in an inconsistent state: use this as a cue that we should
// go off and do something else for a while (and come back later)
GPR_TIMER_MARK("delay_busy", 0);
- if (lock->optional_workqueue != NULL && is_covered_by_poller(lock)) {
- queue_offload(exec_ctx, lock);
- }
+ queue_offload(exec_ctx, lock);
GPR_TIMER_END("combiner.continue_exec_ctx", 0);
return true;
}
GPR_TIMER_BEGIN("combiner.exec1", 0);
grpc_closure *cl = (grpc_closure *)n;
- error_data err = unpack_error_data(cl->error_data.scratch);
+ grpc_error *cl_err = cl->error_data.error;
#ifndef NDEBUG
cl->scheduled = false;
#endif
- cl->cb(exec_ctx, cl->cb_arg, err.error);
- if (err.covered_by_poller) {
- gpr_atm_no_barrier_fetch_add(&lock->elements_covered_by_poller, -1);
- }
- GRPC_ERROR_UNREF(err.error);
+ cl->cb(exec_ctx, cl->cb_arg, cl_err);
+ GRPC_ERROR_UNREF(cl_err);
GPR_TIMER_END("combiner.exec1", 0);
} else {
grpc_closure *c = lock->final_list.head;
GPR_ASSERT(c != NULL);
grpc_closure_list_init(&lock->final_list);
- lock->final_list_covered_by_poller = false;
int loops = 0;
while (c != NULL) {
GPR_TIMER_BEGIN("combiner.exec_1final", 0);
@@ -383,20 +322,20 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
grpc_error *error);
-static void combiner_execute_finally(grpc_exec_ctx *exec_ctx,
- grpc_combiner *lock, grpc_closure *closure,
- grpc_error *error,
- bool covered_by_poller) {
- GRPC_COMBINER_TRACE(gpr_log(
- GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p; cov=%d", lock,
- closure, exec_ctx->active_combiner, covered_by_poller));
+static void combiner_finally_exec(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure, grpc_error *error) {
+ grpc_combiner *lock =
+ COMBINER_FROM_CLOSURE_SCHEDULER(closure, finally_scheduler);
+ GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG,
+ "C:%p grpc_combiner_execute_finally c=%p; ac=%p",
+ lock, closure, exec_ctx->active_combiner));
GPR_TIMER_BEGIN("combiner.execute_finally", 0);
if (exec_ctx->active_combiner != lock) {
GPR_TIMER_MARK("slowpath", 0);
- grpc_closure_sched(
- exec_ctx, grpc_closure_create(enqueue_finally, closure,
- grpc_combiner_scheduler(lock, false)),
- error);
+ grpc_closure_sched(exec_ctx,
+ grpc_closure_create(enqueue_finally, closure,
+ grpc_combiner_scheduler(lock)),
+ error);
GPR_TIMER_END("combiner.execute_finally", 0);
return;
}
@@ -404,42 +343,20 @@ static void combiner_execute_finally(grpc_exec_ctx *exec_ctx,
if (grpc_closure_list_empty(lock->final_list)) {
gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
}
- if (covered_by_poller) {
- lock->final_list_covered_by_poller = true;
- }
grpc_closure_list_append(&lock->final_list, closure, error);
GPR_TIMER_END("combiner.execute_finally", 0);
}
static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
grpc_error *error) {
- combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure,
- GRPC_ERROR_REF(error), false);
-}
-
-static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx,
- grpc_closure *cl,
- grpc_error *error) {
- combiner_execute_finally(exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER(
- cl, uncovered_finally_scheduler),
- cl, error, false);
-}
-
-static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx,
- grpc_closure *cl, grpc_error *error) {
- combiner_execute_finally(
- exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_finally_scheduler),
- cl, error, true);
+ combiner_finally_exec(exec_ctx, closure, GRPC_ERROR_REF(error));
}
-grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *combiner,
- bool covered_by_poller) {
- return covered_by_poller ? &combiner->covered_scheduler
- : &combiner->uncovered_scheduler;
+grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *combiner) {
+ return &combiner->scheduler;
}
grpc_closure_scheduler *grpc_combiner_finally_scheduler(
- grpc_combiner *combiner, bool covered_by_poller) {
- return covered_by_poller ? &combiner->covered_finally_scheduler
- : &combiner->uncovered_finally_scheduler;
+ grpc_combiner *combiner) {
+ return &combiner->finally_scheduler;
}
diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h
index d6571ad4c6..a616113ca0 100644
--- a/src/core/lib/iomgr/combiner.h
+++ b/src/core/lib/iomgr/combiner.h
@@ -33,7 +33,7 @@
// Initialize the lock, with an optional workqueue to shift load to when
// necessary
-grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue);
+grpc_combiner *grpc_combiner_create(void);
//#define GRPC_COMBINER_REFCOUNT_DEBUG
#ifdef GRPC_COMBINER_REFCOUNT_DEBUG
@@ -56,11 +56,9 @@ grpc_combiner *grpc_combiner_ref(grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS);
void grpc_combiner_unref(grpc_exec_ctx *exec_ctx,
grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS);
// Fetch a scheduler to schedule closures against
-grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock,
- bool covered_by_poller);
+grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock);
// Scheduler to execute \a action within the lock just prior to unlocking.
-grpc_closure_scheduler *grpc_combiner_finally_scheduler(grpc_combiner *lock,
- bool covered_by_poller);
+grpc_closure_scheduler *grpc_combiner_finally_scheduler(grpc_combiner *lock);
bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx);
diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c
index 116f18424c..37cce335ca 100644
--- a/src/core/lib/iomgr/endpoint.c
+++ b/src/core/lib/iomgr/endpoint.c
@@ -54,10 +54,6 @@ char* grpc_endpoint_get_peer(grpc_endpoint* ep) {
int grpc_endpoint_get_fd(grpc_endpoint* ep) { return ep->vtable->get_fd(ep); }
-grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) {
- return ep->vtable->get_workqueue(ep);
-}
-
grpc_resource_user* grpc_endpoint_get_resource_user(grpc_endpoint* ep) {
return ep->vtable->get_resource_user(ep);
}
diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h
index ec355d413a..8f0523a981 100644
--- a/src/core/lib/iomgr/endpoint.h
+++ b/src/core/lib/iomgr/endpoint.h
@@ -37,7 +37,6 @@ struct grpc_endpoint_vtable {
grpc_slice_buffer *slices, grpc_closure *cb);
void (*write)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_slice_buffer *slices, grpc_closure *cb);
- grpc_workqueue *(*get_workqueue)(grpc_endpoint *ep);
void (*add_to_pollset)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset *pollset);
void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
@@ -63,9 +62,6 @@ char *grpc_endpoint_get_peer(grpc_endpoint *ep);
*/
int grpc_endpoint_get_fd(grpc_endpoint *ep);
-/* Retrieve a reference to the workqueue associated with this endpoint */
-grpc_workqueue *grpc_endpoint_get_workqueue(grpc_endpoint *ep);
-
/* Write slices out to the socket.
If the connection is ready for more data after the end of the call, it
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index 7d7aa44912..63f6962ede 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -43,7 +43,6 @@
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/lockfree_event.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
@@ -115,7 +114,9 @@ struct grpc_pollset {
* Pollset-set Declarations
*/
-struct grpc_pollset_set {};
+struct grpc_pollset_set {
+ char unused;
+};
/*******************************************************************************
* Common helpers
@@ -268,10 +269,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
}
-static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
- return (grpc_workqueue *)0xb0b51ed;
-}
-
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
@@ -298,8 +295,6 @@ GPR_TLS_DECL(g_current_thread_worker);
static gpr_atm g_active_poller;
static pollset_neighbourhood *g_neighbourhoods;
static size_t g_num_neighbourhoods;
-static gpr_mu g_wq_mu;
-static grpc_closure_list g_wq_items;
/* Return true if first in list */
static bool worker_insert(grpc_pollset *pollset, grpc_pollset_worker *worker) {
@@ -348,8 +343,6 @@ static grpc_error *pollset_global_init(void) {
gpr_atm_no_barrier_store(&g_active_poller, 0);
global_wakeup_fd.read_fd = -1;
grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd);
- gpr_mu_init(&g_wq_mu);
- g_wq_items = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
if (err != GRPC_ERROR_NONE) return err;
struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
.data.ptr = &global_wakeup_fd};
@@ -368,7 +361,6 @@ static grpc_error *pollset_global_init(void) {
static void pollset_global_shutdown(void) {
gpr_tls_destroy(&g_current_thread_pollset);
gpr_tls_destroy(&g_current_thread_worker);
- gpr_mu_destroy(&g_wq_mu);
if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
for (size_t i = 0; i < g_num_neighbourhoods; i++) {
gpr_mu_destroy(&g_neighbourhoods[i].mu);
@@ -492,9 +484,6 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
for (int i = 0; i < r; i++) {
void *data_ptr = events[i].data.ptr;
if (data_ptr == &global_wakeup_fd) {
- gpr_mu_lock(&g_wq_mu);
- grpc_closure_list_move(&g_wq_items, &exec_ctx->closure_list);
- gpr_mu_unlock(&g_wq_mu);
append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc);
} else {
@@ -777,84 +766,6 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_fd *fd) {}
/*******************************************************************************
- * Workqueue Definitions
- */
-
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
- const char *file, int line,
- const char *reason) {
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {}
-#else
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue) {}
-#endif
-
-static void wq_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error) {
- // find a neighbourhood to wakeup
- bool scheduled = false;
- size_t initial_neighbourhood = choose_neighbourhood();
- for (size_t i = 0; !scheduled && i < g_num_neighbourhoods; i++) {
- pollset_neighbourhood *neighbourhood =
- &g_neighbourhoods[(initial_neighbourhood + i) % g_num_neighbourhoods];
- if (gpr_mu_trylock(&neighbourhood->mu)) {
- if (neighbourhood->active_root != NULL) {
- grpc_pollset *inspect = neighbourhood->active_root;
- do {
- if (gpr_mu_trylock(&inspect->mu)) {
- if (inspect->root_worker != NULL) {
- grpc_pollset_worker *inspect_worker = inspect->root_worker;
- do {
- if (inspect_worker->kick_state == UNKICKED) {
- inspect_worker->kick_state = KICKED;
- grpc_closure_list_append(
- &inspect_worker->schedule_on_end_work, closure, error);
- if (inspect_worker->initialized_cv) {
- gpr_cv_signal(&inspect_worker->cv);
- }
- scheduled = true;
- }
- inspect_worker = inspect_worker->next;
- } while (!scheduled && inspect_worker != inspect->root_worker);
- }
- gpr_mu_unlock(&inspect->mu);
- }
- inspect = inspect->next;
- } while (!scheduled && inspect != neighbourhood->active_root);
- }
- gpr_mu_unlock(&neighbourhood->mu);
- }
- }
- if (!scheduled) {
- gpr_mu_lock(&g_wq_mu);
- grpc_closure_list_append(&g_wq_items, closure, error);
- gpr_mu_unlock(&g_wq_mu);
- GRPC_LOG_IF_ERROR("workqueue_scheduler",
- grpc_wakeup_fd_wakeup(&global_wakeup_fd));
- }
-}
-
-static const grpc_closure_scheduler_vtable
- singleton_workqueue_scheduler_vtable = {wq_sched, wq_sched,
- "epoll1_workqueue"};
-
-static grpc_closure_scheduler singleton_workqueue_scheduler = {
- &singleton_workqueue_scheduler_vtable};
-
-static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
- return &singleton_workqueue_scheduler;
-}
-
-/*******************************************************************************
* Pollset-set Definitions
*/
@@ -905,7 +816,6 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
- .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
@@ -923,10 +833,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_set_add_fd = pollset_set_add_fd,
.pollset_set_del_fd = pollset_set_del_fd,
- .workqueue_ref = workqueue_ref,
- .workqueue_unref = workqueue_unref,
- .workqueue_scheduler = workqueue_scheduler,
-
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
index 0304d97fd8..d7ae0d371e 100644
--- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
@@ -46,7 +46,6 @@
#include "src/core/lib/iomgr/lockfree_event.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
#include "src/core/lib/support/env.h"
@@ -169,13 +168,15 @@ static void fd_global_shutdown(void);
* Polling island Declarations
*/
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+//#define PI_REFCOUNT_DEBUG
+
+#ifdef PI_REFCOUNT_DEBUG
#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
#define PI_UNREF(exec_ctx, p, r) \
pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
-#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */
+#else /* defined(PI_REFCOUNT_DEBUG) */
#define PI_ADD_REF(p, r) pi_add_ref((p))
#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
@@ -189,8 +190,6 @@ typedef struct worker_node {
/* This is also used as grpc_workqueue (by directly casing it) */
typedef struct polling_island {
- grpc_closure_scheduler workqueue_scheduler;
-
gpr_mu mu;
/* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
the refcount.
@@ -211,15 +210,6 @@ typedef struct polling_island {
/* Number of threads currently polling on this island */
gpr_atm poller_count;
- /* Mutex guarding the read end of the workqueue (must be held to pop from
- * workqueue_items) */
- gpr_mu workqueue_read_mu;
- /* Queue of closures to be executed */
- gpr_mpscq workqueue_items;
- /* Count of items in workqueue_items */
- gpr_atm workqueue_item_count;
- /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
- grpc_wakeup_fd workqueue_wakeup_fd;
/* The list of workers waiting to do polling on this polling island */
gpr_mu worker_list_mu;
@@ -308,8 +298,6 @@ static __thread polling_island *g_current_thread_polling_island;
/* Forward declaration */
static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error);
#ifdef GRPC_TSAN
/* Currently TSAN may incorrectly flag data races between epoll_ctl and
@@ -322,13 +310,10 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
gpr_atm g_epoll_sync;
#endif /* defined(GRPC_TSAN) */
-static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
- workqueue_enqueue, workqueue_enqueue, "workqueue"};
-
static void pi_add_ref(polling_island *pi);
static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+#ifdef PI_REFCOUNT_DEBUG
static void pi_add_ref_dbg(polling_island *pi, const char *reason,
const char *file, int line) {
long old_cnt = gpr_atm_acq_load(&pi->ref_count);
@@ -344,36 +329,6 @@ static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
(void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
}
-
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
- const char *file, int line,
- const char *reason) {
- if (workqueue != NULL) {
- pi_add_ref_dbg((polling_island *)workqueue, reason, file, line);
- }
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {
- if (workqueue != NULL) {
- pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line);
- }
-}
-#else
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
- if (workqueue != NULL) {
- pi_add_ref((polling_island *)workqueue);
- }
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue) {
- if (workqueue != NULL) {
- pi_unref(exec_ctx, (polling_island *)workqueue);
- }
-}
#endif
static void pi_add_ref(polling_island *pi) {
@@ -577,17 +532,12 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
*error = GRPC_ERROR_NONE;
pi = gpr_malloc(sizeof(*pi));
- pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
gpr_mu_init(&pi->mu);
pi->fd_cnt = 0;
pi->fd_capacity = 0;
pi->fds = NULL;
pi->epoll_fd = -1;
- gpr_mu_init(&pi->workqueue_read_mu);
- gpr_mpscq_init(&pi->workqueue_items);
- gpr_atm_rel_store(&pi->workqueue_item_count, 0);
-
gpr_atm_rel_store(&pi->ref_count, 0);
gpr_atm_rel_store(&pi->poller_count, 0);
gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
@@ -595,11 +545,6 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
gpr_mu_init(&pi->worker_list_mu);
worker_node_init(&pi->worker_list_head);
- if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
- err_desc)) {
- goto done;
- }
-
pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (pi->epoll_fd < 0) {
@@ -607,8 +552,6 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
goto done;
}
- polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
-
if (initial_fd != NULL) {
polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
}
@@ -627,11 +570,7 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
if (pi->epoll_fd >= 0) {
close(pi->epoll_fd);
}
- GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
- gpr_mu_destroy(&pi->workqueue_read_mu);
- gpr_mpscq_destroy(&pi->workqueue_items);
gpr_mu_destroy(&pi->mu);
- grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
gpr_mu_destroy(&pi->worker_list_mu);
GPR_ASSERT(is_worker_node_detached(&pi->worker_list_head));
@@ -779,45 +718,6 @@ static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
}
}
-static void workqueue_maybe_wakeup(polling_island *pi) {
- /* If this thread is the current poller, then it may be that it's about to
- decrement the current poller count, so we need to look past this thread */
- bool is_current_poller = (g_current_thread_polling_island == pi);
- gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
- gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
- /* Only issue a wakeup if it's likely that some poller could come in and take
- it right now. Note that since we do an anticipatory mpscq_pop every poll
- loop, it's ok if we miss the wakeup here, as we'll get the work item when
- the next poller enters anyway. */
- if (current_pollers > min_current_pollers_for_wakeup) {
- GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
- grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
- }
-}
-
-static void workqueue_move_items_to_parent(polling_island *q) {
- polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
- if (p == NULL) {
- return;
- }
- gpr_mu_lock(&q->workqueue_read_mu);
- int num_added = 0;
- while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
- gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
- if (n != NULL) {
- gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
- gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
- gpr_mpscq_push(&p->workqueue_items, n);
- num_added++;
- }
- }
- gpr_mu_unlock(&q->workqueue_read_mu);
- if (num_added > 0) {
- workqueue_maybe_wakeup(p);
- }
- workqueue_move_items_to_parent(p);
-}
-
static polling_island *polling_island_merge(polling_island *p,
polling_island *q,
grpc_error **error) {
@@ -842,8 +742,6 @@ static polling_island *polling_island_merge(polling_island *p,
/* Add the 'merged_to' link from p --> q */
gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
-
- workqueue_move_items_to_parent(p);
}
/* else if p == q, nothing needs to be done */
@@ -854,32 +752,6 @@ static polling_island *polling_island_merge(polling_island *p,
return q;
}
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error) {
- GPR_TIMER_BEGIN("workqueue.enqueue", 0);
- grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler;
- /* take a ref to the workqueue: otherwise it can happen that whatever events
- * this kicks off ends up destroying the workqueue before this function
- * completes */
- GRPC_WORKQUEUE_REF(workqueue, "enqueue");
- polling_island *pi = (polling_island *)workqueue;
- gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
- closure->error_data.error = error;
- gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
- if (last == 0) {
- workqueue_maybe_wakeup(pi);
- }
- workqueue_move_items_to_parent(pi);
- GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
- GPR_TIMER_END("workqueue.enqueue", 0);
-}
-
-static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
- polling_island *pi = (polling_island *)workqueue;
- return workqueue == NULL ? grpc_schedule_on_exec_ctx
- : &pi->workqueue_scheduler;
-}
-
static grpc_error *polling_island_global_init() {
grpc_error *error = GRPC_ERROR_NONE;
@@ -1138,14 +1010,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
}
-static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
- gpr_mu_lock(&fd->po.mu);
- grpc_workqueue *workqueue =
- GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
- gpr_mu_unlock(&fd->po.mu);
- return workqueue;
-}
-
/*******************************************************************************
* Pollset Definitions
*/
@@ -1417,33 +1281,6 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->po.mu);
}
-static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
- polling_island *pi) {
- if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
- gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
- gpr_mu_unlock(&pi->workqueue_read_mu);
- if (n != NULL) {
- if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) {
- workqueue_maybe_wakeup(pi);
- }
- grpc_closure *c = (grpc_closure *)n;
- grpc_error *error = c->error_data.error;
-#ifndef NDEBUG
- c->scheduled = false;
-#endif
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- return true;
- } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
- /* n == NULL might mean there's work but it's not available to be popped
- * yet - try to ensure another workqueue wakes up to check shortly if so
- */
- workqueue_maybe_wakeup(pi);
- }
- }
- return false;
-}
-
/* NOTE: This function may modify 'now' */
static bool acquire_polling_lease(grpc_pollset_worker *worker,
polling_island *pi, gpr_timespec deadline,
@@ -1579,12 +1416,7 @@ static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd,
for (int i = 0; i < ep_rv; ++i) {
void *data_ptr = ep_ev[i].data.ptr;
- if (data_ptr == &pi->workqueue_wakeup_fd) {
- append_error(error,
- grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd),
- err_desc);
- maybe_do_workqueue_work(exec_ctx, pi);
- } else if (data_ptr == &polling_island_wakeup_fd) {
+ if (data_ptr == &polling_island_wakeup_fd) {
GRPC_POLLING_TRACE(
"pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
"%d) got merged",
@@ -1660,15 +1492,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
PI_ADD_REF(pi, "ps_work");
gpr_mu_unlock(&pollset->po.mu);
- /* If we get some workqueue work to do, it might end up completing an item on
- the completion queue, so there's no need to poll... so we skip that and
- redo the complete loop to verify */
- if (!maybe_do_workqueue_work(exec_ctx, pi)) {
- g_current_thread_polling_island = pi;
- pollset_do_epoll_pwait(exec_ctx, epoll_fd, pollset, pi, worker, now,
- deadline, sig_mask, error);
- g_current_thread_polling_island = NULL;
- }
+ g_current_thread_polling_island = pi;
+ pollset_do_epoll_pwait(exec_ctx, epoll_fd, pollset, pi, worker, now, deadline,
+ sig_mask, error);
+ g_current_thread_polling_island = NULL;
GPR_ASSERT(pi != NULL);
@@ -2021,7 +1848,6 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
- .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
@@ -2039,10 +1865,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_set_add_fd = pollset_set_add_fd,
.pollset_set_del_fd = pollset_set_del_fd,
- .workqueue_ref = workqueue_ref,
- .workqueue_unref = workqueue_unref,
- .workqueue_scheduler = workqueue_scheduler,
-
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
index 23f0aa68d9..8692b5b3c3 100644
--- a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
@@ -46,7 +46,6 @@
#include "src/core/lib/iomgr/lockfree_event.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
@@ -94,23 +93,22 @@ static void fd_global_shutdown(void);
* epoll set Declarations
*/
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+//#define EPS_REFCOUNT_DEBUG
+
+#ifdef EPS_REFCOUNT_DEBUG
#define EPS_ADD_REF(p, r) eps_add_ref_dbg((p), (r), __FILE__, __LINE__)
#define EPS_UNREF(exec_ctx, p, r) \
eps_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
-#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */
+#else /* defined(EPS_REFCOUNT_DEBUG) */
#define EPS_ADD_REF(p, r) eps_add_ref((p))
#define EPS_UNREF(exec_ctx, p, r) eps_unref((exec_ctx), (p))
#endif /* !defined(GRPC_EPS_REF_COUNT_DEBUG) */
-/* This is also used as grpc_workqueue (by directly casting it) */
typedef struct epoll_set {
- grpc_closure_scheduler workqueue_scheduler;
-
/* Mutex poller should acquire to poll this. This enforces that only one
* poller can be polling on epoll_set at any time */
gpr_mu mu;
@@ -124,15 +122,6 @@ typedef struct epoll_set {
/* Number of threads currently polling on this epoll set*/
gpr_atm poller_count;
- /* Mutex guarding the read end of the workqueue (must be held to pop from
- * workqueue_items) */
- gpr_mu workqueue_read_mu;
- /* Queue of closures to be executed */
- gpr_mpscq workqueue_items;
- /* Count of items in workqueue_items */
- gpr_atm workqueue_item_count;
- /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
- grpc_wakeup_fd workqueue_wakeup_fd;
/* Is the epoll set shutdown */
gpr_atm is_shutdown;
@@ -166,7 +155,9 @@ struct grpc_pollset {
/*******************************************************************************
* Pollset-set Declarations
*/
-struct grpc_pollset_set {};
+struct grpc_pollset_set {
+ char unused;
+};
/*****************************************************************************
* Dedicated polling threads and pollsets - Declarations
@@ -220,8 +211,6 @@ static __thread epoll_set *g_current_thread_epoll_set;
/* Forward declaration */
static void epoll_set_delete(epoll_set *eps);
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error);
#ifdef GRPC_TSAN
/* Currently TSAN may incorrectly flag data races between epoll_ctl and
@@ -234,13 +223,10 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
gpr_atm g_epoll_sync;
#endif /* defined(GRPC_TSAN) */
-static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
- workqueue_enqueue, workqueue_enqueue, "workqueue"};
-
static void eps_add_ref(epoll_set *eps);
static void eps_unref(grpc_exec_ctx *exec_ctx, epoll_set *eps);
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+#ifdef EPS_REFCOUNT_DEBUG
static void eps_add_ref_dbg(epoll_set *eps, const char *reason,
const char *file, int line) {
long old_cnt = gpr_atm_acq_load(&eps->ref_count);
@@ -256,36 +242,6 @@ static void eps_unref_dbg(grpc_exec_ctx *exec_ctx, epoll_set *eps,
gpr_log(GPR_DEBUG, "Unref eps: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
(void *)eps, old_cnt, (old_cnt - 1), reason, file, line);
}
-
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
- const char *file, int line,
- const char *reason) {
- if (workqueue != NULL) {
- eps_add_ref_dbg((epoll_set *)workqueue, reason, file, line);
- }
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {
- if (workqueue != NULL) {
- eps_unref_dbg(exec_ctx, (epoll_set *)workqueue, reason, file, line);
- }
-}
-#else
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
- if (workqueue != NULL) {
- eps_add_ref((epoll_set *)workqueue);
- }
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue) {
- if (workqueue != NULL) {
- eps_unref(exec_ctx, (epoll_set *)workqueue);
- }
-}
#endif
static void eps_add_ref(epoll_set *eps) {
@@ -379,24 +335,15 @@ static epoll_set *epoll_set_create(grpc_error **error) {
*error = GRPC_ERROR_NONE;
eps = gpr_malloc(sizeof(*eps));
- eps->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
eps->epoll_fd = -1;
gpr_mu_init(&eps->mu);
- gpr_mu_init(&eps->workqueue_read_mu);
- gpr_mpscq_init(&eps->workqueue_items);
- gpr_atm_rel_store(&eps->workqueue_item_count, 0);
gpr_atm_rel_store(&eps->ref_count, 0);
gpr_atm_rel_store(&eps->poller_count, 0);
gpr_atm_rel_store(&eps->is_shutdown, false);
- if (!append_error(error, grpc_wakeup_fd_init(&eps->workqueue_wakeup_fd),
- err_desc)) {
- goto done;
- }
-
eps->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (eps->epoll_fd < 0) {
@@ -404,8 +351,6 @@ static epoll_set *epoll_set_create(grpc_error **error) {
goto done;
}
- epoll_set_add_wakeup_fd_locked(eps, &eps->workqueue_wakeup_fd, error);
-
done:
if (*error != GRPC_ERROR_NONE) {
epoll_set_delete(eps);
@@ -419,57 +364,11 @@ static void epoll_set_delete(epoll_set *eps) {
close(eps->epoll_fd);
}
- GPR_ASSERT(gpr_atm_no_barrier_load(&eps->workqueue_item_count) == 0);
gpr_mu_destroy(&eps->mu);
- gpr_mu_destroy(&eps->workqueue_read_mu);
- gpr_mpscq_destroy(&eps->workqueue_items);
- grpc_wakeup_fd_destroy(&eps->workqueue_wakeup_fd);
gpr_free(eps);
}
-static void workqueue_maybe_wakeup(epoll_set *eps) {
- /* If this thread is the current poller, then it may be that it's about to
- decrement the current poller count, so we need to look past this thread */
- bool is_current_poller = (g_current_thread_epoll_set == eps);
- gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
- gpr_atm current_pollers = gpr_atm_no_barrier_load(&eps->poller_count);
- /* Only issue a wakeup if it's likely that some poller could come in and take
- it right now. Note that since we do an anticipatory mpscq_pop every poll
- loop, it's ok if we miss the wakeup here, as we'll get the work item when
- the next poller enters anyway. */
- if (current_pollers > min_current_pollers_for_wakeup) {
- GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
- grpc_wakeup_fd_wakeup(&eps->workqueue_wakeup_fd));
- }
-}
-
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error) {
- GPR_TIMER_BEGIN("workqueue.enqueue", 0);
- grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler;
- /* take a ref to the workqueue: otherwise it can happen that whatever events
- * this kicks off ends up destroying the workqueue before this function
- * completes */
- GRPC_WORKQUEUE_REF(workqueue, "enqueue");
- epoll_set *eps = (epoll_set *)workqueue;
- gpr_atm last = gpr_atm_no_barrier_fetch_add(&eps->workqueue_item_count, 1);
- closure->error_data.error = error;
- gpr_mpscq_push(&eps->workqueue_items, &closure->next_data.atm_next);
- if (last == 0) {
- workqueue_maybe_wakeup(eps);
- }
-
- GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
- GPR_TIMER_END("workqueue.enqueue", 0);
-}
-
-static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
- epoll_set *eps = (epoll_set *)workqueue;
- return workqueue == NULL ? grpc_schedule_on_exec_ctx
- : &eps->workqueue_scheduler;
-}
-
static grpc_error *epoll_set_global_init() {
grpc_error *error = GRPC_ERROR_NONE;
@@ -665,8 +564,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
}
-static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; }
-
/*******************************************************************************
* Pollset Definitions
*/
@@ -850,32 +747,6 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
}
-static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, epoll_set *eps) {
- if (gpr_mu_trylock(&eps->workqueue_read_mu)) {
- gpr_mpscq_node *n = gpr_mpscq_pop(&eps->workqueue_items);
- gpr_mu_unlock(&eps->workqueue_read_mu);
- if (n != NULL) {
- if (gpr_atm_full_fetch_add(&eps->workqueue_item_count, -1) > 1) {
- workqueue_maybe_wakeup(eps);
- }
- grpc_closure *c = (grpc_closure *)n;
- grpc_error *error = c->error_data.error;
-#ifndef NDEBUG
- c->scheduled = false;
-#endif
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- return true;
- } else if (gpr_atm_no_barrier_load(&eps->workqueue_item_count) > 0) {
- /* n == NULL might mean there's work but it's not available to be popped
- * yet - try to ensure another workqueue wakes up to check shortly if so
- */
- workqueue_maybe_wakeup(eps);
- }
- }
- return false;
-}
-
/* Blocking call */
static void acquire_epoll_lease(epoll_set *eps) {
if (g_num_threads_per_eps > 1) {
@@ -919,12 +790,7 @@ static void do_epoll_wait(grpc_exec_ctx *exec_ctx, int epoll_fd, epoll_set *eps,
for (int i = 0; i < ep_rv; ++i) {
void *data_ptr = ep_ev[i].data.ptr;
- if (data_ptr == &eps->workqueue_wakeup_fd) {
- append_error(error,
- grpc_wakeup_fd_consume_wakeup(&eps->workqueue_wakeup_fd),
- err_desc);
- maybe_do_workqueue_work(exec_ctx, eps);
- } else if (data_ptr == &epoll_set_wakeup_fd) {
+ if (data_ptr == &epoll_set_wakeup_fd) {
gpr_atm_rel_store(&eps->is_shutdown, 1);
gpr_log(GPR_INFO, "pollset poller: shutdown set");
} else {
@@ -951,18 +817,13 @@ static void epoll_set_work(grpc_exec_ctx *exec_ctx, epoll_set *eps,
epoll set. */
epoll_fd = eps->epoll_fd;
- /* If we get some workqueue work to do, it might end up completing an item on
- the completion queue, so there's no need to poll... so we skip that and
- redo the complete loop to verify */
- if (!maybe_do_workqueue_work(exec_ctx, eps)) {
- gpr_atm_no_barrier_fetch_add(&eps->poller_count, 1);
- g_current_thread_epoll_set = eps;
+ gpr_atm_no_barrier_fetch_add(&eps->poller_count, 1);
+ g_current_thread_epoll_set = eps;
- do_epoll_wait(exec_ctx, epoll_fd, eps, error);
+ do_epoll_wait(exec_ctx, epoll_fd, eps, error);
- g_current_thread_epoll_set = NULL;
- gpr_atm_no_barrier_fetch_add(&eps->poller_count, -1);
- }
+ g_current_thread_epoll_set = NULL;
+ gpr_atm_no_barrier_fetch_add(&eps->poller_count, -1);
GPR_TIMER_END("epoll_set_work", 0);
}
@@ -1105,7 +966,6 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
- .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
@@ -1123,10 +983,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_set_add_fd = pollset_set_add_fd,
.pollset_set_del_fd = pollset_set_del_fd,
- .workqueue_ref = workqueue_ref,
- .workqueue_unref = workqueue_unref,
- .workqueue_scheduler = workqueue_scheduler,
-
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index 096edb7fb9..c3627dc914 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -44,7 +44,6 @@
#include "src/core/lib/iomgr/sys_epoll_wrapper.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
#include "src/core/lib/support/spinlock.h"
@@ -124,17 +123,6 @@ struct grpc_fd {
Ref/Unref by two to avoid altering the orphaned bit */
gpr_atm refst;
- /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
- grpc_wakeup_fd workqueue_wakeup_fd;
- grpc_closure_scheduler workqueue_scheduler;
- /* Spinlock guarding the read end of the workqueue (must be held to pop from
- * workqueue_items) */
- gpr_spinlock workqueue_read_mu;
- /* Queue of closures to be executed */
- gpr_mpscq workqueue_items;
- /* Count of items in workqueue_items */
- gpr_atm workqueue_item_count;
-
/* The fd is either closed or we relinquished control of it. In either
cases, this indicates that the 'fd' on this structure is no longer
valid */
@@ -157,12 +145,6 @@ struct grpc_fd {
static void fd_global_init(void);
static void fd_global_shutdown(void);
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error);
-
-static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
- workqueue_enqueue, workqueue_enqueue, "workqueue"};
-
/*******************************************************************************
* Pollset Declarations
*/
@@ -332,13 +314,6 @@ static grpc_fd *fd_create(int fd, const char *name) {
grpc_lfev_init(&new_fd->write_closure);
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
- GRPC_LOG_IF_ERROR("fd_create",
- grpc_wakeup_fd_init(&new_fd->workqueue_wakeup_fd));
- new_fd->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
- new_fd->workqueue_read_mu = GPR_SPINLOCK_INITIALIZER;
- gpr_mpscq_init(&new_fd->workqueue_items);
- gpr_atm_no_barrier_store(&new_fd->workqueue_item_count, 0);
-
new_fd->freelist_next = NULL;
new_fd->on_done_closure = NULL;
@@ -431,91 +406,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
}
-static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
- REF_BY(fd, 2, "return_workqueue");
- return (grpc_workqueue *)fd;
-}
-
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
- const char *file, int line,
- const char *reason) {
- if (workqueue != NULL) {
- ref_by((grpc_fd *)workqueue, 2, file, line, reason);
- }
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {
- if (workqueue != NULL) {
- unref_by(exec_ctx, (grpc_fd *)workqueue, 2, file, line, reason);
- }
-}
-#else
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
- if (workqueue != NULL) {
- ref_by((grpc_fd *)workqueue, 2);
- }
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue) {
- if (workqueue != NULL) {
- unref_by(exec_ctx, (grpc_fd *)workqueue, 2);
- }
-}
-#endif
-
-static void workqueue_wakeup(grpc_fd *fd) {
- GRPC_LOG_IF_ERROR("workqueue_enqueue",
- grpc_wakeup_fd_wakeup(&fd->workqueue_wakeup_fd));
-}
-
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error) {
- GPR_TIMER_BEGIN("workqueue.enqueue", 0);
- grpc_fd *fd = (grpc_fd *)(((char *)closure->scheduler) -
- offsetof(grpc_fd, workqueue_scheduler));
- REF_BY(fd, 2, "workqueue_enqueue");
- gpr_atm last = gpr_atm_no_barrier_fetch_add(&fd->workqueue_item_count, 1);
- closure->error_data.error = error;
- gpr_mpscq_push(&fd->workqueue_items, &closure->next_data.atm_next);
- if (last == 0) {
- workqueue_wakeup(fd);
- }
- UNREF_BY(exec_ctx, fd, 2, "workqueue_enqueue");
-}
-
-static void fd_invoke_workqueue(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- /* handle spurious wakeups */
- if (!gpr_spinlock_trylock(&fd->workqueue_read_mu)) return;
- gpr_mpscq_node *n = gpr_mpscq_pop(&fd->workqueue_items);
- gpr_spinlock_unlock(&fd->workqueue_read_mu);
- if (n != NULL) {
- if (gpr_atm_full_fetch_add(&fd->workqueue_item_count, -1) > 1) {
- workqueue_wakeup(fd);
- }
- grpc_closure *c = (grpc_closure *)n;
- grpc_error *error = c->error_data.error;
-#ifndef NDEBUG
- c->scheduled = false;
-#endif
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- } else if (gpr_atm_no_barrier_load(&fd->workqueue_item_count) > 0) {
- /* n == NULL might mean there's work but it's not available to be popped
- * yet - try to ensure another workqueue wakes up to check shortly if so
- */
- workqueue_wakeup(fd);
- }
-}
-
-static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
- return &((grpc_fd *)workqueue)->workqueue_scheduler;
-}
-
/*******************************************************************************
* Pollable Definitions
*/
@@ -581,22 +471,7 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) {
.data.ptr = fd};
if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) {
switch (errno) {
- case EEXIST: /* if this fd is already in the epoll set, the workqueue fd
- must also be - just return */
- gpr_mu_unlock(&fd->orphaned_mu);
- return GRPC_ERROR_NONE;
- default:
- append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc);
- }
- }
- struct epoll_event ev_wq = {
- .events = (uint32_t)(EPOLLET | EPOLLIN | EPOLLEXCLUSIVE),
- .data.ptr = (void *)(1 + (intptr_t)fd)};
- if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->workqueue_wakeup_fd.read_fd, &ev_wq) !=
- 0) {
- switch (errno) {
- case EEXIST: /* if the workqueue fd is already in the epoll set we're ok
- - no need to do anything special */
+ case EEXIST:
break;
default:
append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc);
@@ -859,29 +734,21 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
append_error(&error, grpc_wakeup_fd_consume_wakeup(&p->wakeup), err_desc);
} else {
- grpc_fd *fd = (grpc_fd *)(((intptr_t)data_ptr) & ~(intptr_t)1);
- bool is_workqueue = (((intptr_t)data_ptr) & 1) != 0;
+ grpc_fd *fd = (grpc_fd *)data_ptr;
bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
bool write_ev = (events[i].events & EPOLLOUT) != 0;
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG,
- "PS:%p poll %p got fd %p: is_wq=%d cancel=%d read=%d "
+ "PS:%p poll %p got fd %p: cancel=%d read=%d "
"write=%d",
- pollset, p, fd, is_workqueue, cancel, read_ev, write_ev);
+ pollset, p, fd, cancel, read_ev, write_ev);
}
- if (is_workqueue) {
- append_error(&error,
- grpc_wakeup_fd_consume_wakeup(&fd->workqueue_wakeup_fd),
- err_desc);
- fd_invoke_workqueue(exec_ctx, fd);
- } else {
- if (read_ev || cancel) {
- fd_become_readable(exec_ctx, fd, pollset);
- }
- if (write_ev || cancel) {
- fd_become_writable(exec_ctx, fd);
- }
+ if (read_ev || cancel) {
+ fd_become_readable(exec_ctx, fd, pollset);
+ }
+ if (write_ev || cancel) {
+ fd_become_writable(exec_ctx, fd);
}
}
}
@@ -1434,7 +1301,6 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
- .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
@@ -1452,10 +1318,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_set_add_fd = pollset_set_add_fd,
.pollset_set_del_fd = pollset_set_del_fd,
- .workqueue_ref = workqueue_ref,
- .workqueue_unref = workqueue_unref,
- .workqueue_scheduler = workqueue_scheduler,
-
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c
index a58218bae3..29a86f73ff 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.c
+++ b/src/core/lib/iomgr/ev_epollsig_linux.c
@@ -44,7 +44,6 @@
#include "src/core/lib/iomgr/lockfree_event.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
@@ -162,7 +161,9 @@ static void fd_global_shutdown(void);
* Polling island Declarations
*/
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+//#define PI_REFCOUNT_DEBUG
+
+#ifdef PI_REFCOUNT_DEBUG
#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
#define PI_UNREF(exec_ctx, p, r) \
@@ -177,8 +178,6 @@ static void fd_global_shutdown(void);
/* This is also used as grpc_workqueue (by directly casing it) */
typedef struct polling_island {
- grpc_closure_scheduler workqueue_scheduler;
-
gpr_mu mu;
/* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
the refcount.
@@ -199,15 +198,6 @@ typedef struct polling_island {
/* Number of threads currently polling on this island */
gpr_atm poller_count;
- /* Mutex guarding the read end of the workqueue (must be held to pop from
- * workqueue_items) */
- gpr_mu workqueue_read_mu;
- /* Queue of closures to be executed */
- gpr_mpscq workqueue_items;
- /* Count of items in workqueue_items */
- gpr_atm workqueue_item_count;
- /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
- grpc_wakeup_fd workqueue_wakeup_fd;
/* The fd of the underlying epoll set */
int epoll_fd;
@@ -282,8 +272,6 @@ static __thread polling_island *g_current_thread_polling_island;
/* Forward declaration */
static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error);
#ifdef GRPC_TSAN
/* Currently TSAN may incorrectly flag data races between epoll_ctl and
@@ -296,13 +284,10 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
gpr_atm g_epoll_sync;
#endif /* defined(GRPC_TSAN) */
-static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
- workqueue_enqueue, workqueue_enqueue, "workqueue"};
-
static void pi_add_ref(polling_island *pi);
static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+#ifdef PI_REFCOUNT_DEBUG
static void pi_add_ref_dbg(polling_island *pi, const char *reason,
const char *file, int line) {
long old_cnt = gpr_atm_acq_load(&pi->ref_count);
@@ -318,36 +303,6 @@ static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
(void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
}
-
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
- const char *file, int line,
- const char *reason) {
- if (workqueue != NULL) {
- pi_add_ref_dbg((polling_island *)workqueue, reason, file, line);
- }
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {
- if (workqueue != NULL) {
- pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line);
- }
-}
-#else
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
- if (workqueue != NULL) {
- pi_add_ref((polling_island *)workqueue);
- }
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue) {
- if (workqueue != NULL) {
- pi_unref(exec_ctx, (polling_island *)workqueue);
- }
-}
#endif
static void pi_add_ref(polling_island *pi) {
@@ -511,26 +466,16 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
*error = GRPC_ERROR_NONE;
pi = gpr_malloc(sizeof(*pi));
- pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
gpr_mu_init(&pi->mu);
pi->fd_cnt = 0;
pi->fd_capacity = 0;
pi->fds = NULL;
pi->epoll_fd = -1;
- gpr_mu_init(&pi->workqueue_read_mu);
- gpr_mpscq_init(&pi->workqueue_items);
- gpr_atm_rel_store(&pi->workqueue_item_count, 0);
-
gpr_atm_rel_store(&pi->ref_count, 0);
gpr_atm_rel_store(&pi->poller_count, 0);
gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
- if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
- err_desc)) {
- goto done;
- }
-
pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (pi->epoll_fd < 0) {
@@ -538,8 +483,6 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
goto done;
}
- polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
-
if (initial_fd != NULL) {
polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
}
@@ -558,11 +501,7 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
if (pi->epoll_fd >= 0) {
close(pi->epoll_fd);
}
- GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
- gpr_mu_destroy(&pi->workqueue_read_mu);
- gpr_mpscq_destroy(&pi->workqueue_items);
gpr_mu_destroy(&pi->mu);
- grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
gpr_free(pi->fds);
gpr_free(pi);
}
@@ -707,45 +646,6 @@ static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
}
}
-static void workqueue_maybe_wakeup(polling_island *pi) {
- /* If this thread is the current poller, then it may be that it's about to
- decrement the current poller count, so we need to look past this thread */
- bool is_current_poller = (g_current_thread_polling_island == pi);
- gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
- gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
- /* Only issue a wakeup if it's likely that some poller could come in and take
- it right now. Note that since we do an anticipatory mpscq_pop every poll
- loop, it's ok if we miss the wakeup here, as we'll get the work item when
- the next poller enters anyway. */
- if (current_pollers >= min_current_pollers_for_wakeup) {
- GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
- grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
- }
-}
-
-static void workqueue_move_items_to_parent(polling_island *q) {
- polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
- if (p == NULL) {
- return;
- }
- gpr_mu_lock(&q->workqueue_read_mu);
- int num_added = 0;
- while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
- gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
- if (n != NULL) {
- gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
- gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
- gpr_mpscq_push(&p->workqueue_items, n);
- num_added++;
- }
- }
- gpr_mu_unlock(&q->workqueue_read_mu);
- if (num_added > 0) {
- workqueue_maybe_wakeup(p);
- }
- workqueue_move_items_to_parent(p);
-}
-
static polling_island *polling_island_merge(polling_island *p,
polling_island *q,
grpc_error **error) {
@@ -770,8 +670,6 @@ static polling_island *polling_island_merge(polling_island *p,
/* Add the 'merged_to' link from p --> q */
gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
-
- workqueue_move_items_to_parent(p);
}
/* else if p == q, nothing needs to be done */
@@ -782,32 +680,6 @@ static polling_island *polling_island_merge(polling_island *p,
return q;
}
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error) {
- GPR_TIMER_BEGIN("workqueue.enqueue", 0);
- grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler;
- /* take a ref to the workqueue: otherwise it can happen that whatever events
- * this kicks off ends up destroying the workqueue before this function
- * completes */
- GRPC_WORKQUEUE_REF(workqueue, "enqueue");
- polling_island *pi = (polling_island *)workqueue;
- gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
- closure->error_data.error = error;
- gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
- if (last == 0) {
- workqueue_maybe_wakeup(pi);
- }
- workqueue_move_items_to_parent(pi);
- GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
- GPR_TIMER_END("workqueue.enqueue", 0);
-}
-
-static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
- polling_island *pi = (polling_island *)workqueue;
- return workqueue == NULL ? grpc_schedule_on_exec_ctx
- : &pi->workqueue_scheduler;
-}
-
static grpc_error *polling_island_global_init() {
grpc_error *error = GRPC_ERROR_NONE;
@@ -1069,14 +941,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
}
-static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
- gpr_mu_lock(&fd->po.mu);
- grpc_workqueue *workqueue =
- GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
- gpr_mu_unlock(&fd->po.mu);
- return workqueue;
-}
-
/*******************************************************************************
* Pollset Definitions
*/
@@ -1314,44 +1178,6 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->po.mu);
}
-static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
- polling_island *pi) {
- if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
- gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
- gpr_mu_unlock(&pi->workqueue_read_mu);
- if (n != NULL) {
- gpr_atm remaining =
- gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) - 1;
- GRPC_POLLING_TRACE(
- "maybe_do_workqueue_work: pi: %p: got closure %p, remaining = "
- "%" PRIdPTR,
- pi, n, remaining);
- if (remaining > 0) {
- workqueue_maybe_wakeup(pi);
- }
- grpc_closure *c = (grpc_closure *)n;
- grpc_error *error = c->error_data.error;
-#ifndef NDEBUG
- c->scheduled = false;
-#endif
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- return true;
- } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
- /* n == NULL might mean there's work but it's not available to be popped
- * yet - try to ensure another workqueue wakes up to check shortly if so
- */
- GRPC_POLLING_TRACE(
- "maybe_do_workqueue_work: pi: %p: more to do, but not yet", pi);
- workqueue_maybe_wakeup(pi);
- }
- } else {
- GRPC_POLLING_TRACE("maybe_do_workqueue_work: pi: %p: read already locked",
- pi);
- }
- return false;
-}
-
#define GRPC_EPOLL_MAX_EVENTS 100
/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
@@ -1407,76 +1233,61 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
PI_ADD_REF(pi, "ps_work");
gpr_mu_unlock(&pollset->po.mu);
- /* If we get some workqueue work to do, it might end up completing an item on
- the completion queue, so there's no need to poll... so we skip that and
- redo the complete loop to verify */
- GRPC_POLLING_TRACE("pollset_work: pollset: %p, worker %p, pi %p", pollset,
- worker, pi);
- if (!maybe_do_workqueue_work(exec_ctx, pi)) {
- GRPC_POLLING_TRACE("pollset_work: begins");
- gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
- g_current_thread_polling_island = pi;
-
- GRPC_SCHEDULING_START_BLOCKING_REGION;
- ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
- sig_mask);
- GRPC_SCHEDULING_END_BLOCKING_REGION;
- if (ep_rv < 0) {
- if (errno != EINTR) {
- gpr_asprintf(&err_msg,
- "epoll_wait() epoll fd: %d failed with error: %d (%s)",
- epoll_fd, errno, strerror(errno));
- append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
- } else {
- /* We were interrupted. Save an interation by doing a zero timeout
- epoll_wait to see if there are any other events of interest */
- GRPC_POLLING_TRACE(
- "pollset_work: pollset: %p, worker: %p received kick",
- (void *)pollset, (void *)worker);
- ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
- }
+ gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
+ g_current_thread_polling_island = pi;
+
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
+ ep_rv =
+ epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
+ if (ep_rv < 0) {
+ if (errno != EINTR) {
+ gpr_asprintf(&err_msg,
+ "epoll_wait() epoll fd: %d failed with error: %d (%s)",
+ epoll_fd, errno, strerror(errno));
+ append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
+ } else {
+ /* We were interrupted. Save an interation by doing a zero timeout
+ epoll_wait to see if there are any other events of interest */
+ GRPC_POLLING_TRACE("pollset_work: pollset: %p, worker: %p received kick",
+ (void *)pollset, (void *)worker);
+ ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
}
+ }
#ifdef GRPC_TSAN
- /* See the definition of g_poll_sync for more details */
- gpr_atm_acq_load(&g_epoll_sync);
+ /* See the definition of g_poll_sync for more details */
+ gpr_atm_acq_load(&g_epoll_sync);
#endif /* defined(GRPC_TSAN) */
- for (int i = 0; i < ep_rv; ++i) {
- void *data_ptr = ep_ev[i].data.ptr;
- if (data_ptr == &pi->workqueue_wakeup_fd) {
- append_error(error,
- grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd),
- err_desc);
- maybe_do_workqueue_work(exec_ctx, pi);
- } else if (data_ptr == &polling_island_wakeup_fd) {
- GRPC_POLLING_TRACE(
- "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
- "%d) got merged",
- (void *)pollset, (void *)worker, epoll_fd);
- /* This means that our polling island is merged with a different
- island. We do not have to do anything here since the subsequent call
- to the function pollset_work_and_unlock() will pick up the correct
- epoll_fd */
- } else {
- grpc_fd *fd = data_ptr;
- int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
- int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
- int write_ev = ep_ev[i].events & EPOLLOUT;
- if (read_ev || cancel) {
- fd_become_readable(exec_ctx, fd, pollset);
- }
- if (write_ev || cancel) {
- fd_become_writable(exec_ctx, fd);
- }
+ for (int i = 0; i < ep_rv; ++i) {
+ void *data_ptr = ep_ev[i].data.ptr;
+ if (data_ptr == &polling_island_wakeup_fd) {
+ GRPC_POLLING_TRACE(
+ "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
+ "%d) got merged",
+ (void *)pollset, (void *)worker, epoll_fd);
+ /* This means that our polling island is merged with a different
+ island. We do not have to do anything here since the subsequent call
+ to the function pollset_work_and_unlock() will pick up the correct
+ epoll_fd */
+ } else {
+ grpc_fd *fd = data_ptr;
+ int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
+ int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
+ int write_ev = ep_ev[i].events & EPOLLOUT;
+ if (read_ev || cancel) {
+ fd_become_readable(exec_ctx, fd, pollset);
+ }
+ if (write_ev || cancel) {
+ fd_become_writable(exec_ctx, fd);
}
}
-
- g_current_thread_polling_island = NULL;
- gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
- GRPC_POLLING_TRACE("pollset_work: ends");
}
+ g_current_thread_polling_island = NULL;
+ gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
+
GPR_ASSERT(pi != NULL);
/* Before leaving, release the extra ref we added to the polling island. It
@@ -1867,7 +1678,6 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
- .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
@@ -1885,10 +1695,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_set_add_fd = pollset_set_add_fd,
.pollset_set_del_fd = pollset_set_del_fd,
- .workqueue_ref = workqueue_ref,
- .workqueue_unref = workqueue_unref,
- .workqueue_scheduler = workqueue_scheduler,
-
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 806f985229..6145c9ec3f 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -633,8 +633,6 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
GRPC_FD_UNREF(fd, "poll");
}
-static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; }
-
/*******************************************************************************
* pollset_posix.c
*/
@@ -1274,30 +1272,6 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
}
/*******************************************************************************
- * workqueue stubs
- */
-
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
- const char *file, int line,
- const char *reason) {
- return workqueue;
-}
-static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {}
-#else
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
- return workqueue;
-}
-static void workqueue_unref(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue) {}
-#endif
-
-static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
- return grpc_schedule_on_exec_ctx;
-}
-
-/*******************************************************************************
* Condition Variable polling extensions
*/
@@ -1514,7 +1488,6 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
- .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
@@ -1532,10 +1505,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_set_add_fd = pollset_set_add_fd,
.pollset_set_del_fd = pollset_set_del_fd,
- .workqueue_ref = workqueue_ref,
- .workqueue_unref = workqueue_unref,
- .workqueue_scheduler = workqueue_scheduler,
-
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 4377b2a0f6..54960d1ecc 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -156,10 +156,6 @@ grpc_fd *grpc_fd_create(int fd, const char *name) {
return g_event_engine->fd_create(fd, name);
}
-grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd) {
- return g_event_engine->fd_get_workqueue(fd);
-}
-
int grpc_fd_wrapped_fd(grpc_fd *fd) {
return g_event_engine->fd_wrapped_fd(fd);
}
@@ -261,26 +257,4 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
g_event_engine->pollset_set_del_fd(exec_ctx, pollset_set, fd);
}
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file,
- int line, const char *reason) {
- return g_event_engine->workqueue_ref(workqueue, file, line, reason);
-}
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {
- g_event_engine->workqueue_unref(exec_ctx, workqueue, file, line, reason);
-}
-#else
-grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) {
- return g_event_engine->workqueue_ref(workqueue);
-}
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
- g_event_engine->workqueue_unref(exec_ctx, workqueue);
-}
-#endif
-
-grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) {
- return g_event_engine->workqueue_scheduler(workqueue);
-}
-
#endif // GRPC_POSIX_SOCKET
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index f87fe16901..54c4f2ee11 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -26,7 +26,6 @@
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/iomgr/workqueue.h"
extern grpc_tracer_flag grpc_polling_trace; /* Disabled by default */
@@ -45,7 +44,6 @@ typedef struct grpc_event_engine_vtable {
void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
bool (*fd_is_shutdown)(grpc_fd *fd);
- grpc_workqueue *(*fd_get_workqueue)(grpc_fd *fd);
grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx,
grpc_fd *fd);
@@ -82,17 +80,6 @@ typedef struct grpc_event_engine_vtable {
grpc_pollset_set *pollset_set, grpc_fd *fd);
void (*shutdown_engine)(void);
-
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
- grpc_workqueue *(*workqueue_ref)(grpc_workqueue *workqueue, const char *file,
- int line, const char *reason);
- void (*workqueue_unref)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason);
-#else
- grpc_workqueue *(*workqueue_ref)(grpc_workqueue *workqueue);
- void (*workqueue_unref)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
-#endif
- grpc_closure_scheduler *(*workqueue_scheduler)(grpc_workqueue *workqueue);
} grpc_event_engine_vtable;
void grpc_event_engine_init(void);
@@ -106,9 +93,6 @@ const char *grpc_get_poll_strategy_name();
This takes ownership of closing fd. */
grpc_fd *grpc_fd_create(int fd, const char *name);
-/* Get a workqueue that's associated with this fd */
-grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd);
-
/* Return the wrapped fd, or -1 if it has been released or closed. */
int grpc_fd_wrapped_fd(grpc_fd *fd);
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index 7cef32adce..6699be3646 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -23,7 +23,6 @@
#include <grpc/support/thd.h>
#include "src/core/lib/iomgr/combiner.h"
-#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx) {
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c
index 4ca23a2407..7621a7fe75 100644
--- a/src/core/lib/iomgr/executor.c
+++ b/src/core/lib/iomgr/executor.c
@@ -21,132 +21,172 @@
#include <string.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/thd.h>
+#include <grpc/support/tls.h>
+#include <grpc/support/useful.h>
+
#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/support/spinlock.h"
+
+#define MAX_DEPTH 2
-typedef struct grpc_executor_data {
- int busy; /**< is the thread currently running? */
- int shutting_down; /**< has \a grpc_shutdown() been invoked? */
- int pending_join; /**< has the thread finished but not been joined? */
- grpc_closure_list closures; /**< collection of pending work */
- gpr_thd_id tid; /**< thread id of the thread, only valid if \a busy or \a
- pending_join are true */
- gpr_thd_options options;
+typedef struct {
gpr_mu mu;
-} grpc_executor;
+ gpr_cv cv;
+ grpc_closure_list elems;
+ size_t depth;
+ bool shutdown;
+ gpr_thd_id id;
+} thread_state;
-static grpc_executor g_executor;
+static thread_state *g_thread_state;
+static size_t g_max_threads;
+static gpr_atm g_cur_threads;
+static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER;
-void grpc_executor_init() {
- memset(&g_executor, 0, sizeof(grpc_executor));
- gpr_mu_init(&g_executor.mu);
- g_executor.options = gpr_thd_options_default();
- gpr_thd_options_set_joinable(&g_executor.options);
-}
+GPR_TLS_DECL(g_this_thread_state);
-/* thread body */
-static void closure_exec_thread_func(void *ignored) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- while (1) {
- gpr_mu_lock(&g_executor.mu);
- if (g_executor.shutting_down != 0) {
- gpr_mu_unlock(&g_executor.mu);
- break;
- }
- if (grpc_closure_list_empty(g_executor.closures)) {
- /* no more work, time to die */
- GPR_ASSERT(g_executor.busy == 1);
- g_executor.busy = 0;
- gpr_mu_unlock(&g_executor.mu);
- break;
- } else {
- grpc_closure *c = g_executor.closures.head;
- grpc_closure_list_init(&g_executor.closures);
- gpr_mu_unlock(&g_executor.mu);
- while (c != NULL) {
- grpc_closure *next = c->next_data.next;
- grpc_error *error = c->error_data.error;
+static void executor_thread(void *arg);
+
+static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
+ size_t n = 0;
+
+ grpc_closure *c = list.head;
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
#ifndef NDEBUG
- c->scheduled = false;
+ c->scheduled = false;
#endif
- c->cb(&exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- c = next;
- }
- grpc_exec_ctx_flush(&exec_ctx);
- }
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ c = next;
+ n++;
}
- grpc_exec_ctx_finish(&exec_ctx);
+
+ return n;
}
-/* Spawn the thread if new work has arrived a no thread is up */
-static void maybe_spawn_locked() {
- if (grpc_closure_list_empty(g_executor.closures) == 1) {
- return;
- }
- if (g_executor.shutting_down == 1) {
- return;
- }
+bool grpc_executor_is_threaded() {
+ return gpr_atm_no_barrier_load(&g_cur_threads) > 0;
+}
- if (g_executor.busy != 0) {
- /* Thread still working. New work will be picked up by already running
- * thread. Not spawning anything. */
- return;
- } else if (g_executor.pending_join != 0) {
- /* Pickup the remains of the previous incarnations of the thread. */
- gpr_thd_join(g_executor.tid);
- g_executor.pending_join = 0;
+void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) {
+ gpr_atm cur_threads = gpr_atm_no_barrier_load(&g_cur_threads);
+ if (threading) {
+ if (cur_threads > 0) return;
+ g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores());
+ gpr_atm_no_barrier_store(&g_cur_threads, 1);
+ gpr_tls_init(&g_this_thread_state);
+ g_thread_state = gpr_zalloc(sizeof(thread_state) * g_max_threads);
+ for (size_t i = 0; i < g_max_threads; i++) {
+ gpr_mu_init(&g_thread_state[i].mu);
+ gpr_cv_init(&g_thread_state[i].cv);
+ g_thread_state[i].elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
+ }
+
+ gpr_thd_options opt = gpr_thd_options_default();
+ gpr_thd_options_set_joinable(&opt);
+ gpr_thd_new(&g_thread_state[0].id, executor_thread, &g_thread_state[0],
+ &opt);
+ } else {
+ if (cur_threads == 0) return;
+ for (size_t i = 0; i < g_max_threads; i++) {
+ gpr_mu_lock(&g_thread_state[i].mu);
+ g_thread_state[i].shutdown = true;
+ gpr_cv_signal(&g_thread_state[i].cv);
+ gpr_mu_unlock(&g_thread_state[i].mu);
+ }
+ /* ensure no thread is adding a new thread... once this is past, then
+ no thread will try to add a new one either (since shutdown is true) */
+ gpr_spinlock_lock(&g_adding_thread_lock);
+ gpr_spinlock_unlock(&g_adding_thread_lock);
+ for (gpr_atm i = 0; i < g_cur_threads; i++) {
+ gpr_thd_join(g_thread_state[i].id);
+ }
+ gpr_atm_no_barrier_store(&g_cur_threads, 0);
+ for (size_t i = 0; i < g_max_threads; i++) {
+ gpr_mu_destroy(&g_thread_state[i].mu);
+ gpr_cv_destroy(&g_thread_state[i].cv);
+ run_closures(exec_ctx, g_thread_state[i].elems);
+ }
+ gpr_free(g_thread_state);
+ gpr_tls_destroy(&g_this_thread_state);
}
+}
- /* All previous instances of the thread should have been joined at this point.
- * Spawn time! */
- g_executor.busy = 1;
- GPR_ASSERT(gpr_thd_new(&g_executor.tid, closure_exec_thread_func, NULL,
- &g_executor.options));
- g_executor.pending_join = 1;
+void grpc_executor_init(grpc_exec_ctx *exec_ctx) {
+ gpr_atm_no_barrier_store(&g_cur_threads, 0);
+ grpc_executor_set_threading(exec_ctx, true);
}
-static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error) {
- gpr_mu_lock(&g_executor.mu);
- if (g_executor.shutting_down == 0) {
- grpc_closure_list_append(&g_executor.closures, closure, error);
- maybe_spawn_locked();
+void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) {
+ grpc_executor_set_threading(exec_ctx, false);
+}
+
+static void executor_thread(void *arg) {
+ thread_state *ts = arg;
+ gpr_tls_set(&g_this_thread_state, (intptr_t)ts);
+
+ grpc_exec_ctx exec_ctx =
+ GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
+
+ size_t subtract_depth = 0;
+ for (;;) {
+ gpr_mu_lock(&ts->mu);
+ ts->depth -= subtract_depth;
+ while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
+ gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
+ }
+ if (ts->shutdown) {
+ gpr_mu_unlock(&ts->mu);
+ break;
+ }
+ grpc_closure_list exec = ts->elems;
+ ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
+ gpr_mu_unlock(&ts->mu);
+
+ subtract_depth = run_closures(&exec_ctx, exec);
+ grpc_exec_ctx_flush(&exec_ctx);
}
- gpr_mu_unlock(&g_executor.mu);
+ grpc_exec_ctx_finish(&exec_ctx);
}
-void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) {
- int pending_join;
-
- gpr_mu_lock(&g_executor.mu);
- pending_join = g_executor.pending_join;
- g_executor.shutting_down = 1;
- gpr_mu_unlock(&g_executor.mu);
- /* we can release the lock at this point despite the access to the closure
- * list below because we aren't accepting new work */
-
- /* Execute pending callbacks, some may be performing cleanups */
- grpc_closure *c = g_executor.closures.head;
- grpc_closure_list_init(&g_executor.closures);
- while (c != NULL) {
- grpc_closure *next = c->next_data.next;
- grpc_error *error = c->error_data.error;
-#ifndef NDEBUG
- c->scheduled = false;
-#endif
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- c = next;
+static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error) {
+ size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
+ if (cur_thread_count == 0) {
+ grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
+ return;
+ }
+ thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state);
+ if (ts == NULL) {
+ ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)];
}
- grpc_exec_ctx_flush(exec_ctx);
- GPR_ASSERT(grpc_closure_list_empty(g_executor.closures));
- if (pending_join) {
- gpr_thd_join(g_executor.tid);
+ gpr_mu_lock(&ts->mu);
+ if (grpc_closure_list_empty(ts->elems)) {
+ gpr_cv_signal(&ts->cv);
+ }
+ grpc_closure_list_append(&ts->elems, closure, error);
+ ts->depth++;
+ bool try_new_thread = ts->depth > MAX_DEPTH &&
+ cur_thread_count < g_max_threads && !ts->shutdown;
+ gpr_mu_unlock(&ts->mu);
+ if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) {
+ cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads);
+ if (cur_thread_count < g_max_threads) {
+ gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1);
+
+ gpr_thd_options opt = gpr_thd_options_default();
+ gpr_thd_options_set_joinable(&opt);
+ gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread,
+ &g_thread_state[cur_thread_count], &opt);
+ }
+ gpr_spinlock_unlock(&g_adding_thread_lock);
}
- gpr_mu_destroy(&g_executor.mu);
}
static const grpc_closure_scheduler_vtable executor_vtable = {
diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h
index ace9d80b06..c3382a0a12 100644
--- a/src/core/lib/iomgr/executor.h
+++ b/src/core/lib/iomgr/executor.h
@@ -26,11 +26,18 @@
* This mechanism is meant to outsource work (grpc_closure instances) to a
* thread, for those cases where blocking isn't an option but there isn't a
* non-blocking solution available. */
-void grpc_executor_init();
+void grpc_executor_init(grpc_exec_ctx *exec_ctx);
extern grpc_closure_scheduler *grpc_executor_scheduler;
/** Shutdown the executor, running all pending work as part of the call */
void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx);
+/** Is the executor multi-threaded? */
+bool grpc_executor_is_threaded();
+
+/* enable/disable threading - must be called after grpc_executor_init and before
+ grpc_executor_shutdown */
+void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool enable);
+
#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c
index c8b784e4c0..3d19953eeb 100644
--- a/src/core/lib/iomgr/iomgr.c
+++ b/src/core/lib/iomgr/iomgr.c
@@ -29,6 +29,7 @@
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/network_status_tracker.h"
#include "src/core/lib/iomgr/timer.h"
@@ -41,11 +42,12 @@ static gpr_cv g_rcv;
static int g_shutdown;
static grpc_iomgr_object g_root_object;
-void grpc_iomgr_init(void) {
+void grpc_iomgr_init(grpc_exec_ctx *exec_ctx) {
g_shutdown = 0;
gpr_mu_init(&g_mu);
gpr_cv_init(&g_rcv);
grpc_exec_ctx_global_init();
+ grpc_executor_init(exec_ctx);
grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC));
g_root_object.next = g_root_object.prev = &g_root_object;
g_root_object.name = "root";
@@ -53,7 +55,7 @@ void grpc_iomgr_init(void) {
grpc_iomgr_platform_init();
}
-void grpc_iomgr_start(void) { grpc_timer_manager_init(); }
+void grpc_iomgr_start(grpc_exec_ctx *exec_ctx) { grpc_timer_manager_init(); }
static size_t count_objects(void) {
grpc_iomgr_object *obj;
@@ -78,6 +80,7 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) {
grpc_timer_manager_shutdown();
grpc_iomgr_platform_flush();
+ grpc_executor_shutdown(exec_ctx);
gpr_mu_lock(&g_mu);
g_shutdown = 1;
diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h
index 45dedc2b0a..e3cd6ebe79 100644
--- a/src/core/lib/iomgr/iomgr.h
+++ b/src/core/lib/iomgr/iomgr.h
@@ -23,10 +23,10 @@
#include "src/core/lib/iomgr/port.h"
/** Initializes the iomgr. */
-void grpc_iomgr_init(void);
+void grpc_iomgr_init(grpc_exec_ctx *exec_ctx);
/** Starts any background threads for iomgr. */
-void grpc_iomgr_start(void);
+void grpc_iomgr_start(grpc_exec_ctx *exec_ctx);
/** Signals the intention to shutdown the iomgr. Expects to be able to flush
* exec_ctx. */
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index d3632c4cba..7c3fb93279 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -566,7 +566,7 @@ static void rq_reclamation_done(grpc_exec_ctx *exec_ctx, void *rq,
grpc_resource_quota *grpc_resource_quota_create(const char *name) {
grpc_resource_quota *resource_quota = gpr_malloc(sizeof(*resource_quota));
gpr_ref_init(&resource_quota->refs, 1);
- resource_quota->combiner = grpc_combiner_create(NULL);
+ resource_quota->combiner = grpc_combiner_create();
resource_quota->free_pool = INT64_MAX;
resource_quota->size = INT64_MAX;
gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
@@ -579,12 +579,11 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) {
gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR,
(intptr_t)resource_quota);
}
- grpc_closure_init(
- &resource_quota->rq_step_closure, rq_step, resource_quota,
- grpc_combiner_finally_scheduler(resource_quota->combiner, true));
+ grpc_closure_init(&resource_quota->rq_step_closure, rq_step, resource_quota,
+ grpc_combiner_finally_scheduler(resource_quota->combiner));
grpc_closure_init(&resource_quota->rq_reclamation_done_closure,
rq_reclamation_done, resource_quota,
- grpc_combiner_scheduler(resource_quota->combiner, false));
+ grpc_combiner_scheduler(resource_quota->combiner));
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
resource_quota->roots[i] = NULL;
}
@@ -689,18 +688,18 @@ grpc_resource_user *grpc_resource_user_create(
grpc_resource_quota_ref_internal(resource_quota);
grpc_closure_init(&resource_user->allocate_closure, &ru_allocate,
resource_user,
- grpc_combiner_scheduler(resource_quota->combiner, false));
+ grpc_combiner_scheduler(resource_quota->combiner));
grpc_closure_init(&resource_user->add_to_free_pool_closure,
&ru_add_to_free_pool, resource_user,
- grpc_combiner_scheduler(resource_quota->combiner, false));
+ grpc_combiner_scheduler(resource_quota->combiner));
grpc_closure_init(&resource_user->post_reclaimer_closure[0],
&ru_post_benign_reclaimer, resource_user,
- grpc_combiner_scheduler(resource_quota->combiner, false));
+ grpc_combiner_scheduler(resource_quota->combiner));
grpc_closure_init(&resource_user->post_reclaimer_closure[1],
&ru_post_destructive_reclaimer, resource_user,
- grpc_combiner_scheduler(resource_quota->combiner, false));
+ grpc_combiner_scheduler(resource_quota->combiner));
grpc_closure_init(&resource_user->destroy_closure, &ru_destroy, resource_user,
- grpc_combiner_scheduler(resource_quota->combiner, false));
+ grpc_combiner_scheduler(resource_quota->combiner));
gpr_mu_init(&resource_user->mu);
gpr_atm_rel_store(&resource_user->refs, 1);
gpr_atm_rel_store(&resource_user->shutdown, 0);
@@ -757,12 +756,12 @@ void grpc_resource_user_unref(grpc_exec_ctx *exec_ctx,
void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user) {
if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) {
- grpc_closure_sched(exec_ctx,
- grpc_closure_create(
- ru_shutdown, resource_user,
- grpc_combiner_scheduler(
- resource_user->resource_quota->combiner, false)),
- GRPC_ERROR_NONE);
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_create(
+ ru_shutdown, resource_user,
+ grpc_combiner_scheduler(resource_user->resource_quota->combiner)),
+ GRPC_ERROR_NONE);
}
}
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 36091803bb..a2404f97c1 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -543,26 +543,15 @@ static int tcp_get_fd(grpc_endpoint *ep) {
return tcp->fd;
}
-static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) {
- grpc_tcp *tcp = (grpc_tcp *)ep;
- return grpc_fd_get_workqueue(tcp->em_fd);
-}
-
static grpc_resource_user *tcp_get_resource_user(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return tcp->resource_user;
}
-static const grpc_endpoint_vtable vtable = {tcp_read,
- tcp_write,
- tcp_get_workqueue,
- tcp_add_to_pollset,
- tcp_add_to_pollset_set,
- tcp_shutdown,
- tcp_destroy,
- tcp_get_resource_user,
- tcp_get_peer,
- tcp_get_fd};
+static const grpc_endpoint_vtable vtable = {
+ tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
+ tcp_shutdown, tcp_destroy, tcp_get_resource_user, tcp_get_peer,
+ tcp_get_fd};
#define MAX_CHUNK_SIZE 32 * 1024 * 1024
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index 5d3d315234..a37a75c8fa 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -318,15 +318,12 @@ static grpc_resource_user *uv_get_resource_user(grpc_endpoint *ep) {
return tcp->resource_user;
}
-static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) { return NULL; }
-
static int uv_get_fd(grpc_endpoint *ep) { return -1; }
static grpc_endpoint_vtable vtable = {
- uv_endpoint_read, uv_endpoint_write, uv_get_workqueue,
- uv_add_to_pollset, uv_add_to_pollset_set, uv_endpoint_shutdown,
- uv_destroy, uv_get_resource_user, uv_get_peer,
- uv_get_fd};
+ uv_endpoint_read, uv_endpoint_write, uv_add_to_pollset,
+ uv_add_to_pollset_set, uv_endpoint_shutdown, uv_destroy,
+ uv_get_resource_user, uv_get_peer, uv_get_fd};
grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
grpc_resource_quota *resource_quota,
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index a0a2563956..dbae42e936 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -395,8 +395,6 @@ static char *win_get_peer(grpc_endpoint *ep) {
return gpr_strdup(tcp->peer_string);
}
-static grpc_workqueue *win_get_workqueue(grpc_endpoint *ep) { return NULL; }
-
static grpc_resource_user *win_get_resource_user(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return tcp->resource_user;
@@ -404,16 +402,10 @@ static grpc_resource_user *win_get_resource_user(grpc_endpoint *ep) {
static int win_get_fd(grpc_endpoint *ep) { return -1; }
-static grpc_endpoint_vtable vtable = {win_read,
- win_write,
- win_get_workqueue,
- win_add_to_pollset,
- win_add_to_pollset_set,
- win_shutdown,
- win_destroy,
- win_get_resource_user,
- win_get_peer,
- win_get_fd};
+static grpc_endpoint_vtable vtable = {
+ win_read, win_write, win_add_to_pollset, win_add_to_pollset_set,
+ win_shutdown, win_destroy, win_get_resource_user, win_get_peer,
+ win_get_fd};
grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
grpc_channel_args *channel_args,
diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h
deleted file mode 100644
index 558d4955d3..0000000000
--- a/src/core/lib/iomgr/workqueue.h
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_CORE_LIB_IOMGR_WORKQUEUE_H
-#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_H
-
-#include "src/core/lib/iomgr/closure.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
-#include "src/core/lib/iomgr/iomgr.h"
-#include "src/core/lib/iomgr/pollset.h"
-#include "src/core/lib/iomgr/pollset_set.h"
-#include "src/core/lib/iomgr/port.h"
-
-#ifdef GPR_WINDOWS
-#include "src/core/lib/iomgr/workqueue_windows.h"
-#endif
-
-/* grpc_workqueue is forward declared in exec_ctx.h */
-
-/* Reference counting functions. Use the macro's always
- (GRPC_WORKQUEUE_{REF,UNREF}).
-
- Pass in a descriptive reason string for reffing/unreffing as the last
- argument to each macro. When GRPC_WORKQUEUE_REFCOUNT_DEBUG is defined, that
- string will be printed alongside the refcount. When it is not defined, the
- string will be discarded at compilation time. */
-
-/*#define GRPC_WORKQUEUE_REFCOUNT_DEBUG*/
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-#define GRPC_WORKQUEUE_REF(p, r) \
- grpc_workqueue_ref((p), __FILE__, __LINE__, (r))
-#define GRPC_WORKQUEUE_UNREF(exec_ctx, p, r) \
- grpc_workqueue_unref((exec_ctx), (p), __FILE__, __LINE__, (r))
-grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file,
- int line, const char *reason);
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason);
-#else
-#define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p))
-#define GRPC_WORKQUEUE_UNREF(cl, p, r) grpc_workqueue_unref((cl), (p))
-grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue);
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
-#endif
-
-/** Fetch the workqueue closure scheduler. Items added to a work queue will be
- started in approximately the order they were enqueued, on some thread that
- may or may not be the current thread. Successive closures enqueued onto a
- workqueue MAY be executed concurrently.
-
- It is generally more expensive to add a closure to a workqueue than to the
- execution context, both in terms of CPU work and in execution latency.
-
- Use work queues when it's important that other threads be given a chance to
- tackle some workload. */
-grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue);
-
-#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_H */
diff --git a/src/core/lib/iomgr/workqueue_uv.c b/src/core/lib/iomgr/workqueue_uv.c
deleted file mode 100644
index 1900aa1ca6..0000000000
--- a/src/core/lib/iomgr/workqueue_uv.c
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * Copyright 2016 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "src/core/lib/iomgr/port.h"
-
-#ifdef GRPC_UV
-
-#include "src/core/lib/iomgr/workqueue.h"
-
-// Minimal implementation of grpc_workqueue for libuv
-// Works by directly enqueuing workqueue items onto the current execution
-// context, which is at least correct, if not performant or in the spirit of
-// workqueues.
-
-void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
-
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file,
- int line, const char *reason) {
- return workqueue;
-}
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {}
-#else
-grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) {
- return workqueue;
-}
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
-#endif
-
-grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) {
- return grpc_schedule_on_exec_ctx;
-}
-
-#endif /* GPR_UV */
diff --git a/src/core/lib/iomgr/workqueue_uv.h b/src/core/lib/iomgr/workqueue_uv.h
deleted file mode 100644
index 5cac45ea2a..0000000000
--- a/src/core/lib/iomgr/workqueue_uv.h
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- *
- * Copyright 2016 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_CORE_LIB_IOMGR_WORKQUEUE_UV_H
-#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_UV_H
-
-#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_UV_H */
diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c
deleted file mode 100644
index c7d650c767..0000000000
--- a/src/core/lib/iomgr/workqueue_windows.c
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#ifdef GPR_WINDOWS
-
-#include "src/core/lib/iomgr/workqueue.h"
-
-// Minimal implementation of grpc_workqueue for Windows
-// Works by directly enqueuing workqueue items onto the current execution
-// context, which is at least correct, if not performant or in the spirit of
-// workqueues.
-
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file,
- int line, const char *reason) {
- return workqueue;
-}
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {}
-#else
-grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) {
- return workqueue;
-}
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
-#endif
-
-grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) {
- return grpc_schedule_on_exec_ctx;
-}
-
-#endif /* GPR_WINDOWS */
diff --git a/src/core/lib/iomgr/workqueue_windows.h b/src/core/lib/iomgr/workqueue_windows.h
deleted file mode 100644
index 47ceed1110..0000000000
--- a/src/core/lib/iomgr/workqueue_windows.h
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_CORE_LIB_IOMGR_WORKQUEUE_WINDOWS_H
-#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_WINDOWS_H
-
-#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_WINDOWS_H */
diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c
index 537f6f922a..58112b04b4 100644
--- a/src/core/lib/security/transport/client_auth_filter.c
+++ b/src/core/lib/security/transport/client_auth_filter.c
@@ -50,7 +50,8 @@ typedef struct {
*/
grpc_polling_entity *pollent;
grpc_transport_stream_op_batch op;
- uint8_t security_context_set;
+ gpr_atm security_context_set;
+ gpr_mu security_context_mu;
grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT];
grpc_auth_metadata_context auth_md_context;
} call_data;
@@ -238,19 +239,26 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_linked_mdelem *l;
grpc_client_security_context *sec_ctx = NULL;
- if (!op->cancel_stream && calld->security_context_set == 0) {
- calld->security_context_set = 1;
- GPR_ASSERT(op->payload->context != NULL);
- if (op->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) {
- op->payload->context[GRPC_CONTEXT_SECURITY].value =
- grpc_client_security_context_create();
- op->payload->context[GRPC_CONTEXT_SECURITY].destroy =
- grpc_client_security_context_destroy;
+ if (!op->cancel_stream) {
+ /* double checked lock over security context to ensure it's set once */
+ if (gpr_atm_acq_load(&calld->security_context_set) == 0) {
+ gpr_mu_lock(&calld->security_context_mu);
+ if (gpr_atm_acq_load(&calld->security_context_set) == 0) {
+ GPR_ASSERT(op->payload->context != NULL);
+ if (op->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) {
+ op->payload->context[GRPC_CONTEXT_SECURITY].value =
+ grpc_client_security_context_create();
+ op->payload->context[GRPC_CONTEXT_SECURITY].destroy =
+ grpc_client_security_context_destroy;
+ }
+ sec_ctx = op->payload->context[GRPC_CONTEXT_SECURITY].value;
+ GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter");
+ sec_ctx->auth_context =
+ GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter");
+ gpr_atm_rel_store(&calld->security_context_set, 1);
+ }
+ gpr_mu_unlock(&calld->security_context_mu);
}
- sec_ctx = op->payload->context[GRPC_CONTEXT_SECURITY].value;
- GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter");
- sec_ctx->auth_context =
- GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter");
}
if (op->send_initial_metadata) {
@@ -297,6 +305,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
memset(calld, 0, sizeof(*calld));
+ gpr_mu_init(&calld->security_context_mu);
return GRPC_ERROR_NONE;
}
@@ -320,6 +329,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_slice_unref_internal(exec_ctx, calld->method);
}
reset_auth_metadata_context(&calld->auth_md_context);
+ gpr_mu_destroy(&calld->security_context_mu);
}
/* Constructor for channel_data */
diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c
index 8823458da5..643e057229 100644
--- a/src/core/lib/security/transport/secure_endpoint.c
+++ b/src/core/lib/security/transport/secure_endpoint.c
@@ -365,11 +365,6 @@ static int endpoint_get_fd(grpc_endpoint *secure_ep) {
return grpc_endpoint_get_fd(ep->wrapped_ep);
}
-static grpc_workqueue *endpoint_get_workqueue(grpc_endpoint *secure_ep) {
- secure_endpoint *ep = (secure_endpoint *)secure_ep;
- return grpc_endpoint_get_workqueue(ep->wrapped_ep);
-}
-
static grpc_resource_user *endpoint_get_resource_user(
grpc_endpoint *secure_ep) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
@@ -378,7 +373,6 @@ static grpc_resource_user *endpoint_get_resource_user(
static const grpc_endpoint_vtable vtable = {endpoint_read,
endpoint_write,
- endpoint_get_workqueue,
endpoint_add_to_pollset,
endpoint_add_to_pollset_set,
endpoint_shutdown,
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index e4d27ffbf6..fd4ed726b8 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -1453,7 +1453,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op_batch *stream_op = &bctl->op;
grpc_transport_stream_op_batch_payload *stream_op_payload =
&call->stream_op_payload;
- stream_op->covered_by_poller = true;
/* rewrite batch ops into a transport op */
for (i = 0; i < nops; i++) {
@@ -1642,10 +1641,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
- /* IF this is a server, then GRPC_OP_RECV_INITIAL_METADATA *must* come
- from server.c. In that case, it's coming from accept_stream, and in
- that case we're not necessarily covered by a poller. */
- stream_op->covered_by_poller = call->is_client;
call->received_initial_metadata = true;
call->buffered_metadata[0] =
op->data.recv_initial_metadata.recv_initial_metadata;
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 3575c5495c..01a1f33db2 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -113,6 +113,7 @@ void grpc_init(void) {
int i;
gpr_once_init(&g_basic_init, do_basic_init);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_mu_lock(&g_init_mu);
if (++g_initializations == 1) {
gpr_time_init();
@@ -139,8 +140,7 @@ void grpc_init(void) {
grpc_register_tracer("pending_tags", &grpc_trace_pending_tags);
#endif
grpc_security_pre_init();
- grpc_iomgr_init();
- grpc_executor_init();
+ grpc_iomgr_init(&exec_ctx);
gpr_timers_global_init();
grpc_handshaker_factory_registry_init();
grpc_security_init();
@@ -156,19 +156,20 @@ void grpc_init(void) {
grpc_tracer_init("GRPC_TRACE");
/* no more changes to channel init pipelines */
grpc_channel_init_finalize();
- grpc_iomgr_start();
+ grpc_iomgr_start(&exec_ctx);
}
gpr_mu_unlock(&g_init_mu);
+ grpc_exec_ctx_finish(&exec_ctx);
GRPC_API_TRACE("grpc_init(void)", 0, ());
}
void grpc_shutdown(void) {
int i;
GRPC_API_TRACE("grpc_shutdown(void)", 0, ());
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_exec_ctx exec_ctx =
+ GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
gpr_mu_lock(&g_init_mu);
if (--g_initializations == 0) {
- grpc_executor_shutdown(&exec_ctx);
grpc_iomgr_shutdown(&exec_ctx);
gpr_timers_global_destroy();
grpc_tracer_shutdown();
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 16f12c345b..d231157c87 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -112,10 +112,6 @@ typedef struct grpc_transport_stream_op_batch {
/** Values for the stream op (fields set are determined by flags above) */
grpc_transport_stream_op_batch_payload *payload;
- /** Is the completion of this op covered by a poller (if false: the op should
- complete independently of some pollset being polled) */
- bool covered_by_poller : 1;
-
/** Send initial metadata to the peer, from the provided metadata batch. */
bool send_initial_metadata : 1;
diff --git a/src/core/lib/transport/transport_op_string.c b/src/core/lib/transport/transport_op_string.c
index b47b1e8306..7b18229ba6 100644
--- a/src/core/lib/transport/transport_op_string.c
+++ b/src/core/lib/transport/transport_op_string.c
@@ -64,9 +64,6 @@ char *grpc_transport_stream_op_batch_string(
gpr_strvec b;
gpr_strvec_init(&b);
- gpr_strvec_add(
- &b, gpr_strdup(op->covered_by_poller ? "[COVERED]" : "[UNCOVERED]"));
-
if (op->send_initial_metadata) {
gpr_strvec_add(&b, gpr_strdup(" "));
gpr_strvec_add(&b, gpr_strdup("SEND_INITIAL_METADATA{"));
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index e3b2ffaf9e..48782174a7 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -143,8 +143,6 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/wakeup_fd_nospecial.c',
'src/core/lib/iomgr/wakeup_fd_pipe.c',
'src/core/lib/iomgr/wakeup_fd_posix.c',
- 'src/core/lib/iomgr/workqueue_uv.c',
- 'src/core/lib/iomgr/workqueue_windows.c',
'src/core/lib/json/json.c',
'src/core/lib/json/json_reader.c',
'src/core/lib/json/json_string.c',