aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/call.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-01-31 08:51:54 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-01-31 08:51:54 -0800
commit1c1419011afa631ed25224d9d97ce7ce8a185880 (patch)
treee737e604e20f2695fb21f5469dbe9041f3252b88 /src/core/surface/call.c
parentb8a318acd9c2d7c289b3fda532c22d486173d9e7 (diff)
Get server status send working
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r--src/core/surface/call.c117
1 files changed, 54 insertions, 63 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index e2e8fe23a5..a638979959 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -58,7 +58,7 @@ typedef struct {
/* input buffers */
grpc_metadata_array initial_md_in;
grpc_metadata_array trailing_md_in;
-
+
size_t details_capacity;
char *details;
grpc_status_code status;
@@ -76,8 +76,7 @@ typedef enum {
SEND_NOTHING,
SEND_INITIAL_METADATA,
SEND_MESSAGE,
- SEND_TRAILING_METADATA,
- SEND_STATUS,
+ SEND_TRAILING_METADATA_AND_FINISH,
SEND_FINISH
} send_action;
@@ -89,7 +88,7 @@ typedef struct {
/* See reqinfo.set below for a description */
#define REQSET_EMPTY 255
-#define REQSET_DONE 254
+#define REQSET_DONE 254
/* The state of an ioreq */
typedef struct reqinfo {
@@ -158,7 +157,7 @@ struct grpc_call {
legacy_state *legacy_state;
};
-#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
+#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1))
#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
#define CALL_ELEM_FROM_CALL(call, idx) \
grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
@@ -191,6 +190,7 @@ grpc_call *grpc_call_create(grpc_channel *channel,
}
if (call->is_client) {
call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set = REQSET_DONE;
+ call->requests[GRPC_IOREQ_SEND_STATUS].set = REQSET_DONE;
}
grpc_channel_internal_ref(channel);
call->metadata_context = grpc_channel_get_metadata_context(channel);
@@ -341,7 +341,8 @@ static void get_final_status(grpc_call *call, grpc_recv_status_args args) {
gpr_slice details = call->status[i].details->slice;
size_t len = GPR_SLICE_LENGTH(details);
if (len + 1 > *args.details_capacity) {
- *args.details_capacity = GPR_MAX(len + 1, *args.details_capacity * 3 / 2);
+ *args.details_capacity =
+ GPR_MAX(len + 1, *args.details_capacity * 3 / 2);
*args.details = gpr_realloc(*args.details, *args.details_capacity);
}
memcpy(*args.details, GPR_SLICE_START_PTR(details), len);
@@ -374,12 +375,10 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
(op == GRPC_IOREQ_SEND_MESSAGE || op == GRPC_IOREQ_RECV_MESSAGE)
? REQSET_EMPTY
: REQSET_DONE;
- if (master->complete_mask == master->need_mask ||
- status == GRPC_OP_ERROR) {
+ if (master->complete_mask == master->need_mask || status == GRPC_OP_ERROR) {
if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) {
get_final_status(
- call,
- call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status);
+ call, call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status);
}
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
if (call->requests[i].set == op) {
@@ -397,7 +396,8 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
}
}
-static void finish_send_op(grpc_call *call, grpc_ioreq_op op, grpc_op_error error) {
+static void finish_send_op(grpc_call *call, grpc_ioreq_op op,
+ grpc_op_error error) {
lock(call);
finish_ioreq_op(call, op, error);
call->sending = 0;
@@ -434,30 +434,20 @@ static send_action choose_send_action(grpc_call *call) {
case REQSET_DONE:
break;
}
- switch (call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set) {
- case REQSET_EMPTY:
- return SEND_NOTHING;
- default:
- finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK);
- return SEND_TRAILING_METADATA;
- case REQSET_DONE:
- break;
- }
- switch (call->requests[GRPC_IOREQ_SEND_STATUS].set) {
- case REQSET_EMPTY:
- return SEND_NOTHING;
- default:
- finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_OK);
- return SEND_STATUS;
- case REQSET_DONE:
- break;
- }
switch (call->requests[GRPC_IOREQ_SEND_CLOSE].set) {
case REQSET_EMPTY:
case REQSET_DONE:
return SEND_NOTHING;
default:
- return SEND_FINISH;
+ if (call->is_client) {
+ return SEND_FINISH;
+ } else if (call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set !=
+ REQSET_EMPTY &&
+ call->requests[GRPC_IOREQ_SEND_STATUS].set != REQSET_EMPTY) {
+ return SEND_TRAILING_METADATA_AND_FINISH;
+ } else {
+ return SEND_NOTHING;
+ }
}
}
@@ -509,7 +499,8 @@ static void enact_send_action(grpc_call *call, send_action sa) {
op.user_data = call;
grpc_call_execute_op(call, &op);
break;
- case SEND_TRAILING_METADATA:
+ case SEND_TRAILING_METADATA_AND_FINISH:
+ /* send trailing metadata */
data = call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].data;
for (i = 0; i < data.send_metadata.count; i++) {
const grpc_metadata *md = &data.send_metadata.metadata[i];
@@ -518,12 +509,7 @@ static void enact_send_action(grpc_call *call, send_action sa) {
call->metadata_context, md->key,
(const gpr_uint8 *)md->value, md->value_length));
}
- lock(call);
- call->sending = 0;
- unlock(call);
- grpc_call_internal_unref(call, 0);
- break;
- case SEND_STATUS:
+ /* send status */
/* TODO(ctiller): cache common status values */
data = call->requests[GRPC_IOREQ_SEND_CLOSE].data;
gpr_ltoa(data.send_status.code, status_str);
@@ -534,15 +520,15 @@ static void enact_send_action(grpc_call *call, send_action sa) {
grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
grpc_mdstr_from_string(call->metadata_context, status_str)));
if (data.send_status.details) {
- send_metadata(call,
- grpc_mdelem_from_metadata_strings(
- call->metadata_context,
- grpc_mdstr_ref(
- grpc_channel_get_message_string(call->channel)),
- grpc_mdstr_from_string(call->metadata_context,
- data.send_status.details)));
+ send_metadata(
+ call,
+ grpc_mdelem_from_metadata_strings(
+ call->metadata_context,
+ grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)),
+ grpc_mdstr_from_string(call->metadata_context,
+ data.send_status.details)));
}
- break;
+ /* fallthrough: see choose_send_action for details */
case SEND_FINISH:
op.type = GRPC_SEND_FINISH;
op.dir = GRPC_CALL_DOWN;
@@ -591,8 +577,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
return start_ioreq_error(call, have_ops,
GRPC_CALL_ERROR_TOO_MANY_OPERATIONS);
} else if (requests[op].set == REQSET_DONE) {
- return start_ioreq_error(call, have_ops,
- GRPC_CALL_ERROR_ALREADY_INVOKED);
+ return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_ALREADY_INVOKED);
}
have_ops |= 1 << op;
data = reqs[i].data;
@@ -755,11 +740,14 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
lock(call);
ls = get_legacy_state(call);
- if (ls->md_out_count[ls->md_out_buffer] == ls->md_out_capacity[ls->md_out_buffer]) {
+ if (ls->md_out_count[ls->md_out_buffer] ==
+ ls->md_out_capacity[ls->md_out_buffer]) {
ls->md_out_capacity[ls->md_out_buffer] =
- GPR_MAX(ls->md_out_capacity[ls->md_out_buffer] * 3 / 2, ls->md_out_capacity[ls->md_out_buffer] + 8);
- ls->md_out[ls->md_out_buffer] =
- gpr_realloc(ls->md_out[ls->md_out_buffer], sizeof(grpc_metadata) * ls->md_out_capacity[ls->md_out_buffer]);
+ GPR_MAX(ls->md_out_capacity[ls->md_out_buffer] * 3 / 2,
+ ls->md_out_capacity[ls->md_out_buffer] + 8);
+ ls->md_out[ls->md_out_buffer] = gpr_realloc(
+ ls->md_out[ls->md_out_buffer],
+ sizeof(grpc_metadata) * ls->md_out_capacity[ls->md_out_buffer]);
}
mdout = &ls->md_out[ls->md_out_buffer][ls->md_out_count[ls->md_out_buffer]++];
mdout->key = gpr_strdup(metadata->key);
@@ -776,8 +764,8 @@ static void maybe_finish_legacy(grpc_call *call) {
legacy_state *ls = get_legacy_state(call);
if (ls->got_status) {
grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL,
- ls->status, ls->details,
- ls->trailing_md_in.metadata, ls->trailing_md_in.count);
+ ls->status, ls->details, ls->trailing_md_in.metadata,
+ ls->trailing_md_in.count);
}
}
@@ -800,7 +788,8 @@ static void finish_recv_metadata(grpc_call *call, grpc_op_error status,
ls = get_legacy_state(call);
if (status == GRPC_OP_OK) {
grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL,
- ls->initial_md_in.count, ls->initial_md_in.metadata);
+ ls->initial_md_in.count,
+ ls->initial_md_in.metadata);
} else {
grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0,
@@ -1000,11 +989,12 @@ grpc_call_error grpc_call_start_write_status(grpc_call *call,
reqs[0].op = GRPC_IOREQ_SEND_TRAILING_METADATA;
reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
- reqs[1].op = GRPC_IOREQ_SEND_CLOSE;
+ reqs[1].op = GRPC_IOREQ_SEND_STATUS;
reqs[1].data.send_status.code = status;
/* MEMLEAK */
reqs[1].data.send_status.details = gpr_strdup(details);
- err = start_ioreq(call, reqs, 2, finish_finish, tag);
+ reqs[2].op = GRPC_IOREQ_SEND_CLOSE;
+ err = start_ioreq(call, reqs, 3, finish_finish, tag);
unlock(call);
return err;
@@ -1077,7 +1067,7 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
gpr_uint32 status;
void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
if (user_data) {
- status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET;
+ status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET;
} else {
if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
GPR_SLICE_LENGTH(md->value->slice),
@@ -1119,16 +1109,17 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
grpc_mdelem_unref(md);
} else {
if (!call->got_initial_metadata) {
- dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].set < GRPC_IOREQ_OP_COUNT
+ dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].set <
+ GRPC_IOREQ_OP_COUNT
? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA]
.data.recv_metadata
: &call->buffered_initial_metadata;
} else {
- dest =
- call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].set < GRPC_IOREQ_OP_COUNT
- ? call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA]
- .data.recv_metadata
- : &call->buffered_trailing_metadata;
+ dest = call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].set <
+ GRPC_IOREQ_OP_COUNT
+ ? call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA]
+ .data.recv_metadata
+ : &call->buffered_trailing_metadata;
}
if (dest->count == dest->capacity) {
dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);