aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/call.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-04 08:38:02 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-04 08:38:02 -0800
commit7bd9b99d970a2f34ab59e24b114266d93dc871e4 (patch)
treeea2a925f557c1ab7bdfb289e3e7fce87c03e1973 /src/core/surface/call.c
parentd6731628bcedbe150d4f7cd09656bd506979b00f (diff)
Autohint write buffering
If there's an operation already scheduled after the one we are about to start, hint that write buffering is desired.
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r--src/core/surface/call.c42
1 files changed, 27 insertions, 15 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 8221401a70..e878e1a3f3 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -54,7 +54,9 @@ typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state;
typedef enum {
SEND_NOTHING,
SEND_INITIAL_METADATA,
+ SEND_BUFFERED_INITIAL_METADATA,
SEND_MESSAGE,
+ SEND_BUFFERED_MESSAGE,
SEND_TRAILING_METADATA_AND_FINISH,
SEND_FINISH
} send_action;
@@ -146,10 +148,10 @@ struct grpc_call {
/* Active ioreqs.
request_set and request_data contain one element per active ioreq
operation.
-
+
request_set[op] is an integer specifying a set of operations to which
the request belongs:
- - if it is < GRPC_IOREQ_OP_COUNT, then this operation is pending
+ - if it is < GRPC_IOREQ_OP_COUNT, then this operation is pending
completion, and the integer represents to which group of operations
the ioreq belongs. Each group is represented by one master, and the
integer in request_set is an index into masters to find the master
@@ -158,7 +160,7 @@ struct grpc_call {
started
- finally, if request_set[op] is REQSET_DONE, then the operation is
complete and unavailable to be started again
-
+
request_data[op] is the request data as supplied by the initiator of
a request, and is valid iff request_set[op] <= GRPC_IOREQ_OP_COUNT.
The set fields are as per the request type specified by op.
@@ -200,12 +202,12 @@ struct grpc_call {
/* Call refcount - to keep the call alive during asynchronous operations */
gpr_refcount internal_refcount;
- /* Data that the legacy api needs to track. To be deleted at some point
+ /* Data that the legacy api needs to track. To be deleted at some point
soon */
legacy_state *legacy_state;
};
-#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
+#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1))
#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
#define CALL_ELEM_FROM_CALL(call, idx) \
grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
@@ -333,8 +335,8 @@ static void unlock(grpc_call *call) {
send_action sa = SEND_NOTHING;
completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
int num_completed_requests = call->num_completed_requests;
- int need_more_data =
- call->need_more_data && !is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA);
+ int need_more_data = call->need_more_data &&
+ !is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA);
int i;
if (need_more_data) {
@@ -495,15 +497,18 @@ static void finish_start_step(void *pc, grpc_op_error error) {
static send_action choose_send_action(grpc_call *call) {
switch (call->write_state) {
case WRITE_STATE_INITIAL:
- if (call->request_set[GRPC_IOREQ_SEND_INITIAL_METADATA] !=
- REQSET_EMPTY) {
+ if (call->request_set[GRPC_IOREQ_SEND_INITIAL_METADATA] != REQSET_EMPTY) {
call->write_state = WRITE_STATE_STARTED;
- return SEND_INITIAL_METADATA;
+ return is_op_live(call, GRPC_IOREQ_SEND_MESSAGE) ||
+ is_op_live(call, GRPC_IOREQ_SEND_CLOSE)
+ ? SEND_BUFFERED_INITIAL_METADATA
+ : SEND_INITIAL_METADATA;
}
return SEND_NOTHING;
case WRITE_STATE_STARTED:
if (call->request_set[GRPC_IOREQ_SEND_MESSAGE] != REQSET_EMPTY) {
- return SEND_MESSAGE;
+ return is_op_live(call, GRPC_IOREQ_SEND_CLOSE) ? SEND_BUFFERED_MESSAGE
+ : SEND_MESSAGE;
}
if (call->request_set[GRPC_IOREQ_SEND_CLOSE] != REQSET_EMPTY) {
call->write_state = WRITE_STATE_WRITE_CLOSED;
@@ -525,7 +530,7 @@ static void send_metadata(grpc_call *call, grpc_mdelem *elem) {
grpc_call_op op;
op.type = GRPC_SEND_METADATA;
op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
+ op.flags = GRPC_WRITE_BUFFER_HINT;
op.data.metadata = elem;
op.done_cb = do_nothing;
op.user_data = NULL;
@@ -536,12 +541,16 @@ static void enact_send_action(grpc_call *call, send_action sa) {
grpc_ioreq_data data;
grpc_call_op op;
size_t i;
+ gpr_uint32 flags = 0;
char status_str[GPR_LTOA_MIN_BUFSIZE];
switch (sa) {
case SEND_NOTHING:
abort();
break;
+ case SEND_BUFFERED_INITIAL_METADATA:
+ flags |= GRPC_WRITE_BUFFER_HINT;
+ /* fallthrough */
case SEND_INITIAL_METADATA:
data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
for (i = 0; i < data.send_metadata.count; i++) {
@@ -553,17 +562,20 @@ static void enact_send_action(grpc_call *call, send_action sa) {
}
op.type = GRPC_SEND_START;
op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
+ op.flags = flags;
op.data.start.pollset = grpc_cq_pollset(call->cq);
op.done_cb = finish_start_step;
op.user_data = call;
grpc_call_execute_op(call, &op);
break;
+ case SEND_BUFFERED_MESSAGE:
+ flags |= GRPC_WRITE_BUFFER_HINT;
+ /* fallthrough */
case SEND_MESSAGE:
data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
op.type = GRPC_SEND_MESSAGE;
op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
+ op.flags = flags;
op.data.message = data.send_message;
op.done_cb = finish_write_step;
op.user_data = call;
@@ -854,7 +866,7 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
gpr_uint32 status;
void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
if (user_data) {
- status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET;
+ status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET;
} else {
if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
GPR_SLICE_LENGTH(md->value->slice),