aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-03-01 16:58:28 -0800
committerGravatar Craig Tiller <ctiller@google.com>2017-03-01 16:58:28 -0800
commitea54b8c0c0029c094a2ebb08ce04cbf3a02e24fa (patch)
tree436e63c67bfa55921f32344d26955a68a38ef2f5 /src/core/lib/surface
parent6517333d17e9c16e9f637320dc938b84dd248cc8 (diff)
Start converting stream ops to a control + payload... since the bulky payload can be shared across concurrent ops (saving memory)
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/call.c61
1 files changed, 30 insertions, 31 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index cc57654ea4..1a8dd245d5 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -116,24 +116,19 @@ static received_status unpack_received_status(gpr_atm atm) {
typedef struct batch_control {
grpc_call *call;
- grpc_cq_completion cq_completion;
+ union {
+ grpc_cq_completion cq_completion;
+ struct {
+ void *tag;
+ bool is_closure;
+ } notify_tag;
+ } completion_data;
grpc_closure finish_batch;
- void *notify_tag;
gpr_refcount steps_to_complete;
grpc_error *errors[MAX_ERRORS_PER_BATCH];
gpr_atm num_errors;
- uint8_t send_initial_metadata;
- uint8_t send_message;
- uint8_t send_final_op;
- uint8_t recv_initial_metadata;
- uint8_t recv_message;
- uint8_t recv_final_op;
- uint8_t is_notify_tag_closure;
-
- /* TODO(ctiller): now that this is inlined, figure out how much of the above
- state can be eliminated */
grpc_transport_stream_op op;
} batch_control;
@@ -166,6 +161,7 @@ struct grpc_call {
bool has_initial_md_been_received;
batch_control active_batches[MAX_CONCURRENT_BATCHES];
+ grpc_transport_stream_op_payload stream_op_payload;
/* first idx: is_receiving, second idx: is_trailing */
grpc_metadata_batch metadata_batch[2][2];
@@ -282,6 +278,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
/* Always support no compression */
GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
call->is_client = args->server_transport_data == NULL;
+ call->stream_op_payload.context = call->context;
grpc_slice path = grpc_empty_slice();
if (call->is_client) {
GPR_ASSERT(args->add_initial_metadata_count <
@@ -515,7 +512,6 @@ static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
GPR_TIMER_BEGIN("execute_op", 0);
elem = CALL_ELEM_FROM_CALL(call, 0);
- op->context = call->context;
elem->filter->start_transport_stream_op(exec_ctx, elem, op);
GPR_TIMER_END("execute_op", 0);
}
@@ -566,6 +562,7 @@ typedef struct termination_closure {
grpc_closure closure;
grpc_call *call;
grpc_transport_stream_op op;
+ grpc_transport_stream_op_payload payload;
} termination_closure;
static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
@@ -579,7 +576,9 @@ static void send_termination(grpc_exec_ctx *exec_ctx, void *tcp,
grpc_error *error) {
termination_closure *tc = tcp;
memset(&tc->op, 0, sizeof(tc->op));
- tc->op.cancel_error = GRPC_ERROR_REF(error);
+ tc->op.payload = &tc->payload;
+ tc->op.cancel_stream = true;
+ tc->op.payload->cancel_stream.cancel_error = GRPC_ERROR_REF(error);
/* reuse closure to catch completion */
tc->op.on_complete = grpc_closure_init(&tc->closure, done_termination, tc,
grpc_schedule_on_exec_ctx);
@@ -1084,20 +1083,20 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&call->mu);
- if (bctl->send_initial_metadata) {
+ if (bctl->op.send_initial_metadata) {
grpc_metadata_batch_destroy(
exec_ctx,
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
- if (bctl->send_message) {
+ if (bctl->op.send_message) {
call->sending_message = false;
}
- if (bctl->send_final_op) {
+ if (bctl->op.send_trailing_metadata) {
grpc_metadata_batch_destroy(
exec_ctx,
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
}
- if (bctl->recv_final_op) {
+ if (bctl->op.recv_trailing_metadata) {
grpc_metadata_batch *md =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
recv_trailing_filter(exec_ctx, call, md);
@@ -1131,15 +1130,15 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
}
gpr_mu_unlock(&call->mu);
- if (bctl->is_notify_tag_closure) {
+ if (bctl->completion_data.notify_tag.is_closure) {
/* unrefs bctl->error */
bctl->call = NULL;
- grpc_closure_run(exec_ctx, bctl->notify_tag, error);
+ grpc_closure_run(exec_ctx, bctl->completion_data.notify_tag.tag, error);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
} else {
/* unrefs bctl->error */
- grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, error,
- finish_batch_completion, bctl, &bctl->cq_completion);
+ grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
+ finish_batch_completion, bctl, &bctl->completion_data.cq_completion);
}
}
@@ -1389,8 +1388,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (bctl == NULL) {
return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
}
- bctl->notify_tag = notify_tag;
- bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0);
+ bctl->completion_data.notify_tag.tag = notify_tag;
+ bctl->completion_data.notify_tag.is_closure = (uint8_t)(is_notify_tag_closure != 0);
gpr_mu_lock(&call->mu);
grpc_transport_stream_op *stream_op = &bctl->op;
@@ -1448,8 +1447,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
- bctl->send_initial_metadata = 1;
- call->sent_initial_metadata = 1;
+ bctl->op.send_initial_metadata = true;
+ call->sent_initial_metadata = true;
if (!prepare_application_metadata(
exec_ctx, call, (int)op->data.send_initial_metadata.count,
op->data.send_initial_metadata.metadata, 0, call->is_client,
@@ -1459,9 +1458,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
/* TODO(ctiller): just make these the same variable? */
call->metadata_batch[0][0].deadline = call->send_deadline;
- stream_op->send_initial_metadata =
+ call->stream_op_payload.send_initial_metadata.send_initial_metadata =
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
- stream_op->send_initial_metadata_flags = op->flags;
+ call->stream_op_payload.send_initial_metadata.send_initial_metadata_flags = op->flags;
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {
@@ -1476,8 +1475,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
- bctl->send_message = 1;
- call->sending_message = 1;
+ bctl->op.send_message = true;
+ call->sending_message = true;
grpc_slice_buffer_stream_init(
&call->sending_stream,
&op->data.send_message.send_message->data.raw.slice_buffer,
@@ -1489,7 +1488,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
GRPC_COMPRESS_NONE) {
call->sending_stream.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
- stream_op->send_message = &call->sending_stream.base;
+ call->stream_op_payload.send_message.send_message = &call->sending_stream.base;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
/* Flag validation: currently allow no flags */