aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-01-29 15:06:42 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-01-29 15:06:42 -0800
commit928fbc8ed0588f016fa2dd71517f532b25d6eb3c (patch)
tree16491102dd69c72603b7930807a664c0936a39f4 /src
parent6875272b86629e108a527ac2e24e810d878c02c6 (diff)
Robust status overriding
Diffstat (limited to 'src')
-rw-r--r--src/core/surface/call.c154
1 files changed, 83 insertions, 71 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index be624bfe9b..6a1fa361de 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -183,9 +183,7 @@ legacy_state *get_legacy_state(grpc_call *call) {
return call->legacy_state;
}
-void grpc_call_internal_ref(grpc_call *c) {
- gpr_ref(&c->internal_refcount);
-}
+void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); }
static void destroy_call(grpc_call *c) {
int i;
@@ -213,12 +211,14 @@ void grpc_call_internal_unref(grpc_call *c) {
}
}
-static void set_status_code(grpc_call *call, status_source source, gpr_uint32 status) {
+static void set_status_code(grpc_call *call, status_source source,
+ gpr_uint32 status) {
call->status[source].set = 1;
call->status[source].code = status;
}
-static void set_status_details(grpc_call *call, status_source source, grpc_mdstr *status) {
+static void set_status_details(grpc_call *call, status_source source,
+ grpc_mdstr *status) {
if (call->status[source].details != NULL) {
grpc_mdstr_unref(call->status[source].details);
}
@@ -285,7 +285,8 @@ static void unlock(grpc_call *call) {
}
}
-static void get_final_status(grpc_call *call, grpc_status_code *code, const char **details) {
+static void get_final_status(grpc_call *call, grpc_status_code *code,
+ const char **details) {
int i;
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (call->status[i].set) {
@@ -317,9 +318,11 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
if (master->complete_mask == master->need_mask ||
status == GRPC_OP_ERROR) {
if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) {
- get_final_status(call,
- &call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status->status,
- &call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status->details);
+ get_final_status(
+ call,
+ &call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status->status,
+ &call->requests[GRPC_IOREQ_RECV_STATUS]
+ .data.recv_status->details);
}
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
if (call->requests[i].master == master) {
@@ -426,11 +429,10 @@ static void enact_send_action(grpc_call *call, send_action sa) {
data = call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].data;
for (i = 0; i < data.send_metadata.count; i++) {
const grpc_metadata *md = &data.send_metadata.metadata[i];
- send_metadata(
- call,
- grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
- (const gpr_uint8 *)md->value,
- md->value_length));
+ send_metadata(call,
+ grpc_mdelem_from_string_and_buffer(
+ call->metadata_context, md->key,
+ (const gpr_uint8 *)md->value, md->value_length));
}
op.type = GRPC_SEND_START;
op.dir = GRPC_CALL_DOWN;
@@ -454,11 +456,10 @@ static void enact_send_action(grpc_call *call, send_action sa) {
data = call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].data;
for (i = 0; i < data.send_metadata.count; i++) {
const grpc_metadata *md = &data.send_metadata.metadata[i];
- send_metadata(
- call,
- grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
- (const gpr_uint8 *)md->value,
- md->value_length));
+ send_metadata(call,
+ grpc_mdelem_from_string_and_buffer(
+ call->metadata_context, md->key,
+ (const gpr_uint8 *)md->value, md->value_length));
}
lock(call);
call->sending = 0;
@@ -478,13 +479,13 @@ static void enact_send_action(grpc_call *call, send_action sa) {
grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
grpc_mdstr_from_string(call->metadata_context, status_str)));
if (data.send_close.details) {
- send_metadata(
- call,
- grpc_mdelem_from_metadata_strings(
- call->metadata_context,
- grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)),
- grpc_mdstr_from_string(call->metadata_context,
- data.send_close.details)));
+ send_metadata(call,
+ grpc_mdelem_from_metadata_strings(
+ call->metadata_context,
+ grpc_mdstr_ref(
+ grpc_channel_get_message_string(call->channel)),
+ grpc_mdstr_from_string(call->metadata_context,
+ data.send_close.details)));
}
}
op.type = GRPC_SEND_FINISH;
@@ -556,44 +557,46 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
for (i = 0; i < nreqs; i++) {
op = reqs[i].op;
switch (op) {
- default:
- break;
- case GRPC_IOREQ_RECV_MESSAGES:
- data.recv_messages->count = 0;
- if (call->buffered_messages.count > 0 || call->read_closed) {
- SWAP(grpc_byte_buffer_array, *data.recv_messages,
- call->buffered_messages);
- finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK);
- } else {
- call->need_more_data = 1;
- }
- break;
- case GRPC_IOREQ_SEND_MESSAGES:
- call->write_index = 0;
- break;
- case GRPC_IOREQ_SEND_CLOSE:
- if (requests[GRPC_IOREQ_SEND_MESSAGES].state == REQ_INITIAL) {
- requests[GRPC_IOREQ_SEND_MESSAGES].state = REQ_DONE;
- }
- break;
- case GRPC_IOREQ_RECV_INITIAL_METADATA:
- data.recv_metadata->count = 0;
- if (call->buffered_initial_metadata.count > 0) {
- SWAP(grpc_metadata_array, *data.recv_metadata, call->buffered_initial_metadata);
- }
- if (call->got_initial_metadata) {
- finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
- }
- break;
- case GRPC_IOREQ_RECV_TRAILING_METADATA:
- data.recv_metadata->count = 0;
- if (call->buffered_trailing_metadata.count > 0) {
- SWAP(grpc_metadata_array, *data.recv_metadata, call->buffered_trailing_metadata);
- }
- if (call->read_closed) {
- finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
- }
- break;
+ default:
+ break;
+ case GRPC_IOREQ_RECV_MESSAGES:
+ data.recv_messages->count = 0;
+ if (call->buffered_messages.count > 0 || call->read_closed) {
+ SWAP(grpc_byte_buffer_array, *data.recv_messages,
+ call->buffered_messages);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK);
+ } else {
+ call->need_more_data = 1;
+ }
+ break;
+ case GRPC_IOREQ_SEND_MESSAGES:
+ call->write_index = 0;
+ break;
+ case GRPC_IOREQ_SEND_CLOSE:
+ if (requests[GRPC_IOREQ_SEND_MESSAGES].state == REQ_INITIAL) {
+ requests[GRPC_IOREQ_SEND_MESSAGES].state = REQ_DONE;
+ }
+ break;
+ case GRPC_IOREQ_RECV_INITIAL_METADATA:
+ data.recv_metadata->count = 0;
+ if (call->buffered_initial_metadata.count > 0) {
+ SWAP(grpc_metadata_array, *data.recv_metadata,
+ call->buffered_initial_metadata);
+ }
+ if (call->got_initial_metadata) {
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
+ }
+ break;
+ case GRPC_IOREQ_RECV_TRAILING_METADATA:
+ data.recv_metadata->count = 0;
+ if (call->buffered_trailing_metadata.count > 0) {
+ SWAP(grpc_metadata_array, *data.recv_metadata,
+ call->buffered_trailing_metadata);
+ }
+ if (call->read_closed) {
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
+ }
+ break;
}
}
@@ -698,6 +701,18 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
return GRPC_CALL_OK;
}
+static void publish_failed_finished(grpc_call *call, grpc_status_code status,
+ const char *desc) {
+ grpc_status_code status_code;
+ const char *details;
+ set_status_code(call, STATUS_FROM_FAILED_OP, status);
+ set_status_details(call, STATUS_FROM_FAILED_OP,
+ grpc_mdstr_from_string(call->metadata_context, desc));
+ get_final_status(call, &status_code, &details);
+ grpc_cq_end_finished(call->cq, get_legacy_state(call)->finished_tag, call,
+ do_nothing, NULL, status_code, details, NULL, 0);
+}
+
static void finish_status(grpc_call *call, grpc_op_error status, void *tag) {
legacy_state *ls;
@@ -710,8 +725,7 @@ static void finish_status(grpc_call *call, grpc_op_error status, void *tag) {
ls->status_in.status, ls->status_in.details,
ls->trail_md_in.metadata, ls->trail_md_in.count);
} else {
- grpc_cq_end_finished(call->cq, tag, call, do_nothing, NULL,
- GRPC_STATUS_UNKNOWN, "Read status failed", NULL, 0);
+ publish_failed_finished(call, GRPC_STATUS_UNKNOWN, "Read status failed");
}
}
@@ -756,9 +770,8 @@ static void finish_send_metadata(grpc_call *call, grpc_op_error status,
ls = get_legacy_state(call);
grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call,
do_nothing, NULL, 0, NULL);
- grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL,
- GRPC_STATUS_UNKNOWN, "Failed to read initial metadata",
- NULL, 0);
+ publish_failed_finished(call, GRPC_STATUS_UNKNOWN,
+ "Failed to read initial metadata");
}
unlock(call);
}
@@ -828,8 +841,7 @@ grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call,
return err;
}
-void grpc_call_initial_metadata_complete(
- grpc_call_element *surface_element) {
+void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) {
grpc_call *call = grpc_call_from_top_element(surface_element);
lock(call);
call->got_initial_metadata = 1;