aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/call.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-04-04 08:29:43 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-04-04 08:29:43 -0700
commit61666f5557b343fb77d2a250dcdbede253a54d88 (patch)
tree817040b0fef4c1a9b44259d00a9e5cc6166c4ff2 /src/core/lib/surface/call.c
parent6696894e6a3e02170a560e0abcea1b4e948d583f (diff)
parent0df90f049ad8ee0da6e47bbe89650fcf9bee25e2 (diff)
Merge branch 'lazy-batches' into lazy-parent
Diffstat (limited to 'src/core/lib/surface/call.c')
-rw-r--r--src/core/lib/surface/call.c50
1 files changed, 31 insertions, 19 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index b3c686c4d9..315c906baa 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -117,9 +117,21 @@ static received_status unpack_received_status(gpr_atm atm) {
typedef struct batch_control {
grpc_call *call;
+ /* Share memory for cq_completion and notify_tag as they are never needed
+ simultaneously. Each byte used in this data structure count as six bytes
+ per call, so any savings we can make are worthwhile,
+
+ We use notify_tag to determine whether or not to send notification to the
+ completion queue. Once we've made that determination, we can reuse the
+ memory for cq_completion. */
union {
grpc_cq_completion cq_completion;
struct {
+ /* Any given op indicates completion by either (a) calling a closure or
+ (b) sending a notification on the call's completion queue. If
+ \a is_closure is true, \a tag indicates a closure to be invoked;
+ otherwise, \a tag indicates the tag to be used in the notification to
+ be sent to the completion queue. */
void *tag;
bool is_closure;
} notify_tag;
@@ -130,7 +142,7 @@ typedef struct batch_control {
grpc_error *errors[MAX_ERRORS_PER_BATCH];
gpr_atm num_errors;
- grpc_transport_stream_op op;
+ grpc_transport_stream_op_batch op;
} batch_control;
typedef struct {
@@ -176,7 +188,7 @@ struct grpc_call {
bool has_initial_md_been_received;
batch_control *active_batches[MAX_CONCURRENT_BATCHES];
- grpc_transport_stream_op_payload stream_op_payload;
+ grpc_transport_stream_op_batch_payload stream_op_payload;
/* first idx: is_receiving, second idx: is_trailing */
grpc_metadata_batch metadata_batch[2][2];
@@ -241,7 +253,7 @@ int grpc_call_error_trace = 0;
CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
- grpc_transport_stream_op *op);
+ grpc_transport_stream_op_batch *op);
static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_status_code status,
const char *description);
@@ -562,12 +574,12 @@ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
}
static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op_batch *op) {
grpc_call_element *elem;
GPR_TIMER_BEGIN("execute_op", 0);
elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->start_transport_stream_op(exec_ctx, elem, op);
+ elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op);
GPR_TIMER_END("execute_op", 0);
}
@@ -620,7 +632,7 @@ static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_error *error) {
GRPC_CALL_INTERNAL_REF(c, "termination");
set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
- grpc_transport_stream_op *op = grpc_make_transport_stream_op(
+ grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(
grpc_closure_create(done_termination, c, grpc_schedule_on_exec_ctx));
op->cancel_stream = true;
op->payload->cancel_stream.cancel_error = error;
@@ -1410,8 +1422,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
bctl->completion_data.notify_tag.is_closure =
(uint8_t)(is_notify_tag_closure != 0);
- grpc_transport_stream_op *stream_op = &bctl->op;
- grpc_transport_stream_op_payload *stream_op_payload =
+ grpc_transport_stream_op_batch *stream_op = &bctl->op;
+ grpc_transport_stream_op_batch_payload *stream_op_payload =
&call->stream_op_payload;
stream_op->covered_by_poller = true;
@@ -1526,7 +1538,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
goto done_with_error;
}
stream_op->send_trailing_metadata = true;
- call->sent_final_op = 1;
+ call->sent_final_op = true;
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
break;
@@ -1550,7 +1562,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
goto done_with_error;
}
stream_op->send_trailing_metadata = true;
- call->sent_final_op = 1;
+ call->sent_final_op = true;
GPR_ASSERT(call->send_extra_metadata_count == 0);
call->send_extra_metadata_count = 1;
call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem(
@@ -1606,7 +1618,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
from server.c. In that case, it's coming from accept_stream, and in
that case we're not necessarily covered by a poller. */
stream_op->covered_by_poller = call->is_client;
- call->received_initial_metadata = 1;
+ call->received_initial_metadata = true;
call->buffered_metadata[0] =
op->data.recv_initial_metadata.recv_initial_metadata;
grpc_closure_init(&call->receiving_initial_metadata_ready,
@@ -1653,7 +1665,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
- call->requested_final_op = 1;
+ call->requested_final_op = true;
call->buffered_metadata[1] =
op->data.recv_status_on_client.trailing_metadata;
call->final_op.client.status = op->data.recv_status_on_client.status;
@@ -1680,7 +1692,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
- call->requested_final_op = 1;
+ call->requested_final_op = true;
call->final_op.server.cancelled =
op->data.recv_close_on_server.cancelled;
stream_op->recv_trailing_metadata = true;
@@ -1713,25 +1725,25 @@ done:
done_with_error:
/* reverse any mutations that occured */
if (stream_op->send_initial_metadata) {
- call->sent_initial_metadata = 0;
+ call->sent_initial_metadata = false;
grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][0]);
}
if (stream_op->send_message) {
- call->sending_message = 0;
+ call->sending_message = false;
grpc_byte_stream_destroy(exec_ctx, &call->sending_stream.base);
}
if (stream_op->send_trailing_metadata) {
- call->sent_final_op = 0;
+ call->sent_final_op = false;
grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][1]);
}
if (stream_op->recv_initial_metadata) {
- call->received_initial_metadata = 0;
+ call->received_initial_metadata = false;
}
if (stream_op->recv_message) {
- call->receiving_message = 0;
+ call->receiving_message = false;
}
if (stream_op->recv_trailing_metadata) {
- call->requested_final_op = 0;
+ call->requested_final_op = false;
}
goto done;
}