aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/channel/compress_filter.c10
-rw-r--r--src/core/security/client_auth_filter.c6
-rw-r--r--src/core/security/server_auth_filter.c10
-rw-r--r--src/core/transport/chttp2/internal.h2
-rw-r--r--src/core/transport/chttp2_transport.c113
-rw-r--r--src/core/transport/metadata.c2
-rw-r--r--src/core/transport/transport.c52
-rw-r--r--src/core/transport/transport.h13
8 files changed, 188 insertions, 20 deletions
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index 9fc8589fbb..8963c13b0f 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -204,7 +204,7 @@ static void process_send_ops(grpc_call_element *elem,
}
grpc_metadata_batch_add_tail(
&(sop->data.metadata), &calld->compression_algorithm_storage,
- grpc_mdelem_ref(channeld->mdelem_compression_algorithms
+ GRPC_MDELEM_REF(channeld->mdelem_compression_algorithms
[calld->compression_algorithm]));
calld->written_initial_metadata = 1; /* GPR_TRUE */
}
@@ -295,7 +295,7 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
channeld->mdelem_compression_algorithms[algo_idx] =
grpc_mdelem_from_metadata_strings(
mdctx,
- grpc_mdstr_ref(channeld->mdstr_outgoing_compression_algorithm_key),
+ GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key),
grpc_mdstr_from_string(mdctx, algorithm_name, 0));
}
@@ -307,11 +307,11 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
channel_data *channeld = elem->channel_data;
grpc_compression_algorithm algo_idx;
- grpc_mdstr_unref(channeld->mdstr_request_compression_algorithm_key);
- grpc_mdstr_unref(channeld->mdstr_outgoing_compression_algorithm_key);
+ GRPC_MDSTR_UNREF(channeld->mdstr_request_compression_algorithm_key);
+ GRPC_MDSTR_UNREF(channeld->mdstr_outgoing_compression_algorithm_key);
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT;
++algo_idx) {
- grpc_mdelem_unref(channeld->mdelem_compression_algorithms[algo_idx]);
+ GRPC_MDELEM_UNREF(channeld->mdelem_compression_algorithms[algo_idx]);
}
}
diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c
index e86b5430b2..e2d1b6fce9 100644
--- a/src/core/security/client_auth_filter.c
+++ b/src/core/security/client_auth_filter.c
@@ -77,10 +77,8 @@ typedef struct {
static void bubble_up_error(grpc_call_element *elem, const char *error_msg) {
call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- grpc_transport_stream_op_add_cancellation(
- &calld->op, GRPC_STATUS_UNAUTHENTICATED,
- grpc_mdstr_from_string(chand->md_ctx, error_msg, 0));
+ grpc_transport_stream_op_add_cancellation(&calld->op,
+ GRPC_STATUS_UNAUTHENTICATED);
grpc_call_next_op(elem, &calld->op);
}
diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c
index 10dfb09926..fd0f94b19c 100644
--- a/src/core/security/server_auth_filter.c
+++ b/src/core/security/server_auth_filter.c
@@ -110,7 +110,6 @@ static void on_md_processing_done(void *user_data,
grpc_auth_context *result) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
if (success) {
calld->consumed_md = consumed_md;
@@ -124,10 +123,11 @@ static void on_md_processing_done(void *user_data,
GRPC_AUTH_CONTEXT_REF(result, "refing new context.");
calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
} else {
- grpc_transport_stream_op_add_cancellation(
- &calld->transport_op, GRPC_STATUS_UNAUTHENTICATED,
- grpc_mdstr_from_string(chand->mdctx,
- "Authentication metadata processing failed."));
+ gpr_slice message = gpr_slice_from_copied_string(
+ "Authentication metadata processing failed.");
+ grpc_sopb_reset(calld->recv_ops);
+ grpc_transport_stream_op_add_close(&calld->transport_op,
+ GRPC_STATUS_UNAUTHENTICATED, &message);
grpc_call_next_op(elem, &calld->transport_op);
}
}
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index f0eeb6de50..74b3a591d5 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -384,6 +384,8 @@ typedef struct {
gpr_uint8 in_stream_map;
/** is this stream actively being written? */
gpr_uint8 writing_now;
+ /** has anything been written to this stream? */
+ gpr_uint8 written_anything;
/** stream state already published to the upper layer */
grpc_stream_state published_state;
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 1ea4a82c16..c8c4207208 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -107,6 +107,11 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status);
+static void close_from_api(grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global,
+ grpc_status_code status,
+ gpr_slice *optional_message);
+
/** Add endpoint from this transport to pollset */
static void add_to_pollset_locked(grpc_chttp2_transport *t,
grpc_pollset *pollset);
@@ -602,10 +607,16 @@ static void perform_stream_op_locked(
cancel_from_api(transport_global, stream_global, op->cancel_with_status);
}
+ if (op->close_with_status != GRPC_STATUS_OK) {
+ close_from_api(transport_global, stream_global, op->close_with_status,
+ op->optional_close_message);
+ }
+
if (op->send_ops) {
GPR_ASSERT(stream_global->outgoing_sopb == NULL);
stream_global->send_done_closure = op->on_done_send;
if (!stream_global->cancelled) {
+ stream_global->written_anything = 1;
stream_global->outgoing_sopb = op->send_ops;
if (op->is_last_send &&
stream_global->write_state == GRPC_WRITE_STATE_OPEN) {
@@ -894,6 +905,108 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
stream_global);
}
+static void close_from_api(grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global,
+ grpc_status_code status,
+ gpr_slice *optional_message) {
+ gpr_slice hdr;
+ gpr_slice status_hdr;
+ gpr_slice message_pfx;
+ gpr_uint8 *p;
+ gpr_uint32 len = 0;
+
+ GPR_ASSERT(status >= 0 && (int)status < 100);
+
+ stream_global->cancelled = 1;
+ stream_global->cancelled_status = status;
+ GPR_ASSERT(stream_global->id != 0);
+ GPR_ASSERT(!stream_global->written_anything);
+
+ /* Hand roll a header block.
+ This is unnecessarily ugly - at some point we should find a more elegant
+ solution.
+ It's complicated by the fact that our send machinery would be dead by the
+ time we got around to sending this, so instead we ignore HPACK compression
+ and just write the uncompressed bytes onto the wire. */
+ status_hdr = gpr_slice_malloc(15 + (status >= 10));
+ p = GPR_SLICE_START_PTR(status_hdr);
+ *p++ = 0x40; /* literal header */
+ *p++ = 11; /* len(grpc-status) */
+ *p++ = 'g';
+ *p++ = 'r';
+ *p++ = 'p';
+ *p++ = 'c';
+ *p++ = '-';
+ *p++ = 's';
+ *p++ = 't';
+ *p++ = 'a';
+ *p++ = 't';
+ *p++ = 'u';
+ *p++ = 's';
+ if (status < 10) {
+ *p++ = 1;
+ *p++ = '0' + status;
+ } else {
+ *p++ = 2;
+ *p++ = '0' + (status / 10);
+ *p++ = '0' + (status % 10);
+ }
+ GPR_ASSERT(p == GPR_SLICE_END_PTR(status_hdr));
+ len += GPR_SLICE_LENGTH(status_hdr);
+
+ if (optional_message) {
+ GPR_ASSERT(GPR_SLICE_LENGTH(*optional_message) < 127);
+ message_pfx = gpr_slice_malloc(15);
+ p = GPR_SLICE_START_PTR(message_pfx);
+ *p++ = 0x40;
+ *p++ = 12; /* len(grpc-message) */
+ *p++ = 'g';
+ *p++ = 'r';
+ *p++ = 'p';
+ *p++ = 'c';
+ *p++ = '-';
+ *p++ = 'm';
+ *p++ = 'e';
+ *p++ = 's';
+ *p++ = 's';
+ *p++ = 'a';
+ *p++ = 'g';
+ *p++ = 'e';
+ *p++ = GPR_SLICE_LENGTH(*optional_message);
+ GPR_ASSERT(p == GPR_SLICE_END_PTR(message_pfx));
+ len += GPR_SLICE_LENGTH(message_pfx);
+ len += GPR_SLICE_LENGTH(*optional_message);
+ }
+
+ hdr = gpr_slice_malloc(9);
+ p = GPR_SLICE_START_PTR(hdr);
+ *p++ = len >> 16;
+ *p++ = len >> 8;
+ *p++ = len;
+ *p++ = GRPC_CHTTP2_FRAME_HEADER;
+ *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
+ *p++ = stream_global->id >> 24;
+ *p++ = stream_global->id >> 16;
+ *p++ = stream_global->id >> 8;
+ *p++ = stream_global->id;
+ GPR_ASSERT(p == GPR_SLICE_END_PTR(hdr));
+
+ gpr_slice_buffer_add(&transport_global->qbuf, hdr);
+ gpr_slice_buffer_add(&transport_global->qbuf, status_hdr);
+ if (optional_message) {
+ gpr_slice_buffer_add(&transport_global->qbuf, message_pfx);
+ gpr_slice_buffer_add(&transport_global->qbuf,
+ gpr_slice_ref(*optional_message));
+ }
+
+ gpr_slice_buffer_add(
+ &transport_global->qbuf,
+ grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR));
+
+ grpc_chttp2_list_add_read_write_state_changed(transport_global,
+ stream_global);
+}
+
static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global,
void *user_data,
grpc_chttp2_stream_global *stream_global) {
diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c
index 967fd4898c..44d32b6cb2 100644
--- a/src/core/transport/metadata.c
+++ b/src/core/transport/metadata.c
@@ -135,7 +135,9 @@ static void unlock(grpc_mdctx *ctx) {
if (ctx->refs == 0) {
/* uncomment if you're having trouble diagnosing an mdelem leak to make
things clearer (slows down destruction a lot, however) */
+#ifdef GRPC_METADATA_REFCOUNT_DEBUG
gc_mdtab(ctx);
+#endif
if (ctx->mdtab_count && ctx->mdtab_count == ctx->mdtab_free) {
discard_metadata(ctx);
}
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index 69c00b6a4f..c0d92cf93f 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -32,6 +32,8 @@
*/
#include "src/core/transport/transport.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
#include "src/core/transport/transport_impl.h"
size_t grpc_transport_stream_size(grpc_transport *transport) {
@@ -83,12 +85,54 @@ void grpc_transport_stream_op_finish_with_failure(
}
void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
- grpc_status_code status,
- grpc_mdstr *message) {
+ grpc_status_code status) {
+ GPR_ASSERT(status != GRPC_STATUS_OK);
if (op->cancel_with_status == GRPC_STATUS_OK) {
op->cancel_with_status = status;
}
- if (message) {
- GRPC_MDSTR_UNREF(message);
+ if (op->close_with_status != GRPC_STATUS_OK) {
+ op->close_with_status = GRPC_STATUS_OK;
+ if (op->optional_close_message != NULL) {
+ gpr_slice_unref(*op->optional_close_message);
+ op->optional_close_message = NULL;
+ }
}
}
+
+typedef struct {
+ gpr_slice message;
+ grpc_iomgr_closure *then_call;
+ grpc_iomgr_closure closure;
+} close_message_data;
+
+static void free_message(void *p, int iomgr_success) {
+ close_message_data *cmd = p;
+ gpr_slice_unref(cmd->message);
+ if (cmd->then_call != NULL) {
+ cmd->then_call->cb(cmd->then_call->cb_arg, iomgr_success);
+ }
+ gpr_free(cmd);
+}
+
+void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,
+ grpc_status_code status,
+ gpr_slice *optional_message) {
+ close_message_data *cmd;
+ GPR_ASSERT(status != GRPC_STATUS_OK);
+ if (op->cancel_with_status != GRPC_STATUS_OK ||
+ op->close_with_status != GRPC_STATUS_OK) {
+ if (optional_message) {
+ gpr_slice_unref(*optional_message);
+ }
+ return;
+ }
+ if (optional_message) {
+ cmd = gpr_malloc(sizeof(*cmd));
+ cmd->message = *optional_message;
+ cmd->then_call = op->on_consumed;
+ grpc_iomgr_closure_init(&cmd->closure, free_message, cmd);
+ op->on_consumed = &cmd->closure;
+ op->optional_close_message = &cmd->message;
+ }
+ op->close_with_status = status;
+}
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 7efcfcf970..92c1f38c5e 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -80,8 +80,14 @@ typedef struct grpc_transport_stream_op {
grpc_pollset *bind_pollset;
+ /** If != GRPC_STATUS_OK, cancel this stream */
grpc_status_code cancel_with_status;
+ /** If != GRPC_STATUS_OK, send grpc-status, grpc-message, and close this
+ stream for both reading and writing */
+ grpc_status_code close_with_status;
+ gpr_slice *optional_close_message;
+
/* Indexes correspond to grpc_context_index enum values */
grpc_call_context_element *context;
} grpc_transport_stream_op;
@@ -148,8 +154,11 @@ void grpc_transport_destroy_stream(grpc_transport *transport,
void grpc_transport_stream_op_finish_with_failure(grpc_transport_stream_op *op);
void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
- grpc_status_code status,
- grpc_mdstr *message);
+ grpc_status_code status);
+
+void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,
+ grpc_status_code status,
+ gpr_slice *optional_message);
char *grpc_transport_stream_op_string(grpc_transport_stream_op *op);