diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/channel/compress_filter.c | 10 | ||||
-rw-r--r-- | src/core/security/client_auth_filter.c | 6 | ||||
-rw-r--r-- | src/core/security/server_auth_filter.c | 10 | ||||
-rw-r--r-- | src/core/transport/chttp2/internal.h | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 113 | ||||
-rw-r--r-- | src/core/transport/metadata.c | 2 | ||||
-rw-r--r-- | src/core/transport/transport.c | 52 | ||||
-rw-r--r-- | src/core/transport/transport.h | 13 |
8 files changed, 188 insertions, 20 deletions
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 9fc8589fbb..8963c13b0f 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -204,7 +204,7 @@ static void process_send_ops(grpc_call_element *elem, } grpc_metadata_batch_add_tail( &(sop->data.metadata), &calld->compression_algorithm_storage, - grpc_mdelem_ref(channeld->mdelem_compression_algorithms + GRPC_MDELEM_REF(channeld->mdelem_compression_algorithms [calld->compression_algorithm])); calld->written_initial_metadata = 1; /* GPR_TRUE */ } @@ -295,7 +295,7 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, channeld->mdelem_compression_algorithms[algo_idx] = grpc_mdelem_from_metadata_strings( mdctx, - grpc_mdstr_ref(channeld->mdstr_outgoing_compression_algorithm_key), + GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key), grpc_mdstr_from_string(mdctx, algorithm_name, 0)); } @@ -307,11 +307,11 @@ static void destroy_channel_elem(grpc_channel_element *elem) { channel_data *channeld = elem->channel_data; grpc_compression_algorithm algo_idx; - grpc_mdstr_unref(channeld->mdstr_request_compression_algorithm_key); - grpc_mdstr_unref(channeld->mdstr_outgoing_compression_algorithm_key); + GRPC_MDSTR_UNREF(channeld->mdstr_request_compression_algorithm_key); + GRPC_MDSTR_UNREF(channeld->mdstr_outgoing_compression_algorithm_key); for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { - grpc_mdelem_unref(channeld->mdelem_compression_algorithms[algo_idx]); + GRPC_MDELEM_UNREF(channeld->mdelem_compression_algorithms[algo_idx]); } } diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c index e86b5430b2..e2d1b6fce9 100644 --- a/src/core/security/client_auth_filter.c +++ b/src/core/security/client_auth_filter.c @@ -77,10 +77,8 @@ typedef struct { static void bubble_up_error(grpc_call_element *elem, const char *error_msg) { call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - grpc_transport_stream_op_add_cancellation( - &calld->op, GRPC_STATUS_UNAUTHENTICATED, - grpc_mdstr_from_string(chand->md_ctx, error_msg, 0)); + grpc_transport_stream_op_add_cancellation(&calld->op, + GRPC_STATUS_UNAUTHENTICATED); grpc_call_next_op(elem, &calld->op); } diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index 10dfb09926..fd0f94b19c 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -110,7 +110,6 @@ static void on_md_processing_done(void *user_data, grpc_auth_context *result) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; if (success) { calld->consumed_md = consumed_md; @@ -124,10 +123,11 @@ static void on_md_processing_done(void *user_data, GRPC_AUTH_CONTEXT_REF(result, "refing new context."); calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success); } else { - grpc_transport_stream_op_add_cancellation( - &calld->transport_op, GRPC_STATUS_UNAUTHENTICATED, - grpc_mdstr_from_string(chand->mdctx, - "Authentication metadata processing failed.")); + gpr_slice message = gpr_slice_from_copied_string( + "Authentication metadata processing failed."); + grpc_sopb_reset(calld->recv_ops); + grpc_transport_stream_op_add_close(&calld->transport_op, + GRPC_STATUS_UNAUTHENTICATED, &message); grpc_call_next_op(elem, &calld->transport_op); } } diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index f0eeb6de50..74b3a591d5 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -384,6 +384,8 @@ typedef struct { gpr_uint8 in_stream_map; /** is this stream actively being written? */ gpr_uint8 writing_now; + /** has anything been written to this stream? */ + gpr_uint8 written_anything; /** stream state already published to the upper layer */ grpc_stream_state published_state; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 1ea4a82c16..c8c4207208 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -107,6 +107,11 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_status_code status); +static void close_from_api(grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, + grpc_status_code status, + gpr_slice *optional_message); + /** Add endpoint from this transport to pollset */ static void add_to_pollset_locked(grpc_chttp2_transport *t, grpc_pollset *pollset); @@ -602,10 +607,16 @@ static void perform_stream_op_locked( cancel_from_api(transport_global, stream_global, op->cancel_with_status); } + if (op->close_with_status != GRPC_STATUS_OK) { + close_from_api(transport_global, stream_global, op->close_with_status, + op->optional_close_message); + } + if (op->send_ops) { GPR_ASSERT(stream_global->outgoing_sopb == NULL); stream_global->send_done_closure = op->on_done_send; if (!stream_global->cancelled) { + stream_global->written_anything = 1; stream_global->outgoing_sopb = op->send_ops; if (op->is_last_send && stream_global->write_state == GRPC_WRITE_STATE_OPEN) { @@ -894,6 +905,108 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global, stream_global); } +static void close_from_api(grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, + grpc_status_code status, + gpr_slice *optional_message) { + gpr_slice hdr; + gpr_slice status_hdr; + gpr_slice message_pfx; + gpr_uint8 *p; + gpr_uint32 len = 0; + + GPR_ASSERT(status >= 0 && (int)status < 100); + + stream_global->cancelled = 1; + stream_global->cancelled_status = status; + GPR_ASSERT(stream_global->id != 0); + GPR_ASSERT(!stream_global->written_anything); + + /* Hand roll a header block. + This is unnecessarily ugly - at some point we should find a more elegant + solution. + It's complicated by the fact that our send machinery would be dead by the + time we got around to sending this, so instead we ignore HPACK compression + and just write the uncompressed bytes onto the wire. */ + status_hdr = gpr_slice_malloc(15 + (status >= 10)); + p = GPR_SLICE_START_PTR(status_hdr); + *p++ = 0x40; /* literal header */ + *p++ = 11; /* len(grpc-status) */ + *p++ = 'g'; + *p++ = 'r'; + *p++ = 'p'; + *p++ = 'c'; + *p++ = '-'; + *p++ = 's'; + *p++ = 't'; + *p++ = 'a'; + *p++ = 't'; + *p++ = 'u'; + *p++ = 's'; + if (status < 10) { + *p++ = 1; + *p++ = '0' + status; + } else { + *p++ = 2; + *p++ = '0' + (status / 10); + *p++ = '0' + (status % 10); + } + GPR_ASSERT(p == GPR_SLICE_END_PTR(status_hdr)); + len += GPR_SLICE_LENGTH(status_hdr); + + if (optional_message) { + GPR_ASSERT(GPR_SLICE_LENGTH(*optional_message) < 127); + message_pfx = gpr_slice_malloc(15); + p = GPR_SLICE_START_PTR(message_pfx); + *p++ = 0x40; + *p++ = 12; /* len(grpc-message) */ + *p++ = 'g'; + *p++ = 'r'; + *p++ = 'p'; + *p++ = 'c'; + *p++ = '-'; + *p++ = 'm'; + *p++ = 'e'; + *p++ = 's'; + *p++ = 's'; + *p++ = 'a'; + *p++ = 'g'; + *p++ = 'e'; + *p++ = GPR_SLICE_LENGTH(*optional_message); + GPR_ASSERT(p == GPR_SLICE_END_PTR(message_pfx)); + len += GPR_SLICE_LENGTH(message_pfx); + len += GPR_SLICE_LENGTH(*optional_message); + } + + hdr = gpr_slice_malloc(9); + p = GPR_SLICE_START_PTR(hdr); + *p++ = len >> 16; + *p++ = len >> 8; + *p++ = len; + *p++ = GRPC_CHTTP2_FRAME_HEADER; + *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS; + *p++ = stream_global->id >> 24; + *p++ = stream_global->id >> 16; + *p++ = stream_global->id >> 8; + *p++ = stream_global->id; + GPR_ASSERT(p == GPR_SLICE_END_PTR(hdr)); + + gpr_slice_buffer_add(&transport_global->qbuf, hdr); + gpr_slice_buffer_add(&transport_global->qbuf, status_hdr); + if (optional_message) { + gpr_slice_buffer_add(&transport_global->qbuf, message_pfx); + gpr_slice_buffer_add(&transport_global->qbuf, + gpr_slice_ref(*optional_message)); + } + + gpr_slice_buffer_add( + &transport_global->qbuf, + grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR)); + + grpc_chttp2_list_add_read_write_state_changed(transport_global, + stream_global); +} + static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global, void *user_data, grpc_chttp2_stream_global *stream_global) { diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c index 967fd4898c..44d32b6cb2 100644 --- a/src/core/transport/metadata.c +++ b/src/core/transport/metadata.c @@ -135,7 +135,9 @@ static void unlock(grpc_mdctx *ctx) { if (ctx->refs == 0) { /* uncomment if you're having trouble diagnosing an mdelem leak to make things clearer (slows down destruction a lot, however) */ +#ifdef GRPC_METADATA_REFCOUNT_DEBUG gc_mdtab(ctx); +#endif if (ctx->mdtab_count && ctx->mdtab_count == ctx->mdtab_free) { discard_metadata(ctx); } diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index 69c00b6a4f..c0d92cf93f 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -32,6 +32,8 @@ */ #include "src/core/transport/transport.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> #include "src/core/transport/transport_impl.h" size_t grpc_transport_stream_size(grpc_transport *transport) { @@ -83,12 +85,54 @@ void grpc_transport_stream_op_finish_with_failure( } void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, - grpc_status_code status, - grpc_mdstr *message) { + grpc_status_code status) { + GPR_ASSERT(status != GRPC_STATUS_OK); if (op->cancel_with_status == GRPC_STATUS_OK) { op->cancel_with_status = status; } - if (message) { - GRPC_MDSTR_UNREF(message); + if (op->close_with_status != GRPC_STATUS_OK) { + op->close_with_status = GRPC_STATUS_OK; + if (op->optional_close_message != NULL) { + gpr_slice_unref(*op->optional_close_message); + op->optional_close_message = NULL; + } } } + +typedef struct { + gpr_slice message; + grpc_iomgr_closure *then_call; + grpc_iomgr_closure closure; +} close_message_data; + +static void free_message(void *p, int iomgr_success) { + close_message_data *cmd = p; + gpr_slice_unref(cmd->message); + if (cmd->then_call != NULL) { + cmd->then_call->cb(cmd->then_call->cb_arg, iomgr_success); + } + gpr_free(cmd); +} + +void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op, + grpc_status_code status, + gpr_slice *optional_message) { + close_message_data *cmd; + GPR_ASSERT(status != GRPC_STATUS_OK); + if (op->cancel_with_status != GRPC_STATUS_OK || + op->close_with_status != GRPC_STATUS_OK) { + if (optional_message) { + gpr_slice_unref(*optional_message); + } + return; + } + if (optional_message) { + cmd = gpr_malloc(sizeof(*cmd)); + cmd->message = *optional_message; + cmd->then_call = op->on_consumed; + grpc_iomgr_closure_init(&cmd->closure, free_message, cmd); + op->on_consumed = &cmd->closure; + op->optional_close_message = &cmd->message; + } + op->close_with_status = status; +} diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 7efcfcf970..92c1f38c5e 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -80,8 +80,14 @@ typedef struct grpc_transport_stream_op { grpc_pollset *bind_pollset; + /** If != GRPC_STATUS_OK, cancel this stream */ grpc_status_code cancel_with_status; + /** If != GRPC_STATUS_OK, send grpc-status, grpc-message, and close this + stream for both reading and writing */ + grpc_status_code close_with_status; + gpr_slice *optional_close_message; + /* Indexes correspond to grpc_context_index enum values */ grpc_call_context_element *context; } grpc_transport_stream_op; @@ -148,8 +154,11 @@ void grpc_transport_destroy_stream(grpc_transport *transport, void grpc_transport_stream_op_finish_with_failure(grpc_transport_stream_op *op); void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, - grpc_status_code status, - grpc_mdstr *message); + grpc_status_code status); + +void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op, + grpc_status_code status, + gpr_slice *optional_message); char *grpc_transport_stream_op_string(grpc_transport_stream_op *op); |