From 119c103ab00f309a66de6f1cb78c648eb3c4e2cc Mon Sep 17 00:00:00 2001 From: vjpai Date: Thu, 29 Oct 2015 01:21:04 -0700 Subject: Split up into a new service proto, use proper service suffix, add a reset option to the mark, create a closed loop config params (empty message) for consistency with other tests. --- src/core/support/histogram.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core') diff --git a/src/core/support/histogram.c b/src/core/support/histogram.c index 8a1a9d9233..77b48af996 100644 --- a/src/core/support/histogram.c +++ b/src/core/support/histogram.c @@ -125,7 +125,7 @@ void gpr_histogram_add(gpr_histogram *h, double x) { h->buckets[bucket_for(h, x)]++; } -int gpr_histogram_merge(gpr_histogram *dst, gpr_histogram *src) { +int gpr_histogram_merge(gpr_histogram *dst, const gpr_histogram *src) { if ((dst->num_buckets != src->num_buckets) || (dst->multiplier != src->multiplier)) { /* Fail because these histograms don't match */ -- cgit v1.2.3 From 20df14ef6d36c01239e4e1b52a3dee009a5962c3 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 6 Nov 2015 09:10:49 -0800 Subject: Fix flow control for 0-byte messages --- src/core/surface/call.c | 2 +- src/core/transport/chttp2/frame_data.c | 4 +- src/core/transport/chttp2/internal.h | 2 +- src/core/transport/chttp2_transport.c | 83 ++++++++++++++++++++-------------- 4 files changed, 54 insertions(+), 37 deletions(-) (limited to 'src/core') diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 056d49064e..aa435d44d3 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -944,12 +944,12 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl) { grpc_call *call = bctl->call; if (bctl->is_notify_tag_closure) { + grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success); gpr_mu_lock(&call->mu); bctl->call->used_batches = (gpr_uint8)(bctl->call->used_batches & ~(gpr_uint8)(1 << (bctl - bctl->call->active_batches))); gpr_mu_unlock(&call->mu); - grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } else { grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->success, diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index 7287f97aaa..eafffc2795 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -198,8 +198,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( } p->parsing_frame = incoming_byte_stream = grpc_chttp2_incoming_byte_stream_create( - transport_parsing, stream_parsing, p->frame_size, message_flags, - &p->incoming_frames); + exec_ctx, transport_parsing, stream_parsing, p->frame_size, + message_flags, &p->incoming_frames); /* fallthrough */ case GRPC_CHTTP2_DATA_FRAME: if (cur == end) { diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index b53c9dee0b..2d0cb4abdb 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -738,7 +738,7 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, #endif grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( - grpc_chttp2_transport_parsing *transport_parsing, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size, gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue); void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 3d98a4fb14..b44843a341 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -1364,6 +1364,46 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, * BYTE STREAM */ +static void incoming_byte_stream_update_flow_control( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, size_t max_size_hint, + size_t have_already) { + gpr_uint32 max_recv_bytes; + + /* clamp max recv hint to an allowable size */ + if (max_size_hint >= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD) { + max_recv_bytes = GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD; + } else { + max_recv_bytes = (gpr_uint32)max_size_hint; + } + + /* account for bytes already received but unknown to higher layers */ + if (max_recv_bytes >= have_already) { + max_recv_bytes -= (gpr_uint32)have_already; + } else { + max_recv_bytes = 0; + } + + /* add some small lookahead to keep pipelines flowing */ + GPR_ASSERT(max_recv_bytes <= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD); + max_recv_bytes += GRPC_CHTTP2_STREAM_LOOKAHEAD; + if (stream_global->max_recv_bytes < max_recv_bytes) { + gpr_uint32 add_max_recv_bytes = + max_recv_bytes - stream_global->max_recv_bytes; + GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, + max_recv_bytes, add_max_recv_bytes); + GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, + unannounced_incoming_window_for_parse, + add_max_recv_bytes); + GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, + unannounced_incoming_window_for_writing, + add_max_recv_bytes); + grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global, + stream_global); + grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + } +} + static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, gpr_slice *slice, size_t max_size_hint, @@ -1372,41 +1412,11 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, (grpc_chttp2_incoming_byte_stream *)byte_stream; grpc_chttp2_transport_global *transport_global = &bs->transport->global; grpc_chttp2_stream_global *stream_global = &bs->stream->global; - gpr_uint32 max_recv_bytes; lock(bs->transport); if (bs->is_tail) { - /* clamp max recv hint to an allowable size */ - if (max_size_hint >= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD) { - max_recv_bytes = GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD; - } else { - max_recv_bytes = (gpr_uint32)max_size_hint; - } - - /* account for bytes already received but unknown to higher layers */ - if (max_recv_bytes >= bs->slices.length) { - max_recv_bytes -= (gpr_uint32)bs->slices.length; - } else { - max_recv_bytes = 0; - } - /* add some small lookahead to keep pipelines flowing */ - GPR_ASSERT(max_recv_bytes <= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD); - max_recv_bytes += GRPC_CHTTP2_STREAM_LOOKAHEAD; - if (stream_global->max_recv_bytes < max_recv_bytes) { - gpr_uint32 add_max_recv_bytes = - max_recv_bytes - stream_global->max_recv_bytes; - GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, - max_recv_bytes, add_max_recv_bytes); - GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, - unannounced_incoming_window_for_parse, - add_max_recv_bytes); - GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, - unannounced_incoming_window_for_writing, - add_max_recv_bytes); - grpc_chttp2_list_add_unannounced_incoming_window_available( - transport_global, stream_global); - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); - } + incoming_byte_stream_update_flow_control(transport_global, stream_global, + max_size_hint, bs->slices.length); } if (bs->slices.count > 0) { *slice = gpr_slice_buffer_take_first(&bs->slices); @@ -1451,7 +1461,7 @@ void grpc_chttp2_incoming_byte_stream_finished( } grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( - grpc_chttp2_transport_parsing *transport_parsing, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size, gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue) { grpc_chttp2_incoming_byte_stream *incoming_byte_stream = @@ -1474,6 +1484,13 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( add_to_queue->tail->next_message = incoming_byte_stream; } add_to_queue->tail = incoming_byte_stream; + if (frame_size == 0) { + lock(TRANSPORT_FROM_PARSING(transport_parsing)); + incoming_byte_stream_update_flow_control( + &TRANSPORT_FROM_PARSING(transport_parsing)->global, + &STREAM_FROM_PARSING(stream_parsing)->global, 0, 0); + unlock(exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing)); + } return incoming_byte_stream; } -- cgit v1.2.3 From 7be556e72865991d816bae8d369a74889655398d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 6 Nov 2015 12:29:33 -0800 Subject: Transport memory management fixes --- src/core/transport/chttp2/frame_data.c | 9 ++++++++- src/core/transport/chttp2/frame_data.h | 3 ++- src/core/transport/chttp2_transport.c | 2 +- 3 files changed, 11 insertions(+), 3 deletions(-) (limited to 'src/core') diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index eafffc2795..e07fbb2cc7 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -45,11 +45,16 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_init( grpc_chttp2_data_parser *parser) { parser->state = GRPC_CHTTP2_DATA_FH_0; + parser->parsing_frame = NULL; return GRPC_CHTTP2_PARSE_OK; } -void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser *parser) { +void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx, + grpc_chttp2_data_parser *parser) { grpc_byte_stream *bs; + if (parser->parsing_frame) { + grpc_chttp2_incoming_byte_stream_finished(exec_ctx, parser->parsing_frame); + } while ( (bs = grpc_chttp2_incoming_frame_queue_pop(&parser->incoming_frames))) { grpc_byte_stream_destroy(bs); @@ -214,6 +219,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( exec_ctx, p->parsing_frame, gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame); + p->parsing_frame = NULL; p->state = GRPC_CHTTP2_DATA_FH_0; return GRPC_CHTTP2_PARSE_OK; } else if ((gpr_uint32)(end - cur) > p->frame_size) { @@ -222,6 +228,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(cur + p->frame_size - beg))); grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame); + p->parsing_frame = NULL; cur += p->frame_size; goto fh_0; /* loop */ } else { diff --git a/src/core/transport/chttp2/frame_data.h b/src/core/transport/chttp2/frame_data.h index bc32f29d97..472f9cebdb 100644 --- a/src/core/transport/chttp2/frame_data.h +++ b/src/core/transport/chttp2/frame_data.h @@ -80,7 +80,8 @@ grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop( grpc_chttp2_parse_error grpc_chttp2_data_parser_init( grpc_chttp2_data_parser *parser); -void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser *parser); +void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx, + grpc_chttp2_data_parser *parser); /* start processing a new data frame */ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame( diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index b44843a341..cbde3c5b86 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -512,7 +512,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, GPR_ASSERT(s->global.recv_initial_metadata_finished == NULL); GPR_ASSERT(s->global.recv_message_ready == NULL); GPR_ASSERT(s->global.recv_trailing_metadata_finished == NULL); - grpc_chttp2_data_parser_destroy(&s->parsing.data_parser); + grpc_chttp2_data_parser_destroy(exec_ctx, &s->parsing.data_parser); grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[0]); grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[1]); grpc_chttp2_incoming_metadata_buffer_destroy( -- cgit v1.2.3 From e9c216ee23297490de8c3350d0b2fa1115d5c3da Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 6 Nov 2015 14:48:46 -0800 Subject: Sending empty trailing metadata to a closed stream is ok - it means close this already closed stream --- src/core/transport/chttp2_transport.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/core') diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index cbde3c5b86..f62294c7c5 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -806,7 +806,8 @@ static void perform_stream_op_locked( } if (stream_global->write_closed) { grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->send_trailing_metadata_finished, 0); + exec_ctx, &stream_global->send_trailing_metadata_finished, + grpc_metadata_batch_is_empty(op->send_trailing_metadata)); } else if (stream_global->id != 0) { /* TODO(ctiller): check if there's flow control for any outstanding bytes before going writable */ -- cgit v1.2.3 From 892f2d372641967c5fe18c72b60393eca913c8f2 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Sun, 8 Nov 2015 13:53:04 +0000 Subject: Commentary --- src/core/channel/channel_stack.h | 3 ++- src/core/channel/subchannel_call_holder.h | 19 +++++++++++++++++-- src/core/client_config/subchannel.c | 8 ++++---- src/core/client_config/subchannel.h | 11 +++++------ 4 files changed, 28 insertions(+), 13 deletions(-) (limited to 'src/core') diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 1279fec080..5d33ab5b42 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -214,7 +214,8 @@ void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx, /* Destroy a call stack */ void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack); -/* Ignore set pollset */ +/* Ignore set pollset - used by filters to implement the set_pollset method + if they don't care about pollsets at all. Does nothing. */ void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_pollset *pollset); diff --git a/src/core/channel/subchannel_call_holder.h b/src/core/channel/subchannel_call_holder.h index 3dd43c9c3f..bda051c566 100644 --- a/src/core/channel/subchannel_call_holder.h +++ b/src/core/channel/subchannel_call_holder.h @@ -36,6 +36,10 @@ #include "src/core/client_config/subchannel.h" +/** Pick a subchannel for grpc_subchannel_call_holder; + Return 1 if subchannel is available immediately (in which case on_ready + should not be called), or 0 otherwise (in which case on_ready should be + called when the subchannel is available) */ typedef int (*grpc_subchannel_call_holder_pick_subchannel)( grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata, grpc_subchannel **subchannel, grpc_closure *on_ready); @@ -46,10 +50,21 @@ typedef enum { GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL } grpc_subchannel_call_holder_creation_phase; +/** Wrapper for holding a pointer to grpc_subchannel_call, and the + associated machinery to create such a pointer. + Handles queueing of stream ops until a call object is ready, waiting + for initial metadata before trying to create a call object, + and handling cancellation gracefully. + + Both the channel and uchannel filter use this as their call_data. */ typedef struct grpc_subchannel_call_holder { - /* either 0 for no call, 1 for cancelled, or a pointer to a - grpc_subchannel_call */ + /** either 0 for no call, 1 for cancelled, or a pointer to a + grpc_subchannel_call */ gpr_atm subchannel_call; + /** Helper function to choose the subchannel on which to create + the call object. Channel filter delegates to the load + balancing policy (once it's ready); uchannel returns + immediately */ grpc_subchannel_call_holder_pick_subchannel pick_subchannel; void *pick_subchannel_arg; diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index e911a46faf..49c2cf9a19 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -22,7 +22,7 @@ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMA`S (INCLUDING, BUT NOT + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT @@ -395,12 +395,12 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { - int call_creation_status; + int call_creation_finished_ok; waiting_for_connect *w4c = arg; grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset); - call_creation_status = grpc_subchannel_create_call( + call_creation_finished_ok = grpc_subchannel_create_call( exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify); - GPR_ASSERT(call_creation_status == 1); + GPR_ASSERT(call_creation_finished_ok == 1); w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success); GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect"); gpr_free(w4c); diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 4769a6de68..1fefa1888a 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -77,12 +77,11 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, /** construct a subchannel call (possibly asynchronously). * - * If the returned status is \a GRPC_SUBCHANNEL_CALL_CREATE_READY, the call will - * return immediately and \a target will point to a connected \a subchannel_call - * instance. Note that \a notify will \em not be invoked in this case. - * Otherwise, if the returned status is GRPC_SUBCHANNEL_CALL_CREATE_PENDING, the - * subchannel call will be created asynchronously, invoking the \a notify - * callback upon completion. */ + * If the returned status is 1, the call will return immediately and \a target + * will point to a connected \a subchannel_call instance. Note that \a notify + * will \em not be invoked in this case. + * Otherwise, if the returned status is 0, the subchannel call will be created + * asynchronously, invoking the \a notify callback upon completion. */ int grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *subchannel, grpc_pollset *pollset, gpr_atm *target, -- cgit v1.2.3