aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-08-31 12:24:15 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-08-31 12:24:15 -0700
commit890f542498a8af4c05f22e42d818f3b0eeafaea8 (patch)
tree4ccf813375c1744629ed43c5c5eaa62fbdffe7ac /src/core
parent5489d41c15926abbf12a5b8d27b24d1d605d7f0f (diff)
parentccad38227f63797318d7cffcba8a2df783394ccd (diff)
Merge branch 'stats' into stats_histo
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/census/grpc_filter.c2
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c556
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c1
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c16
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c16
-rw-r--r--src/core/ext/filters/client_channel/subchannel.c23
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h5
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.c112
-rw-r--r--src/core/ext/filters/deadline/deadline_filter.h8
-rw-r--r--src/core/ext/filters/http/client/http_client_filter.c9
-rw-r--r--src/core/ext/filters/http/message_compress/message_compress_filter.c276
-rw-r--r--src/core/ext/filters/http/server/http_server_filter.c58
-rw-r--r--src/core/ext/filters/load_reporting/load_reporting_filter.c1
-rw-r--r--src/core/ext/filters/max_age/max_age_filter.c1
-rw-r--r--src/core/ext/filters/message_size/message_size_filter.c6
-rw-r--r--src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c1
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c33
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h2
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c2
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c5
-rw-r--r--src/core/ext/transport/inproc/inproc_transport.c12
-rw-r--r--src/core/lib/channel/channel_stack.c16
-rw-r--r--src/core/lib/channel/channel_stack.h11
-rw-r--r--src/core/lib/channel/connected_channel.c92
-rw-r--r--src/core/lib/iomgr/call_combiner.c180
-rw-r--r--src/core/lib/iomgr/call_combiner.h104
-rw-r--r--src/core/lib/security/transport/client_auth_filter.c183
-rw-r--r--src/core/lib/security/transport/server_auth_filter.c89
-rw-r--r--src/core/lib/surface/call.c148
-rw-r--r--src/core/lib/surface/init.c2
-rw-r--r--src/core/lib/surface/lame_client.cc19
-rw-r--r--src/core/lib/surface/server.c2
-rw-r--r--src/core/lib/transport/byte_stream.c1
-rw-r--r--src/core/lib/transport/byte_stream.h4
-rw-r--r--src/core/lib/transport/transport.c21
-rw-r--r--src/core/lib/transport/transport.h13
-rw-r--r--src/core/lib/transport/transport_impl.h3
-rw-r--r--src/core/lib/transport/transport_op_string.c7
38 files changed, 829 insertions, 1211 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c
index 3e10f61154..13fe2e6b1c 100644
--- a/src/core/ext/census/grpc_filter.c
+++ b/src/core/ext/census/grpc_filter.c
@@ -179,6 +179,7 @@ const grpc_channel_filter grpc_client_census_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"census-client"};
@@ -192,5 +193,6 @@ const grpc_channel_filter grpc_server_census_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"census-server"};
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index e6822ce801..58e31d7b45 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -796,8 +796,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
// send_message
// recv_trailing_metadata
// send_trailing_metadata
-// We also add room for a single cancel_stream batch.
-#define MAX_WAITING_BATCHES 7
+#define MAX_WAITING_BATCHES 6
/** Call data. Holds a pointer to grpc_subchannel_call and the
associated machinery to create such a pointer.
@@ -809,25 +808,23 @@ typedef struct client_channel_call_data {
// The code in deadline_filter.c requires this to be the first field.
// TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
// and this struct both independently store a pointer to the call
- // combiner. If/when we have time, find a way to avoid this without
- // breaking the grpc_deadline_state abstraction.
+ // stack and each has its own mutex. If/when we have time, find a way
+ // to avoid this without breaking the grpc_deadline_state abstraction.
grpc_deadline_state deadline_state;
grpc_slice path; // Request path.
gpr_timespec call_start_time;
gpr_timespec deadline;
- gpr_arena *arena;
- grpc_call_combiner *call_combiner;
-
grpc_server_retry_throttle_data *retry_throttle_data;
method_parameters *method_params;
- grpc_subchannel_call *subchannel_call;
- grpc_error *error;
+ /** either 0 for no call, a pointer to a grpc_subchannel_call (if the lowest
+ bit is 0), or a pointer to an error (if the lowest bit is 1) */
+ gpr_atm subchannel_call_or_error;
+ gpr_arena *arena;
grpc_lb_policy *lb_policy; // Holds ref while LB pick is pending.
grpc_closure lb_pick_closure;
- grpc_closure cancel_closure;
grpc_connected_subchannel *connected_subchannel;
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
@@ -835,9 +832,10 @@ typedef struct client_channel_call_data {
grpc_transport_stream_op_batch *waiting_for_pick_batches[MAX_WAITING_BATCHES];
size_t waiting_for_pick_batches_count;
- grpc_closure handle_pending_batch_in_call_combiner[MAX_WAITING_BATCHES];
- grpc_transport_stream_op_batch *initial_metadata_batch;
+ grpc_transport_stream_op_batch_payload *initial_metadata_payload;
+
+ grpc_call_stack *owning_call;
grpc_linked_mdelem lb_token_mdelem;
@@ -845,42 +843,55 @@ typedef struct client_channel_call_data {
grpc_closure *original_on_complete;
} call_data;
-grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
- grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- return calld->subchannel_call;
+typedef struct {
+ grpc_subchannel_call *subchannel_call;
+ grpc_error *error;
+} call_or_error;
+
+static call_or_error get_call_or_error(call_data *p) {
+ gpr_atm c = gpr_atm_acq_load(&p->subchannel_call_or_error);
+ if (c == 0)
+ return (call_or_error){NULL, NULL};
+ else if (c & 1)
+ return (call_or_error){NULL, (grpc_error *)((c) & ~(gpr_atm)1)};
+ else
+ return (call_or_error){(grpc_subchannel_call *)c, NULL};
}
-// This is called via the call combiner, so access to calld is synchronized.
-static void waiting_for_pick_batches_add(
- call_data *calld, grpc_transport_stream_op_batch *batch) {
- if (batch->send_initial_metadata) {
- GPR_ASSERT(calld->initial_metadata_batch == NULL);
- calld->initial_metadata_batch = batch;
+static bool set_call_or_error(call_data *p, call_or_error coe) {
+ // this should always be under a lock
+ call_or_error existing = get_call_or_error(p);
+ if (existing.error != GRPC_ERROR_NONE) {
+ GRPC_ERROR_UNREF(coe.error);
+ return false;
+ }
+ GPR_ASSERT(existing.subchannel_call == NULL);
+ if (coe.error != GRPC_ERROR_NONE) {
+ GPR_ASSERT(coe.subchannel_call == NULL);
+ gpr_atm_rel_store(&p->subchannel_call_or_error, 1 | (gpr_atm)coe.error);
} else {
- GPR_ASSERT(calld->waiting_for_pick_batches_count < MAX_WAITING_BATCHES);
- calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count++] =
- batch;
+ GPR_ASSERT(coe.subchannel_call != NULL);
+ gpr_atm_rel_store(&p->subchannel_call_or_error,
+ (gpr_atm)coe.subchannel_call);
}
+ return true;
}
-// This is called via the call combiner, so access to calld is synchronized.
-static void fail_pending_batch_in_call_combiner(grpc_exec_ctx *exec_ctx,
- void *arg, grpc_error *error) {
- call_data *calld = arg;
- if (calld->waiting_for_pick_batches_count > 0) {
- --calld->waiting_for_pick_batches_count;
- grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx,
- calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count],
- GRPC_ERROR_REF(error), calld->call_combiner);
- }
+grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
+ grpc_call_element *call_elem) {
+ return get_call_or_error(call_elem->call_data).subchannel_call;
}
-// This is called via the call combiner, so access to calld is synchronized.
-static void waiting_for_pick_batches_fail(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_error *error) {
+static void waiting_for_pick_batches_add_locked(
+ call_data *calld, grpc_transport_stream_op_batch *batch) {
+ GPR_ASSERT(calld->waiting_for_pick_batches_count < MAX_WAITING_BATCHES);
+ calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count++] =
+ batch;
+}
+
+static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_error *error) {
call_data *calld = elem->call_data;
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG,
@@ -889,60 +900,34 @@ static void waiting_for_pick_batches_fail(grpc_exec_ctx *exec_ctx,
grpc_error_string(error));
}
for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
- GRPC_CLOSURE_INIT(&calld->handle_pending_batch_in_call_combiner[i],
- fail_pending_batch_in_call_combiner, calld,
- grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner,
- &calld->handle_pending_batch_in_call_combiner[i],
- GRPC_ERROR_REF(error),
- "waiting_for_pick_batches_fail");
- }
- if (calld->initial_metadata_batch != NULL) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->initial_metadata_batch, GRPC_ERROR_REF(error),
- calld->call_combiner);
- } else {
- GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner,
- "waiting_for_pick_batches_fail");
+ exec_ctx, calld->waiting_for_pick_batches[i], GRPC_ERROR_REF(error));
}
+ calld->waiting_for_pick_batches_count = 0;
GRPC_ERROR_UNREF(error);
}
-// This is called via the call combiner, so access to calld is synchronized.
-static void run_pending_batch_in_call_combiner(grpc_exec_ctx *exec_ctx,
- void *arg, grpc_error *ignored) {
- call_data *calld = arg;
- if (calld->waiting_for_pick_batches_count > 0) {
- --calld->waiting_for_pick_batches_count;
- grpc_subchannel_call_process_op(
- exec_ctx, calld->subchannel_call,
- calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count]);
- }
-}
-
-// This is called via the call combiner, so access to calld is synchronized.
-static void waiting_for_pick_batches_resume(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
- channel_data *chand = elem->channel_data;
+static void waiting_for_pick_batches_resume_locked(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
call_data *calld = elem->call_data;
+ if (calld->waiting_for_pick_batches_count == 0) return;
+ call_or_error coe = get_call_or_error(calld);
+ if (coe.error != GRPC_ERROR_NONE) {
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem,
+ GRPC_ERROR_REF(coe.error));
+ return;
+ }
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR
" pending batches to subchannel_call=%p",
- chand, calld, calld->waiting_for_pick_batches_count,
- calld->subchannel_call);
+ elem->channel_data, calld, calld->waiting_for_pick_batches_count,
+ coe.subchannel_call);
}
for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) {
- GRPC_CLOSURE_INIT(&calld->handle_pending_batch_in_call_combiner[i],
- run_pending_batch_in_call_combiner, calld,
- grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner,
- &calld->handle_pending_batch_in_call_combiner[i],
- GRPC_ERROR_NONE,
- "waiting_for_pick_batches_resume");
- }
- GPR_ASSERT(calld->initial_metadata_batch != NULL);
- grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call,
- calld->initial_metadata_batch);
+ grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call,
+ calld->waiting_for_pick_batches[i]);
+ }
+ calld->waiting_for_pick_batches_count = 0;
}
// Applies service config to the call. Must be invoked once we know
@@ -983,28 +968,29 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx,
static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_error *error) {
- channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
+ grpc_subchannel_call *subchannel_call = NULL;
const grpc_connected_subchannel_call_args call_args = {
.pollent = calld->pollent,
.path = calld->path,
.start_time = calld->call_start_time,
.deadline = calld->deadline,
.arena = calld->arena,
- .context = calld->subchannel_call_context,
- .call_combiner = calld->call_combiner};
+ .context = calld->subchannel_call_context};
grpc_error *new_error = grpc_connected_subchannel_create_call(
- exec_ctx, calld->connected_subchannel, &call_args,
- &calld->subchannel_call);
+ exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
- chand, calld, calld->subchannel_call, grpc_error_string(new_error));
+ elem->channel_data, calld, subchannel_call,
+ grpc_error_string(new_error));
}
+ GPR_ASSERT(set_call_or_error(
+ calld, (call_or_error){.subchannel_call = subchannel_call}));
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
- waiting_for_pick_batches_fail(exec_ctx, elem, new_error);
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem, new_error);
} else {
- waiting_for_pick_batches_resume(exec_ctx, elem);
+ waiting_for_pick_batches_resume_locked(exec_ctx, elem);
}
GRPC_ERROR_UNREF(error);
}
@@ -1016,27 +1002,60 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx,
channel_data *chand = elem->channel_data;
grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
chand->interested_parties);
+ call_or_error coe = get_call_or_error(calld);
if (calld->connected_subchannel == NULL) {
// Failed to create subchannel.
- GRPC_ERROR_UNREF(calld->error);
- calld->error = error == GRPC_ERROR_NONE
- ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Call dropped by load balancing policy")
- : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Failed to create subchannel", &error, 1);
+ grpc_error *failure =
+ error == GRPC_ERROR_NONE
+ ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Call dropped by load balancing policy")
+ : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Failed to create subchannel", &error, 1);
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG,
"chand=%p calld=%p: failed to create subchannel: error=%s", chand,
- calld, grpc_error_string(calld->error));
+ calld, grpc_error_string(failure));
+ }
+ set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)});
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem, failure);
+ } else if (coe.error != GRPC_ERROR_NONE) {
+ /* already cancelled before subchannel became ready */
+ grpc_error *child_errors[] = {error, coe.error};
+ grpc_error *cancellation_error =
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Cancelled before creating subchannel", child_errors,
+ GPR_ARRAY_SIZE(child_errors));
+ /* if due to deadline, attach the deadline exceeded status to the error */
+ if (gpr_time_cmp(calld->deadline, gpr_now(GPR_CLOCK_MONOTONIC)) < 0) {
+ cancellation_error =
+ grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_DEADLINE_EXCEEDED);
}
- waiting_for_pick_batches_fail(exec_ctx, elem, GRPC_ERROR_REF(calld->error));
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: cancelled before subchannel became ready: %s",
+ chand, calld, grpc_error_string(cancellation_error));
+ }
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem, cancellation_error);
} else {
/* Create call on subchannel. */
create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
}
+ GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
GRPC_ERROR_UNREF(error);
}
+static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ grpc_subchannel_call *subchannel_call =
+ get_call_or_error(calld).subchannel_call;
+ if (subchannel_call == NULL) {
+ return NULL;
+ } else {
+ return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
+ }
+}
+
/** Return true if subchannel is available immediately (in which case
subchannel_ready_locked() should not be called), or false otherwise (in
which case subchannel_ready_locked() should be called when the subchannel
@@ -1050,44 +1069,6 @@ typedef struct {
grpc_closure closure;
} pick_after_resolver_result_args;
-// Note: This runs under the client_channel combiner, but will NOT be
-// holding the call combiner.
-static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx,
- void *arg,
- grpc_error *error) {
- grpc_call_element *elem = arg;
- channel_data *chand = elem->channel_data;
- call_data *calld = elem->call_data;
- // If we don't yet have a resolver result, then a closure for
- // pick_after_resolver_result_done_locked() will have been added to
- // chand->waiting_for_resolver_result_closures, and it may not be invoked
- // until after this call has been destroyed. We mark the operation as
- // cancelled, so that when pick_after_resolver_result_done_locked()
- // is called, it will be a no-op. We also immediately invoke
- // subchannel_ready_locked() to propagate the error back to the caller.
- for (grpc_closure *closure = chand->waiting_for_resolver_result_closures.head;
- closure != NULL; closure = closure->next_data.next) {
- pick_after_resolver_result_args *args = closure->cb_arg;
- if (!args->cancelled && args->elem == elem) {
- if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
- gpr_log(GPR_DEBUG,
- "chand=%p calld=%p: "
- "cancelling pick waiting for resolver result",
- chand, calld);
- }
- args->cancelled = true;
- // Note: Although we are not in the call combiner here, we are
- // basically stealing the call combiner from the pending pick, so
- // it's safe to call subchannel_ready_locked() here -- we are
- // essentially calling it here instead of calling it in
- // pick_after_resolver_result_done_locked().
- subchannel_ready_locked(exec_ctx, elem,
- GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Pick cancelled", &error, 1));
- }
- }
-}
-
static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx,
void *arg,
grpc_error *error) {
@@ -1098,24 +1079,21 @@ static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_DEBUG, "call cancelled before resolver result");
}
} else {
- grpc_call_element *elem = args->elem;
- channel_data *chand = elem->channel_data;
- call_data *calld = elem->call_data;
- grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner,
- NULL);
+ channel_data *chand = args->elem->channel_data;
+ call_data *calld = args->elem->call_data;
if (error != GRPC_ERROR_NONE) {
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data",
chand, calld);
}
- subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
+ subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error));
} else {
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick",
chand, calld);
}
- if (pick_subchannel_locked(exec_ctx, elem)) {
- subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_NONE);
+ if (pick_subchannel_locked(exec_ctx, args->elem)) {
+ subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE);
}
}
}
@@ -1138,33 +1116,41 @@ static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx,
args, grpc_combiner_scheduler(chand->combiner));
grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
&args->closure, GRPC_ERROR_NONE);
- grpc_call_combiner_set_notify_on_cancel(
- exec_ctx, calld->call_combiner,
- GRPC_CLOSURE_INIT(&calld->cancel_closure,
- pick_after_resolver_result_cancel_locked, elem,
- grpc_combiner_scheduler(chand->combiner)));
}
-// Note: This runs under the client_channel combiner, but will NOT be
-// holding the call combiner.
-static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- grpc_call_element *elem = arg;
+static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_error *error) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- if (calld->lb_policy != NULL) {
- if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p",
- chand, calld, calld->lb_policy);
+ // If we don't yet have a resolver result, then a closure for
+ // pick_after_resolver_result_done_locked() will have been added to
+ // chand->waiting_for_resolver_result_closures, and it may not be invoked
+ // until after this call has been destroyed. We mark the operation as
+ // cancelled, so that when pick_after_resolver_result_done_locked()
+ // is called, it will be a no-op. We also immediately invoke
+ // subchannel_ready_locked() to propagate the error back to the caller.
+ for (grpc_closure *closure = chand->waiting_for_resolver_result_closures.head;
+ closure != NULL; closure = closure->next_data.next) {
+ pick_after_resolver_result_args *args = closure->cb_arg;
+ if (!args->cancelled && args->elem == elem) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: "
+ "cancelling pick waiting for resolver result",
+ chand, calld);
+ }
+ args->cancelled = true;
+ subchannel_ready_locked(exec_ctx, elem,
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Pick cancelled", &error, 1));
}
- grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy,
- &calld->connected_subchannel,
- GRPC_ERROR_REF(error));
}
+ GRPC_ERROR_UNREF(error);
}
// Callback invoked by grpc_lb_policy_pick_locked() for async picks.
-// Unrefs the LB policy and invokes subchannel_ready_locked().
+// Unrefs the LB policy after invoking subchannel_ready_locked().
static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_call_element *elem = arg;
@@ -1174,7 +1160,6 @@ static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously",
chand, calld);
}
- grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, NULL);
GPR_ASSERT(calld->lb_policy != NULL);
GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel");
calld->lb_policy = NULL;
@@ -1209,15 +1194,24 @@ static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx,
}
GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel");
calld->lb_policy = NULL;
- } else {
- grpc_call_combiner_set_notify_on_cancel(
- exec_ctx, calld->call_combiner,
- GRPC_CLOSURE_INIT(&calld->cancel_closure, pick_callback_cancel_locked,
- elem, grpc_combiner_scheduler(chand->combiner)));
}
return pick_done;
}
+static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_error *error) {
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ GPR_ASSERT(calld->lb_policy != NULL);
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p",
+ chand, calld, calld->lb_policy);
+ }
+ grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy,
+ &calld->connected_subchannel, error);
+}
+
static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
GPR_TIMER_BEGIN("pick_subchannel", 0);
@@ -1230,7 +1224,7 @@ static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
// Otherwise, if the service config specified a value for this
// method, use that.
uint32_t initial_metadata_flags =
- calld->initial_metadata_batch->payload->send_initial_metadata
+ calld->initial_metadata_payload->send_initial_metadata
.send_initial_metadata_flags;
const bool wait_for_ready_set_from_api =
initial_metadata_flags &
@@ -1247,7 +1241,7 @@ static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
}
}
const grpc_lb_policy_pick_args inputs = {
- calld->initial_metadata_batch->payload->send_initial_metadata
+ calld->initial_metadata_payload->send_initial_metadata
.send_initial_metadata,
initial_metadata_flags, &calld->lb_token_mdelem};
pick_done = pick_callback_start_locked(exec_ctx, elem, &inputs);
@@ -1264,33 +1258,91 @@ static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx,
return pick_done;
}
-static void start_pick_locked(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error_ignored) {
- GPR_TIMER_BEGIN("start_pick_locked", 0);
- grpc_call_element *elem = (grpc_call_element *)arg;
- call_data *calld = (call_data *)elem->call_data;
- channel_data *chand = (channel_data *)elem->channel_data;
- GPR_ASSERT(calld->connected_subchannel == NULL);
- if (pick_subchannel_locked(exec_ctx, elem)) {
- // Pick was returned synchronously.
- if (calld->connected_subchannel == NULL) {
- GRPC_ERROR_UNREF(calld->error);
- calld->error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Call dropped by load balancing policy");
- waiting_for_pick_batches_fail(exec_ctx, elem,
- GRPC_ERROR_REF(calld->error));
+static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx,
+ void *arg,
+ grpc_error *error_ignored) {
+ GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0);
+ grpc_transport_stream_op_batch *batch = arg;
+ grpc_call_element *elem = batch->handler_private.extra_arg;
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ /* need to recheck that another thread hasn't set the call */
+ call_or_error coe = get_call_or_error(calld);
+ if (coe.error != GRPC_ERROR_NONE) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s",
+ chand, calld, grpc_error_string(coe.error));
+ }
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, batch, GRPC_ERROR_REF(coe.error));
+ goto done;
+ }
+ if (coe.subchannel_call != NULL) {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: sending batch to subchannel_call=%p", chand,
+ calld, coe.subchannel_call);
+ }
+ grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch);
+ goto done;
+ }
+ // Add to waiting-for-pick list. If we succeed in getting a
+ // subchannel call below, we'll handle this batch (along with any
+ // other waiting batches) in waiting_for_pick_batches_resume_locked().
+ waiting_for_pick_batches_add_locked(calld, batch);
+ // If this is a cancellation, cancel the pending pick (if any) and
+ // fail any pending batches.
+ if (batch->cancel_stream) {
+ grpc_error *error = batch->payload->cancel_stream.cancel_error;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand,
+ calld, grpc_error_string(error));
+ }
+ /* Stash a copy of cancel_error in our call data, so that we can use
+ it for subsequent operations. This ensures that if the call is
+ cancelled before any batches are passed down (e.g., if the deadline
+ is in the past when the call starts), we can return the right
+ error to the caller when the first batch does get passed down. */
+ set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)});
+ if (calld->lb_policy != NULL) {
+ pick_callback_cancel_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
} else {
- // Create subchannel call.
- create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_NONE);
+ pick_after_resolver_result_cancel_locked(exec_ctx, elem,
+ GRPC_ERROR_REF(error));
}
- } else {
- // Pick will be done asynchronously. Add the call's polling entity to
- // the channel's interested_parties, so that I/O for the resolver
- // and LB policy can be done under it.
- grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
- chand->interested_parties);
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
+ goto done;
}
- GPR_TIMER_END("start_pick_locked", 0);
+ /* if we don't have a subchannel, try to get one */
+ if (batch->send_initial_metadata) {
+ GPR_ASSERT(calld->connected_subchannel == NULL);
+ calld->initial_metadata_payload = batch->payload;
+ GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
+ /* If a subchannel is not available immediately, the polling entity from
+ call_data should be provided to channel_data's interested_parties, so
+ that IO of the lb_policy and resolver could be done under it. */
+ if (pick_subchannel_locked(exec_ctx, elem)) {
+ // Pick was returned synchronously.
+ GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
+ if (calld->connected_subchannel == NULL) {
+ grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Call dropped by load balancing policy");
+ set_call_or_error(calld,
+ (call_or_error){.error = GRPC_ERROR_REF(error)});
+ waiting_for_pick_batches_fail_locked(exec_ctx, elem, error);
+ } else {
+ // Create subchannel call.
+ create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_NONE);
+ }
+ } else {
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
+ chand->interested_parties);
+ }
+ }
+done:
+ GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
+ "start_transport_stream_op_batch");
+ GPR_TIMER_END("start_transport_stream_op_batch_locked", 0);
}
static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
@@ -1313,49 +1365,27 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
GRPC_ERROR_REF(error));
}
+/* The logic here is fairly complicated, due to (a) the fact that we
+ need to handle the case where we receive the send op before the
+ initial metadata op, and (b) the need for efficiency, especially in
+ the streaming case.
+
+ We use double-checked locking to initially see if initialization has been
+ performed. If it has not, we acquire the combiner and perform initialization.
+ If it has, we proceed on the fast path. */
static void cc_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
+ if (GRPC_TRACER_ON(grpc_client_channel_trace) ||
+ GRPC_TRACER_ON(grpc_trace_channel)) {
+ grpc_call_log_op(GPR_INFO, elem, batch);
+ }
if (chand->deadline_checking_enabled) {
grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
batch);
}
- GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
- // If we've previously been cancelled, immediately fail any new batches.
- if (calld->error != GRPC_ERROR_NONE) {
- if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s",
- chand, calld, grpc_error_string(calld->error));
- }
- grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, batch, GRPC_ERROR_REF(calld->error), calld->call_combiner);
- goto done;
- }
- if (batch->cancel_stream) {
- // Stash a copy of cancel_error in our call data, so that we can use
- // it for subsequent operations. This ensures that if the call is
- // cancelled before any batches are passed down (e.g., if the deadline
- // is in the past when the call starts), we can return the right
- // error to the caller when the first batch does get passed down.
- GRPC_ERROR_UNREF(calld->error);
- calld->error = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
- if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand,
- calld, grpc_error_string(calld->error));
- }
- // If we have a subchannel call, send the cancellation batch down.
- // Otherwise, fail all pending batches.
- if (calld->subchannel_call != NULL) {
- grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, batch);
- } else {
- waiting_for_pick_batches_add(calld, batch);
- waiting_for_pick_batches_fail(exec_ctx, elem,
- GRPC_ERROR_REF(calld->error));
- }
- goto done;
- }
// Intercept on_complete for recv_trailing_metadata so that we can
// check retry throttle status.
if (batch->recv_trailing_metadata) {
@@ -1365,43 +1395,38 @@ static void cc_start_transport_stream_op_batch(
grpc_schedule_on_exec_ctx);
batch->on_complete = &calld->on_complete;
}
- // Check if we've already gotten a subchannel call.
- // Note that once we have completed the pick, we do not need to enter
- // the channel combiner, which is more efficient (especially for
- // streaming calls).
- if (calld->subchannel_call != NULL) {
+ /* try to (atomically) get the call */
+ call_or_error coe = get_call_or_error(calld);
+ GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
+ if (coe.error != GRPC_ERROR_NONE) {
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
- gpr_log(GPR_DEBUG,
- "chand=%p calld=%p: sending batch to subchannel_call=%p", chand,
- calld, calld->subchannel_call);
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s",
+ chand, calld, grpc_error_string(coe.error));
}
- grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, batch);
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, batch, GRPC_ERROR_REF(coe.error));
goto done;
}
- // We do not yet have a subchannel call.
- // Add the batch to the waiting-for-pick list.
- waiting_for_pick_batches_add(calld, batch);
- // For batches containing a send_initial_metadata op, enter the channel
- // combiner to start a pick.
- if (batch->send_initial_metadata) {
- if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering combiner", chand, calld);
- }
- GRPC_CLOSURE_SCHED(
- exec_ctx,
- GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
- elem, grpc_combiner_scheduler(chand->combiner)),
- GRPC_ERROR_NONE);
- } else {
- // For all other batches, release the call combiner.
+ if (coe.subchannel_call != NULL) {
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG,
- "chand=%p calld=%p: saved batch, yeilding call combiner", chand,
- calld);
+ "chand=%p calld=%p: sending batch to subchannel_call=%p", chand,
+ calld, coe.subchannel_call);
}
- GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner,
- "batch does not include send_initial_metadata");
+ grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch);
+ goto done;
+ }
+ /* we failed; lock and figure out what to do */
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering combiner", chand, calld);
}
+ GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch");
+ batch->handler_private.extra_arg = elem;
+ GRPC_CLOSURE_SCHED(
+ exec_ctx, GRPC_CLOSURE_INIT(&batch->handler_private.closure,
+ start_transport_stream_op_batch_locked, batch,
+ grpc_combiner_scheduler(chand->combiner)),
+ GRPC_ERROR_NONE);
done:
GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
}
@@ -1416,11 +1441,10 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
calld->path = grpc_slice_ref_internal(args->path);
calld->call_start_time = args->start_time;
calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
+ calld->owning_call = args->call_stack;
calld->arena = args->arena;
- calld->call_combiner = args->call_combiner;
if (chand->deadline_checking_enabled) {
- grpc_deadline_state_init(exec_ctx, elem, args->call_stack,
- args->call_combiner, calld->deadline);
+ grpc_deadline_state_init(exec_ctx, elem, args->call_stack, calld->deadline);
}
return GRPC_ERROR_NONE;
}
@@ -1439,12 +1463,13 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
if (calld->method_params != NULL) {
method_parameters_unref(calld->method_params);
}
- GRPC_ERROR_UNREF(calld->error);
- if (calld->subchannel_call != NULL) {
- grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call,
+ call_or_error coe = get_call_or_error(calld);
+ GRPC_ERROR_UNREF(coe.error);
+ if (coe.subchannel_call != NULL) {
+ grpc_subchannel_call_set_cleanup_closure(coe.subchannel_call,
then_schedule_closure);
then_schedule_closure = NULL;
- GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, calld->subchannel_call,
+ GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, coe.subchannel_call,
"client_channel_destroy_call");
}
GPR_ASSERT(calld->lb_policy == NULL);
@@ -1483,6 +1508,7 @@ const grpc_channel_filter grpc_client_channel_filter = {
sizeof(channel_data),
cc_init_channel_elem,
cc_destroy_channel_elem,
+ cc_get_peer,
cc_get_channel_info,
"client-channel",
};
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
index 299f26b4de..568bb2ba8d 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
@@ -132,5 +132,6 @@ const grpc_channel_filter grpc_client_load_reporting_filter = {
0, // sizeof(channel_data)
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"client_load_reporting"};
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
index fd0fb41fb9..a50ba09bf5 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
@@ -296,8 +296,6 @@ static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx,
static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_args *args) {
pick_first_lb_policy *p = (pick_first_lb_policy *)policy;
- /* Find the number of backend addresses. We ignore balancer
- * addresses, since we don't know how to handle them. */
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
@@ -317,11 +315,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
return;
}
const grpc_lb_addresses *addresses = arg->value.pointer.p;
- size_t num_addrs = 0;
- for (size_t i = 0; i < addresses->num_addresses; i++) {
- if (!addresses->addresses[i].is_balancer) ++num_addrs;
- }
- if (num_addrs == 0) {
+ if (addresses->num_addresses == 0) {
// Empty update. Unsubscribe from all current subchannels and put the
// channel in TRANSIENT_FAILURE.
grpc_connectivity_state_set(
@@ -333,9 +327,10 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
}
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses",
- (void *)p, (unsigned long)num_addrs);
+ (void *)p, (unsigned long)addresses->num_addresses);
}
- grpc_subchannel_args *sc_args = gpr_zalloc(sizeof(*sc_args) * num_addrs);
+ grpc_subchannel_args *sc_args =
+ gpr_zalloc(sizeof(*sc_args) * addresses->num_addresses);
/* We remove the following keys in order for subchannel keys belonging to
* subchannels point to the same address to match. */
static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
@@ -344,7 +339,8 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
/* Create list of subchannel args for new addresses in \a args. */
for (size_t i = 0; i < addresses->num_addresses; i++) {
- if (addresses->addresses[i].is_balancer) continue;
+ // If there were any balancer, we would have chosen grpclb policy instead.
+ GPR_ASSERT(!addresses->addresses[i].is_balancer);
if (addresses->addresses[i].user_data != NULL) {
gpr_log(GPR_ERROR,
"This LB policy doesn't support user data. It will be ignored");
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index 110a9c8047..866fb9a1eb 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -737,8 +737,6 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_args *args) {
round_robin_lb_policy *p = (round_robin_lb_policy *)policy;
- /* Find the number of backend addresses. We ignore balancer addresses, since
- * we don't know how to handle them. */
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
@@ -757,12 +755,9 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
return;
}
grpc_lb_addresses *addresses = arg->value.pointer.p;
- size_t num_addrs = 0;
- for (size_t i = 0; i < addresses->num_addresses; i++) {
- if (!addresses->addresses[i].is_balancer) ++num_addrs;
- }
- rr_subchannel_list *subchannel_list = rr_subchannel_list_create(p, num_addrs);
- if (num_addrs == 0) {
+ rr_subchannel_list *subchannel_list =
+ rr_subchannel_list_create(p, addresses->num_addresses);
+ if (addresses->num_addresses == 0) {
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
@@ -794,9 +789,8 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
GRPC_ARG_LB_ADDRESSES};
/* Create subchannels for addresses in the update. */
for (size_t i = 0; i < addresses->num_addresses; i++) {
- /* Skip balancer addresses, since we only know how to handle backends. */
- if (addresses->addresses[i].is_balancer) continue;
- GPR_ASSERT(i < num_addrs);
+ // If there were any balancer, we would have chosen grpclb policy instead.
+ GPR_ASSERT(!addresses->addresses[i].is_balancer);
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
grpc_arg addr_arg =
grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c
index 5cc8be7628..5788819331 100644
--- a/src/core/ext/filters/client_channel/subchannel.c
+++ b/src/core/ext/filters/client_channel/subchannel.c
@@ -724,6 +724,13 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
GRPC_CALL_STACK_UNREF(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
}
+char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call *call) {
+ grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
+ grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
+ return top_elem->filter->get_peer(exec_ctx, top_elem);
+}
+
void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *call,
grpc_transport_stream_op_batch *op) {
@@ -753,15 +760,13 @@ grpc_error *grpc_connected_subchannel_create_call(
args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
(*call)->connection = GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
- const grpc_call_element_args call_args = {
- .call_stack = callstk,
- .server_transport_data = NULL,
- .context = args->context,
- .path = args->path,
- .start_time = args->start_time,
- .deadline = args->deadline,
- .arena = args->arena,
- .call_combiner = args->call_combiner};
+ const grpc_call_element_args call_args = {.call_stack = callstk,
+ .server_transport_data = NULL,
+ .context = args->context,
+ .path = args->path,
+ .start_time = args->start_time,
+ .deadline = args->deadline,
+ .arena = args->arena};
grpc_error *error = grpc_call_stack_init(
exec_ctx, chanstk, 1, subchannel_call_destroy, *call, &call_args);
if (error != GRPC_ERROR_NONE) {
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index 51d712f6a7..6d2abb04df 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -106,7 +106,6 @@ typedef struct {
gpr_timespec deadline;
gpr_arena *arena;
grpc_call_context_element *context;
- grpc_call_combiner *call_combiner;
} grpc_connected_subchannel_call_args;
grpc_error *grpc_connected_subchannel_create_call(
@@ -151,6 +150,10 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *subchannel_call,
grpc_transport_stream_op_batch *op);
+/** continue querying for peer */
+char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel_call *subchannel_call);
+
/** Must be called once per call. Sets the 'then_schedule_closure' argument for
call stack destruction. */
void grpc_subchannel_call_set_cleanup_closure(
diff --git a/src/core/ext/filters/deadline/deadline_filter.c b/src/core/ext/filters/deadline/deadline_filter.c
index 565b0679dc..6789903c95 100644
--- a/src/core/ext/filters/deadline/deadline_filter.c
+++ b/src/core/ext/filters/deadline/deadline_filter.c
@@ -34,56 +34,22 @@
// grpc_deadline_state
//
-// The on_complete callback used when sending a cancel_error batch down the
-// filter stack. Yields the call combiner when the batch returns.
-static void yield_call_combiner(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* ignored) {
- grpc_deadline_state* deadline_state = arg;
- GRPC_CALL_COMBINER_STOP(exec_ctx, deadline_state->call_combiner,
- "got on_complete from cancel_stream batch");
- GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer");
-}
-
-// This is called via the call combiner, so access to deadline_state is
-// synchronized.
-static void send_cancel_op_in_call_combiner(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
- grpc_call_element* elem = arg;
- grpc_deadline_state* deadline_state = elem->call_data;
- grpc_transport_stream_op_batch* batch = grpc_make_transport_stream_op(
- GRPC_CLOSURE_INIT(&deadline_state->timer_callback, yield_call_combiner,
- deadline_state, grpc_schedule_on_exec_ctx));
- batch->cancel_stream = true;
- batch->payload->cancel_stream.cancel_error = GRPC_ERROR_REF(error);
- elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch);
-}
-
// Timer callback.
static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
grpc_call_element* elem = (grpc_call_element*)arg;
grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
if (error != GRPC_ERROR_CANCELLED) {
- error = grpc_error_set_int(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED);
- grpc_call_combiner_cancel(exec_ctx, deadline_state->call_combiner,
- GRPC_ERROR_REF(error));
- GRPC_CLOSURE_INIT(&deadline_state->timer_callback,
- send_cancel_op_in_call_combiner, elem,
- grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(exec_ctx, deadline_state->call_combiner,
- &deadline_state->timer_callback, error,
- "deadline exceeded -- sending cancel_stream op");
- } else {
- GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack,
- "deadline_timer");
+ grpc_call_element_signal_error(
+ exec_ctx, elem,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED));
}
+ GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer");
}
// Starts the deadline timer.
-// This is called via the call combiner, so access to deadline_state is
-// synchronized.
static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
gpr_timespec deadline) {
@@ -92,39 +58,51 @@ static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
return;
}
grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
+ grpc_deadline_timer_state cur_state;
grpc_closure* closure = NULL;
- switch (deadline_state->timer_state) {
+retry:
+ cur_state =
+ (grpc_deadline_timer_state)gpr_atm_acq_load(&deadline_state->timer_state);
+ switch (cur_state) {
case GRPC_DEADLINE_STATE_PENDING:
// Note: We do not start the timer if there is already a timer
return;
case GRPC_DEADLINE_STATE_FINISHED:
- deadline_state->timer_state = GRPC_DEADLINE_STATE_PENDING;
- // If we've already created and destroyed a timer, we always create a
- // new closure: we have no other guarantee that the inlined closure is
- // not in use (it may hold a pending call to timer_callback)
- closure =
- GRPC_CLOSURE_CREATE(timer_callback, elem, grpc_schedule_on_exec_ctx);
+ if (gpr_atm_rel_cas(&deadline_state->timer_state,
+ GRPC_DEADLINE_STATE_FINISHED,
+ GRPC_DEADLINE_STATE_PENDING)) {
+ // If we've already created and destroyed a timer, we always create a
+ // new closure: we have no other guarantee that the inlined closure is
+ // not in use (it may hold a pending call to timer_callback)
+ closure = GRPC_CLOSURE_CREATE(timer_callback, elem,
+ grpc_schedule_on_exec_ctx);
+ } else {
+ goto retry;
+ }
break;
case GRPC_DEADLINE_STATE_INITIAL:
- deadline_state->timer_state = GRPC_DEADLINE_STATE_PENDING;
- closure =
- GRPC_CLOSURE_INIT(&deadline_state->timer_callback, timer_callback,
- elem, grpc_schedule_on_exec_ctx);
+ if (gpr_atm_rel_cas(&deadline_state->timer_state,
+ GRPC_DEADLINE_STATE_INITIAL,
+ GRPC_DEADLINE_STATE_PENDING)) {
+ closure =
+ GRPC_CLOSURE_INIT(&deadline_state->timer_callback, timer_callback,
+ elem, grpc_schedule_on_exec_ctx);
+ } else {
+ goto retry;
+ }
break;
}
- GPR_ASSERT(closure != NULL);
+ GPR_ASSERT(closure);
GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, closure,
gpr_now(GPR_CLOCK_MONOTONIC));
}
// Cancels the deadline timer.
-// This is called via the call combiner, so access to deadline_state is
-// synchronized.
static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
grpc_deadline_state* deadline_state) {
- if (deadline_state->timer_state == GRPC_DEADLINE_STATE_PENDING) {
- deadline_state->timer_state = GRPC_DEADLINE_STATE_FINISHED;
+ if (gpr_atm_rel_cas(&deadline_state->timer_state, GRPC_DEADLINE_STATE_PENDING,
+ GRPC_DEADLINE_STATE_FINISHED)) {
grpc_timer_cancel(exec_ctx, &deadline_state->timer);
} else {
// timer was either in STATE_INITAL (nothing to cancel)
@@ -153,7 +131,6 @@ static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
// Callback and associated state for starting the timer after call stack
// initialization has been completed.
struct start_timer_after_init_state {
- bool in_call_combiner;
grpc_call_element* elem;
gpr_timespec deadline;
grpc_closure closure;
@@ -161,29 +138,15 @@ struct start_timer_after_init_state {
static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
struct start_timer_after_init_state* state = arg;
- grpc_deadline_state* deadline_state = state->elem->call_data;
- if (!state->in_call_combiner) {
- // We are initially called without holding the call combiner, so we
- // need to bounce ourselves into it.
- state->in_call_combiner = true;
- GRPC_CALL_COMBINER_START(exec_ctx, deadline_state->call_combiner,
- &state->closure, GRPC_ERROR_REF(error),
- "scheduling deadline timer");
- return;
- }
start_timer_if_needed(exec_ctx, state->elem, state->deadline);
gpr_free(state);
- GRPC_CALL_COMBINER_STOP(exec_ctx, deadline_state->call_combiner,
- "done scheduling deadline timer");
}
void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_call_stack* call_stack,
- grpc_call_combiner* call_combiner,
gpr_timespec deadline) {
grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data;
deadline_state->call_stack = call_stack;
- deadline_state->call_combiner = call_combiner;
// Deadline will always be infinite on servers, so the timer will only be
// set on clients with a finite deadline.
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -195,7 +158,7 @@ void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
// call stack initialization is finished. To avoid that problem, we
// create a closure to start the timer, and we schedule that closure
// to be run after call stack initialization is done.
- struct start_timer_after_init_state* state = gpr_zalloc(sizeof(*state));
+ struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state));
state->elem = elem;
state->deadline = deadline;
GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state,
@@ -269,8 +232,7 @@ typedef struct server_call_data {
static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
const grpc_call_element_args* args) {
- grpc_deadline_state_init(exec_ctx, elem, args->call_stack,
- args->call_combiner, args->deadline);
+ grpc_deadline_state_init(exec_ctx, elem, args->call_stack, args->deadline);
return GRPC_ERROR_NONE;
}
@@ -348,6 +310,7 @@ const grpc_channel_filter grpc_client_deadline_filter = {
0, // sizeof(channel_data)
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"deadline",
};
@@ -362,6 +325,7 @@ const grpc_channel_filter grpc_server_deadline_filter = {
0, // sizeof(channel_data)
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"deadline",
};
diff --git a/src/core/ext/filters/deadline/deadline_filter.h b/src/core/ext/filters/deadline/deadline_filter.h
index 3eb102ad28..420bf7065a 100644
--- a/src/core/ext/filters/deadline/deadline_filter.h
+++ b/src/core/ext/filters/deadline/deadline_filter.h
@@ -31,8 +31,7 @@ typedef enum grpc_deadline_timer_state {
typedef struct grpc_deadline_state {
// We take a reference to the call stack for the timer callback.
grpc_call_stack* call_stack;
- grpc_call_combiner* call_combiner;
- grpc_deadline_timer_state timer_state;
+ gpr_atm timer_state;
grpc_timer timer;
grpc_closure timer_callback;
// Closure to invoke when the call is complete.
@@ -51,7 +50,6 @@ typedef struct grpc_deadline_state {
// assumes elem->call_data is zero'd
void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_call_stack* call_stack,
- grpc_call_combiner* call_combiner,
gpr_timespec deadline);
void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem);
@@ -63,8 +61,6 @@ void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
// to ensure that the timer callback is not invoked while it is in the
// process of being reset, which means that attempting to increase the
// deadline may result in the timer being called twice.
-//
-// Note: Must be called while holding the call combiner.
void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
gpr_timespec new_deadline);
@@ -74,8 +70,6 @@ void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
//
// Note: It is the caller's responsibility to chain to the next filter if
// necessary after this function returns.
-//
-// Note: Must be called while holding the call combiner.
void grpc_deadline_state_client_start_transport_stream_op_batch(
grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_transport_stream_op_batch* op);
diff --git a/src/core/ext/filters/http/client/http_client_filter.c b/src/core/ext/filters/http/client/http_client_filter.c
index 99ddd08e6a..3ca01a41b5 100644
--- a/src/core/ext/filters/http/client/http_client_filter.c
+++ b/src/core/ext/filters/http/client/http_client_filter.c
@@ -36,7 +36,6 @@
static const size_t kMaxPayloadSizeForGet = 2048;
typedef struct call_data {
- grpc_call_combiner *call_combiner;
// State for handling send_initial_metadata ops.
grpc_linked_mdelem method;
grpc_linked_mdelem scheme;
@@ -216,13 +215,13 @@ static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg,
call_data *calld = (call_data *)elem->call_data;
if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->send_message_batch, error, calld->call_combiner);
+ exec_ctx, calld->send_message_batch, error);
return;
}
error = pull_slice_from_send_message(exec_ctx, calld);
if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->send_message_batch, error, calld->call_combiner);
+ exec_ctx, calld->send_message_batch, error);
return;
}
// There may or may not be more to read, but we don't care. If we got
@@ -415,7 +414,7 @@ static void hc_start_transport_stream_op_batch(
done:
if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->send_message_batch, error, calld->call_combiner);
+ exec_ctx, calld->send_message_batch, error);
} else if (!batch_will_be_handled_asynchronously) {
grpc_call_next_op(exec_ctx, elem, batch);
}
@@ -427,7 +426,6 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
call_data *calld = (call_data *)elem->call_data;
- calld->call_combiner = args->call_combiner;
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
@@ -567,5 +565,6 @@ const grpc_channel_filter grpc_http_client_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"http-client"};
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c
index 98a503cafc..a32819bfe4 100644
--- a/src/core/ext/filters/http/message_compress/message_compress_filter.c
+++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c
@@ -35,29 +35,35 @@
#include "src/core/lib/surface/call.h"
#include "src/core/lib/transport/static_metadata.h"
-typedef enum {
- // Initial metadata not yet seen.
- INITIAL_METADATA_UNSEEN = 0,
- // Initial metadata seen; compression algorithm set.
- HAS_COMPRESSION_ALGORITHM,
- // Initial metadata seen; no compression algorithm set.
- NO_COMPRESSION_ALGORITHM,
-} initial_metadata_state;
+#define INITIAL_METADATA_UNSEEN 0
+#define HAS_COMPRESSION_ALGORITHM 2
+#define NO_COMPRESSION_ALGORITHM 4
+
+#define CANCELLED_BIT ((gpr_atm)1)
typedef struct call_data {
- grpc_call_combiner *call_combiner;
+ grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */
grpc_linked_mdelem compression_algorithm_storage;
grpc_linked_mdelem stream_compression_algorithm_storage;
grpc_linked_mdelem accept_encoding_storage;
grpc_linked_mdelem accept_stream_encoding_storage;
+ uint32_t remaining_slice_bytes;
/** Compression algorithm we'll try to use. It may be given by incoming
* metadata, or by the channel's default compression settings. */
grpc_compression_algorithm compression_algorithm;
- initial_metadata_state send_initial_metadata_state;
- grpc_error *cancel_error;
- grpc_closure start_send_message_batch_in_call_combiner;
+
+ /* Atomic recording the state of initial metadata; allowed values:
+ INITIAL_METADATA_UNSEEN - initial metadata op not seen
+ HAS_COMPRESSION_ALGORITHM - initial metadata seen; compression algorithm
+ set
+ NO_COMPRESSION_ALGORITHM - initial metadata seen; no compression algorithm
+ set
+ pointer - a stalled op containing a send_message that's waiting on initial
+ metadata
+ pointer | CANCELLED_BIT - request was cancelled with error pointed to */
+ gpr_atm send_initial_metadata_state;
+
grpc_transport_stream_op_batch *send_message_batch;
- grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */
grpc_slice_buffer_stream replacement_stream;
grpc_closure *original_send_message_on_complete;
grpc_closure send_message_on_complete;
@@ -86,13 +92,13 @@ static bool skip_compression(grpc_call_element *elem, uint32_t flags,
channel_data *channeld = elem->channel_data;
if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) {
- return true;
+ return 1;
}
if (has_compression_algorithm) {
if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
- return true;
+ return 1;
}
- return false; /* we have an actual call-specific algorithm */
+ return 0; /* we have an actual call-specific algorithm */
}
/* no per-call compression override */
return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
@@ -220,18 +226,6 @@ static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_REF(error));
}
-static void send_message_batch_continue(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
- call_data *calld = (call_data *)elem->call_data;
- // Note: The call to grpc_call_next_op() results in yielding the
- // call combiner, so we need to clear calld->send_message_batch
- // before we do that.
- grpc_transport_stream_op_batch *send_message_batch =
- calld->send_message_batch;
- calld->send_message_batch = NULL;
- grpc_call_next_op(exec_ctx, elem, send_message_batch);
-}
-
static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
call_data *calld = (call_data *)elem->call_data;
@@ -240,8 +234,8 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_init(&tmp);
uint32_t send_flags =
calld->send_message_batch->payload->send_message.send_message->flags;
- bool did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm,
- &calld->slices, &tmp);
+ const bool did_compress = grpc_msg_compress(
+ exec_ctx, calld->compression_algorithm, &calld->slices, &tmp);
if (did_compress) {
if (GRPC_TRACER_ON(grpc_compression_trace)) {
char *algo_name;
@@ -279,19 +273,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
calld->original_send_message_on_complete =
calld->send_message_batch->on_complete;
calld->send_message_batch->on_complete = &calld->send_message_on_complete;
- send_message_batch_continue(exec_ctx, elem);
-}
-
-static void fail_send_message_batch_in_call_combiner(grpc_exec_ctx *exec_ctx,
- void *arg,
- grpc_error *error) {
- call_data *calld = arg;
- if (calld->send_message_batch != NULL) {
- grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, calld->send_message_batch, GRPC_ERROR_REF(error),
- calld->call_combiner);
- calld->send_message_batch = NULL;
- }
+ grpc_call_next_op(exec_ctx, elem, calld->send_message_batch);
}
// Pulls a slice from the send_message byte stream and adds it to calld->slices.
@@ -311,25 +293,21 @@ static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx,
// If all data has been read, invokes finish_send_message(). Otherwise,
// an async call to grpc_byte_stream_next() has been started, which will
// eventually result in calling on_send_message_next_done().
-static void continue_reading_send_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
+static grpc_error *continue_reading_send_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
call_data *calld = (call_data *)elem->call_data;
while (grpc_byte_stream_next(
exec_ctx, calld->send_message_batch->payload->send_message.send_message,
~(size_t)0, &calld->on_send_message_next_done)) {
grpc_error *error = pull_slice_from_send_message(exec_ctx, calld);
- if (error != GRPC_ERROR_NONE) {
- // Closure callback; does not take ownership of error.
- fail_send_message_batch_in_call_combiner(exec_ctx, calld, error);
- GRPC_ERROR_UNREF(error);
- return;
- }
+ if (error != GRPC_ERROR_NONE) return error;
if (calld->slices.length ==
calld->send_message_batch->payload->send_message.send_message->length) {
finish_send_message(exec_ctx, elem);
break;
}
}
+ return GRPC_ERROR_NONE;
}
// Async callback for grpc_byte_stream_next().
@@ -337,37 +315,46 @@ static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_call_element *elem = (grpc_call_element *)arg;
call_data *calld = (call_data *)elem->call_data;
- if (error != GRPC_ERROR_NONE) {
- // Closure callback; does not take ownership of error.
- fail_send_message_batch_in_call_combiner(exec_ctx, calld, error);
- return;
- }
+ if (error != GRPC_ERROR_NONE) goto fail;
error = pull_slice_from_send_message(exec_ctx, calld);
- if (error != GRPC_ERROR_NONE) {
- // Closure callback; does not take ownership of error.
- fail_send_message_batch_in_call_combiner(exec_ctx, calld, error);
- GRPC_ERROR_UNREF(error);
- return;
- }
+ if (error != GRPC_ERROR_NONE) goto fail;
if (calld->slices.length ==
calld->send_message_batch->payload->send_message.send_message->length) {
finish_send_message(exec_ctx, elem);
} else {
- continue_reading_send_message(exec_ctx, elem);
+ // This will either finish reading all of the data and invoke
+ // finish_send_message(), or else it will make an async call to
+ // grpc_byte_stream_next(), which will eventually result in calling
+ // this function again.
+ error = continue_reading_send_message(exec_ctx, elem);
+ if (error != GRPC_ERROR_NONE) goto fail;
}
+ return;
+fail:
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, calld->send_message_batch, error);
}
-static void start_send_message_batch(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *unused) {
- grpc_call_element *elem = (grpc_call_element *)arg;
+static void start_send_message_batch(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op_batch *batch,
+ bool has_compression_algorithm) {
call_data *calld = (call_data *)elem->call_data;
- if (skip_compression(
- elem,
- calld->send_message_batch->payload->send_message.send_message->flags,
- calld->send_initial_metadata_state == HAS_COMPRESSION_ALGORITHM)) {
- send_message_batch_continue(exec_ctx, elem);
+ if (!skip_compression(elem, batch->payload->send_message.send_message->flags,
+ has_compression_algorithm)) {
+ calld->send_message_batch = batch;
+ // This will either finish reading all of the data and invoke
+ // finish_send_message(), or else it will make an async call to
+ // grpc_byte_stream_next(), which will eventually result in calling
+ // on_send_message_next_done().
+ grpc_error *error = continue_reading_send_message(exec_ctx, elem);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, calld->send_message_batch, error);
+ }
} else {
- continue_reading_send_message(exec_ctx, elem);
+ /* pass control down the stack */
+ grpc_call_next_op(exec_ctx, elem, batch);
}
}
@@ -375,80 +362,95 @@ static void compress_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
+
GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0);
- // Handle cancel_stream.
+
if (batch->cancel_stream) {
- GRPC_ERROR_UNREF(calld->cancel_error);
- calld->cancel_error =
- GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
- if (calld->send_message_batch != NULL) {
- if (calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN) {
- GRPC_CALL_COMBINER_START(
- exec_ctx, calld->call_combiner,
- GRPC_CLOSURE_CREATE(fail_send_message_batch_in_call_combiner, calld,
- grpc_schedule_on_exec_ctx),
- GRPC_ERROR_REF(calld->cancel_error), "failing send_message op");
- } else {
- grpc_byte_stream_shutdown(
- exec_ctx,
- calld->send_message_batch->payload->send_message.send_message,
- GRPC_ERROR_REF(calld->cancel_error));
- }
+ // TODO(roth): As part of the upcoming call combiner work, change
+ // this to call grpc_byte_stream_shutdown() on the incoming byte
+ // stream, to cancel any in-flight calls to grpc_byte_stream_next().
+ GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
+ gpr_atm cur = gpr_atm_full_xchg(
+ &calld->send_initial_metadata_state,
+ CANCELLED_BIT | (gpr_atm)batch->payload->cancel_stream.cancel_error);
+ switch (cur) {
+ case HAS_COMPRESSION_ALGORITHM:
+ case NO_COMPRESSION_ALGORITHM:
+ case INITIAL_METADATA_UNSEEN:
+ break;
+ default:
+ if ((cur & CANCELLED_BIT) == 0) {
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, (grpc_transport_stream_op_batch *)cur,
+ GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error));
+ } else {
+ GRPC_ERROR_UNREF((grpc_error *)(cur & ~CANCELLED_BIT));
+ }
+ break;
}
- } else if (calld->cancel_error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, batch, GRPC_ERROR_REF(calld->cancel_error),
- calld->call_combiner);
- goto done;
}
- // Handle send_initial_metadata.
+
if (batch->send_initial_metadata) {
- GPR_ASSERT(calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN);
bool has_compression_algorithm;
grpc_error *error = process_send_initial_metadata(
exec_ctx, elem,
batch->payload->send_initial_metadata.send_initial_metadata,
&has_compression_algorithm);
if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error,
- calld->call_combiner);
- goto done;
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch,
+ error);
+ return;
}
- calld->send_initial_metadata_state = has_compression_algorithm
- ? HAS_COMPRESSION_ALGORITHM
- : NO_COMPRESSION_ALGORITHM;
- // If we had previously received a batch containing a send_message op,
- // handle it now. Note that we need to re-enter the call combiner
- // for this, since we can't send two batches down while holding the
- // call combiner, since the connected_channel filter (at the bottom of
- // the call stack) will release the call combiner for each batch it sees.
- if (calld->send_message_batch != NULL) {
- GRPC_CALL_COMBINER_START(
- exec_ctx, calld->call_combiner,
- &calld->start_send_message_batch_in_call_combiner, GRPC_ERROR_NONE,
- "starting send_message after send_initial_metadata");
+ gpr_atm cur;
+ retry_send_im:
+ cur = gpr_atm_acq_load(&calld->send_initial_metadata_state);
+ GPR_ASSERT(cur != HAS_COMPRESSION_ALGORITHM &&
+ cur != NO_COMPRESSION_ALGORITHM);
+ if ((cur & CANCELLED_BIT) == 0) {
+ if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur,
+ has_compression_algorithm
+ ? HAS_COMPRESSION_ALGORITHM
+ : NO_COMPRESSION_ALGORITHM)) {
+ goto retry_send_im;
+ }
+ if (cur != INITIAL_METADATA_UNSEEN) {
+ start_send_message_batch(exec_ctx, elem,
+ (grpc_transport_stream_op_batch *)cur,
+ has_compression_algorithm);
+ }
}
}
- // Handle send_message.
if (batch->send_message) {
- GPR_ASSERT(calld->send_message_batch == NULL);
- calld->send_message_batch = batch;
- // If we have not yet seen send_initial_metadata, then we have to
- // wait. We save the batch in calld and then drop the call
- // combiner, which we'll have to pick up again later when we get
- // send_initial_metadata.
- if (calld->send_initial_metadata_state == INITIAL_METADATA_UNSEEN) {
- GRPC_CALL_COMBINER_STOP(
- exec_ctx, calld->call_combiner,
- "send_message batch pending send_initial_metadata");
- goto done;
+ gpr_atm cur;
+ retry_send:
+ cur = gpr_atm_acq_load(&calld->send_initial_metadata_state);
+ switch (cur) {
+ case INITIAL_METADATA_UNSEEN:
+ if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur,
+ (gpr_atm)batch)) {
+ goto retry_send;
+ }
+ break;
+ case HAS_COMPRESSION_ALGORITHM:
+ case NO_COMPRESSION_ALGORITHM:
+ start_send_message_batch(exec_ctx, elem, batch,
+ cur == HAS_COMPRESSION_ALGORITHM);
+ break;
+ default:
+ if (cur & CANCELLED_BIT) {
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, batch,
+ GRPC_ERROR_REF((grpc_error *)(cur & ~CANCELLED_BIT)));
+ } else {
+ /* >1 send_message concurrently */
+ GPR_UNREACHABLE_CODE(break);
+ }
}
- start_send_message_batch(exec_ctx, elem, GRPC_ERROR_NONE);
} else {
- // Pass control down the stack.
+ /* pass control down the stack */
grpc_call_next_op(exec_ctx, elem, batch);
}
-done:
+
GPR_TIMER_END("compress_start_transport_stream_op_batch", 0);
}
@@ -456,16 +458,16 @@ done:
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
- call_data *calld = (call_data *)elem->call_data;
- calld->call_combiner = args->call_combiner;
- calld->cancel_error = GRPC_ERROR_NONE;
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+
+ /* initialize members */
grpc_slice_buffer_init(&calld->slices);
- GRPC_CLOSURE_INIT(&calld->start_send_message_batch_in_call_combiner,
- start_send_message_batch, elem, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->on_send_message_next_done,
on_send_message_next_done, elem, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
elem, grpc_schedule_on_exec_ctx);
+
return GRPC_ERROR_NONE;
}
@@ -473,9 +475,14 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
grpc_closure *ignored) {
+ /* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices);
- GRPC_ERROR_UNREF(calld->cancel_error);
+ gpr_atm imstate =
+ gpr_atm_no_barrier_load(&calld->send_initial_metadata_state);
+ if (imstate & CANCELLED_BIT) {
+ GRPC_ERROR_UNREF((grpc_error *)(imstate & ~CANCELLED_BIT));
+ }
}
/* Constructor for channel_data */
@@ -543,5 +550,6 @@ const grpc_channel_filter grpc_message_compress_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
- "message_compress"};
+ "compress"};
diff --git a/src/core/ext/filters/http/server/http_server_filter.c b/src/core/ext/filters/http/server/http_server_filter.c
index a10e69ba59..b145f12aff 100644
--- a/src/core/ext/filters/http/server/http_server_filter.c
+++ b/src/core/ext/filters/http/server/http_server_filter.c
@@ -32,8 +32,6 @@
#define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1
typedef struct call_data {
- grpc_call_combiner *call_combiner;
-
grpc_linked_mdelem status;
grpc_linked_mdelem content_type;
@@ -283,11 +281,7 @@ static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data,
*calld->pp_recv_message = calld->payload_bin_delivered
? NULL
: (grpc_byte_stream *)&calld->read_stream;
- // Re-enter call combiner for recv_message_ready, since the surface
- // code will release the call combiner for each callback it receives.
- GRPC_CALL_COMBINER_START(exec_ctx, calld->call_combiner,
- calld->recv_message_ready, GRPC_ERROR_REF(err),
- "resuming recv_message_ready from on_complete");
+ GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err));
calld->recv_message_ready = NULL;
calld->payload_bin_delivered = true;
}
@@ -299,20 +293,15 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (calld->seen_path_with_query) {
- // Do nothing. This is probably a GET request, and payload will be
- // returned in hs_on_complete callback.
- // Note that we release the call combiner here, so that other
- // callbacks can run.
- GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner,
- "pausing recv_message_ready until on_complete");
+ /* do nothing. This is probably a GET request, and payload will be returned
+ in hs_on_complete callback. */
} else {
GRPC_CLOSURE_RUN(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err));
}
}
-static grpc_error *hs_mutate_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
+static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
@@ -334,7 +323,10 @@ static grpc_error *hs_mutate_op(grpc_exec_ctx *exec_ctx,
server_filter_outgoing_metadata(
exec_ctx, elem,
op->payload->send_initial_metadata.send_initial_metadata));
- if (error != GRPC_ERROR_NONE) return error;
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
+ return;
+ }
}
if (op->recv_initial_metadata) {
@@ -367,25 +359,21 @@ static grpc_error *hs_mutate_op(grpc_exec_ctx *exec_ctx,
grpc_error *error = server_filter_outgoing_metadata(
exec_ctx, elem,
op->payload->send_trailing_metadata.send_trailing_metadata);
- if (error != GRPC_ERROR_NONE) return error;
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
+ return;
+ }
}
-
- return GRPC_ERROR_NONE;
}
-static void hs_start_transport_stream_op_batch(
- grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
- call_data *calld = elem->call_data;
- GPR_TIMER_BEGIN("hs_start_transport_stream_op_batch", 0);
- grpc_error *error = hs_mutate_op(exec_ctx, elem, op);
- if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error,
- calld->call_combiner);
- } else {
- grpc_call_next_op(exec_ctx, elem, op);
- }
- GPR_TIMER_END("hs_start_transport_stream_op_batch", 0);
+static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op_batch *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ GPR_TIMER_BEGIN("hs_start_transport_op", 0);
+ hs_mutate_op(exec_ctx, elem, op);
+ grpc_call_next_op(exec_ctx, elem, op);
+ GPR_TIMER_END("hs_start_transport_op", 0);
}
/* Constructor for call_data */
@@ -395,7 +383,6 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
/* initialize members */
- calld->call_combiner = args->call_combiner;
GRPC_CLOSURE_INIT(&calld->hs_on_recv, hs_on_recv, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->hs_on_complete, hs_on_complete, elem,
@@ -427,7 +414,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {}
const grpc_channel_filter grpc_http_server_filter = {
- hs_start_transport_stream_op_batch,
+ hs_start_transport_op,
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
@@ -436,5 +423,6 @@ const grpc_channel_filter grpc_http_server_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"http-server"};
diff --git a/src/core/ext/filters/load_reporting/load_reporting_filter.c b/src/core/ext/filters/load_reporting/load_reporting_filter.c
index 17e946937f..08474efb2e 100644
--- a/src/core/ext/filters/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/filters/load_reporting/load_reporting_filter.c
@@ -223,5 +223,6 @@ const grpc_channel_filter grpc_load_reporting_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"load_reporting"};
diff --git a/src/core/ext/filters/max_age/max_age_filter.c b/src/core/ext/filters/max_age/max_age_filter.c
index 16c85a70d0..7d748b9c32 100644
--- a/src/core/ext/filters/max_age/max_age_filter.c
+++ b/src/core/ext/filters/max_age/max_age_filter.c
@@ -391,6 +391,7 @@ const grpc_channel_filter grpc_max_age_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"max_age"};
diff --git a/src/core/ext/filters/message_size/message_size_filter.c b/src/core/ext/filters/message_size/message_size_filter.c
index 47763b1deb..846c7df69a 100644
--- a/src/core/ext/filters/message_size/message_size_filter.c
+++ b/src/core/ext/filters/message_size/message_size_filter.c
@@ -68,7 +68,6 @@ static void* message_size_limits_create_from_json(const grpc_json* json) {
}
typedef struct call_data {
- grpc_call_combiner* call_combiner;
message_size_limits limits;
// Receive closures are chained: we inject this closure as the
// recv_message_ready up-call on transport_stream_op, and remember to
@@ -132,8 +131,7 @@ static void start_transport_stream_op_batch(
exec_ctx, op,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string),
GRPC_ERROR_INT_GRPC_STATUS,
- GRPC_STATUS_RESOURCE_EXHAUSTED),
- calld->call_combiner);
+ GRPC_STATUS_RESOURCE_EXHAUSTED));
gpr_free(message_string);
return;
}
@@ -154,7 +152,6 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
const grpc_call_element_args* args) {
channel_data* chand = (channel_data*)elem->channel_data;
call_data* calld = (call_data*)elem->call_data;
- calld->call_combiner = args->call_combiner;
calld->next_recv_message_ready = NULL;
GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem,
grpc_schedule_on_exec_ctx);
@@ -262,6 +259,7 @@ const grpc_channel_filter grpc_message_size_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"message_size"};
diff --git a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c
index c8b2fe5f99..b4d2cb4b8c 100644
--- a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c
+++ b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c
@@ -177,6 +177,7 @@ const grpc_channel_filter grpc_workaround_cronet_compression_filter = {
0,
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"workaround_cronet_compression"};
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 2f0ac85152..7541bd5c92 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -1370,28 +1370,17 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
"send_initial_metadata_finished");
}
}
- if (op_payload->send_initial_metadata.peer_string != NULL) {
- gpr_atm_rel_store(op_payload->send_initial_metadata.peer_string,
- (gpr_atm)gpr_strdup(t->peer_string));
- }
}
if (op->send_message) {
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
if (s->write_closed) {
- // Return an error unless the client has already received trailing
- // metadata from the server, since an application using a
- // streaming call might send another message before getting a
- // recv_message failure, breaking out of its loop, and then
- // starting recv_trailing_metadata.
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->fetching_send_message_finished,
- t->is_client && s->received_trailing_metadata
- ? GRPC_ERROR_NONE
- : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Attempt to send message after stream was closed",
- &s->write_closed_error, 1),
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Attempt to send message after stream was closed",
+ &s->write_closed_error, 1),
"fetching_send_message_finished");
} else {
GPR_ASSERT(s->fetching_send_message == NULL);
@@ -1477,10 +1466,6 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
op_payload->recv_initial_metadata.recv_initial_metadata;
s->trailing_metadata_available =
op_payload->recv_initial_metadata.trailing_metadata_available;
- if (op_payload->recv_initial_metadata.peer_string != NULL) {
- gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string,
- (gpr_atm)gpr_strdup(t->peer_string));
- }
grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
}
@@ -1839,7 +1824,8 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
}
}
}
- if (s->read_closed && s->frame_storage.length == 0 && !pending_data &&
+ if (s->read_closed && s->frame_storage.length == 0 &&
+ (!pending_data || s->seen_error) &&
s->recv_trailing_metadata_finished != NULL) {
grpc_chttp2_incoming_metadata_buffer_publish(
exec_ctx, &s->metadata_buffer[1], s->recv_trailing_metadata);
@@ -2946,6 +2932,14 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
/*******************************************************************************
+ * INTEGRATION GLUE
+ */
+
+static char *chttp2_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) {
+ return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string);
+}
+
+/*******************************************************************************
* MONITORING
*/
static grpc_endpoint *chttp2_get_endpoint(grpc_exec_ctx *exec_ctx,
@@ -2962,6 +2956,7 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
perform_transport_op,
destroy_stream,
destroy_transport,
+ chttp2_get_peer,
chttp2_get_endpoint};
grpc_transport *grpc_create_chttp2_transport(
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 9fff30d54f..3c41a8958f 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -509,8 +509,6 @@ struct grpc_chttp2_stream {
/** Are we buffering writes on this stream? If yes, we won't become writable
until there's enough queued up in the flow_controlled_buffer */
bool write_buffering;
- /** Has trailing metadata been received. */
- bool received_trailing_metadata;
/** the error that resulted in this stream being read-closed */
grpc_error *read_closed_error;
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 19bd86fd0c..18d163ee98 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -623,7 +623,6 @@ static grpc_error *init_header_frame_parser(grpc_exec_ctx *exec_ctx,
*s->trailing_metadata_available = true;
}
t->hpack_parser.on_header = on_trailing_header;
- s->received_trailing_metadata = true;
} else {
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "parsing initial_metadata"));
t->hpack_parser.on_header = on_initial_header;
@@ -632,7 +631,6 @@ static grpc_error *init_header_frame_parser(grpc_exec_ctx *exec_ctx,
case 1:
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "parsing trailing_metadata"));
t->hpack_parser.on_header = on_trailing_header;
- s->received_trailing_metadata = true;
break;
case 2:
gpr_log(GPR_ERROR, "too many header frames received");
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 09420d92e7..abb558982b 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -1386,6 +1386,10 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {}
+static char *get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
+ return NULL;
+}
+
static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx,
grpc_transport *gt) {
return NULL;
@@ -1404,6 +1408,7 @@ static const grpc_transport_vtable grpc_cronet_vtable = {
perform_op,
destroy_stream,
destroy_transport,
+ get_peer,
get_endpoint};
grpc_transport *grpc_create_cronet_transport(void *engine, const char *target,
diff --git a/src/core/ext/transport/inproc/inproc_transport.c b/src/core/ext/transport/inproc/inproc_transport.c
index b2d6f2d0c9..6f4b429ee2 100644
--- a/src/core/ext/transport/inproc/inproc_transport.c
+++ b/src/core/ext/transport/inproc/inproc_transport.c
@@ -1251,14 +1251,20 @@ static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
// Nothing to do here
}
+static char *get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) {
+ return gpr_strdup("inproc");
+}
+
static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *t) {
return NULL;
}
static const grpc_transport_vtable inproc_vtable = {
- sizeof(inproc_stream), "inproc", init_stream,
- set_pollset, set_pollset_set, perform_stream_op,
- perform_transport_op, destroy_stream, destroy_transport,
+ sizeof(inproc_stream), "inproc",
+ init_stream, set_pollset,
+ set_pollset_set, perform_stream_op,
+ perform_transport_op, destroy_stream,
+ destroy_transport, get_peer,
get_endpoint};
/*******************************************************************************
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 775c8bc667..0f8e33c4be 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -233,10 +233,15 @@ void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
grpc_call_element *next_elem = elem + 1;
- GRPC_CALL_LOG_OP(GPR_INFO, next_elem, op);
next_elem->filter->start_transport_stream_op_batch(exec_ctx, next_elem, op);
}
+char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ grpc_call_element *next_elem = elem + 1;
+ return next_elem->filter->get_peer(exec_ctx, next_elem);
+}
+
void grpc_channel_next_get_info(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
const grpc_channel_info *channel_info) {
@@ -260,3 +265,12 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
return (grpc_call_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE(
sizeof(grpc_call_stack)));
}
+
+void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_error *error) {
+ grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(NULL);
+ op->cancel_stream = true;
+ op->payload->cancel_stream.cancel_error = error;
+ elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op);
+}
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index ae1cac31f7..a80f8aa826 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -40,7 +40,6 @@
#include <grpc/support/time.h>
#include "src/core/lib/debug/trace.h"
-#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/support/arena.h"
#include "src/core/lib/transport/transport.h"
@@ -72,7 +71,6 @@ typedef struct {
gpr_timespec start_time;
gpr_timespec deadline;
gpr_arena *arena;
- grpc_call_combiner *call_combiner;
} grpc_call_element_args;
typedef struct {
@@ -152,6 +150,9 @@ typedef struct {
void (*destroy_channel_elem)(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem);
+ /* Implement grpc_call_get_peer() */
+ char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
+
/* Implement grpc_channel_get_info() */
void (*get_channel_info)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
const grpc_channel_info *channel_info);
@@ -270,6 +271,8 @@ void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
stack */
void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
grpc_transport_op *op);
+/* Pass through a request to get_peer to the next child element */
+char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
/* Pass through a request to get_channel_info() to the next child element */
void grpc_channel_next_get_info(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
@@ -285,6 +288,10 @@ void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
grpc_call_element *elem,
grpc_transport_stream_op_batch *op);
+void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *cur_elem,
+ grpc_error *error);
+
extern grpc_tracer_flag grpc_trace_channel;
#define GRPC_CALL_LOG_OP(sev, elem, op) \
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c
index 8285226fc4..af06ca802e 100644
--- a/src/core/lib/channel/connected_channel.c
+++ b/src/core/lib/channel/connected_channel.c
@@ -36,57 +36,7 @@ typedef struct connected_channel_channel_data {
grpc_transport *transport;
} channel_data;
-typedef struct {
- grpc_closure closure;
- grpc_closure *original_closure;
- grpc_call_combiner *call_combiner;
- const char *reason;
-} callback_state;
-
-typedef struct connected_channel_call_data {
- grpc_call_combiner *call_combiner;
- // Closures used for returning results on the call combiner.
- callback_state on_complete[6]; // Max number of pending batches.
- callback_state recv_initial_metadata_ready;
- callback_state recv_message_ready;
-} call_data;
-
-static void run_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- callback_state *state = (callback_state *)arg;
- GRPC_CALL_COMBINER_START(exec_ctx, state->call_combiner,
- state->original_closure, GRPC_ERROR_REF(error),
- state->reason);
-}
-
-static void run_cancel_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- run_in_call_combiner(exec_ctx, arg, error);
- gpr_free(arg);
-}
-
-static void intercept_callback(call_data *calld, callback_state *state,
- bool free_when_done, const char *reason,
- grpc_closure **original_closure) {
- state->original_closure = *original_closure;
- state->call_combiner = calld->call_combiner;
- state->reason = reason;
- *original_closure = GRPC_CLOSURE_INIT(
- &state->closure,
- free_when_done ? run_cancel_in_call_combiner : run_in_call_combiner,
- state, grpc_schedule_on_exec_ctx);
-}
-
-static callback_state *get_state_for_batch(
- call_data *calld, grpc_transport_stream_op_batch *batch) {
- if (batch->send_initial_metadata) return &calld->on_complete[0];
- if (batch->send_message) return &calld->on_complete[1];
- if (batch->send_trailing_metadata) return &calld->on_complete[2];
- if (batch->recv_initial_metadata) return &calld->on_complete[3];
- if (batch->recv_message) return &calld->on_complete[4];
- if (batch->recv_trailing_metadata) return &calld->on_complete[5];
- GPR_UNREACHABLE_CODE(return NULL);
-}
+typedef struct connected_channel_call_data { void *unused; } call_data;
/* We perform a small hack to locate transport data alongside the connected
channel data in call allocations, to allow everything to be pulled in minimal
@@ -99,38 +49,13 @@ static callback_state *get_state_for_batch(
into transport stream operations */
static void con_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op_batch *batch) {
+ grpc_transport_stream_op_batch *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- if (batch->recv_initial_metadata) {
- callback_state *state = &calld->recv_initial_metadata_ready;
- intercept_callback(
- calld, state, false, "recv_initial_metadata_ready",
- &batch->payload->recv_initial_metadata.recv_initial_metadata_ready);
- }
- if (batch->recv_message) {
- callback_state *state = &calld->recv_message_ready;
- intercept_callback(calld, state, false, "recv_message_ready",
- &batch->payload->recv_message.recv_message_ready);
- }
- if (batch->cancel_stream) {
- // There can be more than one cancellation batch in flight at any
- // given time, so we can't just pick out a fixed index into
- // calld->on_complete like we can for the other ops. However,
- // cancellation isn't in the fast path, so we just allocate a new
- // closure for each one.
- callback_state *state = (callback_state *)gpr_malloc(sizeof(*state));
- intercept_callback(calld, state, true, "on_complete (cancel_stream)",
- &batch->on_complete);
- } else {
- callback_state *state = get_state_for_batch(calld, batch);
- intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
- }
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+
grpc_transport_perform_stream_op(exec_ctx, chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- batch);
- GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner,
- "passed batch to transport");
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
}
static void con_start_transport_op(grpc_exec_ctx *exec_ctx,
@@ -146,7 +71,6 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- calld->call_combiner = args->call_combiner;
int r = grpc_transport_init_stream(
exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld),
&args->call_stack->refcount, args->server_transport_data, args->arena);
@@ -194,6 +118,11 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
}
}
+static char *con_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
+ channel_data *chand = elem->channel_data;
+ return grpc_transport_get_peer(exec_ctx, chand->transport);
+}
+
/* No-op. */
static void con_get_channel_info(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
@@ -209,6 +138,7 @@ const grpc_channel_filter grpc_connected_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ con_get_peer,
con_get_channel_info,
"connected",
};
diff --git a/src/core/lib/iomgr/call_combiner.c b/src/core/lib/iomgr/call_combiner.c
deleted file mode 100644
index 899f98552d..0000000000
--- a/src/core/lib/iomgr/call_combiner.c
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- *
- * Copyright 2017 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "src/core/lib/iomgr/call_combiner.h"
-
-#include <grpc/support/log.h>
-
-grpc_tracer_flag grpc_call_combiner_trace =
- GRPC_TRACER_INITIALIZER(false, "call_combiner");
-
-static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) {
- if (cancel_state & 1) {
- return (grpc_error*)(cancel_state & ~(gpr_atm)1);
- }
- return GRPC_ERROR_NONE;
-}
-
-static gpr_atm encode_cancel_state_error(grpc_error* error) {
- return (gpr_atm)1 | (gpr_atm)error;
-}
-
-void grpc_call_combiner_init(grpc_call_combiner* call_combiner) {
- gpr_mpscq_init(&call_combiner->queue);
-}
-
-void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) {
- gpr_mpscq_destroy(&call_combiner->queue);
- GRPC_ERROR_UNREF(decode_cancel_state_error(call_combiner->cancel_state));
-}
-
-#ifndef NDEBUG
-#define DEBUG_ARGS , const char *file, int line
-#define DEBUG_FMT_STR "%s:%d: "
-#define DEBUG_FMT_ARGS , file, line
-#else
-#define DEBUG_ARGS
-#define DEBUG_FMT_STR
-#define DEBUG_FMT_ARGS
-#endif
-
-void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner,
- grpc_closure* closure,
- grpc_error* error DEBUG_ARGS,
- const char* reason) {
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG,
- "==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR
- "%s] error=%s",
- call_combiner, closure DEBUG_FMT_ARGS, reason,
- grpc_error_string(error));
- }
- size_t prev_size =
- (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1);
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
- prev_size + 1);
- }
- if (prev_size == 0) {
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG, " EXECUTING IMMEDIATELY");
- }
- // Queue was empty, so execute this closure immediately.
- GRPC_CLOSURE_SCHED(exec_ctx, closure, error);
- } else {
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_INFO, " QUEUING");
- }
- // Queue was not empty, so add closure to queue.
- closure->error_data.error = error;
- gpr_mpscq_push(&call_combiner->queue, (gpr_mpscq_node*)closure);
- }
-}
-
-void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner DEBUG_ARGS,
- const char* reason) {
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG,
- "==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]",
- call_combiner DEBUG_FMT_ARGS, reason);
- }
- size_t prev_size =
- (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1);
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
- prev_size - 1);
- }
- GPR_ASSERT(prev_size >= 1);
- if (prev_size > 1) {
- while (true) {
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG, " checking queue");
- }
- bool empty;
- grpc_closure* closure = (grpc_closure*)gpr_mpscq_pop_and_check_end(
- &call_combiner->queue, &empty);
- if (closure == NULL) {
- // This can happen either due to a race condition within the mpscq
- // code or because of a race with grpc_call_combiner_start().
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG, " queue returned no result; checking again");
- }
- continue;
- }
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG, " EXECUTING FROM QUEUE: closure=%p error=%s",
- closure, grpc_error_string(closure->error_data.error));
- }
- GRPC_CLOSURE_SCHED(exec_ctx, closure, closure->error_data.error);
- break;
- }
- } else if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG, " queue empty");
- }
-}
-
-void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner,
- grpc_closure* closure) {
- while (true) {
- // Decode original state.
- gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
- grpc_error* original_error = decode_cancel_state_error(original_state);
- // If error is set, invoke the cancellation closure immediately.
- // Otherwise, store the new closure.
- if (original_error != GRPC_ERROR_NONE) {
- GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_REF(original_error));
- break;
- } else {
- if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
- (gpr_atm)closure)) {
- break;
- }
- }
- // cas failed, try again.
- }
-}
-
-void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner,
- grpc_error* error) {
- while (true) {
- gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
- grpc_error* original_error = decode_cancel_state_error(original_state);
- if (original_error != GRPC_ERROR_NONE) {
- GRPC_ERROR_UNREF(error);
- break;
- }
- if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
- encode_cancel_state_error(error))) {
- if (original_state != 0) {
- grpc_closure* notify_on_cancel = (grpc_closure*)original_state;
- if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
- gpr_log(GPR_DEBUG,
- "call_combiner=%p: scheduling notify_on_cancel callback=%p",
- call_combiner, notify_on_cancel);
- }
- GRPC_CLOSURE_SCHED(exec_ctx, notify_on_cancel, GRPC_ERROR_REF(error));
- }
- break;
- }
- // cas failed, try again.
- }
-}
diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h
deleted file mode 100644
index 621e2c3669..0000000000
--- a/src/core/lib/iomgr/call_combiner.h
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- *
- * Copyright 2017 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H
-#define GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H
-
-#include <stddef.h>
-
-#include <grpc/support/atm.h>
-
-#include "src/core/lib/iomgr/closure.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
-#include "src/core/lib/support/mpscq.h"
-
-// A simple, lock-free mechanism for serializing activity related to a
-// single call. This is similar to a combiner but is more lightweight.
-//
-// It requires the callback (or, in the common case where the callback
-// actually kicks off a chain of callbacks, the last callback in that
-// chain) to explicitly indicate (by calling GRPC_CALL_COMBINER_STOP())
-// when it is done with the action that was kicked off by the original
-// callback.
-
-extern grpc_tracer_flag grpc_call_combiner_trace;
-
-typedef struct {
- gpr_atm size; // size_t, num closures in queue or currently executing
- gpr_mpscq queue;
- // Either 0 (if not cancelled and no cancellation closure set),
- // a grpc_closure* (if the lowest bit is 0),
- // or a grpc_error* (if the lowest bit is 1).
- gpr_atm cancel_state;
-} grpc_call_combiner;
-
-// Assumes memory was initialized to zero.
-void grpc_call_combiner_init(grpc_call_combiner* call_combiner);
-
-void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner);
-
-#ifndef NDEBUG
-#define GRPC_CALL_COMBINER_START(exec_ctx, call_combiner, closure, error, \
- reason) \
- grpc_call_combiner_start((exec_ctx), (call_combiner), (closure), (error), \
- __FILE__, __LINE__, (reason))
-#define GRPC_CALL_COMBINER_STOP(exec_ctx, call_combiner, reason) \
- grpc_call_combiner_stop((exec_ctx), (call_combiner), __FILE__, __LINE__, \
- (reason))
-/// Starts processing \a closure on \a call_combiner.
-void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner,
- grpc_closure* closure, grpc_error* error,
- const char* file, int line, const char* reason);
-/// Yields the call combiner to the next closure in the queue, if any.
-void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner,
- const char* file, int line, const char* reason);
-#else
-#define GRPC_CALL_COMBINER_START(exec_ctx, call_combiner, closure, error, \
- reason) \
- grpc_call_combiner_start((exec_ctx), (call_combiner), (closure), (error), \
- (reason))
-#define GRPC_CALL_COMBINER_STOP(exec_ctx, call_combiner, reason) \
- grpc_call_combiner_stop((exec_ctx), (call_combiner), (reason))
-/// Starts processing \a closure on \a call_combiner.
-void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner,
- grpc_closure* closure, grpc_error* error,
- const char* reason);
-/// Yields the call combiner to the next closure in the queue, if any.
-void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner,
- const char* reason);
-#endif
-
-/// Tells \a call_combiner to invoke \a closure when
-/// grpc_call_combiner_cancel() is called. If grpc_call_combiner_cancel()
-/// was previously called, \a closure will be invoked immediately.
-/// If \a closure is NULL, then no closure will be invoked on
-/// cancellation; this effectively unregisters the previously set closure.
-void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner,
- grpc_closure* closure);
-
-/// Indicates that the call has been cancelled.
-void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx,
- grpc_call_combiner* call_combiner,
- grpc_error* error);
-
-#endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */
diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c
index e3f0163a6c..531a88434f 100644
--- a/src/core/lib/security/transport/client_auth_filter.c
+++ b/src/core/lib/security/transport/client_auth_filter.c
@@ -39,7 +39,6 @@
/* We can have a per-call credentials. */
typedef struct {
- grpc_call_combiner *call_combiner;
grpc_call_credentials *creds;
bool have_host;
bool have_method;
@@ -50,11 +49,17 @@ typedef struct {
pollset_set so that work can progress when this call wants work to progress
*/
grpc_polling_entity *pollent;
+ gpr_atm security_context_set;
+ gpr_mu security_context_mu;
grpc_credentials_mdelem_array md_array;
grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT];
grpc_auth_metadata_context auth_md_context;
- grpc_closure async_cancel_closure;
- grpc_closure async_result_closure;
+ grpc_closure closure;
+ // Either 0 (no cancellation and no async operation in flight),
+ // a grpc_closure* (if the lowest bit is 0),
+ // or a grpc_error* (if the lowest bit is 1).
+ gpr_atm cancellation_state;
+ grpc_closure cancel_closure;
} call_data;
/* We can have a per-channel credentials. */
@@ -63,6 +68,43 @@ typedef struct {
grpc_auth_context *auth_context;
} channel_data;
+static void decode_cancel_state(gpr_atm cancel_state, grpc_closure **func,
+ grpc_error **error) {
+ // If the lowest bit is 1, the value is a grpc_error*.
+ // Otherwise, if non-zdero, the value is a grpc_closure*.
+ if (cancel_state & 1) {
+ *error = (grpc_error *)(cancel_state & ~(gpr_atm)1);
+ } else if (cancel_state != 0) {
+ *func = (grpc_closure *)cancel_state;
+ }
+}
+
+static gpr_atm encode_cancel_state_error(grpc_error *error) {
+ // Set the lowest bit to 1 to indicate that it's an error.
+ return (gpr_atm)1 | (gpr_atm)error;
+}
+
+// Returns an error if the call has been cancelled. Otherwise, sets the
+// cancellation function to be called upon cancellation.
+static grpc_error *set_cancel_func(grpc_call_element *elem,
+ grpc_iomgr_cb_func func) {
+ call_data *calld = (call_data *)elem->call_data;
+ // Decode original state.
+ gpr_atm original_state = gpr_atm_acq_load(&calld->cancellation_state);
+ grpc_error *original_error = GRPC_ERROR_NONE;
+ grpc_closure *original_func = NULL;
+ decode_cancel_state(original_state, &original_func, &original_error);
+ // If error is set, return it.
+ if (original_error != GRPC_ERROR_NONE) return GRPC_ERROR_REF(original_error);
+ // Otherwise, store func.
+ GRPC_CLOSURE_INIT(&calld->cancel_closure, func, elem,
+ grpc_schedule_on_exec_ctx);
+ GPR_ASSERT(((gpr_atm)&calld->cancel_closure & (gpr_atm)1) == 0);
+ gpr_atm_rel_store(&calld->cancellation_state,
+ (gpr_atm)&calld->cancel_closure);
+ return GRPC_ERROR_NONE;
+}
+
static void reset_auth_metadata_context(
grpc_auth_metadata_context *auth_md_context) {
if (auth_md_context->service_url != NULL) {
@@ -93,7 +135,6 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *arg,
grpc_transport_stream_op_batch *batch = (grpc_transport_stream_op_batch *)arg;
grpc_call_element *elem = batch->handler_private.extra_arg;
call_data *calld = elem->call_data;
- grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, NULL);
reset_auth_metadata_context(&calld->auth_md_context);
grpc_error *error = GRPC_ERROR_REF(input_error);
if (error == GRPC_ERROR_NONE) {
@@ -112,8 +153,7 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *arg,
} else {
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAUTHENTICATED);
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error,
- calld->call_combiner);
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error);
}
}
@@ -183,8 +223,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Incompatible credentials set on channel and call."),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED),
- calld->call_combiner);
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED));
return;
}
} else {
@@ -195,23 +234,22 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
build_auth_metadata_context(&chand->security_connector->base,
chand->auth_context, calld);
+ grpc_error *cancel_error = set_cancel_func(elem, cancel_get_request_metadata);
+ if (cancel_error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch,
+ cancel_error);
+ return;
+ }
GPR_ASSERT(calld->pollent != NULL);
-
- GRPC_CLOSURE_INIT(&calld->async_result_closure, on_credentials_metadata,
- batch, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&calld->closure, on_credentials_metadata, batch,
+ grpc_schedule_on_exec_ctx);
grpc_error *error = GRPC_ERROR_NONE;
if (grpc_call_credentials_get_request_metadata(
exec_ctx, calld->creds, calld->pollent, calld->auth_md_context,
- &calld->md_array, &calld->async_result_closure, &error)) {
+ &calld->md_array, &calld->closure, &error)) {
// Synchronous return; invoke on_credentials_metadata() directly.
on_credentials_metadata(exec_ctx, batch, error);
GRPC_ERROR_UNREF(error);
- } else {
- // Async return; register cancellation closure with call combiner.
- GRPC_CLOSURE_INIT(&calld->async_cancel_closure, cancel_get_request_metadata,
- elem, grpc_schedule_on_exec_ctx);
- grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner,
- &calld->async_cancel_closure);
}
}
@@ -220,7 +258,7 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_transport_stream_op_batch *batch = (grpc_transport_stream_op_batch *)arg;
grpc_call_element *elem = batch->handler_private.extra_arg;
call_data *calld = elem->call_data;
- grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, NULL);
+
if (error == GRPC_ERROR_NONE) {
send_security_metadata(exec_ctx, elem, batch);
} else {
@@ -233,8 +271,7 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *arg,
exec_ctx, batch,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg),
GRPC_ERROR_INT_GRPC_STATUS,
- GRPC_STATUS_UNAUTHENTICATED),
- calld->call_combiner);
+ GRPC_STATUS_UNAUTHENTICATED));
gpr_free(error_msg);
}
}
@@ -245,7 +282,7 @@ static void cancel_check_call_host(grpc_exec_ctx *exec_ctx, void *arg,
call_data *calld = (call_data *)elem->call_data;
channel_data *chand = (channel_data *)elem->channel_data;
grpc_channel_security_connector_cancel_check_call_host(
- exec_ctx, chand->security_connector, &calld->async_result_closure,
+ exec_ctx, chand->security_connector, &calld->closure,
GRPC_ERROR_REF(error));
}
@@ -258,19 +295,52 @@ static void auth_start_transport_stream_op_batch(
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- if (!batch->cancel_stream) {
- GPR_ASSERT(batch->payload->context != NULL);
- if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) {
- batch->payload->context[GRPC_CONTEXT_SECURITY].value =
- grpc_client_security_context_create();
- batch->payload->context[GRPC_CONTEXT_SECURITY].destroy =
- grpc_client_security_context_destroy;
+ if (batch->cancel_stream) {
+ while (true) {
+ // Decode the original cancellation state.
+ gpr_atm original_state = gpr_atm_acq_load(&calld->cancellation_state);
+ grpc_error *cancel_error = GRPC_ERROR_NONE;
+ grpc_closure *func = NULL;
+ decode_cancel_state(original_state, &func, &cancel_error);
+ // If we had already set a cancellation error, there's nothing
+ // more to do.
+ if (cancel_error != GRPC_ERROR_NONE) break;
+ // If there's a cancel func, call it.
+ // Note that even if the cancel func has been changed by some
+ // other thread between when we decoded it and now, it will just
+ // be a no-op.
+ cancel_error = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
+ if (func != NULL) {
+ GRPC_CLOSURE_SCHED(exec_ctx, func, GRPC_ERROR_REF(cancel_error));
+ }
+ // Encode the new error into cancellation state.
+ if (gpr_atm_full_cas(&calld->cancellation_state, original_state,
+ encode_cancel_state_error(cancel_error))) {
+ break; // Success.
+ }
+ // The cas failed, so try again.
+ }
+ } else {
+ /* double checked lock over security context to ensure it's set once */
+ if (gpr_atm_acq_load(&calld->security_context_set) == 0) {
+ gpr_mu_lock(&calld->security_context_mu);
+ if (gpr_atm_acq_load(&calld->security_context_set) == 0) {
+ GPR_ASSERT(batch->payload->context != NULL);
+ if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) {
+ batch->payload->context[GRPC_CONTEXT_SECURITY].value =
+ grpc_client_security_context_create();
+ batch->payload->context[GRPC_CONTEXT_SECURITY].destroy =
+ grpc_client_security_context_destroy;
+ }
+ grpc_client_security_context *sec_ctx =
+ batch->payload->context[GRPC_CONTEXT_SECURITY].value;
+ GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter");
+ sec_ctx->auth_context =
+ GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter");
+ gpr_atm_rel_store(&calld->security_context_set, 1);
+ }
+ gpr_mu_unlock(&calld->security_context_mu);
}
- grpc_client_security_context *sec_ctx =
- batch->payload->context[GRPC_CONTEXT_SECURITY].value;
- GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter");
- sec_ctx->auth_context =
- GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter");
}
if (batch->send_initial_metadata) {
@@ -295,25 +365,26 @@ static void auth_start_transport_stream_op_batch(
}
}
if (calld->have_host) {
- batch->handler_private.extra_arg = elem;
- GRPC_CLOSURE_INIT(&calld->async_result_closure, on_host_checked, batch,
- grpc_schedule_on_exec_ctx);
- char *call_host = grpc_slice_to_c_string(calld->host);
- grpc_error *error = GRPC_ERROR_NONE;
- if (grpc_channel_security_connector_check_call_host(
- exec_ctx, chand->security_connector, call_host,
- chand->auth_context, &calld->async_result_closure, &error)) {
- // Synchronous return; invoke on_host_checked() directly.
- on_host_checked(exec_ctx, batch, error);
- GRPC_ERROR_UNREF(error);
+ grpc_error *cancel_error = set_cancel_func(elem, cancel_check_call_host);
+ if (cancel_error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch,
+ cancel_error);
} else {
- // Async return; register cancellation closure with call combiner.
- GRPC_CLOSURE_INIT(&calld->async_cancel_closure, cancel_check_call_host,
- elem, grpc_schedule_on_exec_ctx);
- grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner,
- &calld->async_cancel_closure);
+ char *call_host = grpc_slice_to_c_string(calld->host);
+ batch->handler_private.extra_arg = elem;
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (grpc_channel_security_connector_check_call_host(
+ exec_ctx, chand->security_connector, call_host,
+ chand->auth_context,
+ GRPC_CLOSURE_INIT(&calld->closure, on_host_checked, batch,
+ grpc_schedule_on_exec_ctx),
+ &error)) {
+ // Synchronous return; invoke on_host_checked() directly.
+ on_host_checked(exec_ctx, batch, error);
+ GRPC_ERROR_UNREF(error);
+ }
+ gpr_free(call_host);
}
- gpr_free(call_host);
GPR_TIMER_END("auth_start_transport_stream_op_batch", 0);
return; /* early exit */
}
@@ -329,7 +400,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
- calld->call_combiner = args->call_combiner;
+ memset(calld, 0, sizeof(*calld));
+ gpr_mu_init(&calld->security_context_mu);
return GRPC_ERROR_NONE;
}
@@ -354,6 +426,12 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_slice_unref_internal(exec_ctx, calld->method);
}
reset_auth_metadata_context(&calld->auth_md_context);
+ gpr_mu_destroy(&calld->security_context_mu);
+ gpr_atm cancel_state = gpr_atm_acq_load(&calld->cancellation_state);
+ grpc_error *cancel_error = GRPC_ERROR_NONE;
+ grpc_closure *cancel_func = NULL;
+ decode_cancel_state(cancel_state, &cancel_func, &cancel_error);
+ GRPC_ERROR_UNREF(cancel_error);
}
/* Constructor for channel_data */
@@ -412,5 +490,6 @@ const grpc_channel_filter grpc_client_auth_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"client-auth"};
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index b721ce4a22..9bf3f0ca0f 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -26,15 +26,7 @@
#include "src/core/lib/security/transport/auth_filters.h"
#include "src/core/lib/slice/slice_internal.h"
-typedef enum {
- STATE_INIT = 0,
- STATE_DONE,
- STATE_CANCELLED,
-} async_state;
-
typedef struct call_data {
- grpc_call_combiner *call_combiner;
- grpc_call_stack *owning_call;
grpc_transport_stream_op_batch *recv_initial_metadata_batch;
grpc_closure *original_recv_initial_metadata_ready;
grpc_closure recv_initial_metadata_ready;
@@ -42,8 +34,6 @@ typedef struct call_data {
const grpc_metadata *consumed_md;
size_t num_consumed_md;
grpc_auth_context *auth_context;
- grpc_closure cancel_closure;
- gpr_atm state; // async_state
} call_data;
typedef struct channel_data {
@@ -88,92 +78,54 @@ static grpc_filtered_mdelem remove_consumed_md(grpc_exec_ctx *exec_ctx,
return GRPC_FILTERED_MDELEM(md);
}
-static void on_md_processing_done_inner(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- const grpc_metadata *consumed_md,
- size_t num_consumed_md,
- const grpc_metadata *response_md,
- size_t num_response_md,
- grpc_error *error) {
+/* called from application code */
+static void on_md_processing_done(
+ void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
+ const grpc_metadata *response_md, size_t num_response_md,
+ grpc_status_code status, const char *error_details) {
+ grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch;
- grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, NULL);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
/* TODO(jboeuf): Implement support for response_md. */
if (response_md != NULL && num_response_md > 0) {
gpr_log(GPR_INFO,
"response_md in auth metadata processing not supported for now. "
"Ignoring...");
}
- if (error == GRPC_ERROR_NONE) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (status == GRPC_STATUS_OK) {
calld->consumed_md = consumed_md;
calld->num_consumed_md = num_consumed_md;
error = grpc_metadata_batch_filter(
- exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata,
+ &exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata,
remove_consumed_md, elem, "Response metadata filtering error");
- }
- GRPC_CLOSURE_SCHED(exec_ctx, calld->original_recv_initial_metadata_ready,
- error);
-}
-
-// Called from application code.
-static void on_md_processing_done(
- void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
- const grpc_metadata *response_md, size_t num_response_md,
- grpc_status_code status, const char *error_details) {
- grpc_call_element *elem = user_data;
- call_data *calld = elem->call_data;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- // If the call was not cancelled while we were in flight, process the result.
- if (gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT,
- (gpr_atm)STATE_DONE)) {
- grpc_error *error = GRPC_ERROR_NONE;
- if (status != GRPC_STATUS_OK) {
- if (error_details == NULL) {
- error_details = "Authentication metadata processing failed.";
- }
- error = grpc_error_set_int(
- GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details),
- GRPC_ERROR_INT_GRPC_STATUS, status);
+ } else {
+ if (error_details == NULL) {
+ error_details = "Authentication metadata processing failed.";
}
- on_md_processing_done_inner(&exec_ctx, elem, consumed_md, num_consumed_md,
- response_md, num_response_md, error);
+ error =
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details),
+ GRPC_ERROR_INT_GRPC_STATUS, status);
}
- // Clean up.
for (size_t i = 0; i < calld->md.count; i++) {
grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key);
grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value);
}
grpc_metadata_array_destroy(&calld->md);
- GRPC_CALL_STACK_UNREF(&exec_ctx, calld->owning_call, "server_auth_metadata");
+ GRPC_CLOSURE_SCHED(&exec_ctx, calld->original_recv_initial_metadata_ready,
+ error);
grpc_exec_ctx_finish(&exec_ctx);
}
-static void cancel_call(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- grpc_call_element *elem = (grpc_call_element *)arg;
- call_data *calld = elem->call_data;
- // If the result was not already processed, invoke the callback now.
- if (gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT,
- (gpr_atm)STATE_CANCELLED)) {
- on_md_processing_done_inner(exec_ctx, elem, NULL, 0, NULL, 0,
- GRPC_ERROR_REF(error));
- }
-}
-
static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- grpc_call_element *elem = (grpc_call_element *)arg;
+ grpc_call_element *elem = arg;
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch;
if (error == GRPC_ERROR_NONE) {
if (chand->creds != NULL && chand->creds->processor.process != NULL) {
- // We're calling out to the application, so we need to make sure
- // to drop the call combiner early if we get cancelled.
- GRPC_CLOSURE_INIT(&calld->cancel_closure, cancel_call, elem,
- grpc_schedule_on_exec_ctx);
- grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner,
- &calld->cancel_closure);
- GRPC_CALL_STACK_REF(calld->owning_call, "server_auth_metadata");
calld->md = metadata_batch_to_md_array(
batch->payload->recv_initial_metadata.recv_initial_metadata);
chand->creds->processor.process(
@@ -207,8 +159,6 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- calld->call_combiner = args->call_combiner;
- calld->owning_call = args->call_stack;
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
@@ -268,5 +218,6 @@ const grpc_channel_filter grpc_server_auth_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"server-auth"};
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 03eaaf99ac..e68c201134 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -122,7 +122,6 @@ typedef struct batch_control {
bool is_closure;
} notify_tag;
} completion_data;
- grpc_closure start_batch;
grpc_closure finish_batch;
gpr_refcount steps_to_complete;
@@ -152,7 +151,6 @@ typedef struct {
struct grpc_call {
gpr_refcount ext_ref;
gpr_arena *arena;
- grpc_call_combiner call_combiner;
grpc_completion_queue *cq;
grpc_polling_entity pollent;
grpc_channel *channel;
@@ -186,11 +184,6 @@ struct grpc_call {
Element 0 is initial metadata, element 1 is trailing metadata. */
grpc_metadata_array *buffered_metadata[2];
- grpc_metadata compression_md;
-
- // A char* indicating the peer name.
- gpr_atm peer_string;
-
/* Packed received call statuses from various sources */
gpr_atm status[STATUS_SOURCE_COUNT];
@@ -269,9 +262,8 @@ grpc_tracer_flag grpc_compression_trace =
#define CALL_FROM_TOP_ELEM(top_elem) \
CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
-static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call,
- grpc_transport_stream_op_batch *op,
- grpc_closure *start_batch_closure);
+static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_transport_stream_op_batch *op);
static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_status_code status,
const char *description);
@@ -336,7 +328,6 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
sizeof(grpc_call) + channel_stack->call_stack_size);
gpr_ref_init(&call->ext_ref, 1);
call->arena = arena;
- grpc_call_combiner_init(&call->call_combiner);
*out_call = call;
call->channel = args->channel;
call->cq = args->cq;
@@ -445,8 +436,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
.path = path,
.start_time = call->start_time,
.deadline = send_deadline,
- .arena = call->arena,
- .call_combiner = &call->call_combiner};
+ .arena = call->arena};
add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1,
destroy_call, call, &call_args));
if (error != GRPC_ERROR_NONE) {
@@ -513,8 +503,6 @@ static void release_call(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) {
grpc_call *c = call;
grpc_channel *channel = c->channel;
- grpc_call_combiner_destroy(&c->call_combiner);
- gpr_free((char *)c->peer_string);
grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
}
@@ -614,37 +602,30 @@ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
return GRPC_CALL_OK;
}
-// This is called via the call combiner to start sending a batch down
-// the filter stack.
-static void execute_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *ignored) {
- grpc_transport_stream_op_batch *batch = arg;
- grpc_call *call = batch->handler_private.extra_arg;
- GPR_TIMER_BEGIN("execute_batch", 0);
- grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
- GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
- elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch);
- GPR_TIMER_END("execute_batch", 0);
-}
-
-// start_batch_closure points to a caller-allocated closure to be used
-// for entering the call combiner.
-static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call,
- grpc_transport_stream_op_batch *batch,
- grpc_closure *start_batch_closure) {
- batch->handler_private.extra_arg = call;
- GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
- grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(exec_ctx, &call->call_combiner, start_batch_closure,
- GRPC_ERROR_NONE, "executing batch");
+static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_transport_stream_op_batch *op) {
+ grpc_call_element *elem;
+
+ GPR_TIMER_BEGIN("execute_op", 0);
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op);
+ GPR_TIMER_END("execute_op", 0);
}
char *grpc_call_get_peer(grpc_call *call) {
- char *peer_string = (char *)gpr_atm_acq_load(&call->peer_string);
- if (peer_string != NULL) return gpr_strdup(peer_string);
- peer_string = grpc_channel_get_target(call->channel);
- if (peer_string != NULL) return peer_string;
- return gpr_strdup("unknown");
+ grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ char *result;
+ GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call));
+ result = elem->filter->get_peer(&exec_ctx, elem);
+ if (result == NULL) {
+ result = grpc_channel_get_target(call->channel);
+ }
+ if (result == NULL) {
+ result = gpr_strdup("unknown");
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+ return result;
}
grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
@@ -671,41 +652,20 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
return GRPC_CALL_OK;
}
-typedef struct {
- grpc_call *call;
- grpc_closure start_batch;
- grpc_closure finish_batch;
-} cancel_state;
-
-// The on_complete callback used when sending a cancel_stream batch down
-// the filter stack. Yields the call combiner when the batch is done.
-static void done_termination(grpc_exec_ctx *exec_ctx, void *arg,
+static void done_termination(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) {
- cancel_state *state = (cancel_state *)arg;
- GRPC_CALL_COMBINER_STOP(exec_ctx, &state->call->call_combiner,
- "on_complete for cancel_stream op");
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, state->call, "termination");
- gpr_free(state);
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "termination");
}
static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_error *error) {
GRPC_CALL_INTERNAL_REF(c, "termination");
- // Inform the call combiner of the cancellation, so that it can cancel
- // any in-flight asynchronous actions that may be holding the call
- // combiner. This ensures that the cancel_stream batch can be sent
- // down the filter stack in a timely manner.
- grpc_call_combiner_cancel(exec_ctx, &c->call_combiner, GRPC_ERROR_REF(error));
set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
- cancel_state *state = (cancel_state *)gpr_malloc(sizeof(*state));
- state->call = c;
- GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
- grpc_schedule_on_exec_ctx);
- grpc_transport_stream_op_batch *op =
- grpc_make_transport_stream_op(&state->finish_batch);
+ grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(
+ GRPC_CLOSURE_CREATE(done_termination, c, grpc_schedule_on_exec_ctx));
op->cancel_stream = true;
op->payload->cancel_stream.cancel_error = error;
- execute_batch(exec_ctx, c, op, &state->start_batch);
+ execute_op(exec_ctx, c, op);
}
static grpc_error *error_from_status(grpc_status_code status,
@@ -1471,18 +1431,6 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
}
}
-// The recv_message_ready callback used when sending a batch containing
-// a recv_message op down the filter stack. Yields the call combiner
-// before processing the received message.
-static void receiving_stream_ready_in_call_combiner(grpc_exec_ctx *exec_ctx,
- void *bctlp,
- grpc_error *error) {
- batch_control *bctl = bctlp;
- grpc_call *call = bctl->call;
- GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "recv_message_ready");
- receiving_stream_ready(exec_ctx, bctlp, error);
-}
-
static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
grpc_call *call = bctl->call;
@@ -1589,9 +1537,6 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
- GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner,
- "recv_initial_metadata_ready");
-
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch *md =
@@ -1645,8 +1590,7 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
batch_control *bctl = bctlp;
- grpc_call *call = bctl->call;
- GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "on_complete");
+
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
finish_batch_step(exec_ctx, bctl);
}
@@ -1666,6 +1610,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
int num_completion_callbacks_needed = 1;
grpc_call_error error = GRPC_CALL_OK;
+ // sent_initial_metadata guards against variable reuse.
+ grpc_metadata compression_md;
+
GPR_TIMER_BEGIN("grpc_call_start_batch", 0);
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
@@ -1713,7 +1660,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
goto done_with_error;
}
/* process compression level */
- memset(&call->compression_md, 0, sizeof(call->compression_md));
+ memset(&compression_md, 0, sizeof(compression_md));
size_t additional_metadata_count = 0;
grpc_compression_level effective_compression_level =
GRPC_COMPRESS_LEVEL_NONE;
@@ -1751,9 +1698,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
const grpc_stream_compression_algorithm calgo =
stream_compression_algorithm_for_level_locked(
call, effective_stream_compression_level);
- call->compression_md.key =
+ compression_md.key =
GRPC_MDSTR_GRPC_INTERNAL_STREAM_ENCODING_REQUEST;
- call->compression_md.value =
+ compression_md.value =
grpc_stream_compression_algorithm_slice(calgo);
} else {
const grpc_compression_algorithm calgo =
@@ -1761,10 +1708,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call, effective_compression_level);
/* the following will be picked up by the compress filter and used
* as the call's compression algorithm. */
- call->compression_md.key =
- GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
- call->compression_md.value =
- grpc_compression_algorithm_slice(calgo);
+ compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
+ compression_md.value = grpc_compression_algorithm_slice(calgo);
additional_metadata_count++;
}
}
@@ -1779,7 +1724,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (!prepare_application_metadata(
exec_ctx, call, (int)op->data.send_initial_metadata.count,
op->data.send_initial_metadata.metadata, 0, call->is_client,
- &call->compression_md, (int)additional_metadata_count)) {
+ &compression_md, (int)additional_metadata_count)) {
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
@@ -1791,10 +1736,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
op->flags;
- if (call->is_client) {
- stream_op_payload->send_initial_metadata.peer_string =
- &call->peer_string;
- }
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {
@@ -1927,10 +1868,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
&call->receiving_initial_metadata_ready;
- if (!call->is_client) {
- stream_op_payload->recv_initial_metadata.peer_string =
- &call->peer_string;
- }
num_completion_callbacks_needed++;
break;
case GRPC_OP_RECV_MESSAGE:
@@ -1947,9 +1884,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
stream_op->recv_message = true;
call->receiving_buffer = op->data.recv_message.recv_message;
stream_op_payload->recv_message.recv_message = &call->receiving_stream;
- GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
- receiving_stream_ready_in_call_combiner, bctl,
- grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&call->receiving_stream_ready, receiving_stream_ready,
+ bctl, grpc_schedule_on_exec_ctx);
stream_op_payload->recv_message.recv_message_ready =
&call->receiving_stream_ready;
num_completion_callbacks_needed++;
@@ -2019,7 +1955,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
stream_op->on_complete = &bctl->finish_batch;
gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
- execute_batch(exec_ctx, call, stream_op, &bctl->start_batch);
+ execute_op(exec_ctx, call, stream_op);
done:
GPR_TIMER_END("grpc_call_start_batch", 0);
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 280315036f..898476daee 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -31,7 +31,6 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/http/parser.h"
-#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
@@ -130,7 +129,6 @@ void grpc_init(void) {
grpc_register_tracer(&grpc_trace_channel_stack_builder);
grpc_register_tracer(&grpc_http1_trace);
grpc_register_tracer(&grpc_cq_pluck_trace); // default on
- grpc_register_tracer(&grpc_call_combiner_trace);
grpc_register_tracer(&grpc_combiner_trace);
grpc_register_tracer(&grpc_server_channel_trace);
grpc_register_tracer(&grpc_bdp_estimator_trace);
diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc
index 6286f9159d..a0791080a9 100644
--- a/src/core/lib/surface/lame_client.cc
+++ b/src/core/lib/surface/lame_client.cc
@@ -40,7 +40,6 @@ namespace grpc_core {
namespace {
struct CallData {
- grpc_call_combiner *call_combiner;
grpc_linked_mdelem status;
grpc_linked_mdelem details;
grpc_core::atomic<bool> filled_metadata;
@@ -53,14 +52,14 @@ struct ChannelData {
static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *mdb) {
- CallData *calld = reinterpret_cast<CallData *>(elem->call_data);
+ CallData *calld = static_cast<CallData *>(elem->call_data);
bool expected = false;
if (!calld->filled_metadata.compare_exchange_strong(
expected, true, grpc_core::memory_order_relaxed,
grpc_core::memory_order_relaxed)) {
return;
}
- ChannelData *chand = reinterpret_cast<ChannelData *>(elem->channel_data);
+ ChannelData *chand = static_cast<ChannelData *>(elem->channel_data);
char tmp[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(chand->error_code, tmp);
calld->status.md = grpc_mdelem_from_slices(
@@ -80,7 +79,6 @@ static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
static void lame_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
- CallData *calld = reinterpret_cast<CallData *>(elem->call_data);
if (op->recv_initial_metadata) {
fill_metadata(exec_ctx, elem,
op->payload->recv_initial_metadata.recv_initial_metadata);
@@ -89,8 +87,12 @@ static void lame_start_transport_stream_op_batch(
op->payload->recv_trailing_metadata.recv_trailing_metadata);
}
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op, GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"),
- calld->call_combiner);
+ exec_ctx, op,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"));
+}
+
+static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
+ return NULL;
}
static void lame_get_channel_info(grpc_exec_ctx *exec_ctx,
@@ -120,8 +122,6 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
- CallData *calld = reinterpret_cast<CallData *>(elem->call_data);
- calld->call_combiner = args->call_combiner;
return GRPC_ERROR_NONE;
}
@@ -156,6 +156,7 @@ extern "C" const grpc_channel_filter grpc_lame_filter = {
sizeof(grpc_core::ChannelData),
grpc_core::init_channel_elem,
grpc_core::destroy_channel_elem,
+ grpc_core::lame_get_peer,
grpc_core::lame_get_channel_info,
"lame-client",
};
@@ -175,7 +176,7 @@ grpc_channel *grpc_lame_client_channel_create(const char *target,
"error_message=%s)",
3, (target, (int)error_code, error_message));
GPR_ASSERT(elem->filter == &grpc_lame_filter);
- auto chand = reinterpret_cast<grpc_core::ChannelData *>(elem->channel_data);
+ auto chand = static_cast<grpc_core::ChannelData *>(elem->channel_data);
chand->error_code = error_code;
chand->error_message = error_message;
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 8582d826ca..66dcc299aa 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -789,6 +789,7 @@ static void server_mutate_op(grpc_call_element *elem,
static void server_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
server_mutate_op(elem, op);
grpc_call_next_op(exec_ctx, elem, op);
}
@@ -961,6 +962,7 @@ const grpc_channel_filter grpc_server_top_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
+ grpc_call_next_get_peer,
grpc_channel_next_get_info,
"server",
};
diff --git a/src/core/lib/transport/byte_stream.c b/src/core/lib/transport/byte_stream.c
index fb03a10315..08f61629a9 100644
--- a/src/core/lib/transport/byte_stream.c
+++ b/src/core/lib/transport/byte_stream.c
@@ -85,6 +85,7 @@ static void slice_buffer_stream_shutdown(grpc_exec_ctx *exec_ctx,
static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream) {
grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, stream->backing_buffer);
GRPC_ERROR_UNREF(stream->shutdown_error);
}
diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h
index 1e1e8310b8..be2a35213e 100644
--- a/src/core/lib/transport/byte_stream.h
+++ b/src/core/lib/transport/byte_stream.h
@@ -81,7 +81,9 @@ void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
// grpc_slice_buffer_stream
//
-// A grpc_byte_stream that wraps a slice buffer.
+// A grpc_byte_stream that wraps a slice buffer. The stream takes
+// ownership of the slices in the buffer, and on destruction will
+// reset the contents of the buffer.
typedef struct grpc_slice_buffer_stream {
grpc_byte_stream base;
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 650b0559aa..6c61f4b8d9 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -197,6 +197,11 @@ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
then_schedule_closure);
}
+char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
+ grpc_transport *transport) {
+ return transport->vtable->get_peer(exec_ctx, transport);
+}
+
grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
grpc_transport *transport) {
return transport->vtable->get_endpoint(exec_ctx, transport);
@@ -209,24 +214,24 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
// is a function that must always unref cancel_error
// though it lives in lib, it handles transport stream ops sure
// it's grpc_transport_stream_op_batch_finish_with_failure
+
void grpc_transport_stream_op_batch_finish_with_failure(
grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *batch,
- grpc_error *error, grpc_call_combiner *call_combiner) {
+ grpc_error *error) {
if (batch->send_message) {
grpc_byte_stream_destroy(exec_ctx,
batch->payload->send_message.send_message);
}
if (batch->recv_message) {
- GRPC_CALL_COMBINER_START(exec_ctx, call_combiner,
- batch->payload->recv_message.recv_message_ready,
- GRPC_ERROR_REF(error),
- "failing recv_message_ready");
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ batch->payload->recv_message.recv_message_ready,
+ GRPC_ERROR_REF(error));
}
if (batch->recv_initial_metadata) {
- GRPC_CALL_COMBINER_START(
- exec_ctx, call_combiner,
+ GRPC_CLOSURE_SCHED(
+ exec_ctx,
batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
- GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
+ GRPC_ERROR_REF(error));
}
GRPC_CLOSURE_SCHED(exec_ctx, batch->on_complete, error);
if (batch->cancel_stream) {
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index fbf5dcb8b5..099138ea14 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -22,7 +22,6 @@
#include <stddef.h>
#include "src/core/lib/channel/context.h"
-#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/pollset.h"
@@ -153,9 +152,6 @@ struct grpc_transport_stream_op_batch_payload {
/** Iff send_initial_metadata != NULL, flags associated with
send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */
uint32_t send_initial_metadata_flags;
- // If non-NULL, will be set by the transport to the peer string
- // (a char*, which the caller takes ownership of).
- gpr_atm *peer_string;
} send_initial_metadata;
struct {
@@ -180,9 +176,6 @@ struct grpc_transport_stream_op_batch_payload {
// immediately available. This may be a signal that we received a
// Trailers-Only response.
bool *trailing_metadata_available;
- // If non-NULL, will be set by the transport to the peer string
- // (a char*, which the caller takes ownership of).
- gpr_atm *peer_string;
} recv_initial_metadata;
struct {
@@ -300,7 +293,7 @@ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
void grpc_transport_stream_op_batch_finish_with_failure(
grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op,
- grpc_error *error, grpc_call_combiner *call_combiner);
+ grpc_error *error);
char *grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch *op);
char *grpc_transport_op_string(grpc_transport_op *op);
@@ -339,6 +332,10 @@ void grpc_transport_close(grpc_transport *transport);
/* Destroy the transport */
void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, grpc_transport *transport);
+/* Get the transports peer */
+char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
+ grpc_transport *transport);
+
/* Get the endpoint used by \a transport */
grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
grpc_transport *transport);
diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h
index bbae69c223..fc772c6dd1 100644
--- a/src/core/lib/transport/transport_impl.h
+++ b/src/core/lib/transport/transport_impl.h
@@ -59,6 +59,9 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_destroy */
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_transport *self);
+ /* implementation of grpc_transport_get_peer */
+ char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_transport *self);
+
/* implementation of grpc_transport_get_endpoint */
grpc_endpoint *(*get_endpoint)(grpc_exec_ctx *exec_ctx, grpc_transport *self);
} grpc_transport_vtable;
diff --git a/src/core/lib/transport/transport_op_string.c b/src/core/lib/transport/transport_op_string.c
index 409a6c4103..7b18229ba6 100644
--- a/src/core/lib/transport/transport_op_string.c
+++ b/src/core/lib/transport/transport_op_string.c
@@ -112,13 +112,6 @@ char *grpc_transport_stream_op_batch_string(
gpr_strvec_add(&b, tmp);
}
- if (op->collect_stats) {
- gpr_strvec_add(&b, gpr_strdup(" "));
- gpr_asprintf(&tmp, "COLLECT_STATS:%p",
- op->payload->collect_stats.collect_stats);
- gpr_strvec_add(&b, tmp);
- }
-
out = gpr_strvec_flatten(&b, NULL);
gpr_strvec_destroy(&b);