aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/transport')
-rw-r--r--src/core/transport/chttp2/internal.h2
-rw-r--r--src/core/transport/chttp2_transport.c113
-rw-r--r--src/core/transport/metadata.c2
-rw-r--r--src/core/transport/stream_op.h2
-rw-r--r--src/core/transport/transport.c52
-rw-r--r--src/core/transport/transport.h13
6 files changed, 177 insertions, 7 deletions
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index cb428f8e3c..42cf0ecd5b 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -388,6 +388,8 @@ typedef struct {
gpr_uint8 in_stream_map;
/** bitmask of GRPC_CHTTP2_WRITING_xxx above */
gpr_uint8 writing_now;
+ /** has anything been written to this stream? */
+ gpr_uint8 written_anything;
/** stream state already published to the upper layer */
grpc_stream_state published_state;
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 6ba144faa4..a9f91b64d5 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -107,6 +107,11 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status);
+static void close_from_api(grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global,
+ grpc_status_code status,
+ gpr_slice *optional_message);
+
/** Add endpoint from this transport to pollset */
static void add_to_pollset_locked(grpc_chttp2_transport *t,
grpc_pollset *pollset);
@@ -602,10 +607,16 @@ static void perform_stream_op_locked(
cancel_from_api(transport_global, stream_global, op->cancel_with_status);
}
+ if (op->close_with_status != GRPC_STATUS_OK) {
+ close_from_api(transport_global, stream_global, op->close_with_status,
+ op->optional_close_message);
+ }
+
if (op->send_ops) {
GPR_ASSERT(stream_global->outgoing_sopb == NULL);
stream_global->send_done_closure = op->on_done_send;
if (!stream_global->cancelled) {
+ stream_global->written_anything = 1;
stream_global->outgoing_sopb = op->send_ops;
if (op->is_last_send &&
stream_global->write_state == GRPC_WRITE_STATE_OPEN) {
@@ -900,6 +911,108 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
stream_global);
}
+static void close_from_api(grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global,
+ grpc_status_code status,
+ gpr_slice *optional_message) {
+ gpr_slice hdr;
+ gpr_slice status_hdr;
+ gpr_slice message_pfx;
+ gpr_uint8 *p;
+ gpr_uint32 len = 0;
+
+ GPR_ASSERT(status >= 0 && (int)status < 100);
+
+ stream_global->cancelled = 1;
+ stream_global->cancelled_status = status;
+ GPR_ASSERT(stream_global->id != 0);
+ GPR_ASSERT(!stream_global->written_anything);
+
+ /* Hand roll a header block.
+ This is unnecessarily ugly - at some point we should find a more elegant
+ solution.
+ It's complicated by the fact that our send machinery would be dead by the
+ time we got around to sending this, so instead we ignore HPACK compression
+ and just write the uncompressed bytes onto the wire. */
+ status_hdr = gpr_slice_malloc(15 + (status >= 10));
+ p = GPR_SLICE_START_PTR(status_hdr);
+ *p++ = 0x40; /* literal header */
+ *p++ = 11; /* len(grpc-status) */
+ *p++ = 'g';
+ *p++ = 'r';
+ *p++ = 'p';
+ *p++ = 'c';
+ *p++ = '-';
+ *p++ = 's';
+ *p++ = 't';
+ *p++ = 'a';
+ *p++ = 't';
+ *p++ = 'u';
+ *p++ = 's';
+ if (status < 10) {
+ *p++ = 1;
+ *p++ = '0' + status;
+ } else {
+ *p++ = 2;
+ *p++ = '0' + (status / 10);
+ *p++ = '0' + (status % 10);
+ }
+ GPR_ASSERT(p == GPR_SLICE_END_PTR(status_hdr));
+ len += GPR_SLICE_LENGTH(status_hdr);
+
+ if (optional_message) {
+ GPR_ASSERT(GPR_SLICE_LENGTH(*optional_message) < 127);
+ message_pfx = gpr_slice_malloc(15);
+ p = GPR_SLICE_START_PTR(message_pfx);
+ *p++ = 0x40;
+ *p++ = 12; /* len(grpc-message) */
+ *p++ = 'g';
+ *p++ = 'r';
+ *p++ = 'p';
+ *p++ = 'c';
+ *p++ = '-';
+ *p++ = 'm';
+ *p++ = 'e';
+ *p++ = 's';
+ *p++ = 's';
+ *p++ = 'a';
+ *p++ = 'g';
+ *p++ = 'e';
+ *p++ = GPR_SLICE_LENGTH(*optional_message);
+ GPR_ASSERT(p == GPR_SLICE_END_PTR(message_pfx));
+ len += GPR_SLICE_LENGTH(message_pfx);
+ len += GPR_SLICE_LENGTH(*optional_message);
+ }
+
+ hdr = gpr_slice_malloc(9);
+ p = GPR_SLICE_START_PTR(hdr);
+ *p++ = len >> 16;
+ *p++ = len >> 8;
+ *p++ = len;
+ *p++ = GRPC_CHTTP2_FRAME_HEADER;
+ *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
+ *p++ = stream_global->id >> 24;
+ *p++ = stream_global->id >> 16;
+ *p++ = stream_global->id >> 8;
+ *p++ = stream_global->id;
+ GPR_ASSERT(p == GPR_SLICE_END_PTR(hdr));
+
+ gpr_slice_buffer_add(&transport_global->qbuf, hdr);
+ gpr_slice_buffer_add(&transport_global->qbuf, status_hdr);
+ if (optional_message) {
+ gpr_slice_buffer_add(&transport_global->qbuf, message_pfx);
+ gpr_slice_buffer_add(&transport_global->qbuf,
+ gpr_slice_ref(*optional_message));
+ }
+
+ gpr_slice_buffer_add(
+ &transport_global->qbuf,
+ grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR));
+
+ grpc_chttp2_list_add_read_write_state_changed(transport_global,
+ stream_global);
+}
+
static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global,
void *user_data,
grpc_chttp2_stream_global *stream_global) {
diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c
index 967fd4898c..44d32b6cb2 100644
--- a/src/core/transport/metadata.c
+++ b/src/core/transport/metadata.c
@@ -135,7 +135,9 @@ static void unlock(grpc_mdctx *ctx) {
if (ctx->refs == 0) {
/* uncomment if you're having trouble diagnosing an mdelem leak to make
things clearer (slows down destruction a lot, however) */
+#ifdef GRPC_METADATA_REFCOUNT_DEBUG
gc_mdtab(ctx);
+#endif
if (ctx->mdtab_count && ctx->mdtab_count == ctx->mdtab_free) {
discard_metadata(ctx);
}
diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h
index f27ef1b66b..227320cf2a 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/stream_op.h
@@ -108,7 +108,7 @@ void grpc_metadata_batch_move(grpc_metadata_batch *dst,
grpc_metadata_batch *src);
/** Add \a storage to the beginning of \a batch. storage->md is
- assumed to be valid.
+ assumed to be valid.
\a storage is owned by the caller and must survive for the
lifetime of batch. This usually means it should be around
for the lifetime of the call. */
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index 69c00b6a4f..c0d92cf93f 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -32,6 +32,8 @@
*/
#include "src/core/transport/transport.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
#include "src/core/transport/transport_impl.h"
size_t grpc_transport_stream_size(grpc_transport *transport) {
@@ -83,12 +85,54 @@ void grpc_transport_stream_op_finish_with_failure(
}
void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
- grpc_status_code status,
- grpc_mdstr *message) {
+ grpc_status_code status) {
+ GPR_ASSERT(status != GRPC_STATUS_OK);
if (op->cancel_with_status == GRPC_STATUS_OK) {
op->cancel_with_status = status;
}
- if (message) {
- GRPC_MDSTR_UNREF(message);
+ if (op->close_with_status != GRPC_STATUS_OK) {
+ op->close_with_status = GRPC_STATUS_OK;
+ if (op->optional_close_message != NULL) {
+ gpr_slice_unref(*op->optional_close_message);
+ op->optional_close_message = NULL;
+ }
}
}
+
+typedef struct {
+ gpr_slice message;
+ grpc_iomgr_closure *then_call;
+ grpc_iomgr_closure closure;
+} close_message_data;
+
+static void free_message(void *p, int iomgr_success) {
+ close_message_data *cmd = p;
+ gpr_slice_unref(cmd->message);
+ if (cmd->then_call != NULL) {
+ cmd->then_call->cb(cmd->then_call->cb_arg, iomgr_success);
+ }
+ gpr_free(cmd);
+}
+
+void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,
+ grpc_status_code status,
+ gpr_slice *optional_message) {
+ close_message_data *cmd;
+ GPR_ASSERT(status != GRPC_STATUS_OK);
+ if (op->cancel_with_status != GRPC_STATUS_OK ||
+ op->close_with_status != GRPC_STATUS_OK) {
+ if (optional_message) {
+ gpr_slice_unref(*optional_message);
+ }
+ return;
+ }
+ if (optional_message) {
+ cmd = gpr_malloc(sizeof(*cmd));
+ cmd->message = *optional_message;
+ cmd->then_call = op->on_consumed;
+ grpc_iomgr_closure_init(&cmd->closure, free_message, cmd);
+ op->on_consumed = &cmd->closure;
+ op->optional_close_message = &cmd->message;
+ }
+ op->close_with_status = status;
+}
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 7efcfcf970..92c1f38c5e 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -80,8 +80,14 @@ typedef struct grpc_transport_stream_op {
grpc_pollset *bind_pollset;
+ /** If != GRPC_STATUS_OK, cancel this stream */
grpc_status_code cancel_with_status;
+ /** If != GRPC_STATUS_OK, send grpc-status, grpc-message, and close this
+ stream for both reading and writing */
+ grpc_status_code close_with_status;
+ gpr_slice *optional_close_message;
+
/* Indexes correspond to grpc_context_index enum values */
grpc_call_context_element *context;
} grpc_transport_stream_op;
@@ -148,8 +154,11 @@ void grpc_transport_destroy_stream(grpc_transport *transport,
void grpc_transport_stream_op_finish_with_failure(grpc_transport_stream_op *op);
void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
- grpc_status_code status,
- grpc_mdstr *message);
+ grpc_status_code status);
+
+void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,
+ grpc_status_code status,
+ gpr_slice *optional_message);
char *grpc_transport_stream_op_string(grpc_transport_stream_op *op);