aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c365
-rw-r--r--src/core/ext/filters/client_channel/client_channel.h2
-rw-r--r--src/core/ext/filters/client_channel/client_channel_plugin.c1
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c54
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c6
-rw-r--r--src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c3
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.c26
-rw-r--r--src/core/ext/filters/max_age/max_age_filter.c18
-rw-r--r--src/core/ext/filters/message_size/message_size_filter.c22
-rw-r--r--src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c8
-rw-r--r--src/core/ext/filters/workarounds/workaround_utils.c3
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c31
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h7
-rw-r--r--src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c13
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.c13
-rw-r--r--src/core/lib/iomgr/resolve_address_uv.c4
-rw-r--r--src/core/lib/iomgr/tcp_client_uv.c3
-rw-r--r--src/core/lib/iomgr/tcp_server_uv.c1
-rw-r--r--src/core/lib/iomgr/tcp_uv.c15
-rw-r--r--src/core/lib/iomgr/timer_manager.c101
-rw-r--r--src/core/lib/security/credentials/jwt/jwt_verifier.c45
-rw-r--r--src/core/lib/security/transport/client_auth_filter.c100
-rw-r--r--src/core/lib/security/transport/server_auth_filter.c143
-rw-r--r--src/core/lib/support/arena.c4
-rw-r--r--src/core/lib/support/atm.c14
-rw-r--r--src/core/lib/support/avl.c2
-rw-r--r--src/core/lib/support/mpscq.c25
-rw-r--r--src/core/lib/support/mpscq.h26
-rw-r--r--src/core/lib/support/stack_lockfree.c137
-rw-r--r--src/core/lib/support/stack_lockfree.h38
-rw-r--r--src/core/lib/surface/server.c112
-rw-r--r--src/core/lib/transport/static_metadata.c3
-rw-r--r--src/core/tsi/ssl_transport_security.c10
33 files changed, 894 insertions, 461 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index de516ab4c9..7add432589 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -52,6 +52,8 @@
/* Client channel implementation */
+grpc_tracer_flag grpc_client_channel_trace = GRPC_TRACER_INITIALIZER(false);
+
/*************************************************************************
* METHOD-CONFIG TABLE
*/
@@ -241,6 +243,10 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_REF(error));
}
}
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: setting connectivity state to %s", chand,
+ grpc_connectivity_state_name(state));
+ }
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
reason);
}
@@ -251,6 +257,10 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state publish_state = w->state;
/* check if the notification is for the latest policy */
if (w->lb_policy == w->chand->lb_policy) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand,
+ w->lb_policy, grpc_connectivity_state_name(w->state));
+ }
if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) {
publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver);
@@ -263,7 +273,6 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state);
}
}
-
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy");
gpr_free(w);
}
@@ -273,7 +282,6 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
grpc_connectivity_state current_state) {
lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
-
w->chand = chand;
GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w,
grpc_combiner_scheduler(chand->combiner));
@@ -283,6 +291,18 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
&w->on_changed);
}
+static void start_resolving_locked(grpc_exec_ctx *exec_ctx,
+ channel_data *chand) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand);
+ }
+ GPR_ASSERT(!chand->started_resolving);
+ chand->started_resolving = true;
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
+ grpc_resolver_next_locked(exec_ctx, chand->resolver, &chand->resolver_result,
+ &chand->on_resolver_result_changed);
+}
+
typedef struct {
char *server_name;
grpc_server_retry_throttle_data *retry_throttle_data;
@@ -345,8 +365,13 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
void *arg, grpc_error *error) {
channel_data *chand = arg;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand,
+ grpc_error_string(error));
+ }
// Extract the following fields from the resolver result, if non-NULL.
char *lb_policy_name = NULL;
+ bool lb_policy_name_changed = false;
grpc_lb_policy *new_lb_policy = NULL;
char *service_config_json = NULL;
grpc_server_retry_throttle_data *retry_throttle_data = NULL;
@@ -394,10 +419,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
// taking a lock on chand->info_mu, because this function is the
// only thing that modifies its value, and it can only be invoked
// once at any given time.
- const bool lb_policy_type_changed =
+ lb_policy_name_changed =
chand->info_lb_policy_name == NULL ||
strcmp(chand->info_lb_policy_name, lb_policy_name) != 0;
- if (chand->lb_policy != NULL && !lb_policy_type_changed) {
+ if (chand->lb_policy != NULL && !lb_policy_name_changed) {
// Continue using the same LB policy. Update with new addresses.
grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args);
} else {
@@ -445,6 +470,13 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_channel_args_destroy(exec_ctx, chand->resolver_result);
chand->resolver_result = NULL;
}
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p: resolver result: lb_policy_name=\"%s\"%s, "
+ "service_config=\"%s\"",
+ chand, lb_policy_name, lb_policy_name_changed ? " (changed)" : "",
+ service_config_json);
+ }
// Now swap out fields in chand. Note that the new values may still
// be NULL if (e.g.) the resolver failed to return results or the
// results did not contain the necessary data.
@@ -479,6 +511,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
if (new_lb_policy != NULL || error != GRPC_ERROR_NONE ||
chand->resolver == NULL) {
if (chand->lb_policy != NULL) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand,
+ chand->lb_policy);
+ }
grpc_pollset_set_del_pollset_set(exec_ctx,
chand->lb_policy->interested_parties,
chand->interested_parties);
@@ -489,7 +525,13 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
// Now that we've swapped out the relevant fields of chand, check for
// error or shutdown.
if (error != GRPC_ERROR_NONE || chand->resolver == NULL) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: shutting down", chand);
+ }
if (chand->resolver != NULL) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: shutting down resolver", chand);
+ }
grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
@@ -510,6 +552,9 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_error *state_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
if (new_lb_policy != NULL) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand);
+ }
GRPC_ERROR_UNREF(state_error);
state = grpc_lb_policy_check_connectivity_locked(exec_ctx, new_lb_policy,
&state_error);
@@ -772,7 +817,9 @@ typedef struct client_channel_call_data {
gpr_atm subchannel_call_or_error;
gpr_arena *arena;
- bool pick_pending;
+ grpc_lb_policy *lb_policy; // Holds ref while LB pick is pending.
+ grpc_closure lb_pick_closure;
+
grpc_connected_subchannel *connected_subchannel;
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
grpc_polling_entity *pollent;
@@ -837,8 +884,15 @@ static void waiting_for_pick_batches_add_locked(
}
static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx,
- call_data *calld,
+ grpc_call_element *elem,
grpc_error *error) {
+ call_data *calld = elem->call_data;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: failing %" PRIdPTR " pending batches: %s",
+ elem->channel_data, calld, calld->waiting_for_pick_batches_count,
+ grpc_error_string(error));
+ }
for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, calld->waiting_for_pick_batches[i], GRPC_ERROR_REF(error));
@@ -848,14 +902,21 @@ static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx,
}
static void waiting_for_pick_batches_resume_locked(grpc_exec_ctx *exec_ctx,
- call_data *calld) {
+ grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
if (calld->waiting_for_pick_batches_count == 0) return;
call_or_error coe = get_call_or_error(calld);
if (coe.error != GRPC_ERROR_NONE) {
- waiting_for_pick_batches_fail_locked(exec_ctx, calld,
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem,
GRPC_ERROR_REF(coe.error));
return;
}
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR
+ " pending batches to subchannel_call=%p",
+ elem->channel_data, calld, calld->waiting_for_pick_batches_count,
+ coe.subchannel_call);
+ }
for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call,
calld->waiting_for_pick_batches[i]);
@@ -869,6 +930,10 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call",
+ chand, calld);
+ }
if (chand->retry_throttle_data != NULL) {
calld->retry_throttle_data =
grpc_server_retry_throttle_data_ref(chand->retry_throttle_data);
@@ -895,7 +960,9 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx,
}
static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx,
- call_data *calld, grpc_error *error) {
+ grpc_call_element *elem,
+ grpc_error *error) {
+ call_data *calld = elem->call_data;
grpc_subchannel_call *subchannel_call = NULL;
const grpc_connected_subchannel_call_args call_args = {
.pollent = calld->pollent,
@@ -906,13 +973,18 @@ static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx,
.context = calld->subchannel_call_context};
grpc_error *new_error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
+ elem->channel_data, calld, subchannel_call,
+ grpc_error_string(new_error));
+ }
GPR_ASSERT(set_call_or_error(
calld, (call_or_error){.subchannel_call = subchannel_call}));
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
- waiting_for_pick_batches_fail_locked(exec_ctx, calld, new_error);
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem, new_error);
} else {
- waiting_for_pick_batches_resume_locked(exec_ctx, calld);
+ waiting_for_pick_batches_resume_locked(exec_ctx, elem);
}
GRPC_ERROR_UNREF(error);
}
@@ -922,8 +994,6 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx,
grpc_error *error) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- GPR_ASSERT(calld->pick_pending);
- calld->pick_pending = false;
grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
chand->interested_parties);
call_or_error coe = get_call_or_error(calld);
@@ -935,8 +1005,13 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx,
"Call dropped by load balancing policy")
: GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Failed to create subchannel", &error, 1);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: failed to create subchannel: error=%s", chand,
+ calld, grpc_error_string(failure));
+ }
set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)});
- waiting_for_pick_batches_fail_locked(exec_ctx, calld, failure);
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem, failure);
} else if (coe.error != GRPC_ERROR_NONE) {
/* already cancelled before subchannel became ready */
grpc_error *child_errors[] = {error, coe.error};
@@ -950,10 +1025,15 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx,
grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_DEADLINE_EXCEEDED);
}
- waiting_for_pick_batches_fail_locked(exec_ctx, calld, cancellation_error);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: cancelled before subchannel became ready: %s",
+ chand, calld, grpc_error_string(cancellation_error));
+ }
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem, cancellation_error);
} else {
/* Create call on subchannel. */
- create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
+ create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
}
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
GRPC_ERROR_UNREF(error);
@@ -983,41 +1063,77 @@ typedef struct {
grpc_closure closure;
} pick_after_resolver_result_args;
-static void continue_picking_after_resolver_result_locked(
- grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx,
+ void *arg,
+ grpc_error *error) {
pick_after_resolver_result_args *args = arg;
if (args->cancelled) {
/* cancelled, do nothing */
- } else if (error != GRPC_ERROR_NONE) {
- subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error));
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "call cancelled before resolver result");
+ }
} else {
- if (pick_subchannel_locked(exec_ctx, args->elem)) {
- subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE);
+ channel_data *chand = args->elem->channel_data;
+ call_data *calld = args->elem->call_data;
+ if (error != GRPC_ERROR_NONE) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data",
+ chand, calld);
+ }
+ subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error));
+ } else {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick",
+ chand, calld);
+ }
+ if (pick_subchannel_locked(exec_ctx, args->elem)) {
+ subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE);
+ }
}
}
gpr_free(args);
}
-static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_error *error) {
+static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: deferring pick pending resolver result", chand,
+ calld);
+ }
+ pick_after_resolver_result_args *args =
+ (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args));
+ args->elem = elem;
+ GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked,
+ args, grpc_combiner_scheduler(chand->combiner));
+ grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
+ &args->closure, GRPC_ERROR_NONE);
+}
+
+static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_error *error) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- if (chand->lb_policy != NULL) {
- grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy,
- &calld->connected_subchannel,
- GRPC_ERROR_REF(error));
- }
// If we don't yet have a resolver result, then a closure for
- // continue_picking_after_resolver_result_locked() will have been added to
+ // pick_after_resolver_result_done_locked() will have been added to
// chand->waiting_for_resolver_result_closures, and it may not be invoked
// until after this call has been destroyed. We mark the operation as
- // cancelled, so that when continue_picking_after_resolver_result_locked()
+ // cancelled, so that when pick_after_resolver_result_done_locked()
// is called, it will be a no-op. We also immediately invoke
// subchannel_ready_locked() to propagate the error back to the caller.
for (grpc_closure *closure = chand->waiting_for_resolver_result_closures.head;
closure != NULL; closure = closure->next_data.next) {
pick_after_resolver_result_args *args = closure->cb_arg;
if (!args->cancelled && args->elem == elem) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: "
+ "cancelling pick waiting for resolver result",
+ chand, calld);
+ }
args->cancelled = true;
subchannel_ready_locked(exec_ctx, elem,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
@@ -1027,24 +1143,21 @@ static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
GRPC_ERROR_UNREF(error);
}
-// State for pick callback that holds a reference to the LB policy
-// from which the pick was requested.
-typedef struct {
- grpc_lb_policy *lb_policy;
- grpc_call_element *elem;
- grpc_closure closure;
-} pick_callback_args;
-
// Callback invoked by grpc_lb_policy_pick_locked() for async picks.
// Unrefs the LB policy after invoking subchannel_ready_locked().
static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- pick_callback_args *args = arg;
- GPR_ASSERT(args != NULL);
- GPR_ASSERT(args->lb_policy != NULL);
- subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error));
- GRPC_LB_POLICY_UNREF(exec_ctx, args->lb_policy, "pick_subchannel");
- gpr_free(args);
+ grpc_call_element *elem = arg;
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously",
+ chand, calld);
+ }
+ GPR_ASSERT(calld->lb_policy != NULL);
+ GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel");
+ calld->lb_policy = NULL;
+ subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
}
// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked().
@@ -1055,23 +1168,44 @@ static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx,
const grpc_lb_policy_pick_args *inputs) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- pick_callback_args *pick_args = gpr_zalloc(sizeof(*pick_args));
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p",
+ chand, calld, chand->lb_policy);
+ }
+ // Keep a ref to the LB policy in calld while the pick is pending.
GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel");
- pick_args->lb_policy = chand->lb_policy;
- pick_args->elem = elem;
- GRPC_CLOSURE_INIT(&pick_args->closure, pick_callback_done_locked, pick_args,
+ calld->lb_policy = chand->lb_policy;
+ GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem,
grpc_combiner_scheduler(chand->combiner));
const bool pick_done = grpc_lb_policy_pick_locked(
exec_ctx, chand->lb_policy, inputs, &calld->connected_subchannel,
- calld->subchannel_call_context, NULL, &pick_args->closure);
+ calld->subchannel_call_context, NULL, &calld->lb_pick_closure);
if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the LB policy. */
- GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "pick_subchannel");
- gpr_free(pick_args);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously",
+ chand, calld);
+ }
+ GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel");
+ calld->lb_policy = NULL;
}
return pick_done;
}
+static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_error *error) {
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ GPR_ASSERT(calld->lb_policy != NULL);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p",
+ chand, calld, calld->lb_policy);
+ }
+ grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy,
+ &calld->connected_subchannel, error);
+}
+
static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
GPR_TIMER_BEGIN("pick_subchannel", 0);
@@ -1107,20 +1241,9 @@ static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
pick_done = pick_callback_start_locked(exec_ctx, elem, &inputs);
} else if (chand->resolver != NULL) {
if (!chand->started_resolving) {
- chand->started_resolving = true;
- GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
- grpc_resolver_next_locked(exec_ctx, chand->resolver,
- &chand->resolver_result,
- &chand->on_resolver_result_changed);
+ start_resolving_locked(exec_ctx, chand);
}
- pick_after_resolver_result_args *args =
- (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args));
- args->elem = elem;
- GRPC_CLOSURE_INIT(&args->closure,
- continue_picking_after_resolver_result_locked, args,
- grpc_combiner_scheduler(chand->combiner));
- grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
- &args->closure, GRPC_ERROR_NONE);
+ pick_after_resolver_result_start_locked(exec_ctx, elem);
} else {
subchannel_ready_locked(
exec_ctx, elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
@@ -1133,63 +1256,77 @@ static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx,
void *arg,
grpc_error *error_ignored) {
GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0);
- grpc_transport_stream_op_batch *op = arg;
- grpc_call_element *elem = op->handler_private.extra_arg;
+ grpc_transport_stream_op_batch *batch = arg;
+ grpc_call_element *elem = batch->handler_private.extra_arg;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
/* need to recheck that another thread hasn't set the call */
call_or_error coe = get_call_or_error(calld);
if (coe.error != GRPC_ERROR_NONE) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s",
+ chand, calld, grpc_error_string(coe.error));
+ }
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op, GRPC_ERROR_REF(coe.error));
+ exec_ctx, batch, GRPC_ERROR_REF(coe.error));
goto done;
}
if (coe.subchannel_call != NULL) {
- grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: sending batch to subchannel_call=%p", chand,
+ calld, coe.subchannel_call);
+ }
+ grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch);
goto done;
}
// Add to waiting-for-pick list. If we succeed in getting a
// subchannel call below, we'll handle this batch (along with any
// other waiting batches) in waiting_for_pick_batches_resume_locked().
- waiting_for_pick_batches_add_locked(calld, op);
- /* if this is a cancellation, then we can raise our cancelled flag */
- if (op->cancel_stream) {
- grpc_error *error = op->payload->cancel_stream.cancel_error;
+ waiting_for_pick_batches_add_locked(calld, batch);
+ // If this is a cancellation, cancel the pending pick (if any) and
+ // fail any pending batches.
+ if (batch->cancel_stream) {
+ grpc_error *error = batch->payload->cancel_stream.cancel_error;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand,
+ calld, grpc_error_string(error));
+ }
/* Stash a copy of cancel_error in our call data, so that we can use
it for subsequent operations. This ensures that if the call is
- cancelled before any ops are passed down (e.g., if the deadline
+ cancelled before any batches are passed down (e.g., if the deadline
is in the past when the call starts), we can return the right
- error to the caller when the first op does get passed down. */
+ error to the caller when the first batch does get passed down. */
set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)});
- if (calld->pick_pending) {
- cancel_pick_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
+ if (calld->lb_policy != NULL) {
+ pick_callback_cancel_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
+ } else {
+ pick_after_resolver_result_cancel_locked(exec_ctx, elem,
+ GRPC_ERROR_REF(error));
}
- waiting_for_pick_batches_fail_locked(exec_ctx, calld,
- GRPC_ERROR_REF(error));
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
goto done;
}
/* if we don't have a subchannel, try to get one */
- if (!calld->pick_pending && calld->connected_subchannel == NULL &&
- op->send_initial_metadata) {
- calld->initial_metadata_payload = op->payload;
- calld->pick_pending = true;
+ if (batch->send_initial_metadata) {
+ GPR_ASSERT(calld->connected_subchannel == NULL);
+ calld->initial_metadata_payload = batch->payload;
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
/* If a subchannel is not available immediately, the polling entity from
call_data should be provided to channel_data's interested_parties, so
that IO of the lb_policy and resolver could be done under it. */
if (pick_subchannel_locked(exec_ctx, elem)) {
// Pick was returned synchronously.
- calld->pick_pending = false;
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
if (calld->connected_subchannel == NULL) {
grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Call dropped by load balancing policy");
set_call_or_error(calld,
(call_or_error){.error = GRPC_ERROR_REF(error)});
- waiting_for_pick_batches_fail_locked(exec_ctx, calld, error);
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem, error);
} else {
// Create subchannel call.
- create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_NONE);
+ create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_NONE);
}
} else {
grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
@@ -1232,47 +1369,59 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
If it has, we proceed on the fast path. */
static void cc_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
+ grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace) ||
+ GRPC_TRACER_ON(grpc_trace_channel)) {
+ grpc_call_log_op(GPR_INFO, elem, batch);
+ }
if (chand->deadline_checking_enabled) {
grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
- op);
+ batch);
}
// Intercept on_complete for recv_trailing_metadata so that we can
// check retry throttle status.
- if (op->recv_trailing_metadata) {
- GPR_ASSERT(op->on_complete != NULL);
- calld->original_on_complete = op->on_complete;
+ if (batch->recv_trailing_metadata) {
+ GPR_ASSERT(batch->on_complete != NULL);
+ calld->original_on_complete = batch->on_complete;
GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem,
grpc_schedule_on_exec_ctx);
- op->on_complete = &calld->on_complete;
+ batch->on_complete = &calld->on_complete;
}
/* try to (atomically) get the call */
call_or_error coe = get_call_or_error(calld);
GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
if (coe.error != GRPC_ERROR_NONE) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s",
+ chand, calld, grpc_error_string(coe.error));
+ }
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op, GRPC_ERROR_REF(coe.error));
- GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
- /* early out */
- return;
+ exec_ctx, batch, GRPC_ERROR_REF(coe.error));
+ goto done;
}
if (coe.subchannel_call != NULL) {
- grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op);
- GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
- /* early out */
- return;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: sending batch to subchannel_call=%p", chand,
+ calld, coe.subchannel_call);
+ }
+ grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch);
+ goto done;
}
/* we failed; lock and figure out what to do */
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering combiner", chand, calld);
+ }
GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch");
- op->handler_private.extra_arg = elem;
+ batch->handler_private.extra_arg = elem;
GRPC_CLOSURE_SCHED(
- exec_ctx, GRPC_CLOSURE_INIT(&op->handler_private.closure,
- start_transport_stream_op_batch_locked, op,
+ exec_ctx, GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+ start_transport_stream_op_batch_locked, batch,
grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
+done:
GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
}
@@ -1317,7 +1466,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, coe.subchannel_call,
"client_channel_destroy_call");
}
- GPR_ASSERT(!calld->pick_pending);
+ GPR_ASSERT(calld->lb_policy == NULL);
GPR_ASSERT(calld->waiting_for_pick_batches_count == 0);
if (calld->connected_subchannel != NULL) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
@@ -1366,11 +1515,7 @@ static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg,
} else {
chand->exit_idle_when_lb_policy_arrives = true;
if (!chand->started_resolving && chand->resolver != NULL) {
- GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
- chand->started_resolving = true;
- grpc_resolver_next_locked(exec_ctx, chand->resolver,
- &chand->resolver_result,
- &chand->on_resolver_result_changed);
+ start_resolving_locked(exec_ctx, chand);
}
}
GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect");
diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h
index 63f7c29940..c99f0092e9 100644
--- a/src/core/ext/filters/client_channel/client_channel.h
+++ b/src/core/ext/filters/client_channel/client_channel.h
@@ -23,6 +23,8 @@
#include "src/core/ext/filters/client_channel/resolver.h"
#include "src/core/lib/channel/channel_stack.h"
+extern grpc_tracer_flag grpc_client_channel_trace;
+
// Channel arg key for server URI string.
#define GRPC_ARG_SERVER_URI "grpc.server_uri"
diff --git a/src/core/ext/filters/client_channel/client_channel_plugin.c b/src/core/ext/filters/client_channel/client_channel_plugin.c
index 60e77d6268..6f133a648b 100644
--- a/src/core/ext/filters/client_channel/client_channel_plugin.c
+++ b/src/core/ext/filters/client_channel/client_channel_plugin.c
@@ -78,6 +78,7 @@ void grpc_client_channel_init(void) {
GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, append_filter,
(void *)&grpc_client_channel_filter);
grpc_http_connect_register_handshaker_factory();
+ grpc_register_tracer("client_channel", &grpc_client_channel_trace);
#ifndef NDEBUG
grpc_register_tracer("resolver_refcount", &grpc_trace_resolver_refcount);
#endif
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index 8e9d6b0f47..56d340b8c2 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -195,11 +195,28 @@ static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx,
static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx,
rr_subchannel_list *subchannel_list,
const char *reason) {
+ if (subchannel_list->shutting_down) {
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG, "Subchannel list %p already shutting down",
+ (void *)subchannel_list);
+ }
+ return;
+ };
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG, "Shutting down subchannel_list %p",
+ (void *)subchannel_list);
+ }
GPR_ASSERT(!subchannel_list->shutting_down);
subchannel_list->shutting_down = true;
for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
subchannel_data *sd = &subchannel_list->subchannels[i];
if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe.
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG,
+ "Unsubscribing from subchannel %p as part of shutting down "
+ "subchannel_list %p",
+ (void *)sd->subchannel, (void *)subchannel_list);
+ }
grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL,
NULL,
&sd->connectivity_changed_closure);
@@ -228,13 +245,14 @@ static size_t get_next_ready_subchannel_index_locked(
const size_t index = (i + p->last_ready_subchannel_index + 1) %
p->subchannel_list->num_subchannels;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG,
- "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: "
- "state=%d",
- (void *)p,
- (void *)p->subchannel_list->subchannels[index].subchannel,
- (void *)p->subchannel_list, (unsigned long)index,
- p->subchannel_list->subchannels[index].curr_connectivity_state);
+ gpr_log(
+ GPR_DEBUG,
+ "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: "
+ "state=%s",
+ (void *)p, (void *)p->subchannel_list->subchannels[index].subchannel,
+ (void *)p->subchannel_list, (unsigned long)index,
+ grpc_connectivity_state_name(
+ p->subchannel_list->subchannels[index].curr_connectivity_state));
}
if (p->subchannel_list->subchannels[index].curr_connectivity_state ==
GRPC_CHANNEL_READY) {
@@ -511,16 +529,27 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
subchannel_data *sd = arg;
round_robin_lb_policy *p = sd->subchannel_list->policy;
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(
+ GPR_DEBUG,
+ "[RR %p] connectivity changed for subchannel %p, subchannel_list %p: "
+ "prev_state=%s new_state=%s p->shutdown=%d "
+ "sd->subchannel_list->shutting_down=%d error=%s",
+ (void *)p, (void *)sd->subchannel, (void *)sd->subchannel_list,
+ grpc_connectivity_state_name(sd->prev_connectivity_state),
+ grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe),
+ p->shutdown, sd->subchannel_list->shutting_down,
+ grpc_error_string(error));
+ }
// If the policy is shutting down, unref and return.
if (p->shutdown) {
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "pol_shutdown");
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown");
return;
}
- if (sd->subchannel_list->shutting_down) {
+ if (sd->subchannel_list->shutting_down && error == GRPC_ERROR_CANCELLED) {
// the subchannel list associated with sd has been discarded. This callback
// corresponds to the unsubscription.
- GPR_ASSERT(error == GRPC_ERROR_CANCELLED);
rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sl_shutdown");
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown");
return;
@@ -536,13 +565,6 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner.
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG,
- "[RR %p] connectivity changed for subchannel %p: "
- "prev_state=%d new_state=%d",
- (void *)p, (void *)sd->subchannel, sd->prev_connectivity_state,
- sd->curr_connectivity_state);
- }
// Update state counters and determine new overall state.
update_state_counters_locked(sd);
sd->prev_connectivity_state = sd->curr_connectivity_state;
diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c
index af3391a731..5ea75f0554 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c
@@ -132,7 +132,7 @@ static void dns_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
static void dns_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- dns_resolver *r = arg;
+ dns_resolver *r = (dns_resolver *)arg;
r->have_retry_timer = false;
if (error == GRPC_ERROR_NONE) {
@@ -146,7 +146,7 @@ static void dns_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- dns_resolver *r = arg;
+ dns_resolver *r = (dns_resolver *)arg;
grpc_channel_args *result = NULL;
GPR_ASSERT(r->resolving);
r->resolving = false;
@@ -241,7 +241,7 @@ static grpc_resolver *dns_create(grpc_exec_ctx *exec_ctx,
char *path = args->uri->path;
if (path[0] == '/') ++path;
// Create resolver.
- dns_resolver *r = gpr_zalloc(sizeof(dns_resolver));
+ dns_resolver *r = (dns_resolver *)gpr_zalloc(sizeof(dns_resolver));
grpc_resolver_init(&r->base, &dns_resolver_vtable, args->combiner);
r->name_to_resolve = gpr_strdup(path);
r->default_port = gpr_strdup(default_port);
diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
index 7b4fe38272..7ceb8f40a1 100644
--- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c
@@ -177,7 +177,8 @@ static grpc_resolver *sockaddr_create(grpc_exec_ctx *exec_ctx,
return NULL;
}
/* Instantiate resolver. */
- sockaddr_resolver *r = gpr_zalloc(sizeof(sockaddr_resolver));
+ sockaddr_resolver *r =
+ (sockaddr_resolver *)gpr_zalloc(sizeof(sockaddr_resolver));
r->addresses = addresses;
r->channel_args = grpc_channel_args_copy(args->args);
grpc_resolver_init(&r->base, &sockaddr_resolver_vtable, args->combiner);
diff --git a/src/core/ext/filters/deadline/deadline_filter.c b/src/core/ext/filters/deadline/deadline_filter.c
index ced025e2e2..6789903c95 100644
--- a/src/core/ext/filters/deadline/deadline_filter.c
+++ b/src/core/ext/filters/deadline/deadline_filter.c
@@ -37,8 +37,8 @@
// Timer callback.
static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
- grpc_call_element* elem = arg;
- grpc_deadline_state* deadline_state = elem->call_data;
+ grpc_call_element* elem = (grpc_call_element*)arg;
+ grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
if (error != GRPC_ERROR_CANCELLED) {
grpc_call_element_signal_error(
exec_ctx, elem,
@@ -57,7 +57,7 @@ static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) == 0) {
return;
}
- grpc_deadline_state* deadline_state = elem->call_data;
+ grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
grpc_deadline_timer_state cur_state;
grpc_closure* closure = NULL;
retry:
@@ -112,7 +112,7 @@ static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
// Callback run when the call is complete.
static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
- grpc_deadline_state* deadline_state = arg;
+ grpc_deadline_state* deadline_state = (grpc_deadline_state*)arg;
cancel_timer_if_needed(exec_ctx, deadline_state);
// Invoke the next callback.
GRPC_CLOSURE_RUN(exec_ctx, deadline_state->next_on_complete,
@@ -145,7 +145,7 @@ static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_call_stack* call_stack,
gpr_timespec deadline) {
- grpc_deadline_state* deadline_state = elem->call_data;
+ grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
deadline_state->call_stack = call_stack;
// Deadline will always be infinite on servers, so the timer will only be
// set on clients with a finite deadline.
@@ -169,13 +169,13 @@ void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem) {
- grpc_deadline_state* deadline_state = elem->call_data;
+ grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
cancel_timer_if_needed(exec_ctx, deadline_state);
}
void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
gpr_timespec new_deadline) {
- grpc_deadline_state* deadline_state = elem->call_data;
+ grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
cancel_timer_if_needed(exec_ctx, deadline_state);
start_timer_if_needed(exec_ctx, elem, new_deadline);
}
@@ -183,7 +183,7 @@ void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
void grpc_deadline_state_client_start_transport_stream_op_batch(
grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_transport_stream_op_batch* op) {
- grpc_deadline_state* deadline_state = elem->call_data;
+ grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
if (op->cancel_stream) {
cancel_timer_if_needed(exec_ctx, deadline_state);
} else {
@@ -256,8 +256,8 @@ static void client_start_transport_stream_op_batch(
// Callback for receiving initial metadata on the server.
static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
- grpc_call_element* elem = arg;
- server_call_data* calld = elem->call_data;
+ grpc_call_element* elem = (grpc_call_element*)arg;
+ server_call_data* calld = (server_call_data*)elem->call_data;
// Get deadline from metadata and start the timer if needed.
start_timer_if_needed(exec_ctx, elem, calld->recv_initial_metadata->deadline);
// Invoke the next callback.
@@ -269,7 +269,7 @@ static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg,
static void server_start_transport_stream_op_batch(
grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_transport_stream_op_batch* op) {
- server_call_data* calld = elem->call_data;
+ server_call_data* calld = (server_call_data*)elem->call_data;
if (op->cancel_stream) {
cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state);
} else {
@@ -341,8 +341,8 @@ static bool maybe_add_deadline_filter(grpc_exec_ctx* exec_ctx,
void* arg) {
return grpc_deadline_checking_enabled(
grpc_channel_stack_builder_get_channel_arguments(builder))
- ? grpc_channel_stack_builder_prepend_filter(builder, arg, NULL,
- NULL)
+ ? grpc_channel_stack_builder_prepend_filter(
+ builder, (const grpc_channel_filter*)arg, NULL, NULL)
: true;
}
diff --git a/src/core/ext/filters/max_age/max_age_filter.c b/src/core/ext/filters/max_age/max_age_filter.c
index 35304f8150..7d748b9c32 100644
--- a/src/core/ext/filters/max_age/max_age_filter.c
+++ b/src/core/ext/filters/max_age/max_age_filter.c
@@ -108,7 +108,7 @@ static void decrease_call_count(grpc_exec_ctx* exec_ctx, channel_data* chand) {
static void start_max_idle_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
- channel_data* chand = arg;
+ channel_data* chand = (channel_data*)arg;
/* Decrease call_count. If there are no active calls at this time,
max_idle_timer will start here. If the number of active calls is not 0,
max_idle_timer will start after all the active calls end. */
@@ -119,7 +119,7 @@ static void start_max_idle_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
static void start_max_age_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
- channel_data* chand = arg;
+ channel_data* chand = (channel_data*)arg;
gpr_mu_lock(&chand->max_age_timer_mu);
chand->max_age_timer_pending = true;
GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer");
@@ -140,7 +140,7 @@ static void start_max_age_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
static void start_max_age_grace_timer_after_goaway_op(grpc_exec_ctx* exec_ctx,
void* arg,
grpc_error* error) {
- channel_data* chand = arg;
+ channel_data* chand = (channel_data*)arg;
gpr_mu_lock(&chand->max_age_timer_mu);
chand->max_age_grace_timer_pending = true;
GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_grace_timer");
@@ -156,7 +156,7 @@ static void start_max_age_grace_timer_after_goaway_op(grpc_exec_ctx* exec_ctx,
static void close_max_idle_channel(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
- channel_data* chand = arg;
+ channel_data* chand = (channel_data*)arg;
if (error == GRPC_ERROR_NONE) {
/* Prevent the max idle timer from being set again */
gpr_atm_no_barrier_fetch_add(&chand->call_count, 1);
@@ -176,7 +176,7 @@ static void close_max_idle_channel(grpc_exec_ctx* exec_ctx, void* arg,
static void close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
- channel_data* chand = arg;
+ channel_data* chand = (channel_data*)arg;
gpr_mu_lock(&chand->max_age_timer_mu);
chand->max_age_timer_pending = false;
gpr_mu_unlock(&chand->max_age_timer_mu);
@@ -200,7 +200,7 @@ static void close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg,
static void force_close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
- channel_data* chand = arg;
+ channel_data* chand = (channel_data*)arg;
gpr_mu_lock(&chand->max_age_timer_mu);
chand->max_age_grace_timer_pending = false;
gpr_mu_unlock(&chand->max_age_timer_mu);
@@ -220,7 +220,7 @@ static void force_close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg,
static void channel_connectivity_changed(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
- channel_data* chand = arg;
+ channel_data* chand = (channel_data*)arg;
if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
grpc_transport_op* op = grpc_make_transport_op(NULL);
op->on_connectivity_state_change = &chand->channel_connectivity_changed,
@@ -264,7 +264,7 @@ static int add_random_max_connection_age_jitter(int value) {
static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
const grpc_call_element_args* args) {
- channel_data* chand = elem->channel_data;
+ channel_data* chand = (channel_data*)elem->channel_data;
increase_call_count(exec_ctx, chand);
return GRPC_ERROR_NONE;
}
@@ -281,7 +281,7 @@ static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_channel_element* elem,
grpc_channel_element_args* args) {
- channel_data* chand = elem->channel_data;
+ channel_data* chand = (channel_data*)elem->channel_data;
gpr_mu_init(&chand->max_age_timer_mu);
chand->max_age_timer_pending = false;
chand->max_age_grace_timer_pending = false;
diff --git a/src/core/ext/filters/message_size/message_size_filter.c b/src/core/ext/filters/message_size/message_size_filter.c
index 9bb565ed6d..846c7df69a 100644
--- a/src/core/ext/filters/message_size/message_size_filter.c
+++ b/src/core/ext/filters/message_size/message_size_filter.c
@@ -60,7 +60,8 @@ static void* message_size_limits_create_from_json(const grpc_json* json) {
if (max_response_message_bytes == -1) return NULL;
}
}
- message_size_limits* value = gpr_malloc(sizeof(message_size_limits));
+ message_size_limits* value =
+ (message_size_limits*)gpr_malloc(sizeof(message_size_limits));
value->max_send_size = max_request_message_bytes;
value->max_recv_size = max_response_message_bytes;
return value;
@@ -88,8 +89,8 @@ typedef struct channel_data {
// receive message size.
static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
grpc_error* error) {
- grpc_call_element* elem = user_data;
- call_data* calld = elem->call_data;
+ grpc_call_element* elem = (grpc_call_element*)user_data;
+ call_data* calld = (call_data*)elem->call_data;
if (*calld->recv_message != NULL && calld->limits.max_recv_size >= 0 &&
(*calld->recv_message)->length > (size_t)calld->limits.max_recv_size) {
char* message_string;
@@ -117,7 +118,7 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
static void start_transport_stream_op_batch(
grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_transport_stream_op_batch* op) {
- call_data* calld = elem->call_data;
+ call_data* calld = (call_data*)elem->call_data;
// Check max send message size.
if (op->send_message && calld->limits.max_send_size >= 0 &&
op->payload->send_message.send_message->length >
@@ -149,8 +150,8 @@ static void start_transport_stream_op_batch(
static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
const grpc_call_element_args* args) {
- channel_data* chand = elem->channel_data;
- call_data* calld = elem->call_data;
+ channel_data* chand = (channel_data*)elem->channel_data;
+ call_data* calld = (call_data*)elem->call_data;
calld->next_recv_message_ready = NULL;
GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem,
grpc_schedule_on_exec_ctx);
@@ -160,8 +161,9 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
// size to the receive limit.
calld->limits = chand->limits;
if (chand->method_limit_table != NULL) {
- message_size_limits* limits = grpc_method_config_table_get(
- exec_ctx, chand->method_limit_table, args->path);
+ message_size_limits* limits =
+ (message_size_limits*)grpc_method_config_table_get(
+ exec_ctx, chand->method_limit_table, args->path);
if (limits != NULL) {
if (limits->max_send_size >= 0 &&
(limits->max_send_size < calld->limits.max_send_size ||
@@ -220,7 +222,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_channel_element* elem,
grpc_channel_element_args* args) {
GPR_ASSERT(!args->is_last);
- channel_data* chand = elem->channel_data;
+ channel_data* chand = (channel_data*)elem->channel_data;
chand->limits = get_message_size_limits(args->channel_args);
// Get method config table from channel args.
const grpc_arg* channel_arg =
@@ -243,7 +245,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
// Destructor for channel_data.
static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_channel_element* elem) {
- channel_data* chand = elem->channel_data;
+ channel_data* chand = (channel_data*)elem->channel_data;
grpc_slice_hash_table_unref(exec_ctx, chand->method_limit_table);
}
diff --git a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c
index 8b3fff5fa3..b4d2cb4b8c 100644
--- a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c
+++ b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c
@@ -52,8 +52,8 @@ static bool get_user_agent_mdelem(const grpc_metadata_batch* batch,
// Callback invoked when we receive an initial metadata.
static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx,
void* user_data, grpc_error* error) {
- grpc_call_element* elem = user_data;
- call_data* calld = elem->call_data;
+ grpc_call_element* elem = (grpc_call_element*)user_data;
+ call_data* calld = (call_data*)elem->call_data;
if (GRPC_ERROR_NONE == error) {
grpc_mdelem md;
@@ -75,7 +75,7 @@ static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx,
static void start_transport_stream_op_batch(
grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_transport_stream_op_batch* op) {
- call_data* calld = elem->call_data;
+ call_data* calld = (call_data*)elem->call_data;
// Inject callback for receiving initial metadata
if (op->recv_initial_metadata) {
@@ -103,7 +103,7 @@ static void start_transport_stream_op_batch(
static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
const grpc_call_element_args* args) {
- call_data* calld = elem->call_data;
+ call_data* calld = (call_data*)elem->call_data;
calld->next_recv_initial_metadata_ready = NULL;
calld->workaround_active = false;
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
diff --git a/src/core/ext/filters/workarounds/workaround_utils.c b/src/core/ext/filters/workarounds/workaround_utils.c
index bc76753a8a..e600fbee67 100644
--- a/src/core/ext/filters/workarounds/workaround_utils.c
+++ b/src/core/ext/filters/workarounds/workaround_utils.c
@@ -33,7 +33,8 @@ grpc_workaround_user_agent_md *grpc_parse_user_agent(grpc_mdelem md) {
if (NULL != user_agent_md) {
return user_agent_md;
}
- user_agent_md = gpr_malloc(sizeof(grpc_workaround_user_agent_md));
+ user_agent_md = (grpc_workaround_user_agent_md *)gpr_malloc(
+ sizeof(grpc_workaround_user_agent_md));
for (int i = 0; i < GRPC_MAX_WORKAROUND_ID; i++) {
if (ua_parser[i]) {
user_agent_md->workaround_active[i] = ua_parser[i](md);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 0bdc0603a0..f790267944 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -34,6 +34,7 @@
#include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/http/parser.h"
+#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
@@ -280,8 +281,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_slice_buffer_init(&t->outbuf);
grpc_chttp2_hpack_compressor_init(&t->hpack_compressor);
- GRPC_CLOSURE_INIT(&t->write_action, write_action, t,
- grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t,
grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t,
@@ -399,6 +398,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
t->keepalive_permit_without_calls = g_default_keepalive_permit_without_calls;
+ t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
+
if (channel_args) {
for (i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key,
@@ -487,6 +488,23 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->keepalive_permit_without_calls =
(uint32_t)grpc_channel_arg_get_integer(
&channel_args->args[i], (grpc_integer_options){0, 0, 1});
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_OPTIMIZATION_TARGET)) {
+ if (channel_args->args[i].type != GRPC_ARG_STRING) {
+ gpr_log(GPR_ERROR, "%s should be a string",
+ GRPC_ARG_OPTIMIZATION_TARGET);
+ } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) {
+ t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
+ } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) {
+ t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
+ } else if (0 ==
+ strcmp(channel_args->args[i].value.string, "throughput")) {
+ t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT;
+ } else {
+ gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'",
+ GRPC_ARG_OPTIMIZATION_TARGET,
+ channel_args->args[i].value.string);
+ }
} else {
static const struct {
const char *channel_arg_name;
@@ -540,6 +558,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
+ GRPC_CLOSURE_INIT(&t->write_action, write_action, t,
+ t->opt_target == GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT
+ ? grpc_executor_scheduler
+ : grpc_schedule_on_exec_ctx);
+
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
t->ping_state.is_delayed_ping_timer_set = false;
@@ -2144,8 +2167,8 @@ static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void update_frame(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
double bw_dbl, double bdp_dbl) {
- int32_t bdp = GPR_CLAMP((int32_t)bdp_dbl, 128, INT32_MAX);
- int32_t target = GPR_MAX((int32_t)bw_dbl / 1000, bdp);
+ int32_t bdp = (int32_t)GPR_CLAMP(bdp_dbl, 128.0, INT32_MAX);
+ int32_t target = (int32_t)GPR_MAX(bw_dbl / 1000, bdp);
// frame size is bounded [2^14,2^24-1]
int32_t frame_size = GPR_CLAMP(target, 16384, 16777215);
int64_t delta = (int64_t)frame_size -
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 9fa72ddbdf..a5b67e5aba 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -67,6 +67,11 @@ typedef enum {
} grpc_chttp2_ping_type;
typedef enum {
+ GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY,
+ GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT,
+} grpc_chttp2_optimization_target;
+
+typedef enum {
GRPC_CHTTP2_PCL_INITIATE = 0,
GRPC_CHTTP2_PCL_NEXT,
GRPC_CHTTP2_PCL_INFLIGHT,
@@ -229,6 +234,8 @@ struct grpc_chttp2_transport {
/** should we probe bdp? */
bool enable_bdp_probe;
+ grpc_chttp2_optimization_target opt_target;
+
/** various lists of streams */
grpc_chttp2_stream_list lists[STREAM_LIST_COUNT];
diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
index 2c91ad357c..9f82c480bc 100644
--- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
@@ -57,9 +57,6 @@
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
-/* Uncomment the following to enable extra checks on poll_object operations */
-/* #define PO_DEBUG */
-
/* The maximum number of polling threads per polling island. By default no
limit */
static int g_max_pollers_per_pi = INT_MAX;
@@ -92,7 +89,7 @@ typedef enum {
} poll_obj_type;
typedef struct poll_obj {
-#ifdef PO_DEBUG
+#ifndef NDEBUG
poll_obj_type obj_type;
#endif
gpr_mu mu;
@@ -893,7 +890,7 @@ static grpc_fd *fd_create(int fd, const char *name) {
* would be holding a lock to it anyway. */
gpr_mu_lock(&new_fd->po.mu);
new_fd->po.pi = NULL;
-#ifdef PO_DEBUG
+#ifndef NDEBUG
new_fd->po.obj_type = POLL_OBJ_FD;
#endif
@@ -1171,7 +1168,7 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
gpr_mu_init(&pollset->po.mu);
*mu = &pollset->po.mu;
pollset->po.pi = NULL;
-#ifdef PO_DEBUG
+#ifndef NDEBUG
pollset->po.obj_type = POLL_OBJ_POLLSET;
#endif
@@ -1625,7 +1622,7 @@ static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag,
poll_obj_type item_type) {
GPR_TIMER_BEGIN("add_poll_object", 0);
-#ifdef PO_DEBUG
+#ifndef NDEBUG
GPR_ASSERT(item->obj_type == item_type);
GPR_ASSERT(bag->obj_type == bag_type);
#endif
@@ -1784,7 +1781,7 @@ static grpc_pollset_set *pollset_set_create(void) {
grpc_pollset_set *pss = gpr_malloc(sizeof(*pss));
gpr_mu_init(&pss->po.mu);
pss->po.pi = NULL;
-#ifdef PO_DEBUG
+#ifndef NDEBUG
pss->po.obj_type = POLL_OBJ_POLLSET_SET;
#endif
return pss;
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c
index 255e07010b..3c4ca9c7c5 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.c
+++ b/src/core/lib/iomgr/ev_epollsig_linux.c
@@ -54,9 +54,6 @@
gpr_log(GPR_INFO, __VA_ARGS__); \
}
-/* Uncomment the following to enable extra checks on poll_object operations */
-/* #define PO_DEBUG */
-
static int grpc_wakeup_signal = -1;
static bool is_grpc_wakeup_signal_initialized = false;
@@ -85,7 +82,7 @@ typedef enum {
} poll_obj_type;
typedef struct poll_obj {
-#ifdef PO_DEBUG
+#ifndef NDEBUG
poll_obj_type obj_type;
#endif
gpr_mu mu;
@@ -821,7 +818,7 @@ static grpc_fd *fd_create(int fd, const char *name) {
* would be holding a lock to it anyway. */
gpr_mu_lock(&new_fd->po.mu);
new_fd->po.pi = NULL;
-#ifdef PO_DEBUG
+#ifndef NDEBUG
new_fd->po.obj_type = POLL_OBJ_FD;
#endif
@@ -1079,7 +1076,7 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
gpr_mu_init(&pollset->po.mu);
*mu = &pollset->po.mu;
pollset->po.pi = NULL;
-#ifdef PO_DEBUG
+#ifndef NDEBUG
pollset->po.obj_type = POLL_OBJ_POLLSET;
#endif
@@ -1416,7 +1413,7 @@ static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag,
poll_obj_type item_type) {
GPR_TIMER_BEGIN("add_poll_object", 0);
-#ifdef PO_DEBUG
+#ifndef NDEBUG
GPR_ASSERT(item->obj_type == item_type);
GPR_ASSERT(bag->obj_type == bag_type);
#endif
@@ -1575,7 +1572,7 @@ static grpc_pollset_set *pollset_set_create(void) {
grpc_pollset_set *pss = gpr_malloc(sizeof(*pss));
gpr_mu_init(&pss->po.mu);
pss->po.pi = NULL;
-#ifdef PO_DEBUG
+#ifndef NDEBUG
pss->po.obj_type = POLL_OBJ_POLLSET_SET;
#endif
return pss;
diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c
index 45de289e45..a98b8e62db 100644
--- a/src/core/lib/iomgr/resolve_address_uv.c
+++ b/src/core/lib/iomgr/resolve_address_uv.c
@@ -54,7 +54,7 @@ static int retry_named_port_failure(int status, request *r,
int retry_status;
uv_getaddrinfo_t *req = gpr_malloc(sizeof(uv_getaddrinfo_t));
req->data = r;
- r->port = svc[i][1];
+ r->port = gpr_strdup(svc[i][1]);
retry_status = uv_getaddrinfo(uv_default_loop(), req, getaddrinfo_cb,
r->host, r->port, r->hints);
if (retry_status < 0 || getaddrinfo_cb == NULL) {
@@ -127,6 +127,8 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status,
GRPC_CLOSURE_SCHED(&exec_ctx, r->on_done, error);
grpc_exec_ctx_finish(&exec_ctx);
gpr_free(r->hints);
+ gpr_free(r->host);
+ gpr_free(r->port);
gpr_free(r);
uv_freeaddrinfo(res);
}
diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c
index ab6832932f..2f1d237c07 100644
--- a/src/core/lib/iomgr/tcp_client_uv.c
+++ b/src/core/lib/iomgr/tcp_client_uv.c
@@ -48,6 +48,7 @@ typedef struct grpc_uv_tcp_connect {
static void uv_tcp_connect_cleanup(grpc_exec_ctx *exec_ctx,
grpc_uv_tcp_connect *connect) {
grpc_resource_quota_unref_internal(exec_ctx, connect->resource_quota);
+ gpr_free(connect->addr_name);
gpr_free(connect);
}
@@ -105,6 +106,7 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) {
}
done = (--connect->refs == 0);
if (done) {
+ grpc_exec_ctx_flush(&exec_ctx);
uv_tcp_connect_cleanup(&exec_ctx, connect);
}
GRPC_CLOSURE_SCHED(&exec_ctx, closure, error);
@@ -140,6 +142,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
connect->resource_quota = resource_quota;
uv_tcp_init(uv_default_loop(), connect->tcp_handle);
connect->connect_req.data = connect;
+ connect->refs = 1;
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting",
diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c
index 2de0ea90e7..2ab836cc34 100644
--- a/src/core/lib/iomgr/tcp_server_uv.c
+++ b/src/core/lib/iomgr/tcp_server_uv.c
@@ -234,6 +234,7 @@ static void on_connect(uv_stream_t *server, int status) {
sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
acceptor);
grpc_exec_ctx_finish(&exec_ctx);
+ gpr_free(peer_name_string);
}
}
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index 7c21b44e76..ff5fd3edc8 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -65,7 +65,10 @@ typedef struct {
} grpc_tcp;
static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
+ grpc_slice_unref_internal(exec_ctx, tcp->read_slice);
grpc_resource_user_unref(exec_ctx, tcp->resource_user);
+ gpr_free(tcp->handle);
+ gpr_free(tcp->peer_string);
gpr_free(tcp);
}
@@ -115,13 +118,17 @@ static void uv_close_callback(uv_handle_t *handle) {
grpc_exec_ctx_finish(&exec_ctx);
}
+static grpc_slice alloc_read_slice(grpc_exec_ctx *exec_ctx,
+ grpc_resource_user *resource_user) {
+ return grpc_resource_user_slice_malloc(exec_ctx, resource_user,
+ GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
+}
+
static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size,
uv_buf_t *buf) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp *tcp = handle->data;
(void)suggested_size;
- tcp->read_slice = grpc_resource_user_slice_malloc(
- &exec_ctx, tcp->resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
buf->base = (char *)GRPC_SLICE_START_PTR(tcp->read_slice);
buf->len = GRPC_SLICE_LENGTH(tcp->read_slice);
grpc_exec_ctx_finish(&exec_ctx);
@@ -148,6 +155,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread,
// Successful read
sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, (size_t)nread);
grpc_slice_buffer_add(tcp->read_slices, sub);
+ tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user);
error = GRPC_ERROR_NONE;
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
size_t i;
@@ -334,6 +342,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
grpc_resource_quota *resource_quota,
char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
gpr_log(GPR_DEBUG, "Creating TCP endpoint %p", tcp);
@@ -350,6 +359,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
tcp->peer_string = gpr_strdup(peer_string);
tcp->shutting_down = false;
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
+ tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user);
/* Tell network status tracking code about the new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
@@ -357,6 +367,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
uv_unref((uv_handle_t *)handle);
#endif
+ grpc_exec_ctx_finish(&exec_ctx);
return &tcp->base;
}
diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c
index 520d4a3252..cb7998db97 100644
--- a/src/core/lib/iomgr/timer_manager.c
+++ b/src/core/lib/iomgr/timer_manager.c
@@ -50,6 +50,9 @@ static completed_thread *g_completed_threads;
static bool g_kicked;
// is there a thread waiting until the next timer should fire?
static bool g_has_timed_waiter;
+// the deadline of the current timed waiter thread (only relevant if
+// g_has_timed_waiter is true)
+static gpr_timespec g_timed_waiter_deadline;
// generation counter to track which thread is waiting for the next timer
static uint64_t g_timed_waiter_generation;
@@ -101,8 +104,7 @@ static void run_some_timers(grpc_exec_ctx *exec_ctx) {
start_timer_thread_and_unlock();
} else {
// if there's no thread waiting with a timeout, kick an existing
- // waiter
- // so that the next deadline is not missed
+ // waiter so that the next deadline is not missed
if (!g_has_timed_waiter) {
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "kick untimed waiter");
@@ -132,44 +134,79 @@ static bool wait_until(gpr_timespec next) {
gpr_mu_unlock(&g_mu);
return false;
}
- // if there's no timed waiter, we should become one: that waiter waits
- // only until the next timer should expire
- // all other timers wait forever
- uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
- if (!g_has_timed_waiter && gpr_time_cmp(next, inf_future) != 0) {
- g_has_timed_waiter = true;
- // we use a generation counter to track the timed waiter so we can
- // cancel an existing one quickly (and when it actually times out it'll
- // figure stuff out instead of incurring a wakeup)
- my_timed_waiter_generation = ++g_timed_waiter_generation;
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
- gpr_timespec wait_time = gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC));
- gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds",
- wait_time.tv_sec, wait_time.tv_nsec);
+
+ // If g_kicked is true at this point, it means there was a kick from the timer
+ // system that the timer-manager threads here missed. We cannot trust 'next'
+ // here any longer (since there might be an earlier deadline). So if g_kicked
+ // is true at this point, we should quickly exit this and get the next
+ // deadline from the timer system
+
+ if (!g_kicked) {
+ // if there's no timed waiter, we should become one: that waiter waits
+ // only until the next timer should expire. All other timers wait forever
+ //
+ // 'g_timed_waiter_generation' is a global generation counter. The idea here
+ // is that the thread becoming a timed-waiter increments and stores this
+ // global counter locally in 'my_timed_waiter_generation' before going to
+ // sleep. After waking up, if my_timed_waiter_generation ==
+ // g_timed_waiter_generation, it can be sure that it was the timed_waiter
+ // thread (and that no other thread took over while this was asleep)
+ //
+ // Initialize my_timed_waiter_generation to some value that is NOT equal to
+ // g_timed_waiter_generation
+ uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
+
+ /* If there's no timed waiter, we should become one: that waiter waits only
+ until the next timer should expire. All other timer threads wait forever
+ unless their 'next' is earlier than the current timed-waiter's deadline
+ (in which case the thread with earlier 'next' takes over as the new timed
+ waiter) */
+ if (gpr_time_cmp(next, inf_future) != 0) {
+ if (!g_has_timed_waiter ||
+ (gpr_time_cmp(next, g_timed_waiter_deadline) < 0)) {
+ my_timed_waiter_generation = ++g_timed_waiter_generation;
+ g_has_timed_waiter = true;
+ g_timed_waiter_deadline = next;
+
+ if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ gpr_timespec wait_time =
+ gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC));
+ gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds",
+ wait_time.tv_sec, wait_time.tv_nsec);
+ }
+ } else { // g_timed_waiter == true && next >= g_timed_waiter_deadline
+ next = inf_future;
+ }
}
- } else {
- next = inf_future;
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+
+ if (GRPC_TRACER_ON(grpc_timer_check_trace) &&
+ gpr_time_cmp(next, inf_future) == 0) {
gpr_log(GPR_DEBUG, "sleep until kicked");
}
+
+ gpr_cv_wait(&g_cv_wait, &g_mu, next);
+
+ if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
+ gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d",
+ my_timed_waiter_generation == g_timed_waiter_generation,
+ g_kicked);
+ }
+ // if this was the timed waiter, then we need to check timers, and flag
+ // that there's now no timed waiter... we'll look for a replacement if
+ // there's work to do after checking timers (code above)
+ if (my_timed_waiter_generation == g_timed_waiter_generation) {
+ g_has_timed_waiter = false;
+ g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ }
}
- gpr_cv_wait(&g_cv_wait, &g_mu, next);
- if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
- gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d",
- my_timed_waiter_generation == g_timed_waiter_generation, g_kicked);
- }
- // if this was the timed waiter, then we need to check timers, and flag
- // that there's now no timed waiter... we'll look for a replacement if
- // there's work to do after checking timers (code above)
- if (my_timed_waiter_generation == g_timed_waiter_generation) {
- g_has_timed_waiter = false;
- }
+
// if this was a kick from the timer system, consume it (and don't stop
// this thread yet)
if (g_kicked) {
grpc_timer_consume_kick();
g_kicked = false;
}
+
gpr_mu_unlock(&g_mu);
return true;
}
@@ -257,6 +294,9 @@ void grpc_timer_manager_init(void) {
g_waiter_count = 0;
g_completed_threads = NULL;
+ g_has_timed_waiter = false;
+ g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+
start_threads();
}
@@ -302,6 +342,7 @@ void grpc_kick_poller(void) {
gpr_mu_lock(&g_mu);
g_kicked = true;
g_has_timed_waiter = false;
+ g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
++g_timed_waiter_generation;
gpr_cv_signal(&g_cv_wait);
gpr_mu_unlock(&g_mu);
diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c
index 8c747085bb..6cd558d123 100644
--- a/src/core/lib/security/credentials/jwt/jwt_verifier.c
+++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c
@@ -462,6 +462,35 @@ static BIGNUM *bignum_from_base64(grpc_exec_ctx *exec_ctx, const char *b64) {
return result;
}
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+
+// Provide compatibility across OpenSSL 1.02 and 1.1.
+static int RSA_set0_key(RSA *r, BIGNUM *n, BIGNUM *e, BIGNUM *d) {
+ /* If the fields n and e in r are NULL, the corresponding input
+ * parameters MUST be non-NULL for n and e. d may be
+ * left NULL (in case only the public key is used).
+ */
+ if ((r->n == NULL && n == NULL) || (r->e == NULL && e == NULL)) {
+ return 0;
+ }
+
+ if (n != NULL) {
+ BN_free(r->n);
+ r->n = n;
+ }
+ if (e != NULL) {
+ BN_free(r->e);
+ r->e = e;
+ }
+ if (d != NULL) {
+ BN_free(r->d);
+ r->d = d;
+ }
+
+ return 1;
+}
+#endif // OPENSSL_VERSION_NUMBER < 0x10100000L
+
static EVP_PKEY *pkey_from_jwk(grpc_exec_ctx *exec_ctx, const grpc_json *json,
const char *kty) {
const grpc_json *key_prop;
@@ -478,21 +507,27 @@ static EVP_PKEY *pkey_from_jwk(grpc_exec_ctx *exec_ctx, const grpc_json *json,
gpr_log(GPR_ERROR, "Could not create rsa key.");
goto end;
}
+ BIGNUM *tmp_n = NULL;
+ BIGNUM *tmp_e = NULL;
for (key_prop = json->child; key_prop != NULL; key_prop = key_prop->next) {
if (strcmp(key_prop->key, "n") == 0) {
- rsa->n =
+ tmp_n =
bignum_from_base64(exec_ctx, validate_string_field(key_prop, "n"));
- if (rsa->n == NULL) goto end;
+ if (tmp_n == NULL) goto end;
} else if (strcmp(key_prop->key, "e") == 0) {
- rsa->e =
+ tmp_e =
bignum_from_base64(exec_ctx, validate_string_field(key_prop, "e"));
- if (rsa->e == NULL) goto end;
+ if (tmp_e == NULL) goto end;
}
}
- if (rsa->e == NULL || rsa->n == NULL) {
+ if (tmp_e == NULL || tmp_n == NULL) {
gpr_log(GPR_ERROR, "Missing RSA public key field.");
goto end;
}
+ if (!RSA_set0_key(rsa, tmp_n, tmp_e, NULL)) {
+ gpr_log(GPR_ERROR, "Cannot set RSA key from inputs.");
+ goto end;
+ }
result = EVP_PKEY_new();
EVP_PKEY_set1_RSA(result, rsa); /* uprefs rsa. */
diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c
index 58112b04b4..50a51b31cd 100644
--- a/src/core/lib/security/transport/client_auth_filter.c
+++ b/src/core/lib/security/transport/client_auth_filter.c
@@ -49,7 +49,6 @@ typedef struct {
pollset_set so that work can progress when this call wants work to progress
*/
grpc_polling_entity *pollent;
- grpc_transport_stream_op_batch op;
gpr_atm security_context_set;
gpr_mu security_context_mu;
grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT];
@@ -92,11 +91,10 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data,
size_t num_md,
grpc_credentials_status status,
const char *error_details) {
- grpc_call_element *elem = (grpc_call_element *)user_data;
+ grpc_transport_stream_op_batch *batch =
+ (grpc_transport_stream_op_batch *)user_data;
+ grpc_call_element *elem = batch->handler_private.extra_arg;
call_data *calld = elem->call_data;
- grpc_transport_stream_op_batch *op = &calld->op;
- grpc_metadata_batch *mdb;
- size_t i;
reset_auth_metadata_context(&calld->auth_md_context);
grpc_error *error = GRPC_ERROR_NONE;
if (status != GRPC_CREDENTIALS_OK) {
@@ -108,9 +106,10 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data,
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED);
} else {
GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT);
- GPR_ASSERT(op->send_initial_metadata);
- mdb = op->payload->send_initial_metadata.send_initial_metadata;
- for (i = 0; i < num_md; i++) {
+ GPR_ASSERT(batch->send_initial_metadata);
+ grpc_metadata_batch *mdb =
+ batch->payload->send_initial_metadata.send_initial_metadata;
+ for (size_t i = 0; i < num_md; i++) {
add_error(&error,
grpc_metadata_batch_add_tail(
exec_ctx, mdb, &calld->md_links[i],
@@ -120,9 +119,9 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data,
}
}
if (error == GRPC_ERROR_NONE) {
- grpc_call_next_op(exec_ctx, elem, op);
+ grpc_call_next_op(exec_ctx, elem, batch);
} else {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error);
}
}
@@ -158,11 +157,11 @@ void build_auth_metadata_context(grpc_security_connector *sc,
static void send_security_metadata(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
+ grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_client_security_context *ctx =
- (grpc_client_security_context *)op->payload
+ (grpc_client_security_context *)batch->payload
->context[GRPC_CONTEXT_SECURITY]
.value;
grpc_call_credentials *channel_call_creds =
@@ -171,7 +170,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
if (channel_call_creds == NULL && !call_creds_has_md) {
/* Skip sending metadata altogether. */
- grpc_call_next_op(exec_ctx, elem, op);
+ grpc_call_next_op(exec_ctx, elem, batch);
return;
}
@@ -180,7 +179,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
ctx->creds, NULL);
if (calld->creds == NULL) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op,
+ exec_ctx, batch,
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Incompatible credentials set on channel and call."),
@@ -194,28 +193,29 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
build_auth_metadata_context(&chand->security_connector->base,
chand->auth_context, calld);
- calld->op = *op; /* Copy op (originates from the caller's stack). */
GPR_ASSERT(calld->pollent != NULL);
grpc_call_credentials_get_request_metadata(
exec_ctx, calld->creds, calld->pollent, calld->auth_md_context,
- on_credentials_metadata, elem);
+ on_credentials_metadata, batch);
}
static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_security_status status) {
- grpc_call_element *elem = (grpc_call_element *)user_data;
+ grpc_transport_stream_op_batch *batch =
+ (grpc_transport_stream_op_batch *)user_data;
+ grpc_call_element *elem = batch->handler_private.extra_arg;
call_data *calld = elem->call_data;
if (status == GRPC_SECURITY_OK) {
- send_security_metadata(exec_ctx, elem, &calld->op);
+ send_security_metadata(exec_ctx, elem, batch);
} else {
char *error_msg;
char *host = grpc_slice_to_c_string(calld->host);
gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.",
host);
gpr_free(host);
- grpc_call_element_signal_error(
- exec_ctx, elem,
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, batch,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg),
GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAUTHENTICATED));
@@ -223,35 +223,29 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data,
}
}
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
- GPR_TIMER_BEGIN("auth_start_transport_op", 0);
+static void auth_start_transport_stream_op_batch(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op_batch *batch) {
+ GPR_TIMER_BEGIN("auth_start_transport_stream_op_batch", 0);
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- grpc_linked_mdelem *l;
- grpc_client_security_context *sec_ctx = NULL;
- if (!op->cancel_stream) {
+ if (!batch->cancel_stream) {
/* double checked lock over security context to ensure it's set once */
if (gpr_atm_acq_load(&calld->security_context_set) == 0) {
gpr_mu_lock(&calld->security_context_mu);
if (gpr_atm_acq_load(&calld->security_context_set) == 0) {
- GPR_ASSERT(op->payload->context != NULL);
- if (op->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) {
- op->payload->context[GRPC_CONTEXT_SECURITY].value =
+ GPR_ASSERT(batch->payload->context != NULL);
+ if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) {
+ batch->payload->context[GRPC_CONTEXT_SECURITY].value =
grpc_client_security_context_create();
- op->payload->context[GRPC_CONTEXT_SECURITY].destroy =
+ batch->payload->context[GRPC_CONTEXT_SECURITY].destroy =
grpc_client_security_context_destroy;
}
- sec_ctx = op->payload->context[GRPC_CONTEXT_SECURITY].value;
+ grpc_client_security_context *sec_ctx =
+ batch->payload->context[GRPC_CONTEXT_SECURITY].value;
GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter");
sec_ctx->auth_context =
GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter");
@@ -261,9 +255,9 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
}
}
- if (op->send_initial_metadata) {
- for (l = op->payload->send_initial_metadata.send_initial_metadata->list
- .head;
+ if (batch->send_initial_metadata) {
+ for (grpc_linked_mdelem *l = batch->payload->send_initial_metadata
+ .send_initial_metadata->list.head;
l != NULL; l = l->next) {
grpc_mdelem md = l->md;
/* Pointer comparison is OK for md_elems created from the same context.
@@ -284,19 +278,19 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
}
if (calld->have_host) {
char *call_host = grpc_slice_to_c_string(calld->host);
- calld->op = *op; /* Copy op (originates from the caller's stack). */
+ batch->handler_private.extra_arg = elem;
grpc_channel_security_connector_check_call_host(
exec_ctx, chand->security_connector, call_host, chand->auth_context,
- on_host_checked, elem);
+ on_host_checked, batch);
gpr_free(call_host);
- GPR_TIMER_END("auth_start_transport_op", 0);
+ GPR_TIMER_END("auth_start_transport_stream_op_batch", 0);
return; /* early exit */
}
}
/* pass control down the stack */
- grpc_call_next_op(exec_ctx, elem, op);
- GPR_TIMER_END("auth_start_transport_op", 0);
+ grpc_call_next_op(exec_ctx, elem, batch);
+ GPR_TIMER_END("auth_start_transport_stream_op_batch", 0);
}
/* Constructor for call_data */
@@ -379,7 +373,15 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
}
const grpc_channel_filter grpc_client_auth_filter = {
- auth_start_transport_op, grpc_channel_next_op, sizeof(call_data),
- init_call_elem, set_pollset_or_pollset_set, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem,
- grpc_call_next_get_peer, grpc_channel_next_get_info, "client-auth"};
+ auth_start_transport_stream_op_batch,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ init_call_elem,
+ set_pollset_or_pollset_set,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ grpc_channel_next_get_info,
+ "client-auth"};
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index 4e6914be7b..9bf3f0ca0f 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -27,14 +27,9 @@
#include "src/core/lib/slice/slice_internal.h"
typedef struct call_data {
- grpc_metadata_batch *recv_initial_metadata;
- /* Closure to call when finished with the auth_on_recv hook. */
- grpc_closure *on_done_recv;
- /* Receive closures are chained: we inject this closure as the on_done_recv
- up-call on transport_op, and remember to call our on_done_recv member after
- handling it. */
- grpc_closure auth_on_recv;
- grpc_transport_stream_op_batch *transport_op;
+ grpc_transport_stream_op_batch *recv_initial_metadata_batch;
+ grpc_closure *original_recv_initial_metadata_ready;
+ grpc_closure recv_initial_metadata_ready;
grpc_metadata_array md;
const grpc_metadata *consumed_md;
size_t num_consumed_md;
@@ -90,125 +85,96 @@ static void on_md_processing_done(
grpc_status_code status, const char *error_details) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
+ grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
-
/* TODO(jboeuf): Implement support for response_md. */
if (response_md != NULL && num_response_md > 0) {
gpr_log(GPR_INFO,
"response_md in auth metadata processing not supported for now. "
"Ignoring...");
}
-
+ grpc_error *error = GRPC_ERROR_NONE;
if (status == GRPC_STATUS_OK) {
calld->consumed_md = consumed_md;
calld->num_consumed_md = num_consumed_md;
- /* TODO(ctiller): propagate error */
- GRPC_LOG_IF_ERROR(
- "grpc_metadata_batch_filter",
- grpc_metadata_batch_filter(&exec_ctx, calld->recv_initial_metadata,
- remove_consumed_md, elem,
- "Response metadata filtering error"));
- for (size_t i = 0; i < calld->md.count; i++) {
- grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key);
- grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value);
- }
- grpc_metadata_array_destroy(&calld->md);
- GRPC_CLOSURE_SCHED(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE);
+ error = grpc_metadata_batch_filter(
+ &exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata,
+ remove_consumed_md, elem, "Response metadata filtering error");
} else {
- for (size_t i = 0; i < calld->md.count; i++) {
- grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key);
- grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value);
- }
- grpc_metadata_array_destroy(&calld->md);
- error_details = error_details != NULL
- ? error_details
- : "Authentication metadata processing failed.";
- if (calld->transport_op->send_message) {
- grpc_byte_stream_destroy(
- &exec_ctx, calld->transport_op->payload->send_message.send_message);
- calld->transport_op->payload->send_message.send_message = NULL;
+ if (error_details == NULL) {
+ error_details = "Authentication metadata processing failed.";
}
- GRPC_CLOSURE_SCHED(
- &exec_ctx, calld->on_done_recv,
+ error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details),
- GRPC_ERROR_INT_GRPC_STATUS, status));
+ GRPC_ERROR_INT_GRPC_STATUS, status);
}
-
+ for (size_t i = 0; i < calld->md.count; i++) {
+ grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key);
+ grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value);
+ }
+ grpc_metadata_array_destroy(&calld->md);
+ GRPC_CLOSURE_SCHED(&exec_ctx, calld->original_recv_initial_metadata_ready,
+ error);
grpc_exec_ctx_finish(&exec_ctx);
}
-static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_error *error) {
- grpc_call_element *elem = user_data;
- call_data *calld = elem->call_data;
+static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_call_element *elem = arg;
channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch;
if (error == GRPC_ERROR_NONE) {
if (chand->creds != NULL && chand->creds->processor.process != NULL) {
- calld->md = metadata_batch_to_md_array(calld->recv_initial_metadata);
+ calld->md = metadata_batch_to_md_array(
+ batch->payload->recv_initial_metadata.recv_initial_metadata);
chand->creds->processor.process(
chand->creds->processor.state, calld->auth_context,
calld->md.metadata, calld->md.count, on_md_processing_done, elem);
return;
}
}
- GRPC_CLOSURE_SCHED(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_initial_metadata_ready,
+ GRPC_ERROR_REF(error));
}
-static void set_recv_ops_md_callbacks(grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
+static void auth_start_transport_stream_op_batch(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
-
- if (op->recv_initial_metadata) {
- /* substitute our callback for the higher callback */
- calld->recv_initial_metadata =
- op->payload->recv_initial_metadata.recv_initial_metadata;
- calld->on_done_recv =
- op->payload->recv_initial_metadata.recv_initial_metadata_ready;
- op->payload->recv_initial_metadata.recv_initial_metadata_ready =
- &calld->auth_on_recv;
- calld->transport_op = op;
+ if (batch->recv_initial_metadata) {
+ // Inject our callback.
+ calld->recv_initial_metadata_batch = batch;
+ calld->original_recv_initial_metadata_ready =
+ batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
+ batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
+ &calld->recv_initial_metadata_ready;
}
-}
-
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
- set_recv_ops_md_callbacks(elem, op);
- grpc_call_next_op(exec_ctx, elem, op);
+ grpc_call_next_op(exec_ctx, elem, batch);
}
/* Constructor for call_data */
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
- /* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- grpc_server_security_context *server_ctx = NULL;
-
- /* initialize members */
- memset(calld, 0, sizeof(*calld));
- GRPC_CLOSURE_INIT(&calld->auth_on_recv, auth_on_recv, elem,
+ GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
+ recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
-
+ // Create server security context. Set its auth context from channel
+ // data and save it in the call context.
+ grpc_server_security_context *server_ctx =
+ grpc_server_security_context_create();
+ server_ctx->auth_context = grpc_auth_context_create(chand->auth_context);
+ calld->auth_context = server_ctx->auth_context;
if (args->context[GRPC_CONTEXT_SECURITY].value != NULL) {
args->context[GRPC_CONTEXT_SECURITY].destroy(
args->context[GRPC_CONTEXT_SECURITY].value);
}
-
- server_ctx = grpc_server_security_context_create();
- server_ctx->auth_context = grpc_auth_context_create(chand->auth_context);
- calld->auth_context = server_ctx->auth_context;
-
args->context[GRPC_CONTEXT_SECURITY].value = server_ctx;
args->context[GRPC_CONTEXT_SECURITY].destroy =
grpc_server_security_context_destroy;
-
return GRPC_ERROR_NONE;
}
@@ -221,19 +187,15 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
+ GPR_ASSERT(!args->is_last);
+ channel_data *chand = elem->channel_data;
grpc_auth_context *auth_context =
grpc_find_auth_context_in_args(args->channel_args);
- grpc_server_credentials *creds =
- grpc_find_server_credentials_in_args(args->channel_args);
- /* grab pointers to our data from the channel element */
- channel_data *chand = elem->channel_data;
-
- GPR_ASSERT(!args->is_last);
GPR_ASSERT(auth_context != NULL);
-
- /* initialize members */
chand->auth_context =
GRPC_AUTH_CONTEXT_REF(auth_context, "server_auth_filter");
+ grpc_server_credentials *creds =
+ grpc_find_server_credentials_in_args(args->channel_args);
chand->creds = grpc_server_credentials_ref(creds);
return GRPC_ERROR_NONE;
}
@@ -241,14 +203,13 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
/* Destructor for channel data */
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {
- /* grab pointers to our data from the channel element */
channel_data *chand = elem->channel_data;
GRPC_AUTH_CONTEXT_UNREF(chand->auth_context, "server_auth_filter");
grpc_server_credentials_unref(exec_ctx, chand->creds);
}
const grpc_channel_filter grpc_server_auth_filter = {
- auth_start_transport_op,
+ auth_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
diff --git a/src/core/lib/support/arena.c b/src/core/lib/support/arena.c
index b433c61b4c..9e0f73ae3d 100644
--- a/src/core/lib/support/arena.c
+++ b/src/core/lib/support/arena.c
@@ -38,7 +38,7 @@ struct gpr_arena {
gpr_arena *gpr_arena_create(size_t initial_size) {
initial_size = ROUND_UP_TO_ALIGNMENT_SIZE(initial_size);
- gpr_arena *a = gpr_zalloc(sizeof(gpr_arena) + initial_size);
+ gpr_arena *a = (gpr_arena *)gpr_zalloc(sizeof(gpr_arena) + initial_size);
a->initial_zone.size_end = initial_size;
return a;
}
@@ -64,7 +64,7 @@ void *gpr_arena_alloc(gpr_arena *arena, size_t size) {
zone *next_z = (zone *)gpr_atm_acq_load(&z->next_atm);
if (next_z == NULL) {
size_t next_z_size = (size_t)gpr_atm_no_barrier_load(&arena->size_so_far);
- next_z = gpr_zalloc(sizeof(zone) + next_z_size);
+ next_z = (zone *)gpr_zalloc(sizeof(zone) + next_z_size);
next_z->size_begin = z->size_end;
next_z->size_end = z->size_end + next_z_size;
if (!gpr_atm_rel_cas(&z->next_atm, (gpr_atm)NULL, (gpr_atm)next_z)) {
diff --git a/src/core/lib/support/atm.c b/src/core/lib/support/atm.c
index caa0bafe33..2f37d62f76 100644
--- a/src/core/lib/support/atm.c
+++ b/src/core/lib/support/atm.c
@@ -21,12 +21,12 @@
gpr_atm gpr_atm_no_barrier_clamped_add(gpr_atm *value, gpr_atm delta,
gpr_atm min, gpr_atm max) {
- gpr_atm current;
- gpr_atm new;
+ gpr_atm current_value;
+ gpr_atm new_value;
do {
- current = gpr_atm_no_barrier_load(value);
- new = GPR_CLAMP(current + delta, min, max);
- if (new == current) break;
- } while (!gpr_atm_no_barrier_cas(value, current, new));
- return new;
+ current_value = gpr_atm_no_barrier_load(value);
+ new_value = GPR_CLAMP(current_value + delta, min, max);
+ if (new_value == current_value) break;
+ } while (!gpr_atm_no_barrier_cas(value, current_value, new_value));
+ return new_value;
}
diff --git a/src/core/lib/support/avl.c b/src/core/lib/support/avl.c
index aa0f665272..a6178fdbce 100644
--- a/src/core/lib/support/avl.c
+++ b/src/core/lib/support/avl.c
@@ -76,7 +76,7 @@ static gpr_avl_node *assert_invariants(gpr_avl_node *n) { return n; }
gpr_avl_node *new_node(void *key, void *value, gpr_avl_node *left,
gpr_avl_node *right) {
- gpr_avl_node *node = gpr_malloc(sizeof(*node));
+ gpr_avl_node *node = (gpr_avl_node *)gpr_malloc(sizeof(*node));
gpr_ref_init(&node->refs, 1);
node->key = key;
node->value = value;
diff --git a/src/core/lib/support/mpscq.c b/src/core/lib/support/mpscq.c
index 58c4c435d3..e9f893988d 100644
--- a/src/core/lib/support/mpscq.c
+++ b/src/core/lib/support/mpscq.c
@@ -31,12 +31,11 @@ void gpr_mpscq_destroy(gpr_mpscq *q) {
GPR_ASSERT(q->tail == &q->stub);
}
-bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) {
+void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) {
gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL);
gpr_mpscq_node *prev =
(gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n);
gpr_atm_rel_store(&prev->next, (gpr_atm)n);
- return prev == &q->stub;
}
gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) {
@@ -78,25 +77,3 @@ gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty) {
*empty = false;
return NULL;
}
-
-void gpr_locked_mpscq_init(gpr_locked_mpscq *q) {
- gpr_mpscq_init(&q->queue);
- q->read_lock = GPR_SPINLOCK_INITIALIZER;
-}
-
-void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q) {
- gpr_mpscq_destroy(&q->queue);
-}
-
-bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n) {
- return gpr_mpscq_push(&q->queue, n);
-}
-
-gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q) {
- if (gpr_spinlock_trylock(&q->read_lock)) {
- gpr_mpscq_node *n = gpr_mpscq_pop(&q->queue);
- gpr_spinlock_unlock(&q->read_lock);
- return n;
- }
- return NULL;
-}
diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h
index 2f4739d7f8..daa51768f7 100644
--- a/src/core/lib/support/mpscq.h
+++ b/src/core/lib/support/mpscq.h
@@ -22,7 +22,6 @@
#include <grpc/support/atm.h>
#include <stdbool.h>
#include <stddef.h>
-#include "src/core/lib/support/spinlock.h"
// Multiple-producer single-consumer lock free queue, based upon the
// implementation from Dmitry Vyukov here:
@@ -44,34 +43,11 @@ typedef struct gpr_mpscq {
void gpr_mpscq_init(gpr_mpscq *q);
void gpr_mpscq_destroy(gpr_mpscq *q);
// Push a node
-// Thread safe - can be called from multiple threads concurrently
-// Returns true if this was possibly the first node (may return true
-// sporadically, will not return false sporadically)
-bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n);
+void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n);
// Pop a node (returns NULL if no node is ready - which doesn't indicate that
// the queue is empty!!)
-// Thread compatible - can only be called from one thread at a time
gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q);
// Pop a node; sets *empty to true if the queue is empty, or false if it is not
gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty);
-// An mpscq with a spinlock: it's safe to pop from multiple threads, but doing
-// only one thread will succeed concurrently
-typedef struct gpr_locked_mpscq {
- gpr_mpscq queue;
- gpr_spinlock read_lock;
-} gpr_locked_mpscq;
-
-void gpr_locked_mpscq_init(gpr_locked_mpscq *q);
-void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q);
-// Push a node
-// Thread safe - can be called from multiple threads concurrently
-// Returns true if this was possibly the first node (may return true
-// sporadically, will not return false sporadically)
-bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n);
-// Pop a node (returns NULL if no node is ready - which doesn't indicate that
-// the queue is empty!!)
-// Thread safe - can be called from multiple threads concurrently
-gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q);
-
#endif /* GRPC_CORE_LIB_SUPPORT_MPSCQ_H */
diff --git a/src/core/lib/support/stack_lockfree.c b/src/core/lib/support/stack_lockfree.c
new file mode 100644
index 0000000000..0fb64ed001
--- /dev/null
+++ b/src/core/lib/support/stack_lockfree.c
@@ -0,0 +1,137 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/lib/support/stack_lockfree.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/atm.h>
+#include <grpc/support/log.h>
+#include <grpc/support/port_platform.h>
+
+/* The lockfree node structure is a single architecture-level
+ word that allows for an atomic CAS to set it up. */
+struct lockfree_node_contents {
+ /* next thing to look at. Actual index for head, next index otherwise */
+ uint16_t index;
+#ifdef GPR_ARCH_64
+ uint16_t pad;
+ uint32_t aba_ctr;
+#else
+#ifdef GPR_ARCH_32
+ uint16_t aba_ctr;
+#else
+#error Unsupported bit width architecture
+#endif
+#endif
+};
+
+/* Use a union to make sure that these are in the same bits as an atm word */
+typedef union lockfree_node {
+ gpr_atm atm;
+ struct lockfree_node_contents contents;
+} lockfree_node;
+
+/* make sure that entries aligned to 8-bytes */
+#define ENTRY_ALIGNMENT_BITS 3
+/* reserve this entry as invalid */
+#define INVALID_ENTRY_INDEX ((1 << 16) - 1)
+
+struct gpr_stack_lockfree {
+ lockfree_node *entries;
+ lockfree_node head; /* An atomic entry describing curr head */
+};
+
+gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries) {
+ gpr_stack_lockfree *stack;
+ stack = (gpr_stack_lockfree *)gpr_malloc(sizeof(*stack));
+ /* Since we only allocate 16 bits to represent an entry number,
+ * make sure that we are within the desired range */
+ /* Reserve the highest entry number as a dummy */
+ GPR_ASSERT(entries < INVALID_ENTRY_INDEX);
+ stack->entries = (lockfree_node *)gpr_malloc_aligned(
+ entries * sizeof(stack->entries[0]), ENTRY_ALIGNMENT_BITS);
+ /* Clear out all entries */
+ memset(stack->entries, 0, entries * sizeof(stack->entries[0]));
+ memset(&stack->head, 0, sizeof(stack->head));
+
+ GPR_ASSERT(sizeof(stack->entries->atm) == sizeof(stack->entries->contents));
+
+ /* Point the head at reserved dummy entry */
+ stack->head.contents.index = INVALID_ENTRY_INDEX;
+/* Fill in the pad and aba_ctr to avoid confusing memcheck tools */
+#ifdef GPR_ARCH_64
+ stack->head.contents.pad = 0;
+#endif
+ stack->head.contents.aba_ctr = 0;
+ return stack;
+}
+
+void gpr_stack_lockfree_destroy(gpr_stack_lockfree *stack) {
+ gpr_free_aligned(stack->entries);
+ gpr_free(stack);
+}
+
+int gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) {
+ lockfree_node head;
+ lockfree_node newhead;
+ lockfree_node curent;
+ lockfree_node newent;
+
+ /* First fill in the entry's index and aba ctr for new head */
+ newhead.contents.index = (uint16_t)entry;
+#ifdef GPR_ARCH_64
+ /* Fill in the pad to avoid confusing memcheck tools */
+ newhead.contents.pad = 0;
+#endif
+
+ /* Also post-increment the aba_ctr */
+ curent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm);
+ newhead.contents.aba_ctr = ++curent.contents.aba_ctr;
+ gpr_atm_no_barrier_store(&stack->entries[entry].atm, curent.atm);
+
+ do {
+ /* Atomically get the existing head value for use */
+ head.atm = gpr_atm_no_barrier_load(&(stack->head.atm));
+ /* Point to it */
+ newent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm);
+ newent.contents.index = head.contents.index;
+ gpr_atm_no_barrier_store(&stack->entries[entry].atm, newent.atm);
+ } while (!gpr_atm_rel_cas(&(stack->head.atm), head.atm, newhead.atm));
+ /* Use rel_cas above to make sure that entry index is set properly */
+ return head.contents.index == INVALID_ENTRY_INDEX;
+}
+
+int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack) {
+ lockfree_node head;
+ lockfree_node newhead;
+
+ do {
+ head.atm = gpr_atm_acq_load(&(stack->head.atm));
+ if (head.contents.index == INVALID_ENTRY_INDEX) {
+ return -1;
+ }
+ newhead.atm =
+ gpr_atm_no_barrier_load(&(stack->entries[head.contents.index].atm));
+
+ } while (!gpr_atm_no_barrier_cas(&(stack->head.atm), head.atm, newhead.atm));
+
+ return head.contents.index;
+}
diff --git a/src/core/lib/support/stack_lockfree.h b/src/core/lib/support/stack_lockfree.h
new file mode 100644
index 0000000000..6324211b72
--- /dev/null
+++ b/src/core/lib/support/stack_lockfree.h
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H
+#define GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H
+
+#include <stddef.h>
+
+typedef struct gpr_stack_lockfree gpr_stack_lockfree;
+
+/* This stack must specify the maximum number of entries to track.
+ The current implementation only allows up to 65534 entries */
+gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries);
+void gpr_stack_lockfree_destroy(gpr_stack_lockfree *stack);
+
+/* Pass in a valid entry number for the next stack entry */
+/* Returns 1 if this is the first element on the stack, 0 otherwise */
+int gpr_stack_lockfree_push(gpr_stack_lockfree *, int entry);
+
+/* Returns -1 on empty or the actual entry number */
+int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack);
+
+#endif /* GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H */
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 84ddf74ab9..0cd436883a 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -32,8 +32,7 @@
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/slice/slice_internal.h"
-#include "src/core/lib/support/mpscq.h"
-#include "src/core/lib/support/spinlock.h"
+#include "src/core/lib/support/stack_lockfree.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h"
@@ -62,7 +61,6 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
grpc_tracer_flag grpc_server_channel_trace = GRPC_TRACER_INITIALIZER(false);
typedef struct requested_call {
- gpr_mpscq_node request_link; /* must be first */
requested_call_type type;
size_t cq_idx;
void *tag;
@@ -162,7 +160,7 @@ struct request_matcher {
grpc_server *server;
call_data *pending_head;
call_data *pending_tail;
- gpr_locked_mpscq *requests_per_cq;
+ gpr_stack_lockfree **requests_per_cq;
};
struct registered_method {
@@ -207,6 +205,11 @@ struct grpc_server {
registered_method *registered_methods;
/** one request matcher for unregistered methods */
request_matcher unregistered_request_matcher;
+ /** free list of available requested_calls_per_cq indices */
+ gpr_stack_lockfree **request_freelist_per_cq;
+ /** requested call backing data */
+ requested_call **requested_calls_per_cq;
+ int max_requested_calls_per_cq;
gpr_atm shutdown_flag;
uint8_t shutdown_published;
@@ -306,20 +309,21 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
* request_matcher
*/
-static void request_matcher_init(request_matcher *rm, grpc_server *server) {
+static void request_matcher_init(request_matcher *rm, size_t entries,
+ grpc_server *server) {
memset(rm, 0, sizeof(*rm));
rm->server = server;
rm->requests_per_cq =
gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count);
for (size_t i = 0; i < server->cq_count; i++) {
- gpr_locked_mpscq_init(&rm->requests_per_cq[i]);
+ rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries);
}
}
static void request_matcher_destroy(request_matcher *rm) {
for (size_t i = 0; i < rm->server->cq_count; i++) {
- GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == NULL);
- gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]);
+ GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1);
+ gpr_stack_lockfree_destroy(rm->requests_per_cq[i]);
}
gpr_free(rm->requests_per_cq);
}
@@ -349,17 +353,13 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx,
grpc_server *server,
request_matcher *rm,
grpc_error *error) {
- requested_call *rc;
+ int request_id;
for (size_t i = 0; i < server->cq_count; i++) {
- /* Here we know:
- 1. no requests are being added (since the server is shut down)
- 2. no other threads are pulling (since the shut down process is single
- threaded)
- So, we can ignore the queue lock and just pop, with the guarantee that a
- NULL returned here truly means that the queue is empty */
- while ((rc = (requested_call *)gpr_mpscq_pop(
- &rm->requests_per_cq[i].queue)) != NULL) {
- fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error));
+ while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) !=
+ -1) {
+ fail_call(exec_ctx, server, i,
+ &server->requested_calls_per_cq[i][request_id],
+ GRPC_ERROR_REF(error));
}
}
GRPC_ERROR_UNREF(error);
@@ -394,7 +394,13 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
}
for (i = 0; i < server->cq_count; i++) {
GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server");
+ if (server->started) {
+ gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]);
+ gpr_free(server->requested_calls_per_cq[i]);
+ }
}
+ gpr_free(server->request_freelist_per_cq);
+ gpr_free(server->requested_calls_per_cq);
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
@@ -452,7 +458,21 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand,
static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
grpc_cq_completion *c) {
- gpr_free(req);
+ requested_call *rc = req;
+ grpc_server *server = rc->server;
+
+ if (rc >= server->requested_calls_per_cq[rc->cq_idx] &&
+ rc < server->requested_calls_per_cq[rc->cq_idx] +
+ server->max_requested_calls_per_cq) {
+ GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX);
+ gpr_stack_lockfree_push(
+ server->request_freelist_per_cq[rc->cq_idx],
+ (int)(rc - server->requested_calls_per_cq[rc->cq_idx]));
+ } else {
+ gpr_free(req);
+ }
+
+ server_unref(exec_ctx, server);
}
static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
@@ -482,6 +502,10 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
GPR_UNREACHABLE_CODE(return );
}
+ grpc_call_element *elem =
+ grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
+ channel_data *chand = elem->channel_data;
+ server_ref(chand->server);
grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE,
done_request_event, rc, &rc->completion);
}
@@ -509,15 +533,15 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
for (size_t i = 0; i < server->cq_count; i++) {
size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
- requested_call *rc =
- (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]);
- if (rc == NULL) {
+ int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]);
+ if (request_id == -1) {
continue;
} else {
gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
- publish_call(exec_ctx, server, calld, cq_idx, rc);
+ publish_call(exec_ctx, server, calld, cq_idx,
+ &server->requested_calls_per_cq[cq_idx][request_id]);
return; /* early out */
}
}
@@ -992,6 +1016,8 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
server->root_channel_data.next = server->root_channel_data.prev =
&server->root_channel_data;
+ /* TODO(ctiller): expose a channel_arg for this */
+ server->max_requested_calls_per_cq = 32768;
server->channel_args = grpc_channel_args_copy(args);
return server;
@@ -1064,15 +1090,29 @@ void grpc_server_start(grpc_server *server) {
server->started = true;
server->pollset_count = 0;
server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
+ server->request_freelist_per_cq =
+ gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count);
+ server->requested_calls_per_cq =
+ gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
if (grpc_cq_can_listen(server->cqs[i])) {
server->pollsets[server->pollset_count++] =
grpc_cq_pollset(server->cqs[i]);
}
+ server->request_freelist_per_cq[i] =
+ gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq);
+ for (int j = 0; j < server->max_requested_calls_per_cq; j++) {
+ gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j);
+ }
+ server->requested_calls_per_cq[i] =
+ gpr_malloc((size_t)server->max_requested_calls_per_cq *
+ sizeof(*server->requested_calls_per_cq[i]));
}
- request_matcher_init(&server->unregistered_request_matcher, server);
+ request_matcher_init(&server->unregistered_request_matcher,
+ (size_t)server->max_requested_calls_per_cq, server);
for (registered_method *rm = server->registered_methods; rm; rm = rm->next) {
- request_matcher_init(&rm->request_matcher, server);
+ request_matcher_init(&rm->request_matcher,
+ (size_t)server->max_requested_calls_per_cq, server);
}
server_ref(server);
@@ -1326,11 +1366,21 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
requested_call *rc) {
call_data *calld = NULL;
request_matcher *rm = NULL;
+ int request_id;
if (gpr_atm_acq_load(&server->shutdown_flag)) {
fail_call(exec_ctx, server, cq_idx, rc,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
return GRPC_CALL_OK;
}
+ request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]);
+ if (request_id == -1) {
+ /* out of request ids: just fail this one */
+ fail_call(exec_ctx, server, cq_idx, rc,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"),
+ GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq));
+ return GRPC_CALL_OK;
+ }
switch (rc->type) {
case BATCH_CALL:
rm = &server->unregistered_request_matcher;
@@ -1339,13 +1389,15 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
rm = &rc->data.registered.registered_method->request_matcher;
break;
}
- if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) {
+ server->requested_calls_per_cq[cq_idx][request_id] = *rc;
+ gpr_free(rc);
+ if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) {
/* this was the first queued request: we need to lock and start
matching calls */
gpr_mu_lock(&server->mu_call);
while ((calld = rm->pending_head) != NULL) {
- rc = (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]);
- if (rc == NULL) break;
+ request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]);
+ if (request_id == -1) break;
rm->pending_head = calld->pending_next;
gpr_mu_unlock(&server->mu_call);
gpr_mu_lock(&calld->mu_state);
@@ -1361,7 +1413,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
- publish_call(exec_ctx, server, calld, cq_idx, rc);
+ publish_call(exec_ctx, server, calld, cq_idx,
+ &server->requested_calls_per_cq[cq_idx][request_id]);
}
gpr_mu_lock(&server->mu_call);
}
@@ -1468,6 +1521,7 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
rc->initial_metadata->count = 0;
GPR_ASSERT(error != GRPC_ERROR_NONE);
+ server_ref(server);
grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error,
done_request_event, rc, &rc->completion);
}
diff --git a/src/core/lib/transport/static_metadata.c b/src/core/lib/transport/static_metadata.c
index 404c240589..2388f19f81 100644
--- a/src/core/lib/transport/static_metadata.c
+++ b/src/core/lib/transport/static_metadata.c
@@ -464,7 +464,8 @@ grpc_mdelem grpc_static_mdelem_for_static_strings(int a, int b) {
if (a == -1 || b == -1) return GRPC_MDNULL;
uint32_t k = (uint32_t)(a * 99 + b);
uint32_t h = elems_phash(k);
- return h < GPR_ARRAY_SIZE(elem_keys) && elem_keys[h] == k
+ return h < GPR_ARRAY_SIZE(elem_keys) && elem_keys[h] == k &&
+ elem_idxs[h] != 255
? GRPC_MAKE_MDELEM(&grpc_static_mdelem_table[elem_idxs[h]],
GRPC_MDELEM_STORAGE_STATIC)
: GRPC_MDNULL;
diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c
index 37d4f038b7..1fd65928f9 100644
--- a/src/core/tsi/ssl_transport_security.c
+++ b/src/core/tsi/ssl_transport_security.c
@@ -411,15 +411,11 @@ static tsi_result do_ssl_read(SSL *ssl, unsigned char *unprotected_bytes,
GPR_ASSERT(*unprotected_bytes_size <= INT_MAX);
read_from_ssl =
SSL_read(ssl, unprotected_bytes, (int)*unprotected_bytes_size);
- if (read_from_ssl == 0) {
- gpr_log(GPR_ERROR, "SSL_read returned 0 unexpectedly.");
- return TSI_INTERNAL_ERROR;
- }
- if (read_from_ssl < 0) {
+ if (read_from_ssl <= 0) {
read_from_ssl = SSL_get_error(ssl, read_from_ssl);
switch (read_from_ssl) {
- case SSL_ERROR_WANT_READ:
- /* We need more data to finish the frame. */
+ case SSL_ERROR_ZERO_RETURN: /* Received a close_notify alert. */
+ case SSL_ERROR_WANT_READ: /* We need more data to finish the frame. */
*unprotected_bytes_size = 0;
return TSI_OK;
case SSL_ERROR_WANT_WRITE: