aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-05-12 13:17:14 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-05-12 13:17:14 -0700
commite9a766593b33a18574cd3dfdaaea46a6dcdddc91 (patch)
tree54f1d15d38146aec770afef465cfe4aec78ffae0
parent8ef68a570faebde6c79b80b3922a121f0540f342 (diff)
Progress
-rw-r--r--include/grpc/impl/codegen/grpc_types.h7
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c38
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h6
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c16
4 files changed, 49 insertions, 18 deletions
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index 4738e02ecb..f03ec09430 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -350,8 +350,11 @@ typedef enum grpc_call_error {
/** Force compression to be disabled for a particular write
(start_write/add_metadata). Illegal on invoke/accept. */
#define GRPC_WRITE_NO_COMPRESS (0x00000002u)
+/** Force this message to be written to the socket before completing it */
+#define GRPC_WRITE_THROUGH (0x00000004u)
/** Mask of all valid flags. */
-#define GRPC_WRITE_USED_MASK (GRPC_WRITE_BUFFER_HINT | GRPC_WRITE_NO_COMPRESS)
+#define GRPC_WRITE_USED_MASK \
+ (GRPC_WRITE_BUFFER_HINT | GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_THROUGH)
/* Initial metadata flags */
/** Signal that the call is idempotent */
@@ -372,7 +375,7 @@ typedef enum grpc_call_error {
GRPC_INITIAL_METADATA_WAIT_FOR_READY | \
GRPC_INITIAL_METADATA_CACHEABLE_REQUEST | \
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET | \
- GRPC_INITIAL_METADATA_CORKED)
+ GRPC_INITIAL_METADATA_CORKED | GRPC_WRITE_THROUGH)
/** A single metadata element */
typedef struct grpc_metadata {
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index e0aa3499c9..a9bc36b0d4 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -909,7 +909,7 @@ static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) {
grpc_chttp2_transport *t = gt;
GPR_TIMER_BEGIN("write_action", 0);
grpc_endpoint_write(
- exec_ctx, t->ep, &t->outbuf, true,
+ exec_ctx, t->ep, &t->outbuf, t->write_is_covered,
grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t,
grpc_combiner_scheduler(t->combiner, false)));
GPR_TIMER_END("write_action", 0);
@@ -1192,8 +1192,12 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
cb->call_at_byte = notify_offset;
cb->closure = s->fetching_send_message_finished;
s->fetching_send_message_finished = NULL;
- cb->next = s->on_write_finished_cbs;
- s->on_write_finished_cbs = cb;
+ grpc_chttp2_write_cb **list =
+ s->fetching_send_message->flags & GRPC_WRITE_THROUGH
+ ? &s->on_write_finished_cbs
+ : &s->on_flow_controlled_cbs;
+ cb->next = *list;
+ *list = cb;
}
s->fetching_send_message = NULL;
return; /* early out */
@@ -1873,6 +1877,21 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s,
return error;
}
+static void flush_write_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, grpc_chttp2_write_cb **list,
+ grpc_error *error) {
+ while (*list) {
+ grpc_chttp2_write_cb *cb = *list;
+ *list = cb->next;
+ grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure,
+ GRPC_ERROR_REF(error),
+ "on_write_finished_cb");
+ cb->next = t->write_cb_pool;
+ t->write_cb_pool = cb;
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error) {
@@ -1892,16 +1911,9 @@ void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error),
"fetching_send_message_finished");
- while (s->on_write_finished_cbs) {
- grpc_chttp2_write_cb *cb = s->on_write_finished_cbs;
- s->on_write_finished_cbs = cb->next;
- grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure,
- GRPC_ERROR_REF(error),
- "on_write_finished_cb");
- cb->next = t->write_cb_pool;
- t->write_cb_pool = cb;
- }
- GRPC_ERROR_UNREF(error);
+ flush_write_list(exec_ctx, t, s, &s->on_write_finished_cbs,
+ GRPC_ERROR_REF(error));
+ flush_write_list(exec_ctx, t, s, &s->on_flow_controlled_cbs, error);
}
void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 8d66e396ee..682c14be3a 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -421,6 +421,10 @@ struct grpc_chttp2_transport {
bool keepalive_permit_without_calls;
/** keep-alive state machine state */
grpc_chttp2_keepalive_state keepalive_state;
+
+ /** is the next write covered by a poller? set in grpc_chttp2_begin_write,
+ read in write_action */
+ bool write_is_covered;
};
typedef enum {
@@ -458,6 +462,7 @@ struct grpc_chttp2_stream {
grpc_slice fetching_slice;
int64_t next_message_end_offset;
int64_t flow_controlled_bytes_written;
+ int64_t flow_controlled_bytes_flowed;
bool complete_fetch_covered_by_poller;
grpc_closure complete_fetch_locked;
grpc_closure *fetching_send_message_finished;
@@ -531,6 +536,7 @@ struct grpc_chttp2_stream {
uint32_t announce_window;
grpc_slice_buffer flow_controlled_buffer;
+ grpc_chttp2_write_cb *on_flow_controlled_cbs;
grpc_chttp2_write_cb *on_write_finished_cbs;
grpc_chttp2_write_cb *finish_after_write;
size_t sending_bytes;
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 5be1092946..92ee747e6b 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -138,10 +138,11 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx,
static void update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *s, int64_t send_bytes,
- grpc_chttp2_write_cb **list, grpc_error *error) {
+ grpc_chttp2_write_cb **list, int64_t *ctr,
+ grpc_error *error) {
grpc_chttp2_write_cb *cb = *list;
*list = NULL;
- s->flow_controlled_bytes_written += send_bytes;
+ *ctr += send_bytes;
while (cb) {
grpc_chttp2_write_cb *next = cb->next;
if (cb->call_at_byte <= s->flow_controlled_bytes_written) {
@@ -228,6 +229,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
bool sent_initial_metadata = s->sent_initial_metadata;
bool now_writing = false;
+ t->write_is_covered = false;
GRPC_CHTTP2_IF_TRACING(gpr_log(
GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t,
@@ -259,6 +261,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
gpr_inf_past(GPR_CLOCK_MONOTONIC);
t->ping_recv_state.ping_strikes = 0;
}
+ t->write_is_covered = true;
}
/* send any window updates */
if (s->announce_window > 0) {
@@ -320,11 +323,16 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
}
}
s->sending_bytes += send_bytes;
+ update_list(exec_ctx, t, s, send_bytes, &s->on_flow_controlled_cbs,
+ &s->flow_controlled_bytes_flowed, GRPC_ERROR_NONE);
now_writing = true;
if (s->flow_controlled_buffer.length > 0) {
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork");
grpc_chttp2_list_add_writable_stream(t, s);
}
+ if (s->on_write_finished_cbs != NULL) {
+ t->write_is_covered = true;
+ }
} else if (t->outgoing_window == 0) {
grpc_chttp2_list_add_stalled_by_transport(t, s);
now_writing = true;
@@ -364,6 +372,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
s->id, GRPC_HTTP2_NO_ERROR, &s->stats.outgoing));
}
now_writing = true;
+ t->write_is_covered = true;
}
}
@@ -430,7 +439,8 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
if (s->sending_bytes != 0) {
update_list(exec_ctx, t, s, (int64_t)s->sending_bytes,
- &s->on_write_finished_cbs, GRPC_ERROR_REF(error));
+ &s->on_write_finished_cbs, &s->flow_controlled_bytes_written,
+ GRPC_ERROR_REF(error));
s->sending_bytes = 0;
}
if (s->sent_trailing_metadata) {