aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-04-21 16:02:05 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-04-21 16:02:05 -0700
commit83f88d90b9c61c94994ed00d03a6fa469359d559 (patch)
treef1320aa3cda8b54961de246437c1ea980459750c /src/core/surface
parent65582323ad6b934b4cbb3a4632bb4c985acc1bdc (diff)
stuff
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/call.c178
-rw-r--r--src/core/surface/call.h1
2 files changed, 91 insertions, 88 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 608442c008..709ca0b397 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -150,6 +150,9 @@ struct grpc_call {
gpr_uint8 num_completed_requests;
/* flag that we need to request more data */
gpr_uint8 need_more_data;
+ /* flags with bits corresponding to write states allowing us to determine
+ what was sent */
+ gpr_uint8 last_send_contains;
/* Active ioreqs.
request_set and request_data contain one element per active ioreq
@@ -214,6 +217,10 @@ struct grpc_call {
size_t send_initial_metadata_count;
gpr_timespec send_deadline;
+ grpc_stream_op_buffer send_ops;
+ grpc_stream_op_buffer recv_ops;
+ grpc_stream_state recv_state;
+
/* Data that the legacy api needs to track. To be deleted at some point
soon */
legacy_state *legacy_state;
@@ -234,9 +241,11 @@ struct grpc_call {
} while (0)
static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
-static send_action choose_send_action(grpc_call *call);
-static void enact_send_action(grpc_call *call, send_action sa);
static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline);
+static void call_on_done_recv(void *call, int success);
+static void call_on_done_send(void *call, int success);
+static int fill_send_ops(grpc_call *call, grpc_transport_op *op);
+static void execute_op(grpc_call *call, grpc_transport_op *op);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
const void *server_transport_data,
@@ -359,20 +368,6 @@ static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) {
return GRPC_CALL_OK;
}
-static void request_more_data(grpc_call *call) {
- grpc_call_op op;
-
- /* call down */
- op.type = GRPC_REQUEST_DATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- op.bind_pollset = NULL;
-
- grpc_call_execute_op(call, &op);
-}
-
static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
gpr_uint8 set = call->request_set[op];
reqinfo_master *master;
@@ -384,16 +379,31 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
static void unlock(grpc_call *call) {
- send_action sa = SEND_NOTHING;
+ grpc_transport_op op;
completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
int completing_requests = 0;
- int need_more_data =
- call->need_more_data &&
- (call->write_state >= WRITE_STATE_STARTED || !call->is_client);
+ int start_op = 0;
int i;
- if (need_more_data) {
+ memset(&op, 0, sizeof(op));
+
+ if (call->need_more_data &&
+ (call->write_state >= WRITE_STATE_STARTED || !call->is_client)) {
+ op.recv_ops = &call->recv_ops;
+ op.recv_state = &call->recv_state;
+ op.on_done_recv = call_on_done_recv;
+ op.recv_user_data = call;
call->need_more_data = 0;
+ grpc_call_internal_ref(call);
+ start_op = 1;
+ }
+
+ if (!call->sending) {
+ if (fill_send_ops(call, &op)) {
+ call->sending = 1;
+ grpc_call_internal_ref(call);
+ start_op = 1;
+ }
}
if (!call->completing && call->num_completed_requests != 0) {
@@ -405,22 +415,10 @@ static void unlock(grpc_call *call) {
grpc_call_internal_ref(call);
}
- if (!call->sending) {
- sa = choose_send_action(call);
- if (sa != SEND_NOTHING) {
- call->sending = 1;
- grpc_call_internal_ref(call);
- }
- }
-
gpr_mu_unlock(&call->mu);
- if (need_more_data) {
- request_more_data(call);
- }
-
- if (sa != SEND_NOTHING) {
- enact_send_action(call, sa);
+ if (start_op) {
+ execute_op(call, &op);
}
if (completing_requests > 0) {
@@ -577,6 +575,64 @@ static void finish_start_step(void *pc, grpc_op_error error) {
error);
}
+static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
+ grpc_metadata *metadata) {
+ size_t i;
+ grpc_mdelem_list out;
+ if (count == 0) {
+ out.head = out.tail = NULL;
+ return out;
+ }
+ for (i = 0; i < count; i++) {
+ grpc_metadata *md = &metadata[i];
+ grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1];
+ grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1];
+ grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
+ assert(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
+ l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
+ (const gpr_uint8 *)md->value,
+ md->value_length);
+ l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL;
+ l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL;
+ }
+ out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data);
+ out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data);
+ return out;
+}
+
+static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
+ grpc_ioreq_data data;
+ grpc_metadata_batch mdb;
+ size_t i;
+ GPR_ASSERT(op->send_ops == NULL);
+
+ switch (call->write_state) {
+ case WRITE_STATE_INITIAL:
+ if (!is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) {
+ break;
+ }
+ data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
+ mdb.list = chain_metadata_from_app(
+ call, data.send_metadata.count, data.send_metadata.metadata);
+ mdb.garbage.head = mdb.garbage.tail = NULL;
+ mdb.deadline = call->send_deadline;
+ for (i = 0; i < call->send_initial_metadata_count; i++) {
+ grpc_metadata_batch_link_head(&mdb,
+ &call->send_initial_metadata[i]);
+ }
+ grpc_sopb_add_metadata(&call->send_ops, mdb);
+ op->send_ops = &call->send_ops;
+ op->bind_pollset = grpc_cq_pollset(call->cq);
+ call->last_send_contains |= 1 << WRITE_STATE_INITIAL;
+ /* fall through intended */
+ case WRITE_STATE_STARTED:
+ if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
+ data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
+ grpc_sopb_add_message(data.send_message);
+abort();
+ }
+}
+
static send_action choose_send_action(grpc_call *call) {
switch (call->write_state) {
case WRITE_STATE_INITIAL:
@@ -614,33 +670,7 @@ static send_action choose_send_action(grpc_call *call) {
return SEND_NOTHING;
}
-static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
- grpc_metadata *metadata) {
- size_t i;
- grpc_mdelem_list out;
- if (count == 0) {
- out.head = out.tail = NULL;
- return out;
- }
- for (i = 0; i < count; i++) {
- grpc_metadata *md = &metadata[i];
- grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1];
- grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1];
- grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
- assert(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
- l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
- (const gpr_uint8 *)md->value,
- md->value_length);
- l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL;
- l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL;
- }
- out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data);
- out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data);
- return out;
-}
-
static void enact_send_action(grpc_call *call, send_action sa) {
- grpc_ioreq_data data;
grpc_call_op op;
size_t i;
gpr_uint32 flags = 0;
@@ -654,37 +684,11 @@ static void enact_send_action(grpc_call *call, send_action sa) {
flags |= GRPC_WRITE_BUFFER_HINT;
/* fallthrough */
case SEND_INITIAL_METADATA:
- data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
- op.type = GRPC_SEND_METADATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
- op.data.metadata.list = chain_metadata_from_app(
- call, data.send_metadata.count, data.send_metadata.metadata);
- op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
- op.data.metadata.deadline = call->send_deadline;
- for (i = 0; i < call->send_initial_metadata_count; i++) {
- grpc_metadata_batch_link_head(&op.data.metadata,
- &call->send_initial_metadata[i]);
- }
- call->send_initial_metadata_count = 0;
- op.done_cb = finish_start_step;
- op.user_data = call;
- op.bind_pollset = grpc_cq_pollset(call->cq);
- grpc_call_execute_op(call, &op);
break;
case SEND_BUFFERED_MESSAGE:
flags |= GRPC_WRITE_BUFFER_HINT;
/* fallthrough */
case SEND_MESSAGE:
- data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
- op.type = GRPC_SEND_MESSAGE;
- op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
- op.data.message = data.send_message;
- op.done_cb = finish_write_step;
- op.user_data = call;
- op.bind_pollset = NULL;
- grpc_call_execute_op(call, &op);
break;
case SEND_TRAILING_METADATA_AND_FINISH:
/* send trailing metadata */
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index f8d0915349..358e5560a3 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -106,7 +106,6 @@ void grpc_call_recv_message(grpc_call_element *surface_element,
void grpc_call_read_closed(grpc_call_element *surface_element);
void grpc_call_stream_closed(grpc_call_element *surface_element);
-void grpc_call_execute_op(grpc_call *call, grpc_call_op *op);
grpc_call_error grpc_call_start_ioreq_and_call_back(
grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
grpc_ioreq_completion_func on_complete, void *user_data);