aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-11-08 14:44:23 +0000
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-11-08 14:44:23 +0000
commit83e73373cb63be5ec5a97d894e7a06faf81d3df8 (patch)
treef8b9da29da1c70c05a120c43448b0e9685716f97 /src/core
parent79e32519393645c382eab2e4ad795f324d1a777d (diff)
parentbd50ed8057aa9823a0fd4d0b6d5b1722c199a70d (diff)
Merge branch 'new_op' into hpack_fix
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/channel_stack.h3
-rw-r--r--src/core/channel/subchannel_call_holder.h19
-rw-r--r--src/core/client_config/subchannel.c8
-rw-r--r--src/core/client_config/subchannel.h11
-rw-r--r--src/core/support/histogram.c2
-rw-r--r--src/core/surface/call.c2
-rw-r--r--src/core/transport/chttp2/frame_data.c13
-rw-r--r--src/core/transport/chttp2/frame_data.h3
-rw-r--r--src/core/transport/chttp2/internal.h2
-rw-r--r--src/core/transport/chttp2_transport.c88
10 files changed, 96 insertions, 55 deletions
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index 1279fec080..5d33ab5b42 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -214,7 +214,8 @@ void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx,
/* Destroy a call stack */
void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack);
-/* Ignore set pollset */
+/* Ignore set pollset - used by filters to implement the set_pollset method
+ if they don't care about pollsets at all. Does nothing. */
void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_pollset *pollset);
diff --git a/src/core/channel/subchannel_call_holder.h b/src/core/channel/subchannel_call_holder.h
index 3dd43c9c3f..bda051c566 100644
--- a/src/core/channel/subchannel_call_holder.h
+++ b/src/core/channel/subchannel_call_holder.h
@@ -36,6 +36,10 @@
#include "src/core/client_config/subchannel.h"
+/** Pick a subchannel for grpc_subchannel_call_holder;
+ Return 1 if subchannel is available immediately (in which case on_ready
+ should not be called), or 0 otherwise (in which case on_ready should be
+ called when the subchannel is available) */
typedef int (*grpc_subchannel_call_holder_pick_subchannel)(
grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata,
grpc_subchannel **subchannel, grpc_closure *on_ready);
@@ -46,10 +50,21 @@ typedef enum {
GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL
} grpc_subchannel_call_holder_creation_phase;
+/** Wrapper for holding a pointer to grpc_subchannel_call, and the
+ associated machinery to create such a pointer.
+ Handles queueing of stream ops until a call object is ready, waiting
+ for initial metadata before trying to create a call object,
+ and handling cancellation gracefully.
+
+ Both the channel and uchannel filter use this as their call_data. */
typedef struct grpc_subchannel_call_holder {
- /* either 0 for no call, 1 for cancelled, or a pointer to a
- grpc_subchannel_call */
+ /** either 0 for no call, 1 for cancelled, or a pointer to a
+ grpc_subchannel_call */
gpr_atm subchannel_call;
+ /** Helper function to choose the subchannel on which to create
+ the call object. Channel filter delegates to the load
+ balancing policy (once it's ready); uchannel returns
+ immediately */
grpc_subchannel_call_holder_pick_subchannel pick_subchannel;
void *pick_subchannel_arg;
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index e911a46faf..49c2cf9a19 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -22,7 +22,7 @@
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMA`S (INCLUDING, BUT NOT
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
@@ -395,12 +395,12 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
- int call_creation_status;
+ int call_creation_finished_ok;
waiting_for_connect *w4c = arg;
grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset);
- call_creation_status = grpc_subchannel_create_call(
+ call_creation_finished_ok = grpc_subchannel_create_call(
exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify);
- GPR_ASSERT(call_creation_status == 1);
+ GPR_ASSERT(call_creation_finished_ok == 1);
w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success);
GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
gpr_free(w4c);
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 4769a6de68..1fefa1888a 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -77,12 +77,11 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
/** construct a subchannel call (possibly asynchronously).
*
- * If the returned status is \a GRPC_SUBCHANNEL_CALL_CREATE_READY, the call will
- * return immediately and \a target will point to a connected \a subchannel_call
- * instance. Note that \a notify will \em not be invoked in this case.
- * Otherwise, if the returned status is GRPC_SUBCHANNEL_CALL_CREATE_PENDING, the
- * subchannel call will be created asynchronously, invoking the \a notify
- * callback upon completion. */
+ * If the returned status is 1, the call will return immediately and \a target
+ * will point to a connected \a subchannel_call instance. Note that \a notify
+ * will \em not be invoked in this case.
+ * Otherwise, if the returned status is 0, the subchannel call will be created
+ * asynchronously, invoking the \a notify callback upon completion. */
int grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx,
grpc_subchannel *subchannel,
grpc_pollset *pollset, gpr_atm *target,
diff --git a/src/core/support/histogram.c b/src/core/support/histogram.c
index 8a1a9d9233..77b48af996 100644
--- a/src/core/support/histogram.c
+++ b/src/core/support/histogram.c
@@ -125,7 +125,7 @@ void gpr_histogram_add(gpr_histogram *h, double x) {
h->buckets[bucket_for(h, x)]++;
}
-int gpr_histogram_merge(gpr_histogram *dst, gpr_histogram *src) {
+int gpr_histogram_merge(gpr_histogram *dst, const gpr_histogram *src) {
if ((dst->num_buckets != src->num_buckets) ||
(dst->multiplier != src->multiplier)) {
/* Fail because these histograms don't match */
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 056d49064e..aa435d44d3 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -944,12 +944,12 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
grpc_call *call = bctl->call;
if (bctl->is_notify_tag_closure) {
+ grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success);
gpr_mu_lock(&call->mu);
bctl->call->used_batches =
(gpr_uint8)(bctl->call->used_batches &
~(gpr_uint8)(1 << (bctl - bctl->call->active_batches)));
gpr_mu_unlock(&call->mu);
- grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
} else {
grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->success,
diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c
index 7287f97aaa..e07fbb2cc7 100644
--- a/src/core/transport/chttp2/frame_data.c
+++ b/src/core/transport/chttp2/frame_data.c
@@ -45,11 +45,16 @@
grpc_chttp2_parse_error grpc_chttp2_data_parser_init(
grpc_chttp2_data_parser *parser) {
parser->state = GRPC_CHTTP2_DATA_FH_0;
+ parser->parsing_frame = NULL;
return GRPC_CHTTP2_PARSE_OK;
}
-void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser *parser) {
+void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_data_parser *parser) {
grpc_byte_stream *bs;
+ if (parser->parsing_frame) {
+ grpc_chttp2_incoming_byte_stream_finished(exec_ctx, parser->parsing_frame);
+ }
while (
(bs = grpc_chttp2_incoming_frame_queue_pop(&parser->incoming_frames))) {
grpc_byte_stream_destroy(bs);
@@ -198,8 +203,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
}
p->parsing_frame = incoming_byte_stream =
grpc_chttp2_incoming_byte_stream_create(
- transport_parsing, stream_parsing, p->frame_size, message_flags,
- &p->incoming_frames);
+ exec_ctx, transport_parsing, stream_parsing, p->frame_size,
+ message_flags, &p->incoming_frames);
/* fallthrough */
case GRPC_CHTTP2_DATA_FRAME:
if (cur == end) {
@@ -214,6 +219,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
exec_ctx, p->parsing_frame,
gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame);
+ p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
return GRPC_CHTTP2_PARSE_OK;
} else if ((gpr_uint32)(end - cur) > p->frame_size) {
@@ -222,6 +228,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
gpr_slice_sub(slice, (size_t)(cur - beg),
(size_t)(cur + p->frame_size - beg)));
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame);
+ p->parsing_frame = NULL;
cur += p->frame_size;
goto fh_0; /* loop */
} else {
diff --git a/src/core/transport/chttp2/frame_data.h b/src/core/transport/chttp2/frame_data.h
index bc32f29d97..472f9cebdb 100644
--- a/src/core/transport/chttp2/frame_data.h
+++ b/src/core/transport/chttp2/frame_data.h
@@ -80,7 +80,8 @@ grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
grpc_chttp2_parse_error grpc_chttp2_data_parser_init(
grpc_chttp2_data_parser *parser);
-void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser *parser);
+void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_data_parser *parser);
/* start processing a new data frame */
grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index b53c9dee0b..2d0cb4abdb 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -738,7 +738,7 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
#endif
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
- grpc_chttp2_transport_parsing *transport_parsing,
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size,
gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue);
void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 3d98a4fb14..f62294c7c5 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -512,7 +512,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
GPR_ASSERT(s->global.recv_initial_metadata_finished == NULL);
GPR_ASSERT(s->global.recv_message_ready == NULL);
GPR_ASSERT(s->global.recv_trailing_metadata_finished == NULL);
- grpc_chttp2_data_parser_destroy(&s->parsing.data_parser);
+ grpc_chttp2_data_parser_destroy(exec_ctx, &s->parsing.data_parser);
grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[0]);
grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[1]);
grpc_chttp2_incoming_metadata_buffer_destroy(
@@ -806,7 +806,8 @@ static void perform_stream_op_locked(
}
if (stream_global->write_closed) {
grpc_chttp2_complete_closure_step(
- exec_ctx, &stream_global->send_trailing_metadata_finished, 0);
+ exec_ctx, &stream_global->send_trailing_metadata_finished,
+ grpc_metadata_batch_is_empty(op->send_trailing_metadata));
} else if (stream_global->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
@@ -1364,6 +1365,46 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
* BYTE STREAM
*/
+static void incoming_byte_stream_update_flow_control(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
+ size_t have_already) {
+ gpr_uint32 max_recv_bytes;
+
+ /* clamp max recv hint to an allowable size */
+ if (max_size_hint >= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD) {
+ max_recv_bytes = GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD;
+ } else {
+ max_recv_bytes = (gpr_uint32)max_size_hint;
+ }
+
+ /* account for bytes already received but unknown to higher layers */
+ if (max_recv_bytes >= have_already) {
+ max_recv_bytes -= (gpr_uint32)have_already;
+ } else {
+ max_recv_bytes = 0;
+ }
+
+ /* add some small lookahead to keep pipelines flowing */
+ GPR_ASSERT(max_recv_bytes <= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD);
+ max_recv_bytes += GRPC_CHTTP2_STREAM_LOOKAHEAD;
+ if (stream_global->max_recv_bytes < max_recv_bytes) {
+ gpr_uint32 add_max_recv_bytes =
+ max_recv_bytes - stream_global->max_recv_bytes;
+ GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
+ max_recv_bytes, add_max_recv_bytes);
+ GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
+ unannounced_incoming_window_for_parse,
+ add_max_recv_bytes);
+ GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
+ unannounced_incoming_window_for_writing,
+ add_max_recv_bytes);
+ grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global,
+ stream_global);
+ grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ }
+}
+
static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
gpr_slice *slice, size_t max_size_hint,
@@ -1372,41 +1413,11 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
(grpc_chttp2_incoming_byte_stream *)byte_stream;
grpc_chttp2_transport_global *transport_global = &bs->transport->global;
grpc_chttp2_stream_global *stream_global = &bs->stream->global;
- gpr_uint32 max_recv_bytes;
lock(bs->transport);
if (bs->is_tail) {
- /* clamp max recv hint to an allowable size */
- if (max_size_hint >= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD) {
- max_recv_bytes = GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD;
- } else {
- max_recv_bytes = (gpr_uint32)max_size_hint;
- }
-
- /* account for bytes already received but unknown to higher layers */
- if (max_recv_bytes >= bs->slices.length) {
- max_recv_bytes -= (gpr_uint32)bs->slices.length;
- } else {
- max_recv_bytes = 0;
- }
- /* add some small lookahead to keep pipelines flowing */
- GPR_ASSERT(max_recv_bytes <= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD);
- max_recv_bytes += GRPC_CHTTP2_STREAM_LOOKAHEAD;
- if (stream_global->max_recv_bytes < max_recv_bytes) {
- gpr_uint32 add_max_recv_bytes =
- max_recv_bytes - stream_global->max_recv_bytes;
- GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
- max_recv_bytes, add_max_recv_bytes);
- GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
- unannounced_incoming_window_for_parse,
- add_max_recv_bytes);
- GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
- unannounced_incoming_window_for_writing,
- add_max_recv_bytes);
- grpc_chttp2_list_add_unannounced_incoming_window_available(
- transport_global, stream_global);
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
- }
+ incoming_byte_stream_update_flow_control(transport_global, stream_global,
+ max_size_hint, bs->slices.length);
}
if (bs->slices.count > 0) {
*slice = gpr_slice_buffer_take_first(&bs->slices);
@@ -1451,7 +1462,7 @@ void grpc_chttp2_incoming_byte_stream_finished(
}
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
- grpc_chttp2_transport_parsing *transport_parsing,
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size,
gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue) {
grpc_chttp2_incoming_byte_stream *incoming_byte_stream =
@@ -1474,6 +1485,13 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
add_to_queue->tail->next_message = incoming_byte_stream;
}
add_to_queue->tail = incoming_byte_stream;
+ if (frame_size == 0) {
+ lock(TRANSPORT_FROM_PARSING(transport_parsing));
+ incoming_byte_stream_update_flow_control(
+ &TRANSPORT_FROM_PARSING(transport_parsing)->global,
+ &STREAM_FROM_PARSING(stream_parsing)->global, 0, 0);
+ unlock(exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing));
+ }
return incoming_byte_stream;
}