diff options
author | Craig Tiller <ctiller@google.com> | 2017-03-01 16:58:28 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-03-01 16:58:28 -0800 |
commit | ea54b8c0c0029c094a2ebb08ce04cbf3a02e24fa (patch) | |
tree | 436e63c67bfa55921f32344d26955a68a38ef2f5 /src/core/lib/surface | |
parent | 6517333d17e9c16e9f637320dc938b84dd248cc8 (diff) |
Start converting stream ops to a control + payload... since the bulky payload can be shared across concurrent ops (saving memory)
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/call.c | 61 |
1 files changed, 30 insertions, 31 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index cc57654ea4..1a8dd245d5 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -116,24 +116,19 @@ static received_status unpack_received_status(gpr_atm atm) { typedef struct batch_control { grpc_call *call; - grpc_cq_completion cq_completion; + union { + grpc_cq_completion cq_completion; + struct { + void *tag; + bool is_closure; + } notify_tag; + } completion_data; grpc_closure finish_batch; - void *notify_tag; gpr_refcount steps_to_complete; grpc_error *errors[MAX_ERRORS_PER_BATCH]; gpr_atm num_errors; - uint8_t send_initial_metadata; - uint8_t send_message; - uint8_t send_final_op; - uint8_t recv_initial_metadata; - uint8_t recv_message; - uint8_t recv_final_op; - uint8_t is_notify_tag_closure; - - /* TODO(ctiller): now that this is inlined, figure out how much of the above - state can be eliminated */ grpc_transport_stream_op op; } batch_control; @@ -166,6 +161,7 @@ struct grpc_call { bool has_initial_md_been_received; batch_control active_batches[MAX_CONCURRENT_BATCHES]; + grpc_transport_stream_op_payload stream_op_payload; /* first idx: is_receiving, second idx: is_trailing */ grpc_metadata_batch metadata_batch[2][2]; @@ -282,6 +278,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, /* Always support no compression */ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); call->is_client = args->server_transport_data == NULL; + call->stream_op_payload.context = call->context; grpc_slice path = grpc_empty_slice(); if (call->is_client) { GPR_ASSERT(args->add_initial_metadata_count < @@ -515,7 +512,6 @@ static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, GPR_TIMER_BEGIN("execute_op", 0); elem = CALL_ELEM_FROM_CALL(call, 0); - op->context = call->context; elem->filter->start_transport_stream_op(exec_ctx, elem, op); GPR_TIMER_END("execute_op", 0); } @@ -566,6 +562,7 @@ typedef struct termination_closure { grpc_closure closure; grpc_call *call; grpc_transport_stream_op op; + grpc_transport_stream_op_payload payload; } termination_closure; static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, @@ -579,7 +576,9 @@ static void send_termination(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { termination_closure *tc = tcp; memset(&tc->op, 0, sizeof(tc->op)); - tc->op.cancel_error = GRPC_ERROR_REF(error); + tc->op.payload = &tc->payload; + tc->op.cancel_stream = true; + tc->op.payload->cancel_stream.cancel_error = GRPC_ERROR_REF(error); /* reuse closure to catch completion */ tc->op.on_complete = grpc_closure_init(&tc->closure, done_termination, tc, grpc_schedule_on_exec_ctx); @@ -1084,20 +1083,20 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&call->mu); - if (bctl->send_initial_metadata) { + if (bctl->op.send_initial_metadata) { grpc_metadata_batch_destroy( exec_ctx, &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); } - if (bctl->send_message) { + if (bctl->op.send_message) { call->sending_message = false; } - if (bctl->send_final_op) { + if (bctl->op.send_trailing_metadata) { grpc_metadata_batch_destroy( exec_ctx, &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); } - if (bctl->recv_final_op) { + if (bctl->op.recv_trailing_metadata) { grpc_metadata_batch *md = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; recv_trailing_filter(exec_ctx, call, md); @@ -1131,15 +1130,15 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, } gpr_mu_unlock(&call->mu); - if (bctl->is_notify_tag_closure) { + if (bctl->completion_data.notify_tag.is_closure) { /* unrefs bctl->error */ bctl->call = NULL; - grpc_closure_run(exec_ctx, bctl->notify_tag, error); + grpc_closure_run(exec_ctx, bctl->completion_data.notify_tag.tag, error); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } else { /* unrefs bctl->error */ - grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, error, - finish_batch_completion, bctl, &bctl->cq_completion); + grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->completion_data.notify_tag.tag, error, + finish_batch_completion, bctl, &bctl->completion_data.cq_completion); } } @@ -1389,8 +1388,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, if (bctl == NULL) { return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; } - bctl->notify_tag = notify_tag; - bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0); + bctl->completion_data.notify_tag.tag = notify_tag; + bctl->completion_data.notify_tag.is_closure = (uint8_t)(is_notify_tag_closure != 0); gpr_mu_lock(&call->mu); grpc_transport_stream_op *stream_op = &bctl->op; @@ -1448,8 +1447,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } - bctl->send_initial_metadata = 1; - call->sent_initial_metadata = 1; + bctl->op.send_initial_metadata = true; + call->sent_initial_metadata = true; if (!prepare_application_metadata( exec_ctx, call, (int)op->data.send_initial_metadata.count, op->data.send_initial_metadata.metadata, 0, call->is_client, @@ -1459,9 +1458,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } /* TODO(ctiller): just make these the same variable? */ call->metadata_batch[0][0].deadline = call->send_deadline; - stream_op->send_initial_metadata = + call->stream_op_payload.send_initial_metadata.send_initial_metadata = &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]; - stream_op->send_initial_metadata_flags = op->flags; + call->stream_op_payload.send_initial_metadata.send_initial_metadata_flags = op->flags; break; case GRPC_OP_SEND_MESSAGE: if (!are_write_flags_valid(op->flags)) { @@ -1476,8 +1475,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - bctl->send_message = 1; - call->sending_message = 1; + bctl->op.send_message = true; + call->sending_message = true; grpc_slice_buffer_stream_init( &call->sending_stream, &op->data.send_message.send_message->data.raw.slice_buffer, @@ -1489,7 +1488,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, GRPC_COMPRESS_NONE) { call->sending_stream.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS; } - stream_op->send_message = &call->sending_stream.base; + call->stream_op_payload.send_message.send_message = &call->sending_stream.base; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: /* Flag validation: currently allow no flags */ |