aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/call.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface/call.c')
-rw-r--r--src/core/lib/surface/call.c154
1 files changed, 112 insertions, 42 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index e68c201134..3aa20ffcd7 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -122,6 +122,7 @@ typedef struct batch_control {
bool is_closure;
} notify_tag;
} completion_data;
+ grpc_closure start_batch;
grpc_closure finish_batch;
gpr_refcount steps_to_complete;
@@ -151,6 +152,7 @@ typedef struct {
struct grpc_call {
gpr_refcount ext_ref;
gpr_arena *arena;
+ grpc_call_combiner call_combiner;
grpc_completion_queue *cq;
grpc_polling_entity pollent;
grpc_channel *channel;
@@ -184,6 +186,11 @@ struct grpc_call {
Element 0 is initial metadata, element 1 is trailing metadata. */
grpc_metadata_array *buffered_metadata[2];
+ grpc_metadata compression_md;
+
+ // A char* indicating the peer name.
+ gpr_atm peer_string;
+
/* Packed received call statuses from various sources */
gpr_atm status[STATUS_SOURCE_COUNT];
@@ -262,8 +269,9 @@ grpc_tracer_flag grpc_compression_trace =
#define CALL_FROM_TOP_ELEM(top_elem) \
CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
-static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
- grpc_transport_stream_op_batch *op);
+static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_transport_stream_op_batch *op,
+ grpc_closure *start_batch_closure);
static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_status_code status,
const char *description);
@@ -328,6 +336,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
sizeof(grpc_call) + channel_stack->call_stack_size);
gpr_ref_init(&call->ext_ref, 1);
call->arena = arena;
+ grpc_call_combiner_init(&call->call_combiner);
*out_call = call;
call->channel = args->channel;
call->cq = args->cq;
@@ -436,7 +445,8 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
.path = path,
.start_time = call->start_time,
.deadline = send_deadline,
- .arena = call->arena};
+ .arena = call->arena,
+ .call_combiner = &call->call_combiner};
add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1,
destroy_call, call, &call_args));
if (error != GRPC_ERROR_NONE) {
@@ -503,6 +513,8 @@ static void release_call(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) {
grpc_call *c = call;
grpc_channel *channel = c->channel;
+ grpc_call_combiner_destroy(&c->call_combiner);
+ gpr_free((char *)c->peer_string);
grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
}
@@ -586,6 +598,12 @@ void grpc_call_unref(grpc_call *c) {
if (cancel) {
cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE,
GRPC_ERROR_CANCELLED);
+ } else {
+ // Unset the call combiner cancellation closure. This has the
+ // effect of scheduling the previously set cancellation closure, if
+ // any, so that it can release any internal references it may be
+ // holding to the call stack.
+ grpc_call_combiner_set_notify_on_cancel(&exec_ctx, &c->call_combiner, NULL);
}
GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy");
grpc_exec_ctx_finish(&exec_ctx);
@@ -602,30 +620,37 @@ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
return GRPC_CALL_OK;
}
-static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
- grpc_transport_stream_op_batch *op) {
- grpc_call_element *elem;
-
- GPR_TIMER_BEGIN("execute_op", 0);
- elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op);
- GPR_TIMER_END("execute_op", 0);
+// This is called via the call combiner to start sending a batch down
+// the filter stack.
+static void execute_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *ignored) {
+ grpc_transport_stream_op_batch *batch = arg;
+ grpc_call *call = batch->handler_private.extra_arg;
+ GPR_TIMER_BEGIN("execute_batch", 0);
+ grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
+ elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch);
+ GPR_TIMER_END("execute_batch", 0);
+}
+
+// start_batch_closure points to a caller-allocated closure to be used
+// for entering the call combiner.
+static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_transport_stream_op_batch *batch,
+ grpc_closure *start_batch_closure) {
+ batch->handler_private.extra_arg = call;
+ GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CALL_COMBINER_START(exec_ctx, &call->call_combiner, start_batch_closure,
+ GRPC_ERROR_NONE, "executing batch");
}
char *grpc_call_get_peer(grpc_call *call) {
- grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- char *result;
- GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call));
- result = elem->filter->get_peer(&exec_ctx, elem);
- if (result == NULL) {
- result = grpc_channel_get_target(call->channel);
- }
- if (result == NULL) {
- result = gpr_strdup("unknown");
- }
- grpc_exec_ctx_finish(&exec_ctx);
- return result;
+ char *peer_string = (char *)gpr_atm_acq_load(&call->peer_string);
+ if (peer_string != NULL) return gpr_strdup(peer_string);
+ peer_string = grpc_channel_get_target(call->channel);
+ if (peer_string != NULL) return peer_string;
+ return gpr_strdup("unknown");
}
grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
@@ -652,20 +677,41 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
return GRPC_CALL_OK;
}
-static void done_termination(grpc_exec_ctx *exec_ctx, void *call,
+typedef struct {
+ grpc_call *call;
+ grpc_closure start_batch;
+ grpc_closure finish_batch;
+} cancel_state;
+
+// The on_complete callback used when sending a cancel_stream batch down
+// the filter stack. Yields the call combiner when the batch is done.
+static void done_termination(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "termination");
+ cancel_state *state = (cancel_state *)arg;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &state->call->call_combiner,
+ "on_complete for cancel_stream op");
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, state->call, "termination");
+ gpr_free(state);
}
static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_error *error) {
GRPC_CALL_INTERNAL_REF(c, "termination");
+ // Inform the call combiner of the cancellation, so that it can cancel
+ // any in-flight asynchronous actions that may be holding the call
+ // combiner. This ensures that the cancel_stream batch can be sent
+ // down the filter stack in a timely manner.
+ grpc_call_combiner_cancel(exec_ctx, &c->call_combiner, GRPC_ERROR_REF(error));
set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
- grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(
- GRPC_CLOSURE_CREATE(done_termination, c, grpc_schedule_on_exec_ctx));
+ cancel_state *state = (cancel_state *)gpr_malloc(sizeof(*state));
+ state->call = c;
+ GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
+ grpc_schedule_on_exec_ctx);
+ grpc_transport_stream_op_batch *op =
+ grpc_make_transport_stream_op(&state->finish_batch);
op->cancel_stream = true;
op->payload->cancel_stream.cancel_error = error;
- execute_op(exec_ctx, c, op);
+ execute_batch(exec_ctx, c, op, &state->start_batch);
}
static grpc_error *error_from_status(grpc_status_code status,
@@ -1431,6 +1477,18 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
}
}
+// The recv_message_ready callback used when sending a batch containing
+// a recv_message op down the filter stack. Yields the call combiner
+// before processing the received message.
+static void receiving_stream_ready_in_call_combiner(grpc_exec_ctx *exec_ctx,
+ void *bctlp,
+ grpc_error *error) {
+ batch_control *bctl = bctlp;
+ grpc_call *call = bctl->call;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "recv_message_ready");
+ receiving_stream_ready(exec_ctx, bctlp, error);
+}
+
static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
grpc_call *call = bctl->call;
@@ -1537,6 +1595,9 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner,
+ "recv_initial_metadata_ready");
+
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch *md =
@@ -1590,7 +1651,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
batch_control *bctl = bctlp;
-
+ grpc_call *call = bctl->call;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "on_complete");
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
finish_batch_step(exec_ctx, bctl);
}
@@ -1610,9 +1672,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
int num_completion_callbacks_needed = 1;
grpc_call_error error = GRPC_CALL_OK;
- // sent_initial_metadata guards against variable reuse.
- grpc_metadata compression_md;
-
GPR_TIMER_BEGIN("grpc_call_start_batch", 0);
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
@@ -1660,7 +1719,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
goto done_with_error;
}
/* process compression level */
- memset(&compression_md, 0, sizeof(compression_md));
+ memset(&call->compression_md, 0, sizeof(call->compression_md));
size_t additional_metadata_count = 0;
grpc_compression_level effective_compression_level =
GRPC_COMPRESS_LEVEL_NONE;
@@ -1698,9 +1757,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
const grpc_stream_compression_algorithm calgo =
stream_compression_algorithm_for_level_locked(
call, effective_stream_compression_level);
- compression_md.key =
+ call->compression_md.key =
GRPC_MDSTR_GRPC_INTERNAL_STREAM_ENCODING_REQUEST;
- compression_md.value =
+ call->compression_md.value =
grpc_stream_compression_algorithm_slice(calgo);
} else {
const grpc_compression_algorithm calgo =
@@ -1708,8 +1767,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call, effective_compression_level);
/* the following will be picked up by the compress filter and used
* as the call's compression algorithm. */
- compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
- compression_md.value = grpc_compression_algorithm_slice(calgo);
+ call->compression_md.key =
+ GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
+ call->compression_md.value =
+ grpc_compression_algorithm_slice(calgo);
additional_metadata_count++;
}
}
@@ -1724,7 +1785,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (!prepare_application_metadata(
exec_ctx, call, (int)op->data.send_initial_metadata.count,
op->data.send_initial_metadata.metadata, 0, call->is_client,
- &compression_md, (int)additional_metadata_count)) {
+ &call->compression_md, (int)additional_metadata_count)) {
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
@@ -1736,6 +1797,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
op->flags;
+ if (call->is_client) {
+ stream_op_payload->send_initial_metadata.peer_string =
+ &call->peer_string;
+ }
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {
@@ -1868,6 +1933,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
&call->receiving_initial_metadata_ready;
+ if (!call->is_client) {
+ stream_op_payload->recv_initial_metadata.peer_string =
+ &call->peer_string;
+ }
num_completion_callbacks_needed++;
break;
case GRPC_OP_RECV_MESSAGE:
@@ -1884,8 +1953,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
stream_op->recv_message = true;
call->receiving_buffer = op->data.recv_message.recv_message;
stream_op_payload->recv_message.recv_message = &call->receiving_stream;
- GRPC_CLOSURE_INIT(&call->receiving_stream_ready, receiving_stream_ready,
- bctl, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
+ receiving_stream_ready_in_call_combiner, bctl,
+ grpc_schedule_on_exec_ctx);
stream_op_payload->recv_message.recv_message_ready =
&call->receiving_stream_ready;
num_completion_callbacks_needed++;
@@ -1955,7 +2025,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
stream_op->on_complete = &bctl->finish_batch;
gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
- execute_op(exec_ctx, call, stream_op);
+ execute_batch(exec_ctx, call, stream_op, &bctl->start_batch);
done:
GPR_TIMER_END("grpc_call_start_batch", 0);