aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-04-24 13:03:49 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-04-24 13:03:49 -0700
commit2ea37fd2ce9046ccf2a0b89ba43c93d8fe80408a (patch)
tree4a388d50ce72e36dde07b30f08267493e27eb350
parentc1f7560ac27b6db4106115e5308f1a9124378a60 (diff)
Bug fixing
-rw-r--r--src/core/channel/channel_stack.h3
-rw-r--r--src/core/security/auth.c14
-rw-r--r--src/core/surface/call.c1
-rw-r--r--src/core/transport/chttp2_transport.c65
-rw-r--r--src/core/transport/transport.c9
-rw-r--r--src/core/transport/transport.h3
-rw-r--r--src/core/transport/transport_op_string.c4
7 files changed, 56 insertions, 43 deletions
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index 94b12639fc..de0e4e4518 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -222,9 +222,6 @@ void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
grpc_call_element *elem, grpc_transport_op *op);
void grpc_call_element_send_cancel(grpc_call_element *cur_elem);
-void grpc_call_element_recv_status(grpc_call_element *cur_elem,
- grpc_status_code status,
- const char *message);
extern int grpc_trace_channel;
diff --git a/src/core/security/auth.c b/src/core/security/auth.c
index 4dbc25675b..b6a002d43c 100644
--- a/src/core/security/auth.c
+++ b/src/core/security/auth.c
@@ -67,11 +67,6 @@ typedef struct {
grpc_mdstr *status_key;
} channel_data;
-static void bubbleup_error(grpc_call_element *elem, const char *error_msg) {
- grpc_call_element_recv_status(elem, GRPC_STATUS_UNAUTHENTICATED, error_msg);
- grpc_call_element_send_cancel(elem);
-}
-
static void on_credentials_metadata(void *user_data, grpc_mdelem **md_elems,
size_t num_md,
grpc_credentials_status status) {
@@ -141,6 +136,7 @@ static void send_security_metadata(grpc_call_element *elem, grpc_transport_op *o
static void on_host_checked(void *user_data, grpc_security_status status) {
grpc_call_element *elem = (grpc_call_element *)user_data;
call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
if (status == GRPC_SECURITY_OK) {
send_security_metadata(elem, &calld->op);
@@ -148,9 +144,9 @@ static void on_host_checked(void *user_data, grpc_security_status status) {
char *error_msg;
gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.",
grpc_mdstr_as_c_string(calld->host));
- bubbleup_error(elem, error_msg);
+ grpc_transport_op_add_cancellation(&calld->op, GRPC_STATUS_UNAUTHENTICATED, grpc_mdstr_from_string(chand->md_ctx, error_msg));
gpr_free(error_msg);
- grpc_transport_op_finish_with_failure(&calld->op);
+ grpc_call_next_op(elem, &calld->op);
}
}
@@ -199,9 +195,9 @@ static void auth_start_transport_op(grpc_call_element *elem,
gpr_asprintf(&error_msg,
"Invalid host %s set in :authority metadata.",
call_host);
- bubbleup_error(elem, error_msg);
+ grpc_transport_op_add_cancellation(&calld->op, GRPC_STATUS_UNAUTHENTICATED, grpc_mdstr_from_string(channeld->md_ctx, error_msg));
gpr_free(error_msg);
- grpc_transport_op_finish_with_failure(&calld->op);
+ grpc_call_next_op(elem, &calld->op);
}
return; /* early exit */
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 2f514465fc..8eee67bb83 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -424,7 +424,6 @@ static void unlock(grpc_call *call) {
memset(&op, 0, sizeof(op));
if (!call->receiving &&
- (call->write_state >= WRITE_STATE_STARTED || !call->is_client) &&
need_more_data(call)) {
op.recv_ops = &call->recv_ops;
op.recv_state = &call->recv_state;
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 237def41aa..7b50e285d0 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -361,7 +361,8 @@ static void cancel_stream_id(transport *t, gpr_uint32 id,
grpc_chttp2_error_code error_code, int send_rst);
static void cancel_stream(transport *t, stream *s,
grpc_status_code local_status,
- grpc_chttp2_error_code error_code, int send_rst);
+ grpc_chttp2_error_code error_code,
+ grpc_mdstr *optional_message, int send_rst);
static void finalize_cancellations(transport *t);
static stream *lookup_stream(transport *t, gpr_uint32 id);
static void remove_from_stream_map(transport *t, stream *s);
@@ -1011,6 +1012,12 @@ static void maybe_start_some_streams(transport *t) {
}
static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ cancel_stream(
+ t, s, op->cancel_with_status,
+ grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status), op->cancel_message, 1);
+ }
+
if (op->send_ops) {
GPR_ASSERT(s->outgoing_sopb == NULL);
s->send_done_closure.cb = op->on_done_send;
@@ -1037,26 +1044,16 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
GPR_ASSERT(s->incoming_sopb == NULL);
s->recv_done_closure.cb = op->on_done_recv;
s->recv_done_closure.user_data = op->recv_user_data;
- if (!s->cancelled) {
- s->incoming_sopb = op->recv_ops;
- s->incoming_sopb->nops = 0;
- s->publish_state = op->recv_state;
- maybe_finish_read(t, s);
- maybe_join_window_updates(t, s);
- } else {
- schedule_cb(t, s->recv_done_closure, 0);
- }
+ s->incoming_sopb = op->recv_ops;
+ s->incoming_sopb->nops = 0;
+ s->publish_state = op->recv_state;
+ maybe_finish_read(t, s);
+ maybe_join_window_updates(t, s);
}
if (op->bind_pollset) {
add_to_pollset_locked(t, op->bind_pollset);
}
-
- if (op->cancel_with_status != GRPC_STATUS_OK) {
- cancel_stream(
- t, s, op->cancel_with_status,
- grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status), 1);
- }
}
static void perform_op(grpc_transport *gt, grpc_stream *gs,
@@ -1123,6 +1120,7 @@ static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) {
static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code,
+ grpc_mdstr *optional_message,
int send_rst) {
int had_outgoing;
char buffer[GPR_LTOA_MIN_BUFSIZE];
@@ -1147,14 +1145,18 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
add_incoming_metadata(
t, s,
grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
- switch (local_status) {
- case GRPC_STATUS_CANCELLED:
- add_incoming_metadata(
- t, s, grpc_mdelem_from_strings(t->metadata_context,
- "grpc-message", "Cancelled"));
- break;
- default:
- break;
+ if (!optional_message) {
+ switch (local_status) {
+ case GRPC_STATUS_CANCELLED:
+ add_incoming_metadata(
+ t, s, grpc_mdelem_from_strings(t->metadata_context,
+ "grpc-message", "Cancelled"));
+ break;
+ default:
+ break;
+ }
+ } else {
+ add_incoming_metadata(t, s, grpc_mdelem_from_metadata_strings(t->metadata_context, grpc_mdstr_from_string(t->metadata_context, "grpc-message"), grpc_mdstr_ref(optional_message)));
}
add_metadata_batch(t, s);
maybe_finish_read(t, s);
@@ -1165,24 +1167,27 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
gpr_slice_buffer_add(&t->qbuf,
grpc_chttp2_rst_stream_create(id, error_code));
}
+ if (optional_message) {
+ grpc_mdstr_unref(optional_message);
+ }
}
static void cancel_stream_id(transport *t, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code, int send_rst) {
cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code,
- send_rst);
+ NULL, send_rst);
}
static void cancel_stream(transport *t, stream *s,
grpc_status_code local_status,
- grpc_chttp2_error_code error_code, int send_rst) {
- cancel_stream_inner(t, s, s->id, local_status, error_code, send_rst);
+ grpc_chttp2_error_code error_code, grpc_mdstr *optional_message, int send_rst) {
+ cancel_stream_inner(t, s, s->id, local_status, error_code, optional_message, send_rst);
}
static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) {
cancel_stream(user_data, stream, GRPC_STATUS_UNAVAILABLE,
- GRPC_CHTTP2_INTERNAL_ERROR, 0);
+ GRPC_CHTTP2_INTERNAL_ERROR, NULL, 0);
}
static void end_all_the_calls(transport *t) {
@@ -1285,7 +1290,7 @@ static int init_data_frame_parser(transport *t) {
case GRPC_CHTTP2_STREAM_ERROR:
cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
GRPC_CHTTP2_INTERNAL_ERROR),
- GRPC_CHTTP2_INTERNAL_ERROR, 1);
+ GRPC_CHTTP2_INTERNAL_ERROR, NULL, 1);
return init_skip_frame(t, 0);
case GRPC_CHTTP2_CONNECTION_ERROR:
drop_connection(t);
@@ -1598,7 +1603,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
if (!is_window_update_legal(st.window_update, s->outgoing_window)) {
cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
GRPC_CHTTP2_FLOW_CONTROL_ERROR),
- GRPC_CHTTP2_FLOW_CONTROL_ERROR, 1);
+ GRPC_CHTTP2_FLOW_CONTROL_ERROR, NULL, 1);
} else {
s->outgoing_window += st.window_update;
/* if this window update makes outgoing ops writable again,
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index 987dd4c918..cc9392177f 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -94,3 +94,12 @@ void grpc_transport_op_finish_with_failure(grpc_transport_op *op) {
op->on_done_recv(op->recv_user_data, 0);
}
}
+
+void grpc_transport_op_add_cancellation(grpc_transport_op *op, grpc_status_code status, grpc_mdstr *message) {
+ if (op->cancel_with_status == GRPC_STATUS_OK) {
+ op->cancel_with_status = status;
+ op->cancel_message = message;
+ } else if (message) {
+ grpc_mdstr_unref(message);
+ }
+}
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 5036dfc2de..7c4bed1863 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -75,6 +75,7 @@ typedef struct grpc_transport_op {
grpc_pollset *bind_pollset;
grpc_status_code cancel_with_status;
+ grpc_mdstr *cancel_message;
} grpc_transport_op;
/* Callbacks made from the transport to the upper layers of grpc. */
@@ -134,6 +135,8 @@ void grpc_transport_destroy_stream(grpc_transport *transport,
void grpc_transport_op_finish_with_failure(grpc_transport_op *op);
+void grpc_transport_op_add_cancellation(grpc_transport_op *op, grpc_status_code status, grpc_mdstr *message);
+
/* TODO(ctiller): remove this */
void grpc_transport_add_to_pollset(grpc_transport *transport,
grpc_pollset *pollset);
diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c
index 54f501f898..b9283b7abf 100644
--- a/src/core/transport/transport_op_string.c
+++ b/src/core/transport/transport_op_string.c
@@ -139,6 +139,10 @@ char *grpc_transport_op_string(grpc_transport_op *op) {
first = 0;
gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status);
gpr_strvec_add(&b, tmp);
+ if (op->cancel_message) {
+ gpr_asprintf(&tmp, ";msg='%s'", grpc_mdstr_as_c_string(op->cancel_message));
+ gpr_strvec_add(&b, tmp);
+ }
}
out = gpr_strvec_flatten(&b, NULL);