aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/call.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-08-31 12:23:53 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-08-31 12:23:53 -0700
commitccad38227f63797318d7cffcba8a2df783394ccd (patch)
tree64632504188bf906796763f585c11055098aab58 /src/core/lib/surface/call.c
parent6e739745e298bf205eb2553d3a3c8a3eed06fff1 (diff)
parent451b92a754900a800a7728a3d63462058026ac2c (diff)
Merge github.com:grpc/grpc into stats
Diffstat (limited to 'src/core/lib/surface/call.c')
-rw-r--r--src/core/lib/surface/call.c148
1 files changed, 42 insertions, 106 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 03eaaf99ac..e68c201134 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -122,7 +122,6 @@ typedef struct batch_control {
bool is_closure;
} notify_tag;
} completion_data;
- grpc_closure start_batch;
grpc_closure finish_batch;
gpr_refcount steps_to_complete;
@@ -152,7 +151,6 @@ typedef struct {
struct grpc_call {
gpr_refcount ext_ref;
gpr_arena *arena;
- grpc_call_combiner call_combiner;
grpc_completion_queue *cq;
grpc_polling_entity pollent;
grpc_channel *channel;
@@ -186,11 +184,6 @@ struct grpc_call {
Element 0 is initial metadata, element 1 is trailing metadata. */
grpc_metadata_array *buffered_metadata[2];
- grpc_metadata compression_md;
-
- // A char* indicating the peer name.
- gpr_atm peer_string;
-
/* Packed received call statuses from various sources */
gpr_atm status[STATUS_SOURCE_COUNT];
@@ -269,9 +262,8 @@ grpc_tracer_flag grpc_compression_trace =
#define CALL_FROM_TOP_ELEM(top_elem) \
CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
-static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call,
- grpc_transport_stream_op_batch *op,
- grpc_closure *start_batch_closure);
+static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_transport_stream_op_batch *op);
static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_status_code status,
const char *description);
@@ -336,7 +328,6 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
sizeof(grpc_call) + channel_stack->call_stack_size);
gpr_ref_init(&call->ext_ref, 1);
call->arena = arena;
- grpc_call_combiner_init(&call->call_combiner);
*out_call = call;
call->channel = args->channel;
call->cq = args->cq;
@@ -445,8 +436,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
.path = path,
.start_time = call->start_time,
.deadline = send_deadline,
- .arena = call->arena,
- .call_combiner = &call->call_combiner};
+ .arena = call->arena};
add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1,
destroy_call, call, &call_args));
if (error != GRPC_ERROR_NONE) {
@@ -513,8 +503,6 @@ static void release_call(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) {
grpc_call *c = call;
grpc_channel *channel = c->channel;
- grpc_call_combiner_destroy(&c->call_combiner);
- gpr_free((char *)c->peer_string);
grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
}
@@ -614,37 +602,30 @@ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
return GRPC_CALL_OK;
}
-// This is called via the call combiner to start sending a batch down
-// the filter stack.
-static void execute_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *ignored) {
- grpc_transport_stream_op_batch *batch = arg;
- grpc_call *call = batch->handler_private.extra_arg;
- GPR_TIMER_BEGIN("execute_batch", 0);
- grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
- GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
- elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch);
- GPR_TIMER_END("execute_batch", 0);
-}
-
-// start_batch_closure points to a caller-allocated closure to be used
-// for entering the call combiner.
-static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call,
- grpc_transport_stream_op_batch *batch,
- grpc_closure *start_batch_closure) {
- batch->handler_private.extra_arg = call;
- GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
- grpc_schedule_on_exec_ctx);
- GRPC_CALL_COMBINER_START(exec_ctx, &call->call_combiner, start_batch_closure,
- GRPC_ERROR_NONE, "executing batch");
+static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_transport_stream_op_batch *op) {
+ grpc_call_element *elem;
+
+ GPR_TIMER_BEGIN("execute_op", 0);
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op);
+ GPR_TIMER_END("execute_op", 0);
}
char *grpc_call_get_peer(grpc_call *call) {
- char *peer_string = (char *)gpr_atm_acq_load(&call->peer_string);
- if (peer_string != NULL) return gpr_strdup(peer_string);
- peer_string = grpc_channel_get_target(call->channel);
- if (peer_string != NULL) return peer_string;
- return gpr_strdup("unknown");
+ grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ char *result;
+ GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call));
+ result = elem->filter->get_peer(&exec_ctx, elem);
+ if (result == NULL) {
+ result = grpc_channel_get_target(call->channel);
+ }
+ if (result == NULL) {
+ result = gpr_strdup("unknown");
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+ return result;
}
grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
@@ -671,41 +652,20 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
return GRPC_CALL_OK;
}
-typedef struct {
- grpc_call *call;
- grpc_closure start_batch;
- grpc_closure finish_batch;
-} cancel_state;
-
-// The on_complete callback used when sending a cancel_stream batch down
-// the filter stack. Yields the call combiner when the batch is done.
-static void done_termination(grpc_exec_ctx *exec_ctx, void *arg,
+static void done_termination(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) {
- cancel_state *state = (cancel_state *)arg;
- GRPC_CALL_COMBINER_STOP(exec_ctx, &state->call->call_combiner,
- "on_complete for cancel_stream op");
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, state->call, "termination");
- gpr_free(state);
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "termination");
}
static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_error *error) {
GRPC_CALL_INTERNAL_REF(c, "termination");
- // Inform the call combiner of the cancellation, so that it can cancel
- // any in-flight asynchronous actions that may be holding the call
- // combiner. This ensures that the cancel_stream batch can be sent
- // down the filter stack in a timely manner.
- grpc_call_combiner_cancel(exec_ctx, &c->call_combiner, GRPC_ERROR_REF(error));
set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
- cancel_state *state = (cancel_state *)gpr_malloc(sizeof(*state));
- state->call = c;
- GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
- grpc_schedule_on_exec_ctx);
- grpc_transport_stream_op_batch *op =
- grpc_make_transport_stream_op(&state->finish_batch);
+ grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(
+ GRPC_CLOSURE_CREATE(done_termination, c, grpc_schedule_on_exec_ctx));
op->cancel_stream = true;
op->payload->cancel_stream.cancel_error = error;
- execute_batch(exec_ctx, c, op, &state->start_batch);
+ execute_op(exec_ctx, c, op);
}
static grpc_error *error_from_status(grpc_status_code status,
@@ -1471,18 +1431,6 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
}
}
-// The recv_message_ready callback used when sending a batch containing
-// a recv_message op down the filter stack. Yields the call combiner
-// before processing the received message.
-static void receiving_stream_ready_in_call_combiner(grpc_exec_ctx *exec_ctx,
- void *bctlp,
- grpc_error *error) {
- batch_control *bctl = bctlp;
- grpc_call *call = bctl->call;
- GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "recv_message_ready");
- receiving_stream_ready(exec_ctx, bctlp, error);
-}
-
static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
grpc_call *call = bctl->call;
@@ -1589,9 +1537,6 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
- GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner,
- "recv_initial_metadata_ready");
-
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch *md =
@@ -1645,8 +1590,7 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
batch_control *bctl = bctlp;
- grpc_call *call = bctl->call;
- GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "on_complete");
+
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
finish_batch_step(exec_ctx, bctl);
}
@@ -1666,6 +1610,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
int num_completion_callbacks_needed = 1;
grpc_call_error error = GRPC_CALL_OK;
+ // sent_initial_metadata guards against variable reuse.
+ grpc_metadata compression_md;
+
GPR_TIMER_BEGIN("grpc_call_start_batch", 0);
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
@@ -1713,7 +1660,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
goto done_with_error;
}
/* process compression level */
- memset(&call->compression_md, 0, sizeof(call->compression_md));
+ memset(&compression_md, 0, sizeof(compression_md));
size_t additional_metadata_count = 0;
grpc_compression_level effective_compression_level =
GRPC_COMPRESS_LEVEL_NONE;
@@ -1751,9 +1698,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
const grpc_stream_compression_algorithm calgo =
stream_compression_algorithm_for_level_locked(
call, effective_stream_compression_level);
- call->compression_md.key =
+ compression_md.key =
GRPC_MDSTR_GRPC_INTERNAL_STREAM_ENCODING_REQUEST;
- call->compression_md.value =
+ compression_md.value =
grpc_stream_compression_algorithm_slice(calgo);
} else {
const grpc_compression_algorithm calgo =
@@ -1761,10 +1708,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call, effective_compression_level);
/* the following will be picked up by the compress filter and used
* as the call's compression algorithm. */
- call->compression_md.key =
- GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
- call->compression_md.value =
- grpc_compression_algorithm_slice(calgo);
+ compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
+ compression_md.value = grpc_compression_algorithm_slice(calgo);
additional_metadata_count++;
}
}
@@ -1779,7 +1724,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (!prepare_application_metadata(
exec_ctx, call, (int)op->data.send_initial_metadata.count,
op->data.send_initial_metadata.metadata, 0, call->is_client,
- &call->compression_md, (int)additional_metadata_count)) {
+ &compression_md, (int)additional_metadata_count)) {
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
@@ -1791,10 +1736,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
op->flags;
- if (call->is_client) {
- stream_op_payload->send_initial_metadata.peer_string =
- &call->peer_string;
- }
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {
@@ -1927,10 +1868,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
&call->receiving_initial_metadata_ready;
- if (!call->is_client) {
- stream_op_payload->recv_initial_metadata.peer_string =
- &call->peer_string;
- }
num_completion_callbacks_needed++;
break;
case GRPC_OP_RECV_MESSAGE:
@@ -1947,9 +1884,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
stream_op->recv_message = true;
call->receiving_buffer = op->data.recv_message.recv_message;
stream_op_payload->recv_message.recv_message = &call->receiving_stream;
- GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
- receiving_stream_ready_in_call_combiner, bctl,
- grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&call->receiving_stream_ready, receiving_stream_ready,
+ bctl, grpc_schedule_on_exec_ctx);
stream_op_payload->recv_message.recv_message_ready =
&call->receiving_stream_ready;
num_completion_callbacks_needed++;
@@ -2019,7 +1955,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
stream_op->on_complete = &bctl->finish_batch;
gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
- execute_batch(exec_ctx, call, stream_op, &bctl->start_batch);
+ execute_op(exec_ctx, call, stream_op);
done:
GPR_TIMER_END("grpc_call_start_batch", 0);