aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/chttp2_transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/transport/chttp2_transport.c')
-rw-r--r--src/core/transport/chttp2_transport.c113
1 files changed, 113 insertions, 0 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 1ea4a82c16..c8c4207208 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -107,6 +107,11 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status);
+static void close_from_api(grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global,
+ grpc_status_code status,
+ gpr_slice *optional_message);
+
/** Add endpoint from this transport to pollset */
static void add_to_pollset_locked(grpc_chttp2_transport *t,
grpc_pollset *pollset);
@@ -602,10 +607,16 @@ static void perform_stream_op_locked(
cancel_from_api(transport_global, stream_global, op->cancel_with_status);
}
+ if (op->close_with_status != GRPC_STATUS_OK) {
+ close_from_api(transport_global, stream_global, op->close_with_status,
+ op->optional_close_message);
+ }
+
if (op->send_ops) {
GPR_ASSERT(stream_global->outgoing_sopb == NULL);
stream_global->send_done_closure = op->on_done_send;
if (!stream_global->cancelled) {
+ stream_global->written_anything = 1;
stream_global->outgoing_sopb = op->send_ops;
if (op->is_last_send &&
stream_global->write_state == GRPC_WRITE_STATE_OPEN) {
@@ -894,6 +905,108 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
stream_global);
}
+static void close_from_api(grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global,
+ grpc_status_code status,
+ gpr_slice *optional_message) {
+ gpr_slice hdr;
+ gpr_slice status_hdr;
+ gpr_slice message_pfx;
+ gpr_uint8 *p;
+ gpr_uint32 len = 0;
+
+ GPR_ASSERT(status >= 0 && (int)status < 100);
+
+ stream_global->cancelled = 1;
+ stream_global->cancelled_status = status;
+ GPR_ASSERT(stream_global->id != 0);
+ GPR_ASSERT(!stream_global->written_anything);
+
+ /* Hand roll a header block.
+ This is unnecessarily ugly - at some point we should find a more elegant
+ solution.
+ It's complicated by the fact that our send machinery would be dead by the
+ time we got around to sending this, so instead we ignore HPACK compression
+ and just write the uncompressed bytes onto the wire. */
+ status_hdr = gpr_slice_malloc(15 + (status >= 10));
+ p = GPR_SLICE_START_PTR(status_hdr);
+ *p++ = 0x40; /* literal header */
+ *p++ = 11; /* len(grpc-status) */
+ *p++ = 'g';
+ *p++ = 'r';
+ *p++ = 'p';
+ *p++ = 'c';
+ *p++ = '-';
+ *p++ = 's';
+ *p++ = 't';
+ *p++ = 'a';
+ *p++ = 't';
+ *p++ = 'u';
+ *p++ = 's';
+ if (status < 10) {
+ *p++ = 1;
+ *p++ = '0' + status;
+ } else {
+ *p++ = 2;
+ *p++ = '0' + (status / 10);
+ *p++ = '0' + (status % 10);
+ }
+ GPR_ASSERT(p == GPR_SLICE_END_PTR(status_hdr));
+ len += GPR_SLICE_LENGTH(status_hdr);
+
+ if (optional_message) {
+ GPR_ASSERT(GPR_SLICE_LENGTH(*optional_message) < 127);
+ message_pfx = gpr_slice_malloc(15);
+ p = GPR_SLICE_START_PTR(message_pfx);
+ *p++ = 0x40;
+ *p++ = 12; /* len(grpc-message) */
+ *p++ = 'g';
+ *p++ = 'r';
+ *p++ = 'p';
+ *p++ = 'c';
+ *p++ = '-';
+ *p++ = 'm';
+ *p++ = 'e';
+ *p++ = 's';
+ *p++ = 's';
+ *p++ = 'a';
+ *p++ = 'g';
+ *p++ = 'e';
+ *p++ = GPR_SLICE_LENGTH(*optional_message);
+ GPR_ASSERT(p == GPR_SLICE_END_PTR(message_pfx));
+ len += GPR_SLICE_LENGTH(message_pfx);
+ len += GPR_SLICE_LENGTH(*optional_message);
+ }
+
+ hdr = gpr_slice_malloc(9);
+ p = GPR_SLICE_START_PTR(hdr);
+ *p++ = len >> 16;
+ *p++ = len >> 8;
+ *p++ = len;
+ *p++ = GRPC_CHTTP2_FRAME_HEADER;
+ *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
+ *p++ = stream_global->id >> 24;
+ *p++ = stream_global->id >> 16;
+ *p++ = stream_global->id >> 8;
+ *p++ = stream_global->id;
+ GPR_ASSERT(p == GPR_SLICE_END_PTR(hdr));
+
+ gpr_slice_buffer_add(&transport_global->qbuf, hdr);
+ gpr_slice_buffer_add(&transport_global->qbuf, status_hdr);
+ if (optional_message) {
+ gpr_slice_buffer_add(&transport_global->qbuf, message_pfx);
+ gpr_slice_buffer_add(&transport_global->qbuf,
+ gpr_slice_ref(*optional_message));
+ }
+
+ gpr_slice_buffer_add(
+ &transport_global->qbuf,
+ grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR));
+
+ grpc_chttp2_list_add_read_write_state_changed(transport_global,
+ stream_global);
+}
+
static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global,
void *user_data,
grpc_chttp2_stream_global *stream_global) {