From 83f88d90b9c61c94994ed00d03a6fa469359d559 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 21 Apr 2015 16:02:05 -0700 Subject: stuff --- src/core/surface/call.c | 178 +++++++++++++++++++++++++----------------------- 1 file changed, 91 insertions(+), 87 deletions(-) (limited to 'src/core/surface/call.c') diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 608442c008..709ca0b397 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -150,6 +150,9 @@ struct grpc_call { gpr_uint8 num_completed_requests; /* flag that we need to request more data */ gpr_uint8 need_more_data; + /* flags with bits corresponding to write states allowing us to determine + what was sent */ + gpr_uint8 last_send_contains; /* Active ioreqs. request_set and request_data contain one element per active ioreq @@ -214,6 +217,10 @@ struct grpc_call { size_t send_initial_metadata_count; gpr_timespec send_deadline; + grpc_stream_op_buffer send_ops; + grpc_stream_op_buffer recv_ops; + grpc_stream_state recv_state; + /* Data that the legacy api needs to track. To be deleted at some point soon */ legacy_state *legacy_state; @@ -234,9 +241,11 @@ struct grpc_call { } while (0) static void do_nothing(void *ignored, grpc_op_error also_ignored) {} -static send_action choose_send_action(grpc_call *call); -static void enact_send_action(grpc_call *call, send_action sa); static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline); +static void call_on_done_recv(void *call, int success); +static void call_on_done_send(void *call, int success); +static int fill_send_ops(grpc_call *call, grpc_transport_op *op); +static void execute_op(grpc_call *call, grpc_transport_op *op); grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, const void *server_transport_data, @@ -359,20 +368,6 @@ static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) { return GRPC_CALL_OK; } -static void request_more_data(grpc_call *call) { - grpc_call_op op; - - /* call down */ - op.type = GRPC_REQUEST_DATA; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = do_nothing; - op.user_data = NULL; - op.bind_pollset = NULL; - - grpc_call_execute_op(call, &op); -} - static int is_op_live(grpc_call *call, grpc_ioreq_op op) { gpr_uint8 set = call->request_set[op]; reqinfo_master *master; @@ -384,16 +379,31 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) { static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } static void unlock(grpc_call *call) { - send_action sa = SEND_NOTHING; + grpc_transport_op op; completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; int completing_requests = 0; - int need_more_data = - call->need_more_data && - (call->write_state >= WRITE_STATE_STARTED || !call->is_client); + int start_op = 0; int i; - if (need_more_data) { + memset(&op, 0, sizeof(op)); + + if (call->need_more_data && + (call->write_state >= WRITE_STATE_STARTED || !call->is_client)) { + op.recv_ops = &call->recv_ops; + op.recv_state = &call->recv_state; + op.on_done_recv = call_on_done_recv; + op.recv_user_data = call; call->need_more_data = 0; + grpc_call_internal_ref(call); + start_op = 1; + } + + if (!call->sending) { + if (fill_send_ops(call, &op)) { + call->sending = 1; + grpc_call_internal_ref(call); + start_op = 1; + } } if (!call->completing && call->num_completed_requests != 0) { @@ -405,22 +415,10 @@ static void unlock(grpc_call *call) { grpc_call_internal_ref(call); } - if (!call->sending) { - sa = choose_send_action(call); - if (sa != SEND_NOTHING) { - call->sending = 1; - grpc_call_internal_ref(call); - } - } - gpr_mu_unlock(&call->mu); - if (need_more_data) { - request_more_data(call); - } - - if (sa != SEND_NOTHING) { - enact_send_action(call, sa); + if (start_op) { + execute_op(call, &op); } if (completing_requests > 0) { @@ -577,6 +575,64 @@ static void finish_start_step(void *pc, grpc_op_error error) { error); } +static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, + grpc_metadata *metadata) { + size_t i; + grpc_mdelem_list out; + if (count == 0) { + out.head = out.tail = NULL; + return out; + } + for (i = 0; i < count; i++) { + grpc_metadata *md = &metadata[i]; + grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1]; + grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1]; + grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; + assert(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); + l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, + (const gpr_uint8 *)md->value, + md->value_length); + l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL; + l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL; + } + out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data); + out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data); + return out; +} + +static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { + grpc_ioreq_data data; + grpc_metadata_batch mdb; + size_t i; + GPR_ASSERT(op->send_ops == NULL); + + switch (call->write_state) { + case WRITE_STATE_INITIAL: + if (!is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) { + break; + } + data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA]; + mdb.list = chain_metadata_from_app( + call, data.send_metadata.count, data.send_metadata.metadata); + mdb.garbage.head = mdb.garbage.tail = NULL; + mdb.deadline = call->send_deadline; + for (i = 0; i < call->send_initial_metadata_count; i++) { + grpc_metadata_batch_link_head(&mdb, + &call->send_initial_metadata[i]); + } + grpc_sopb_add_metadata(&call->send_ops, mdb); + op->send_ops = &call->send_ops; + op->bind_pollset = grpc_cq_pollset(call->cq); + call->last_send_contains |= 1 << WRITE_STATE_INITIAL; + /* fall through intended */ + case WRITE_STATE_STARTED: + if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) { + data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; + grpc_sopb_add_message(data.send_message); +abort(); + } +} + static send_action choose_send_action(grpc_call *call) { switch (call->write_state) { case WRITE_STATE_INITIAL: @@ -614,33 +670,7 @@ static send_action choose_send_action(grpc_call *call) { return SEND_NOTHING; } -static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, - grpc_metadata *metadata) { - size_t i; - grpc_mdelem_list out; - if (count == 0) { - out.head = out.tail = NULL; - return out; - } - for (i = 0; i < count; i++) { - grpc_metadata *md = &metadata[i]; - grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1]; - grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1]; - grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; - assert(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); - l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, - (const gpr_uint8 *)md->value, - md->value_length); - l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL; - l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL; - } - out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data); - out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data); - return out; -} - static void enact_send_action(grpc_call *call, send_action sa) { - grpc_ioreq_data data; grpc_call_op op; size_t i; gpr_uint32 flags = 0; @@ -654,37 +684,11 @@ static void enact_send_action(grpc_call *call, send_action sa) { flags |= GRPC_WRITE_BUFFER_HINT; /* fallthrough */ case SEND_INITIAL_METADATA: - data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA]; - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.data.metadata.list = chain_metadata_from_app( - call, data.send_metadata.count, data.send_metadata.metadata); - op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; - op.data.metadata.deadline = call->send_deadline; - for (i = 0; i < call->send_initial_metadata_count; i++) { - grpc_metadata_batch_link_head(&op.data.metadata, - &call->send_initial_metadata[i]); - } - call->send_initial_metadata_count = 0; - op.done_cb = finish_start_step; - op.user_data = call; - op.bind_pollset = grpc_cq_pollset(call->cq); - grpc_call_execute_op(call, &op); break; case SEND_BUFFERED_MESSAGE: flags |= GRPC_WRITE_BUFFER_HINT; /* fallthrough */ case SEND_MESSAGE: - data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; - op.type = GRPC_SEND_MESSAGE; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.data.message = data.send_message; - op.done_cb = finish_write_step; - op.user_data = call; - op.bind_pollset = NULL; - grpc_call_execute_op(call, &op); break; case SEND_TRAILING_METADATA_AND_FINISH: /* send trailing metadata */ -- cgit v1.2.3