aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2017-09-01 09:00:06 -0700
committerGravatar Mark D. Roth <roth@google.com>2017-09-01 09:00:06 -0700
commit764cf04a13958d72db5a22eb4bbb9370e00777f5 (patch)
tree28f265c5458bb537688a1e81535877e2926deb52 /src/core/ext/filters
parentc928cfee2b94a99747f97ff8c5fb09277a1352b7 (diff)
Revert "Revert "Implement call combiner""
Diffstat (limited to 'src/core/ext/filters')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c556
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c1
-rw-r--r--src/core/ext/filters/client_channel/subchannel.c23
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h5
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.c112
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.h8
-rw-r--r--src/core/ext/filters/http/client/http_client_filter.c9
-rw-r--r--src/core/ext/filters/http/message_compress/message_compress_filter.c276
-rw-r--r--src/core/ext/filters/http/server/http_server_filter.c58
-rw-r--r--src/core/ext/filters/load_reporting/load_reporting_filter.c1
-rw-r--r--src/core/ext/filters/max_age/max_age_filter.c1
-rw-r--r--src/core/ext/filters/message_size/message_size_filter.c6
-rw-r--r--src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c1
13 files changed, 534 insertions, 523 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index 58e31d7b45..e6822ce801 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -796,7 +796,8 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
// send_message
// recv_trailing_metadata
// send_trailing_metadata
-#define MAX_WAITING_BATCHES 6
+// We also add room for a single cancel_stream batch.
+#define MAX_WAITING_BATCHES 7
/** Call data. Holds a pointer to grpc_subchannel_call and the
associated machinery to create such a pointer.
@@ -808,23 +809,25 @@ typedef struct client_channel_call_data {
// The code in deadline_filter.c requires this to be the first field.
// TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
// and this struct both independently store a pointer to the call
- // stack and each has its own mutex. If/when we have time, find a way
- // to avoid this without breaking the grpc_deadline_state abstraction.
+ // combiner. If/when we have time, find a way to avoid this without
+ // breaking the grpc_deadline_state abstraction.
grpc_deadline_state deadline_state;
grpc_slice path; // Request path.
gpr_timespec call_start_time;
gpr_timespec deadline;
+ gpr_arena *arena;
+ grpc_call_combiner *call_combiner;
+
grpc_server_retry_throttle_data *retry_throttle_data;
method_parameters *method_params;
- /** either 0 for no call, a pointer to a grpc_subchannel_call (if the lowest
- bit is 0), or a pointer to an error (if the lowest bit is 1) */
- gpr_atm subchannel_call_or_error;
- gpr_arena *arena;
+ grpc_subchannel_call *subchannel_call;
+ grpc_error *error;
grpc_lb_policy *lb_policy; // Holds ref while LB pick is pending.
grpc_closure lb_pick_closure;
+ grpc_closure cancel_closure;
grpc_connected_subchannel *connected_subchannel;
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
@@ -832,10 +835,9 @@ typedef struct client_channel_call_data {
grpc_transport_stream_op_batch *waiting_for_pick_batches[MAX_WAITING_BATCHES];
size_t waiting_for_pick_batches_count;
+ grpc_closure handle_pending_batch_in_call_combiner[MAX_WAITING_BATCHES];
- grpc_transport_stream_op_batch_payload *initial_metadata_payload;
-
- grpc_call_stack *owning_call;
+ grpc_transport_stream_op_batch *initial_metadata_batch;
grpc_linked_mdelem lb_token_mdelem;
@@ -843,55 +845,42 @@ typedef struct client_channel_call_data {
grpc_closure *original_on_complete;
} call_data;
-typedef struct {
- grpc_subchannel_call *subchannel_call;
- grpc_error *error;
-} call_or_error;
-
-static call_or_error get_call_or_error(call_data *p) {
- gpr_atm c = gpr_atm_acq_load(&p->subchannel_call_or_error);
- if (c == 0)
- return (call_or_error){NULL, NULL};
- else if (c & 1)
- return (call_or_error){NULL, (grpc_error *)((c) & ~(gpr_atm)1)};
- else
- return (call_or_error){(grpc_subchannel_call *)c, NULL};
+grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
+ grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ return calld->subchannel_call;
}
-static bool set_call_or_error(call_data *p, call_or_error coe) {
- // this should always be under a lock
- call_or_error existing = get_call_or_error(p);
- if (existing.error != GRPC_ERROR_NONE) {
- GRPC_ERROR_UNREF(coe.error);
- return false;
- }
- GPR_ASSERT(existing.subchannel_call == NULL);
- if (coe.error != GRPC_ERROR_NONE) {
- GPR_ASSERT(coe.subchannel_call == NULL);
- gpr_atm_rel_store(&p->subchannel_call_or_error, 1 | (gpr_atm)coe.error);
+// This is called via the call combiner, so access to calld is synchronized.
+static void waiting_for_pick_batches_add(
+ call_data *calld, grpc_transport_stream_op_batch *batch) {
+ if (batch->send_initial_metadata) {
+ GPR_ASSERT(calld->initial_metadata_batch == NULL);
+ calld->initial_metadata_batch = batch;
} else {
- GPR_ASSERT(coe.subchannel_call != NULL);
- gpr_atm_rel_store(&p->subchannel_call_or_error,
- (gpr_atm)coe.subchannel_call);
+ GPR_ASSERT(calld->waiting_for_pick_batches_count < MAX_WAITING_BATCHES);
+ calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count++] =
+ batch;
}
- return true;
}
-grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
- grpc_call_element *call_elem) {
- return get_call_or_error(call_elem->call_data).subchannel_call;
-}
-
-static void waiting_for_pick_batches_add_locked(
- call_data *calld, grpc_transport_stream_op_batch *batch) {
- GPR_ASSERT(calld->waiting_for_pick_batches_count < MAX_WAITING_BATCHES);
- calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count++] =
- batch;
+// This is called via the call combiner, so access to calld is synchronized.
+static void fail_pending_batch_in_call_combiner(grpc_exec_ctx *exec_ctx,
+ void *arg, grpc_error *error) {
+ call_data *calld = arg;
+ if (calld->waiting_for_pick_batches_count > 0) {
+ --calld->waiting_for_pick_batches_count;
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx,
+ calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count],
+ GRPC_ERROR_REF(error), calld->call_combiner);
+ }
}
-static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_error *error) {
+// This is called via the call combiner, so access to calld is synchronized.
+static void waiting_for_pick_batches_fail(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_error *error) {
call_data *calld = elem->call_data;
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG,
@@ -900,34 +889,60 @@ static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx,
grpc_error_string(error));
}
for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
+ GRPC_CLOSURE_INIT(&calld->handle_pending_batch_in_call_combiner[i],
+ fail_pending_batch_in_call_combiner, calld,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner,
+ &calld->handle_pending_batch_in_call_combiner[i],
+ GRPC_ERROR_REF(error),
+ "waiting_for_pick_batches_fail");
+ }
+ if (calld->initial_metadata_batch != NULL) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->waiting_for_pick_batches[i], GRPC_ERROR_REF(error));
+ exec_ctx, calld->initial_metadata_batch, GRPC_ERROR_REF(error),
+ calld->call_combiner);
+ } else {
+ GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner,
+ "waiting_for_pick_batches_fail");
}
- calld->waiting_for_pick_batches_count = 0;
GRPC_ERROR_UNREF(error);
}
-static void waiting_for_pick_batches_resume_locked(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- if (calld->waiting_for_pick_batches_count == 0) return;
- call_or_error coe = get_call_or_error(calld);
- if (coe.error != GRPC_ERROR_NONE) {
- waiting_for_pick_batches_fail_locked(exec_ctx, elem,
- GRPC_ERROR_REF(coe.error));
- return;
+// This is called via the call combiner, so access to calld is synchronized.
+static void run_pending_batch_in_call_combiner(grpc_exec_ctx *exec_ctx,
+ void *arg, grpc_error *ignored) {
+ call_data *calld = arg;
+ if (calld->waiting_for_pick_batches_count > 0) {
+ --calld->waiting_for_pick_batches_count;
+ grpc_subchannel_call_process_op(
+ exec_ctx, calld->subchannel_call,
+ calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count]);
}
+}
+
+// This is called via the call combiner, so access to calld is synchronized.
+static void waiting_for_pick_batches_resume(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR
" pending batches to subchannel_call=%p",
- elem->channel_data, calld, calld->waiting_for_pick_batches_count,
- coe.subchannel_call);
+ chand, calld, calld->waiting_for_pick_batches_count,
+ calld->subchannel_call);
}
for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
- grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call,
- calld->waiting_for_pick_batches[i]);
- }
- calld->waiting_for_pick_batches_count = 0;
+ GRPC_CLOSURE_INIT(&calld->handle_pending_batch_in_call_combiner[i],
+ run_pending_batch_in_call_combiner, calld,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner,
+ &calld->handle_pending_batch_in_call_combiner[i],
+ GRPC_ERROR_NONE,
+ "waiting_for_pick_batches_resume");
+ }
+ GPR_ASSERT(calld->initial_metadata_batch != NULL);
+ grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call,
+ calld->initial_metadata_batch);
}
// Applies service config to the call. Must be invoked once we know
@@ -968,29 +983,28 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx,
static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_error *error) {
+ channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- grpc_subchannel_call *subchannel_call = NULL;
const grpc_connected_subchannel_call_args call_args = {
.pollent = calld->pollent,
.path = calld->path,
.start_time = calld->call_start_time,
.deadline = calld->deadline,
.arena = calld->arena,
- .context = calld->subchannel_call_context};
+ .context = calld->subchannel_call_context,
+ .call_combiner = calld->call_combiner};
grpc_error *new_error = grpc_connected_subchannel_create_call(
- exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
+ exec_ctx, calld->connected_subchannel, &call_args,
+ &calld->subchannel_call);
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
- elem->channel_data, calld, subchannel_call,
- grpc_error_string(new_error));
+ chand, calld, calld->subchannel_call, grpc_error_string(new_error));
}
- GPR_ASSERT(set_call_or_error(
- calld, (call_or_error){.subchannel_call = subchannel_call}));
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
- waiting_for_pick_batches_fail_locked(exec_ctx, elem, new_error);
+ waiting_for_pick_batches_fail(exec_ctx, elem, new_error);
} else {
- waiting_for_pick_batches_resume_locked(exec_ctx, elem);
+ waiting_for_pick_batches_resume(exec_ctx, elem);
}
GRPC_ERROR_UNREF(error);
}
@@ -1002,60 +1016,27 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx,
channel_data *chand = elem->channel_data;
grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
chand->interested_parties);
- call_or_error coe = get_call_or_error(calld);
if (calld->connected_subchannel == NULL) {
// Failed to create subchannel.
- grpc_error *failure =
- error == GRPC_ERROR_NONE
- ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Call dropped by load balancing policy")
- : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Failed to create subchannel", &error, 1);
+ GRPC_ERROR_UNREF(calld->error);
+ calld->error = error == GRPC_ERROR_NONE
+ ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Call dropped by load balancing policy")
+ : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Failed to create subchannel", &error, 1);
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG,
"chand=%p calld=%p: failed to create subchannel: error=%s", chand,
- calld, grpc_error_string(failure));
- }
- set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)});
- waiting_for_pick_batches_fail_locked(exec_ctx, elem, failure);
- } else if (coe.error != GRPC_ERROR_NONE) {
- /* already cancelled before subchannel became ready */
- grpc_error *child_errors[] = {error, coe.error};
- grpc_error *cancellation_error =
- GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Cancelled before creating subchannel", child_errors,
- GPR_ARRAY_SIZE(child_errors));
- /* if due to deadline, attach the deadline exceeded status to the error */
- if (gpr_time_cmp(calld->deadline, gpr_now(GPR_CLOCK_MONOTONIC)) < 0) {
- cancellation_error =
- grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS,
- GRPC_STATUS_DEADLINE_EXCEEDED);
+ calld, grpc_error_string(calld->error));
}
- if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
- gpr_log(GPR_DEBUG,
- "chand=%p calld=%p: cancelled before subchannel became ready: %s",
- chand, calld, grpc_error_string(cancellation_error));
- }
- waiting_for_pick_batches_fail_locked(exec_ctx, elem, cancellation_error);
+ waiting_for_pick_batches_fail(exec_ctx, elem, GRPC_ERROR_REF(calld->error));
} else {
/* Create call on subchannel. */
create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
}
- GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
GRPC_ERROR_UNREF(error);
}
-static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- grpc_subchannel_call *subchannel_call =
- get_call_or_error(calld).subchannel_call;
- if (subchannel_call == NULL) {
- return NULL;
- } else {
- return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
- }
-}
-
/** Return true if subchannel is available immediately (in which case
subchannel_ready_locked() should not be called), or false otherwise (in
which case subchannel_ready_locked() should be called when the subchannel
@@ -1069,6 +1050,44 @@ typedef struct {
grpc_closure closure;
} pick_after_resolver_result_args;
+// Note: This runs under the client_channel combiner, but will NOT be
+// holding the call combiner.
+static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx,
+ void *arg,
+ grpc_error *error) {
+ grpc_call_element *elem = arg;
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ // If we don't yet have a resolver result, then a closure for
+ // pick_after_resolver_result_done_locked() will have been added to
+ // chand->waiting_for_resolver_result_closures, and it may not be invoked
+ // until after this call has been destroyed. We mark the operation as
+ // cancelled, so that when pick_after_resolver_result_done_locked()
+ // is called, it will be a no-op. We also immediately invoke
+ // subchannel_ready_locked() to propagate the error back to the caller.
+ for (grpc_closure *closure = chand->waiting_for_resolver_result_closures.head;
+ closure != NULL; closure = closure->next_data.next) {
+ pick_after_resolver_result_args *args = closure->cb_arg;
+ if (!args->cancelled && args->elem == elem) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: "
+ "cancelling pick waiting for resolver result",
+ chand, calld);
+ }
+ args->cancelled = true;
+ // Note: Although we are not in the call combiner here, we are
+ // basically stealing the call combiner from the pending pick, so
+ // it's safe to call subchannel_ready_locked() here -- we are
+ // essentially calling it here instead of calling it in
+ // pick_after_resolver_result_done_locked().
+ subchannel_ready_locked(exec_ctx, elem,
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Pick cancelled", &error, 1));
+ }
+ }
+}
+
static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx,
void *arg,
grpc_error *error) {
@@ -1079,21 +1098,24 @@ static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_DEBUG, "call cancelled before resolver result");
}
} else {
- channel_data *chand = args->elem->channel_data;
- call_data *calld = args->elem->call_data;
+ grpc_call_element *elem = args->elem;
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner,
+ NULL);
if (error != GRPC_ERROR_NONE) {
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data",
chand, calld);
}
- subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error));
+ subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
} else {
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick",
chand, calld);
}
- if (pick_subchannel_locked(exec_ctx, args->elem)) {
- subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE);
+ if (pick_subchannel_locked(exec_ctx, elem)) {
+ subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_NONE);
}
}
}
@@ -1116,41 +1138,33 @@ static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx,
args, grpc_combiner_scheduler(chand->combiner));
grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
&args->closure, GRPC_ERROR_NONE);
+ grpc_call_combiner_set_notify_on_cancel(
+ exec_ctx, calld->call_combiner,
+ GRPC_CLOSURE_INIT(&calld->cancel_closure,
+ pick_after_resolver_result_cancel_locked, elem,
+ grpc_combiner_scheduler(chand->combiner)));
}
-static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_error *error) {
+// Note: This runs under the client_channel combiner, but will NOT be
+// holding the call combiner.
+static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_call_element *elem = arg;
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- // If we don't yet have a resolver result, then a closure for
- // pick_after_resolver_result_done_locked() will have been added to
- // chand->waiting_for_resolver_result_closures, and it may not be invoked
- // until after this call has been destroyed. We mark the operation as
- // cancelled, so that when pick_after_resolver_result_done_locked()
- // is called, it will be a no-op. We also immediately invoke
- // subchannel_ready_locked() to propagate the error back to the caller.
- for (grpc_closure *closure = chand->waiting_for_resolver_result_closures.head;
- closure != NULL; closure = closure->next_data.next) {
- pick_after_resolver_result_args *args = closure->cb_arg;
- if (!args->cancelled && args->elem == elem) {
- if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
- gpr_log(GPR_DEBUG,
- "chand=%p calld=%p: "
- "cancelling pick waiting for resolver result",
- chand, calld);
- }
- args->cancelled = true;
- subchannel_ready_locked(exec_ctx, elem,
- GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Pick cancelled", &error, 1));
+ if (calld->lb_policy != NULL) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p",
+ chand, calld, calld->lb_policy);
}
+ grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy,
+ &calld->connected_subchannel,
+ GRPC_ERROR_REF(error));
}
- GRPC_ERROR_UNREF(error);
}
// Callback invoked by grpc_lb_policy_pick_locked() for async picks.
-// Unrefs the LB policy after invoking subchannel_ready_locked().
+// Unrefs the LB policy and invokes subchannel_ready_locked().
static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_call_element *elem = arg;
@@ -1160,6 +1174,7 @@ static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously",
chand, calld);
}
+ grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, NULL);
GPR_ASSERT(calld->lb_policy != NULL);
GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel");
calld->lb_policy = NULL;
@@ -1194,24 +1209,15 @@ static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx,
}
GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel");
calld->lb_policy = NULL;
+ } else {
+ grpc_call_combiner_set_notify_on_cancel(
+ exec_ctx, calld->call_combiner,
+ GRPC_CLOSURE_INIT(&calld->cancel_closure, pick_callback_cancel_locked,
+ elem, grpc_combiner_scheduler(chand->combiner)));
}
return pick_done;
}
-static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_error *error) {
- channel_data *chand = elem->channel_data;
- call_data *calld = elem->call_data;
- GPR_ASSERT(calld->lb_policy != NULL);
- if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p",
- chand, calld, calld->lb_policy);
- }
- grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy,
- &calld->connected_subchannel, error);
-}
-
static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
GPR_TIMER_BEGIN("pick_subchannel", 0);
@@ -1224,7 +1230,7 @@ static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
// Otherwise, if the service config specified a value for this
// method, use that.
uint32_t initial_metadata_flags =
- calld->initial_metadata_payload->send_initial_metadata
+ calld->initial_metadata_batch->payload->send_initial_metadata
.send_initial_metadata_flags;
const bool wait_for_ready_set_from_api =
initial_metadata_flags &
@@ -1241,7 +1247,7 @@ static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
}
}
const grpc_lb_policy_pick_args inputs = {
- calld->initial_metadata_payload->send_initial_metadata
+ calld->initial_metadata_batch->payload->send_initial_metadata
.send_initial_metadata,
initial_metadata_flags, &calld->lb_token_mdelem};
pick_done = pick_callback_start_locked(exec_ctx, elem, &inputs);
@@ -1258,91 +1264,33 @@ static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
return pick_done;
}
-static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx,
- void *arg,
- grpc_error *error_ignored) {
- GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0);
- grpc_transport_stream_op_batch *batch = arg;
- grpc_call_element *elem = batch->handler_private.extra_arg;
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- /* need to recheck that another thread hasn't set the call */
- call_or_error coe = get_call_or_error(calld);
- if (coe.error != GRPC_ERROR_NONE) {
- if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s",
- chand, calld, grpc_error_string(coe.error));
- }
- grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, batch, GRPC_ERROR_REF(coe.error));
- goto done;
- }
- if (coe.subchannel_call != NULL) {
- if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
- gpr_log(GPR_DEBUG,
- "chand=%p calld=%p: sending batch to subchannel_call=%p", chand,
- calld, coe.subchannel_call);
- }
- grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch);
- goto done;
- }
- // Add to waiting-for-pick list. If we succeed in getting a
- // subchannel call below, we'll handle this batch (along with any
- // other waiting batches) in waiting_for_pick_batches_resume_locked().
- waiting_for_pick_batches_add_locked(calld, batch);
- // If this is a cancellation, cancel the pending pick (if any) and
- // fail any pending batches.
- if (batch->cancel_stream) {
- grpc_error *error = batch->payload->cancel_stream.cancel_error;
- if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand,
- calld, grpc_error_string(error));
- }
- /* Stash a copy of cancel_error in our call data, so that we can use
- it for subsequent operations. This ensures that if the call is
- cancelled before any batches are passed down (e.g., if the deadline
- is in the past when the call starts), we can return the right
- error to the caller when the first batch does get passed down. */
- set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)});
- if (calld->lb_policy != NULL) {
- pick_callback_cancel_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
+static void start_pick_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error_ignored) {
+ GPR_TIMER_BEGIN("start_pick_locked", 0);
+ grpc_call_element *elem = (grpc_call_element *)arg;
+ call_data *calld = (call_data *)elem->call_data;
+ channel_data *chand = (channel_data *)elem->channel_data;
+ GPR_ASSERT(calld->connected_subchannel == NULL);
+ if (pick_subchannel_locked(exec_ctx, elem)) {
+ // Pick was returned synchronously.
+ if (calld->connected_subchannel == NULL) {
+ GRPC_ERROR_UNREF(calld->error);
+ calld->error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Call dropped by load balancing policy");
+ waiting_for_pick_batches_fail(exec_ctx, elem,
+ GRPC_ERROR_REF(calld->error));
} else {
- pick_after_resolver_result_cancel_locked(exec_ctx, elem,
- GRPC_ERROR_REF(error));
- }
- waiting_for_pick_batches_fail_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
- goto done;
- }
- /* if we don't have a subchannel, try to get one */
- if (batch->send_initial_metadata) {
- GPR_ASSERT(calld->connected_subchannel == NULL);
- calld->initial_metadata_payload = batch->payload;
- GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
- /* If a subchannel is not available immediately, the polling entity from
- call_data should be provided to channel_data's interested_parties, so
- that IO of the lb_policy and resolver could be done under it. */
- if (pick_subchannel_locked(exec_ctx, elem)) {
- // Pick was returned synchronously.
- GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
- if (calld->connected_subchannel == NULL) {
- grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Call dropped by load balancing policy");
- set_call_or_error(calld,
- (call_or_error){.error = GRPC_ERROR_REF(error)});
- waiting_for_pick_batches_fail_locked(exec_ctx, elem, error);
- } else {
- // Create subchannel call.
- create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_NONE);
- }
- } else {
- grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
- chand->interested_parties);
+ // Create subchannel call.
+ create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_NONE);
}
+ } else {
+ // Pick will be done asynchronously. Add the call's polling entity to
+ // the channel's interested_parties, so that I/O for the resolver
+ // and LB policy can be done under it.
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
+ chand->interested_parties);
}
-done:
- GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
- "start_transport_stream_op_batch");
- GPR_TIMER_END("start_transport_stream_op_batch_locked", 0);
+ GPR_TIMER_END("start_pick_locked", 0);
}
static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
@@ -1365,27 +1313,49 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
GRPC_ERROR_REF(error));
}
-/* The logic here is fairly complicated, due to (a) the fact that we
- need to handle the case where we receive the send op before the
- initial metadata op, and (b) the need for efficiency, especially in
- the streaming case.
-
- We use double-checked locking to initially see if initialization has been
- performed. If it has not, we acquire the combiner and perform initialization.
- If it has, we proceed on the fast path. */
static void cc_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- if (GRPC_TRACER_ON(grpc_client_channel_trace) ||
- GRPC_TRACER_ON(grpc_trace_channel)) {
- grpc_call_log_op(GPR_INFO, elem, batch);
- }
if (chand->deadline_checking_enabled) {
grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
batch);
}
+ GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
+ // If we've previously been cancelled, immediately fail any new batches.
+ if (calld->error != GRPC_ERROR_NONE) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s",
+ chand, calld, grpc_error_string(calld->error));
+ }
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, batch, GRPC_ERROR_REF(calld->error), calld->call_combiner);
+ goto done;
+ }
+ if (batch->cancel_stream) {
+ // Stash a copy of cancel_error in our call data, so that we can use
+ // it for subsequent operations. This ensures that if the call is
+ // cancelled before any batches are passed down (e.g., if the deadline
+ // is in the past when the call starts), we can return the right
+ // error to the caller when the first batch does get passed down.
+ GRPC_ERROR_UNREF(calld->error);
+ calld->error = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand,
+ calld, grpc_error_string(calld->error));
+ }
+ // If we have a subchannel call, send the cancellation batch down.
+ // Otherwise, fail all pending batches.
+ if (calld->subchannel_call != NULL) {
+ grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, batch);
+ } else {
+ waiting_for_pick_batches_add(calld, batch);
+ waiting_for_pick_batches_fail(exec_ctx, elem,
+ GRPC_ERROR_REF(calld->error));
+ }
+ goto done;
+ }
// Intercept on_complete for recv_trailing_metadata so that we can
// check retry throttle status.
if (batch->recv_trailing_metadata) {
@@ -1395,38 +1365,43 @@ static void cc_start_transport_stream_op_batch(
grpc_schedule_on_exec_ctx);
batch->on_complete = &calld->on_complete;
}
- /* try to (atomically) get the call */
- call_or_error coe = get_call_or_error(calld);
- GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
- if (coe.error != GRPC_ERROR_NONE) {
+ // Check if we've already gotten a subchannel call.
+ // Note that once we have completed the pick, we do not need to enter
+ // the channel combiner, which is more efficient (especially for
+ // streaming calls).
+ if (calld->subchannel_call != NULL) {
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s",
- chand, calld, grpc_error_string(coe.error));
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: sending batch to subchannel_call=%p", chand,
+ calld, calld->subchannel_call);
}
- grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, batch, GRPC_ERROR_REF(coe.error));
+ grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, batch);
goto done;
}
- if (coe.subchannel_call != NULL) {
+ // We do not yet have a subchannel call.
+ // Add the batch to the waiting-for-pick list.
+ waiting_for_pick_batches_add(calld, batch);
+ // For batches containing a send_initial_metadata op, enter the channel
+ // combiner to start a pick.
+ if (batch->send_initial_metadata) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering combiner", chand, calld);
+ }
+ GRPC_CLOSURE_SCHED(
+ exec_ctx,
+ GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
+ elem, grpc_combiner_scheduler(chand->combiner)),
+ GRPC_ERROR_NONE);
+ } else {
+ // For all other batches, release the call combiner.
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG,
- "chand=%p calld=%p: sending batch to subchannel_call=%p", chand,
- calld, coe.subchannel_call);
+ "chand=%p calld=%p: saved batch, yeilding call combiner", chand,
+ calld);
}
- grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch);
- goto done;
- }
- /* we failed; lock and figure out what to do */
- if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering combiner", chand, calld);
+ GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner,
+ "batch does not include send_initial_metadata");
}
- GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch");
- batch->handler_private.extra_arg = elem;
- GRPC_CLOSURE_SCHED(
- exec_ctx, GRPC_CLOSURE_INIT(&batch->handler_private.closure,
- start_transport_stream_op_batch_locked, batch,
- grpc_combiner_scheduler(chand->combiner)),
- GRPC_ERROR_NONE);
done:
GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
}
@@ -1441,10 +1416,11 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
calld->path = grpc_slice_ref_internal(args->path);
calld->call_start_time = args->start_time;
calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
- calld->owning_call = args->call_stack;
calld->arena = args->arena;
+ calld->call_combiner = args->call_combiner;
if (chand->deadline_checking_enabled) {
- grpc_deadline_state_init(exec_ctx, elem, args->call_stack, calld->deadline);
+ grpc_deadline_state_init(exec_ctx, elem, args->call_stack,
+ args->call_combiner, calld->deadline);
}
return GRPC_ERROR_NONE;
}
@@ -1463,13 +1439,12 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
if (calld->method_params != NULL) {
method_parameters_unref(calld->method_params);
}
- call_or_error coe = get_call_or_error(calld);
- GRPC_ERROR_UNREF(coe.error);
- if (coe.subchannel_call != NULL) {
- grpc_subchannel_call_set_cleanup_closure(coe.subchannel_call,
+ GRPC_ERROR_UNREF(calld->error);
+ if (calld->subchannel_call != NULL) {
+ grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call,
then_schedule_closure);
then_schedule_closure = NULL;
- GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, coe.subchannel_call,
+ GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, calld->subchannel_call,
"client_channel_destroy_call");
}
GPR_ASSERT(calld->lb_policy == NULL);
@@ -1508,7 +1483,6 @@ const grpc_channel_filter grpc_client_channel_filter = {
sizeof(channel_data),
cc_init_channel_elem,
cc_destroy_channel_elem,
- cc_get_peer,
cc_get_channel_info,
"client-channel",
};
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
index 568bb2ba8d..299f26b4de 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
@@ -132,6 +132,5 @@ const grpc_channel_filter grpc_client_load_reporting_filter = {
0, // sizeof(channel_data)
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"client_load_reporting"};
diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c
index 5788819331..5cc8be7628 100644
--- a/src/core/ext/filters/client_channel/subchannel.c
+++ b/src/core/ext/filters/client_channel/subchannel.c
@@ -724,13 +724,6 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
GRPC_CALL_STACK_UNREF(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
}
-char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_call *call) {
- grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
- grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
- return top_elem->filter->get_peer(exec_ctx, top_elem);
-}
-
void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *call,
grpc_transport_stream_op_batch *op) {
@@ -760,13 +753,15 @@ grpc_error *grpc_connected_subchannel_create_call(
args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
(*call)->connection = GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
- const grpc_call_element_args call_args = {.call_stack = callstk,
- .server_transport_data = NULL,
- .context = args->context,
- .path = args->path,
- .start_time = args->start_time,
- .deadline = args->deadline,
- .arena = args->arena};
+ const grpc_call_element_args call_args = {
+ .call_stack = callstk,
+ .server_transport_data = NULL,
+ .context = args->context,
+ .path = args->path,
+ .start_time = args->start_time,
+ .deadline = args->deadline,
+ .arena = args->arena,
+ .call_combiner = args->call_combiner};
grpc_error *error = grpc_call_stack_init(
exec_ctx, chanstk, 1, subchannel_call_destroy, *call, &call_args);
if (error != GRPC_ERROR_NONE) {
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index 6d2abb04df..51d712f6a7 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -106,6 +106,7 @@ typedef struct {
gpr_timespec deadline;
gpr_arena *arena;
grpc_call_context_element *context;
+ grpc_call_combiner *call_combiner;
} grpc_connected_subchannel_call_args;
grpc_error *grpc_connected_subchannel_create_call(
@@ -150,10 +151,6 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *subchannel_call,
grpc_transport_stream_op_batch *op);
-/** continue querying for peer */
-char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_call *subchannel_call);
-
/** Must be called once per call. Sets the 'then_schedule_closure' argument for
call stack destruction. */
void grpc_subchannel_call_set_cleanup_closure(
diff --git a/src/core/ext/filters/deadline/deadline_filter.c b/src/core/ext/filters/deadline/deadline_filter.c
index 6789903c95..565b0679dc 100644
--- a/src/core/ext/filters/deadline/deadline_filter.c
+++ b/src/core/ext/filters/deadline/deadline_filter.c
@@ -34,22 +34,56 @@
// grpc_deadline_state
//
+// The on_complete callback used when sending a cancel_error batch down the
+// filter stack. Yields the call combiner when the batch returns.
+static void yield_call_combiner(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* ignored) {
+ grpc_deadline_state* deadline_state = arg;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, deadline_state->call_combiner,
+ "got on_complete from cancel_stream batch");
+ GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer");
+}
+
+// This is called via the call combiner, so access to deadline_state is
+// synchronized.
+static void send_cancel_op_in_call_combiner(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_call_element* elem = arg;
+ grpc_deadline_state* deadline_state = elem->call_data;
+ grpc_transport_stream_op_batch* batch = grpc_make_transport_stream_op(
+ GRPC_CLOSURE_INIT(&deadline_state->timer_callback, yield_call_combiner,
+ deadline_state, grpc_schedule_on_exec_ctx));
+ batch->cancel_stream = true;
+ batch->payload->cancel_stream.cancel_error = GRPC_ERROR_REF(error);
+ elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch);
+}
+
// Timer callback.
static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
grpc_call_element* elem = (grpc_call_element*)arg;
grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
if (error != GRPC_ERROR_CANCELLED) {
- grpc_call_element_signal_error(
- exec_ctx, elem,
- grpc_error_set_int(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED));
+ error = grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED);
+ grpc_call_combiner_cancel(exec_ctx, deadline_state->call_combiner,
+ GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_INIT(&deadline_state->timer_callback,
+ send_cancel_op_in_call_combiner, elem,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CALL_COMBINER_START(exec_ctx, deadline_state->call_combiner,
+ &deadline_state->timer_callback, error,
+ "deadline exceeded -- sending cancel_stream op");
+ } else {
+ GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack,
+ "deadline_timer");
}
- GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer");
}
// Starts the deadline timer.
+// This is called via the call combiner, so access to deadline_state is
+// synchronized.
static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
gpr_timespec deadline) {
@@ -58,51 +92,39 @@ static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
return;
}
grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
- grpc_deadline_timer_state cur_state;
grpc_closure* closure = NULL;
-retry:
- cur_state =
- (grpc_deadline_timer_state)gpr_atm_acq_load(&deadline_state->timer_state);
- switch (cur_state) {
+ switch (deadline_state->timer_state) {
case GRPC_DEADLINE_STATE_PENDING:
// Note: We do not start the timer if there is already a timer
return;
case GRPC_DEADLINE_STATE_FINISHED:
- if (gpr_atm_rel_cas(&deadline_state->timer_state,
- GRPC_DEADLINE_STATE_FINISHED,
- GRPC_DEADLINE_STATE_PENDING)) {
- // If we've already created and destroyed a timer, we always create a
- // new closure: we have no other guarantee that the inlined closure is
- // not in use (it may hold a pending call to timer_callback)
- closure = GRPC_CLOSURE_CREATE(timer_callback, elem,
- grpc_schedule_on_exec_ctx);
- } else {
- goto retry;
- }
+ deadline_state->timer_state = GRPC_DEADLINE_STATE_PENDING;
+ // If we've already created and destroyed a timer, we always create a
+ // new closure: we have no other guarantee that the inlined closure is
+ // not in use (it may hold a pending call to timer_callback)
+ closure =
+ GRPC_CLOSURE_CREATE(timer_callback, elem, grpc_schedule_on_exec_ctx);
break;
case GRPC_DEADLINE_STATE_INITIAL:
- if (gpr_atm_rel_cas(&deadline_state->timer_state,
- GRPC_DEADLINE_STATE_INITIAL,
- GRPC_DEADLINE_STATE_PENDING)) {
- closure =
- GRPC_CLOSURE_INIT(&deadline_state->timer_callback, timer_callback,
- elem, grpc_schedule_on_exec_ctx);
- } else {
- goto retry;
- }
+ deadline_state->timer_state = GRPC_DEADLINE_STATE_PENDING;
+ closure =
+ GRPC_CLOSURE_INIT(&deadline_state->timer_callback, timer_callback,
+ elem, grpc_schedule_on_exec_ctx);
break;
}
- GPR_ASSERT(closure);
+ GPR_ASSERT(closure != NULL);
GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, closure,
gpr_now(GPR_CLOCK_MONOTONIC));
}
// Cancels the deadline timer.
+// This is called via the call combiner, so access to deadline_state is
+// synchronized.
static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
grpc_deadline_state* deadline_state) {
- if (gpr_atm_rel_cas(&deadline_state->timer_state, GRPC_DEADLINE_STATE_PENDING,
- GRPC_DEADLINE_STATE_FINISHED)) {
+ if (deadline_state->timer_state == GRPC_DEADLINE_STATE_PENDING) {
+ deadline_state->timer_state = GRPC_DEADLINE_STATE_FINISHED;
grpc_timer_cancel(exec_ctx, &deadline_state->timer);
} else {
// timer was either in STATE_INITAL (nothing to cancel)
@@ -131,6 +153,7 @@ static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
// Callback and associated state for starting the timer after call stack
// initialization has been completed.
struct start_timer_after_init_state {
+ bool in_call_combiner;
grpc_call_element* elem;
gpr_timespec deadline;
grpc_closure closure;
@@ -138,15 +161,29 @@ struct start_timer_after_init_state {
static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
struct start_timer_after_init_state* state = arg;
+ grpc_deadline_state* deadline_state = state->elem->call_data;
+ if (!state->in_call_combiner) {
+ // We are initially called without holding the call combiner, so we
+ // need to bounce ourselves into it.
+ state->in_call_combiner = true;
+ GRPC_CALL_COMBINER_START(exec_ctx, deadline_state->call_combiner,
+ &state->closure, GRPC_ERROR_REF(error),
+ "scheduling deadline timer");
+ return;
+ }
start_timer_if_needed(exec_ctx, state->elem, state->deadline);
gpr_free(state);
+ GRPC_CALL_COMBINER_STOP(exec_ctx, deadline_state->call_combiner,
+ "done scheduling deadline timer");
}
void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_call_stack* call_stack,
+ grpc_call_combiner* call_combiner,
gpr_timespec deadline) {
grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
deadline_state->call_stack = call_stack;
+ deadline_state->call_combiner = call_combiner;
// Deadline will always be infinite on servers, so the timer will only be
// set on clients with a finite deadline.
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -158,7 +195,7 @@ void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
// call stack initialization is finished. To avoid that problem, we
// create a closure to start the timer, and we schedule that closure
// to be run after call stack initialization is done.
- struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state));
+ struct start_timer_after_init_state* state = gpr_zalloc(sizeof(*state));
state->elem = elem;
state->deadline = deadline;
GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state,
@@ -232,7 +269,8 @@ typedef struct server_call_data {
static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
const grpc_call_element_args* args) {
- grpc_deadline_state_init(exec_ctx, elem, args->call_stack, args->deadline);
+ grpc_deadline_state_init(exec_ctx, elem, args->call_stack,
+ args->call_combiner, args->deadline);
return GRPC_ERROR_NONE;
}
@@ -310,7 +348,6 @@ const grpc_channel_filter grpc_client_deadline_filter = {
0, // sizeof(channel_data)
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"deadline",
};
@@ -325,7 +362,6 @@ const grpc_channel_filter grpc_server_deadline_filter = {
0, // sizeof(channel_data)
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"deadline",
};
diff --git a/src/core/ext/filters/deadline/deadline_filter.h b/src/core/ext/filters/deadline/deadline_filter.h
index 420bf7065a..3eb102ad28 100644
--- a/src/core/ext/filters/deadline/deadline_filter.h
+++ b/src/core/ext/filters/deadline/deadline_filter.h
@@ -31,7 +31,8 @@ typedef enum grpc_deadline_timer_state {
typedef struct grpc_deadline_state {
// We take a reference to the call stack for the timer callback.
grpc_call_stack* call_stack;
- gpr_atm timer_state;
+ grpc_call_combiner* call_combiner;
+ grpc_deadline_timer_state timer_state;
grpc_timer timer;
grpc_closure timer_callback;
// Closure to invoke when the call is complete.
@@ -50,6 +51,7 @@ typedef struct grpc_deadline_state {
// assumes elem->call_data is zero'd
void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_call_stack* call_stack,
+ grpc_call_combiner* call_combiner,
gpr_timespec deadline);
void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem);
@@ -61,6 +63,8 @@ void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
// to ensure that the timer callback is not invoked while it is in the
// process of being reset, which means that attempting to increase the
// deadline may result in the timer being called twice.
+//
+// Note: Must be called while holding the call combiner.
void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
gpr_timespec new_deadline);
@@ -70,6 +74,8 @@ void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
//
// Note: It is the caller's responsibility to chain to the next filter if
// necessary after this function returns.
+//
+// Note: Must be called while holding the call combiner.
void grpc_deadline_state_client_start_transport_stream_op_batch(
grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_transport_stream_op_batch* op);
diff --git a/src/core/ext/filters/http/client/http_client_filter.c b/src/core/ext/filters/http/client/http_client_filter.c
index 3ca01a41b5..99ddd08e6a 100644
--- a/src/core/ext/filters/http/client/http_client_filter.c
+++ b/src/core/ext/filters/http/client/http_client_filter.c
@@ -36,6 +36,7 @@
static const size_t kMaxPayloadSizeForGet = 2048;
typedef struct call_data {
+ grpc_call_combiner *call_combiner;
// State for handling send_initial_metadata ops.
grpc_linked_mdelem method;
grpc_linked_mdelem scheme;
@@ -215,13 +216,13 @@ static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg,
call_data *calld = (call_data *)elem->call_data;
if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->send_message_batch, error);
+ exec_ctx, calld->send_message_batch, error, calld->call_combiner);
return;
}
error = pull_slice_from_send_message(exec_ctx, calld);
if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->send_message_batch, error);
+ exec_ctx, calld->send_message_batch, error, calld->call_combiner);
return;
}
// There may or may not be more to read, but we don't care. If we got
@@ -414,7 +415,7 @@ static void hc_start_transport_stream_op_batch(
done:
if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->send_message_batch, error);
+ exec_ctx, calld->send_message_batch, error, calld->call_combiner);
} else if (!batch_will_be_handled_asynchronously) {
grpc_call_next_op(exec_ctx, elem, batch);
}
@@ -426,6 +427,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
call_data *calld = (call_data *)elem->call_data;
+ calld->call_combiner = args->call_combiner;
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
@@ -565,6 +567,5 @@ const grpc_channel_filter grpc_http_client_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"http-client"};
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c
index a32819bfe4..98a503cafc 100644
--- a/src/core/ext/filters/http/message_compress/message_compress_filter.c
+++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c
@@ -35,35 +35,29 @@
#include "src/core/lib/surface/call.h"
#include "src/core/lib/transport/static_metadata.h"
-#define INITIAL_METADATA_UNSEEN 0
-#define HAS_COMPRESSION_ALGORITHM 2
-#define NO_COMPRESSION_ALGORITHM 4
-
-#define CANCELLED_BIT ((gpr_atm)1)
+typedef enum {
+ // Initial metadata not yet seen.
+ INITIAL_METADATA_UNSEEN = 0,
+ // Initial metadata seen; compression algorithm set.
+ HAS_COMPRESSION_ALGORITHM,
+ // Initial metadata seen; no compression algorithm set.
+ NO_COMPRESSION_ALGORITHM,
+} initial_metadata_state;
typedef struct call_data {
- grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */
+ grpc_call_combiner *call_combiner;
grpc_linked_mdelem compression_algorithm_storage;
grpc_linked_mdelem stream_compression_algorithm_storage;
grpc_linked_mdelem accept_encoding_storage;
grpc_linked_mdelem accept_stream_encoding_storage;
- uint32_t remaining_slice_bytes;
/** Compression algorithm we'll try to use. It may be given by incoming
* metadata, or by the channel's default compression settings. */
grpc_compression_algorithm compression_algorithm;
-
- /* Atomic recording the state of initial metadata; allowed values:
- INITIAL_METADATA_UNSEEN - initial metadata op not seen
- HAS_COMPRESSION_ALGORITHM - initial metadata seen; compression algorithm
- set
- NO_COMPRESSION_ALGORITHM - initial metadata seen; no compression algorithm
- set
- pointer - a stalled op containing a send_message that's waiting on initial
- metadata
- pointer | CANCELLED_BIT - request was cancelled with error pointed to */
- gpr_atm send_initial_metadata_state;
-
+ initial_metadata_state send_initial_metadata_state;
+ grpc_error *cancel_error;
+ grpc_closure start_send_message_batch_in_call_combiner;
grpc_transport_stream_op_batch *send_message_batch;
+ grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */
grpc_slice_buffer_stream replacement_stream;
grpc_closure *original_send_message_on_complete;
grpc_closure send_message_on_complete;
@@ -92,13 +86,13 @@ static bool skip_compression(grpc_call_element *elem, uint32_t flags,
channel_data *channeld = elem->channel_data;
if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) {
- return 1;
+ return true;
}
if (has_compression_algorithm) {
if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
- return 1;
+ return true;
}
- return 0; /* we have an actual call-specific algorithm */
+ return false; /* we have an actual call-specific algorithm */
}
/* no per-call compression override */
return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
@@ -226,6 +220,18 @@ static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_REF(error));
}
+static void send_message_batch_continue(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ call_data *calld = (call_data *)elem->call_data;
+ // Note: The call to grpc_call_next_op() results in yielding the
+ // call combiner, so we need to clear calld->send_message_batch
+ // before we do that.
+ grpc_transport_stream_op_batch *send_message_batch =
+ calld->send_message_batch;
+ calld->send_message_batch = NULL;
+ grpc_call_next_op(exec_ctx, elem, send_message_batch);
+}
+
static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
call_data *calld = (call_data *)elem->call_data;
@@ -234,8 +240,8 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_init(&tmp);
uint32_t send_flags =
calld->send_message_batch->payload->send_message.send_message->flags;
- const bool did_compress = grpc_msg_compress(
- exec_ctx, calld->compression_algorithm, &calld->slices, &tmp);
+ bool did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm,
+ &calld->slices, &tmp);
if (did_compress) {
if (GRPC_TRACER_ON(grpc_compression_trace)) {
char *algo_name;
@@ -273,7 +279,19 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
calld->original_send_message_on_complete =
calld->send_message_batch->on_complete;
calld->send_message_batch->on_complete = &calld->send_message_on_complete;
- grpc_call_next_op(exec_ctx, elem, calld->send_message_batch);
+ send_message_batch_continue(exec_ctx, elem);
+}
+
+static void fail_send_message_batch_in_call_combiner(grpc_exec_ctx *exec_ctx,
+ void *arg,
+ grpc_error *error) {
+ call_data *calld = arg;
+ if (calld->send_message_batch != NULL) {
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, calld->send_message_batch, GRPC_ERROR_REF(error),
+ calld->call_combiner);
+ calld->send_message_batch = NULL;
+ }
}
// Pulls a slice from the send_message byte stream and adds it to calld->slices.
@@ -293,21 +311,25 @@ static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx,
// If all data has been read, invokes finish_send_message(). Otherwise,
// an async call to grpc_byte_stream_next() has been started, which will
// eventually result in calling on_send_message_next_done().
-static grpc_error *continue_reading_send_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
+static void continue_reading_send_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
call_data *calld = (call_data *)elem->call_data;
while (grpc_byte_stream_next(
exec_ctx, calld->send_message_batch->payload->send_message.send_message,
~(size_t)0, &calld->on_send_message_next_done)) {
grpc_error *error = pull_slice_from_send_message(exec_ctx, calld);
- if (error != GRPC_ERROR_NONE) return error;
+ if (error != GRPC_ERROR_NONE) {
+ // Closure callback; does not take ownership of error.
+ fail_send_message_batch_in_call_combiner(exec_ctx, calld, error);
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
if (calld->slices.length ==
calld->send_message_batch->payload->send_message.send_message->length) {
finish_send_message(exec_ctx, elem);
break;
}
}
- return GRPC_ERROR_NONE;
}
// Async callback for grpc_byte_stream_next().
@@ -315,46 +337,37 @@ static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_call_element *elem = (grpc_call_element *)arg;
call_data *calld = (call_data *)elem->call_data;
- if (error != GRPC_ERROR_NONE) goto fail;
+ if (error != GRPC_ERROR_NONE) {
+ // Closure callback; does not take ownership of error.
+ fail_send_message_batch_in_call_combiner(exec_ctx, calld, error);
+ return;
+ }
error = pull_slice_from_send_message(exec_ctx, calld);
- if (error != GRPC_ERROR_NONE) goto fail;
+ if (error != GRPC_ERROR_NONE) {
+ // Closure callback; does not take ownership of error.
+ fail_send_message_batch_in_call_combiner(exec_ctx, calld, error);
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
if (calld->slices.length ==
calld->send_message_batch->payload->send_message.send_message->length) {
finish_send_message(exec_ctx, elem);
} else {
- // This will either finish reading all of the data and invoke
- // finish_send_message(), or else it will make an async call to
- // grpc_byte_stream_next(), which will eventually result in calling
- // this function again.
- error = continue_reading_send_message(exec_ctx, elem);
- if (error != GRPC_ERROR_NONE) goto fail;
+ continue_reading_send_message(exec_ctx, elem);
}
- return;
-fail:
- grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->send_message_batch, error);
}
-static void start_send_message_batch(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op_batch *batch,
- bool has_compression_algorithm) {
+static void start_send_message_batch(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *unused) {
+ grpc_call_element *elem = (grpc_call_element *)arg;
call_data *calld = (call_data *)elem->call_data;
- if (!skip_compression(elem, batch->payload->send_message.send_message->flags,
- has_compression_algorithm)) {
- calld->send_message_batch = batch;
- // This will either finish reading all of the data and invoke
- // finish_send_message(), or else it will make an async call to
- // grpc_byte_stream_next(), which will eventually result in calling
- // on_send_message_next_done().
- grpc_error *error = continue_reading_send_message(exec_ctx, elem);
- if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->send_message_batch, error);
- }
+ if (skip_compression(
+ elem,
+ calld->send_message_batch->payload->send_message.send_message->flags,
+ calld->send_initial_metadata_state == HAS_COMPRESSION_ALGORITHM)) {
+ send_message_batch_continue(exec_ctx, elem);
} else {
- /* pass control down the stack */
- grpc_call_next_op(exec_ctx, elem, batch);
+ continue_reading_send_message(exec_ctx, elem);
}
}
@@ -362,95 +375,80 @@ static void compress_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
-
GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0);
-
+ // Handle cancel_stream.
if (batch->cancel_stream) {
- // TODO(roth): As part of the upcoming call combiner work, change
- // this to call grpc_byte_stream_shutdown() on the incoming byte
- // stream, to cancel any in-flight calls to grpc_byte_stream_next().
- GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
- gpr_atm cur = gpr_atm_full_xchg(
- &calld->send_initial_metadata_state,
- CANCELLED_BIT | (gpr_atm)batch->payload->cancel_stream.cancel_error);
- switch (cur) {
- case HAS_COMPRESSION_ALGORITHM:
- case NO_COMPRESSION_ALGORITHM:
- case INITIAL_METADATA_UNSEEN:
- break;
- default:
- if ((cur & CANCELLED_BIT) == 0) {
- grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, (grpc_transport_stream_op_batch *)cur,
- GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error));
- } else {
- GRPC_ERROR_UNREF((grpc_error *)(cur & ~CANCELLED_BIT));
- }
- break;
+ GRPC_ERROR_UNREF(calld->cancel_error);
+ calld->cancel_error =
+ GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
+ if (calld->send_message_batch != NULL) {
+ if (calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN) {
+ GRPC_CALL_COMBINER_START(
+ exec_ctx, calld->call_combiner,
+ GRPC_CLOSURE_CREATE(fail_send_message_batch_in_call_combiner, calld,
+ grpc_schedule_on_exec_ctx),
+ GRPC_ERROR_REF(calld->cancel_error), "failing send_message op");
+ } else {
+ grpc_byte_stream_shutdown(
+ exec_ctx,
+ calld->send_message_batch->payload->send_message.send_message,
+ GRPC_ERROR_REF(calld->cancel_error));
+ }
}
+ } else if (calld->cancel_error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, batch, GRPC_ERROR_REF(calld->cancel_error),
+ calld->call_combiner);
+ goto done;
}
-
+ // Handle send_initial_metadata.
if (batch->send_initial_metadata) {
+ GPR_ASSERT(calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN);
bool has_compression_algorithm;
grpc_error *error = process_send_initial_metadata(
exec_ctx, elem,
batch->payload->send_initial_metadata.send_initial_metadata,
&has_compression_algorithm);
if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch,
- error);
- return;
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error,
+ calld->call_combiner);
+ goto done;
}
- gpr_atm cur;
- retry_send_im:
- cur = gpr_atm_acq_load(&calld->send_initial_metadata_state);
- GPR_ASSERT(cur != HAS_COMPRESSION_ALGORITHM &&
- cur != NO_COMPRESSION_ALGORITHM);
- if ((cur & CANCELLED_BIT) == 0) {
- if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur,
- has_compression_algorithm
- ? HAS_COMPRESSION_ALGORITHM
- : NO_COMPRESSION_ALGORITHM)) {
- goto retry_send_im;
- }
- if (cur != INITIAL_METADATA_UNSEEN) {
- start_send_message_batch(exec_ctx, elem,
- (grpc_transport_stream_op_batch *)cur,
- has_compression_algorithm);
- }
+ calld->send_initial_metadata_state = has_compression_algorithm
+ ? HAS_COMPRESSION_ALGORITHM
+ : NO_COMPRESSION_ALGORITHM;
+ // If we had previously received a batch containing a send_message op,
+ // handle it now. Note that we need to re-enter the call combiner
+ // for this, since we can't send two batches down while holding the
+ // call combiner, since the connected_channel filter (at the bottom of
+ // the call stack) will release the call combiner for each batch it sees.
+ if (calld->send_message_batch != NULL) {
+ GRPC_CALL_COMBINER_START(
+ exec_ctx, calld->call_combiner,
+ &calld->start_send_message_batch_in_call_combiner, GRPC_ERROR_NONE,
+ "starting send_message after send_initial_metadata");
}
}
+ // Handle send_message.
if (batch->send_message) {
- gpr_atm cur;
- retry_send:
- cur = gpr_atm_acq_load(&calld->send_initial_metadata_state);
- switch (cur) {
- case INITIAL_METADATA_UNSEEN:
- if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur,
- (gpr_atm)batch)) {
- goto retry_send;
- }
- break;
- case HAS_COMPRESSION_ALGORITHM:
- case NO_COMPRESSION_ALGORITHM:
- start_send_message_batch(exec_ctx, elem, batch,
- cur == HAS_COMPRESSION_ALGORITHM);
- break;
- default:
- if (cur & CANCELLED_BIT) {
- grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, batch,
- GRPC_ERROR_REF((grpc_error *)(cur & ~CANCELLED_BIT)));
- } else {
- /* >1 send_message concurrently */
- GPR_UNREACHABLE_CODE(break);
- }
+ GPR_ASSERT(calld->send_message_batch == NULL);
+ calld->send_message_batch = batch;
+ // If we have not yet seen send_initial_metadata, then we have to
+ // wait. We save the batch in calld and then drop the call
+ // combiner, which we'll have to pick up again later when we get
+ // send_initial_metadata.
+ if (calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN) {
+ GRPC_CALL_COMBINER_STOP(
+ exec_ctx, calld->call_combiner,
+ "send_message batch pending send_initial_metadata");
+ goto done;
}
+ start_send_message_batch(exec_ctx, elem, GRPC_ERROR_NONE);
} else {
- /* pass control down the stack */
+ // Pass control down the stack.
grpc_call_next_op(exec_ctx, elem, batch);
}
-
+done:
GPR_TIMER_END("compress_start_transport_stream_op_batch", 0);
}
@@ -458,16 +456,16 @@ static void compress_start_transport_stream_op_batch(
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
-
- /* initialize members */
+ call_data *calld = (call_data *)elem->call_data;
+ calld->call_combiner = args->call_combiner;
+ calld->cancel_error = GRPC_ERROR_NONE;
grpc_slice_buffer_init(&calld->slices);
+ GRPC_CLOSURE_INIT(&calld->start_send_message_batch_in_call_combiner,
+ start_send_message_batch, elem, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->on_send_message_next_done,
on_send_message_next_done, elem, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
elem, grpc_schedule_on_exec_ctx);
-
return GRPC_ERROR_NONE;
}
@@ -475,14 +473,9 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
grpc_closure *ignored) {
- /* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices);
- gpr_atm imstate =
- gpr_atm_no_barrier_load(&calld->send_initial_metadata_state);
- if (imstate & CANCELLED_BIT) {
- GRPC_ERROR_UNREF((grpc_error *)(imstate & ~CANCELLED_BIT));
- }
+ GRPC_ERROR_UNREF(calld->cancel_error);
}
/* Constructor for channel_data */
@@ -550,6 +543,5 @@ const grpc_channel_filter grpc_message_compress_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
- "compress"};
+ "message_compress"};
diff --git a/src/core/ext/filters/http/server/http_server_filter.c b/src/core/ext/filters/http/server/http_server_filter.c
index b145f12aff..a10e69ba59 100644
--- a/src/core/ext/filters/http/server/http_server_filter.c
+++ b/src/core/ext/filters/http/server/http_server_filter.c
@@ -32,6 +32,8 @@
#define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1
typedef struct call_data {
+ grpc_call_combiner *call_combiner;
+
grpc_linked_mdelem status;
grpc_linked_mdelem content_type;
@@ -281,7 +283,11 @@ static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data,
*calld->pp_recv_message = calld->payload_bin_delivered
? NULL
: (grpc_byte_stream *)&calld->read_stream;
- GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err));
+ // Re-enter call combiner for recv_message_ready, since the surface
+ // code will release the call combiner for each callback it receives.
+ GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner,
+ calld->recv_message_ready, GRPC_ERROR_REF(err),
+ "resuming recv_message_ready from on_complete");
calld->recv_message_ready = NULL;
calld->payload_bin_delivered = true;
}
@@ -293,15 +299,20 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (calld->seen_path_with_query) {
- /* do nothing. This is probably a GET request, and payload will be returned
- in hs_on_complete callback. */
+ // Do nothing. This is probably a GET request, and payload will be
+ // returned in hs_on_complete callback.
+ // Note that we release the call combiner here, so that other
+ // callbacks can run.
+ GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner,
+ "pausing recv_message_ready until on_complete");
} else {
GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err));
}
}
-static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
+static grpc_error *hs_mutate_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
@@ -323,10 +334,7 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
server_filter_outgoing_metadata(
exec_ctx, elem,
op->payload->send_initial_metadata.send_initial_metadata));
- if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
- return;
- }
+ if (error != GRPC_ERROR_NONE) return error;
}
if (op->recv_initial_metadata) {
@@ -359,21 +367,25 @@ static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_error *error = server_filter_outgoing_metadata(
exec_ctx, elem,
op->payload->send_trailing_metadata.send_trailing_metadata);
- if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
- return;
- }
+ if (error != GRPC_ERROR_NONE) return error;
}
+
+ return GRPC_ERROR_NONE;
}
-static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- GPR_TIMER_BEGIN("hs_start_transport_op", 0);
- hs_mutate_op(exec_ctx, elem, op);
- grpc_call_next_op(exec_ctx, elem, op);
- GPR_TIMER_END("hs_start_transport_op", 0);
+static void hs_start_transport_stream_op_batch(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op) {
+ call_data *calld = elem->call_data;
+ GPR_TIMER_BEGIN("hs_start_transport_stream_op_batch", 0);
+ grpc_error *error = hs_mutate_op(exec_ctx, elem, op);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error,
+ calld->call_combiner);
+ } else {
+ grpc_call_next_op(exec_ctx, elem, op);
+ }
+ GPR_TIMER_END("hs_start_transport_stream_op_batch", 0);
}
/* Constructor for call_data */
@@ -383,6 +395,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
/* initialize members */
+ calld->call_combiner = args->call_combiner;
GRPC_CLOSURE_INIT(&calld->hs_on_recv, hs_on_recv, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->hs_on_complete, hs_on_complete, elem,
@@ -414,7 +427,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {}
const grpc_channel_filter grpc_http_server_filter = {
- hs_start_transport_op,
+ hs_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
@@ -423,6 +436,5 @@ const grpc_channel_filter grpc_http_server_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"http-server"};
diff --git a/src/core/ext/filters/load_reporting/load_reporting_filter.c b/src/core/ext/filters/load_reporting/load_reporting_filter.c
index 08474efb2e..17e946937f 100644
--- a/src/core/ext/filters/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/filters/load_reporting/load_reporting_filter.c
@@ -223,6 +223,5 @@ const grpc_channel_filter grpc_load_reporting_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"load_reporting"};
diff --git a/src/core/ext/filters/max_age/max_age_filter.c b/src/core/ext/filters/max_age/max_age_filter.c
index 7d748b9c32..16c85a70d0 100644
--- a/src/core/ext/filters/max_age/max_age_filter.c
+++ b/src/core/ext/filters/max_age/max_age_filter.c
@@ -391,7 +391,6 @@ const grpc_channel_filter grpc_max_age_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"max_age"};
diff --git a/src/core/ext/filters/message_size/message_size_filter.c b/src/core/ext/filters/message_size/message_size_filter.c
index 846c7df69a..47763b1deb 100644
--- a/src/core/ext/filters/message_size/message_size_filter.c
+++ b/src/core/ext/filters/message_size/message_size_filter.c
@@ -68,6 +68,7 @@ static void* message_size_limits_create_from_json(const grpc_json* json) {
}
typedef struct call_data {
+ grpc_call_combiner* call_combiner;
message_size_limits limits;
// Receive closures are chained: we inject this closure as the
// recv_message_ready up-call on transport_stream_op, and remember to
@@ -131,7 +132,8 @@ static void start_transport_stream_op_batch(
exec_ctx, op,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string),
GRPC_ERROR_INT_GRPC_STATUS,
- GRPC_STATUS_RESOURCE_EXHAUSTED));
+ GRPC_STATUS_RESOURCE_EXHAUSTED),
+ calld->call_combiner);
gpr_free(message_string);
return;
}
@@ -152,6 +154,7 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
const grpc_call_element_args* args) {
channel_data* chand = (channel_data*)elem->channel_data;
call_data* calld = (call_data*)elem->call_data;
+ calld->call_combiner = args->call_combiner;
calld->next_recv_message_ready = NULL;
GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem,
grpc_schedule_on_exec_ctx);
@@ -259,7 +262,6 @@ const grpc_channel_filter grpc_message_size_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"message_size"};
diff --git a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c
index b4d2cb4b8c..c8b2fe5f99 100644
--- a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c
+++ b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c
@@ -177,7 +177,6 @@ const grpc_channel_filter grpc_workaround_cronet_compression_filter = {
0,
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"workaround_cronet_compression"};