aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-17 17:16:48 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-17 17:16:48 -0700
commitcec9eb9ed727dc7321380d63a9e48c8fc94f2fbe (patch)
tree58f4cb81b91740919b575ba3345a216fd6a7459c
parentc9a7f5cf168fc675fcf4359458677f32942da6d3 (diff)
Cancellation related fixes
-rw-r--r--src/core/transport/chttp2/internal.h3
-rw-r--r--src/core/transport/chttp2/parsing.c66
-rw-r--r--src/core/transport/chttp2_transport.c32
3 files changed, 33 insertions, 68 deletions
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index 96390fe7ce..2b4dc5e6b8 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -401,6 +401,9 @@ typedef struct {
gpr_uint8 read_closed;
/** has this stream been cancelled? (boolean) */
gpr_uint8 cancelled;
+ grpc_status_code cancelled_status;
+ /** have we told the upper layer that this stream is cancelled? */
+ gpr_uint8 published_cancelled;
/** is this stream in the stream map? (boolean) */
gpr_uint8 in_stream_map;
diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c
index f33b54f167..7276f8cafa 100644
--- a/src/core/transport/chttp2/parsing.c
+++ b/src/core/transport/chttp2/parsing.c
@@ -36,6 +36,7 @@
#include <string.h>
#include "src/core/transport/chttp2/http2_errors.h"
+#include "src/core/transport/chttp2/status_conversion.h"
#include "src/core/transport/chttp2/timeout_encoding.h"
#include <grpc/support/alloc.h>
@@ -66,6 +67,8 @@ void grpc_chttp2_prepare_to_read(
grpc_chttp2_stream_global *stream_global;
grpc_chttp2_stream_parsing *stream_parsing;
+ transport_parsing->next_stream_id = transport_global->next_stream_id;
+
/* update the parsing view of incoming window */
if (transport_parsing->incoming_window != transport_global->incoming_window) {
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
@@ -205,6 +208,10 @@ void grpc_chttp2_publish_reads(
}
if (stream_parsing->saw_rst_stream) {
stream_global->cancelled = 1;
+ stream_global->cancelled_status = grpc_chttp2_http2_error_to_grpc_status(stream_parsing->rst_stream_reason);
+ if (stream_parsing->rst_stream_reason == GRPC_CHTTP2_NO_ERROR) {
+ stream_global->published_cancelled = 1;
+ }
grpc_chttp2_list_add_read_write_state_changed(transport_global,
stream_global);
}
@@ -803,62 +810,3 @@ static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing,
abort();
return 0;
}
-
-#if 0
- if (st.end_of_stream) {
- transport_parsing->incoming_stream->read_closed = 1;
- maybe_finish_read(t, transport_parsing->incoming_stream, 1);
- }
- if (st.need_flush_reads) {
- maybe_finish_read(t, transport_parsing->incoming_stream, 1);
- }
- if (st.metadata_boundary) {
- add_metadata_batch(t, transport_parsing->incoming_stream);
- maybe_finish_read(t, transport_parsing->incoming_stream, 1);
- }
- if (st.ack_settings) {
- }
- if (st.send_ping_ack) {
- }
- if (st.goaway) {
- add_goaway(t, st.goaway_error, st.goaway_text);
- }
- if (st.rst_stream) {
- cancel_stream_id(
- t, transport_parsing->incoming_stream_id,
- grpc_chttp2_http2_error_to_grpc_status(st.rst_stream_reason),
- st.rst_stream_reason, 0);
- }
- if (st.process_ping_reply) {
- for (i = 0; i < transport_parsing->ping_count; i++) {
- if (0 ==
- memcmp(transport_parsing->pings[i].id, transport_parsing->simple.ping.opaque_8bytes, 8)) {
- transport_parsing->pings[i].cb(transport_parsing->pings[i].user_data);
- memmove(&transport_parsing->pings[i], &transport_parsing->pings[i + 1],
- (transport_parsing->ping_count - i - 1) * sizeof(grpc_chttp2_outstanding_ping));
- transport_parsing->ping_count--;
- break;
- }
- }
- }
- if (st.initial_window_update) {
- for (i = 0; i < transport_parsing->stream_map.count; i++) {
- grpc_chttp2_stream *s = (grpc_chttp2_stream *)(transport_parsing->stream_map.values[i]);
- s->global.outgoing_window_update += st.initial_window_update;
- stream_list_join(t, s, NEW_OUTGOING_WINDOW);
- }
- }
- if (st.window_update) {
- if (transport_parsing->incoming_stream_id) {
- /* if there was a grpc_chttp2_stream id, this is for some grpc_chttp2_stream */
- grpc_chttp2_stream *s = lookup_stream(t, transport_parsing->incoming_stream_id);
- if (s) {
- s->global.outgoing_window_update += st.window_update;
- stream_list_join(t, s, NEW_OUTGOING_WINDOW);
- }
- } else {
- /* grpc_chttp2_transport level window update */
- transport_parsing->global.outgoing_window_update += st.window_update;
- }
- }
-#endif
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index f002b56381..902c15ad3e 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -740,10 +740,19 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
while (grpc_chttp2_list_pop_read_write_state_changed(transport_global,
&stream_global)) {
- if (!stream_global->publish_sopb) {
- gpr_log(GPR_DEBUG, "%s %d: skip rw update: no publish target",
- transport_global->is_client ? "CLI" : "SVR", stream_global->id);
- continue;
+ if (stream_global->cancelled) {
+ stream_global->write_state = WRITE_STATE_SENT_CLOSE;
+ stream_global->read_closed = 1;
+ if (!stream_global->published_cancelled) {
+ char buffer[GPR_LTOA_MIN_BUFSIZE];
+ gpr_ltoa(stream_global->cancelled_status, buffer);
+ grpc_chttp2_incoming_metadata_buffer_add(&stream_global->incoming_metadata,
+ grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
+ grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
+ &stream_global->incoming_metadata,
+ &stream_global->incoming_sopb);
+ stream_global->published_cancelled = 1;
+ }
}
if (stream_global->write_state == WRITE_STATE_SENT_CLOSE &&
stream_global->read_closed && stream_global->in_stream_map) {
@@ -758,6 +767,11 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
remove_stream(t, stream_global->id);
}
}
+ if (!stream_global->publish_sopb) {
+ gpr_log(GPR_DEBUG, "%s %d: skip rw update: no publish target",
+ transport_global->is_client ? "CLI" : "SVR", stream_global->id);
+ continue;
+ }
state = compute_state(
stream_global->write_state == WRITE_STATE_SENT_CLOSE,
stream_global->read_closed && !stream_global->in_stream_map);
@@ -786,15 +800,15 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status) {
stream_global->cancelled = 1;
- if (stream_global->in_stream_map) {
+ stream_global->cancelled_status = status;
+ if (stream_global->id != 0) {
gpr_slice_buffer_add(&transport_global->qbuf,
grpc_chttp2_rst_stream_create(
stream_global->id,
- grpc_chttp2_grpc_status_to_http2_status(status)));
- } else {
- grpc_chttp2_list_add_read_write_state_changed(transport_global,
- stream_global);
+ grpc_chttp2_grpc_status_to_http2_error(status)));
}
+ grpc_chttp2_list_add_read_write_state_changed(transport_global,
+ stream_global);
}
#if 0