aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-10-07 13:09:45 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2016-10-12 10:59:27 -0700
commit9381c00857d0acc4fc034937260f32ee26e836b9 (patch)
tree7af1ad9d74b193e14fb060f93b10917e862f1b71
parent9993c8716888eb1eb7b245fda5e76274e39df767 (diff)
Fix race with fetching data and writing it in chttp2
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c13
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h5
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c10
3 files changed, 15 insertions, 13 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 023b7c2e95..8ab26e512d 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -836,8 +836,8 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
return;
}
if (s->fetched_send_message_length == s->fetching_send_message->length) {
- ssize_t notify_offset = s->fetching_slice_end_offset;
- if (notify_offset <= 0) {
+ int64_t notify_offset = s->next_message_end_offset;
+ if (notify_offset <= s->flow_controlled_bytes_written) {
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
"fetching_send_message_finished");
@@ -848,7 +848,7 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
} else {
t->write_cb_pool = cb->next;
}
- cb->call_at_byte = (size_t)notify_offset;
+ cb->call_at_byte = notify_offset;
cb->closure = s->fetching_send_message_finished;
s->fetching_send_message_finished = NULL;
cb->next = s->on_write_finished_cbs;
@@ -1005,13 +1005,14 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
frame_hdr[4] = (uint8_t)(len);
s->fetching_send_message = op->send_message;
s->fetched_send_message_length = 0;
- s->fetching_slice_end_offset =
- (ssize_t)s->flow_controlled_buffer.length + (ssize_t)len;
+ s->next_message_end_offset = s->flow_controlled_bytes_written +
+ (int64_t)s->flow_controlled_buffer.length +
+ (int64_t)len;
s->complete_fetch_covered_by_poller = op->covered_by_poller;
if (flags & GRPC_WRITE_BUFFER_HINT) {
/* allow up to 64kb to be buffered */
/* TODO(ctiller): make this configurable */
- s->fetching_slice_end_offset -= 65536;
+ s->next_message_end_offset -= 65536;
}
continue_fetching_send_locked(exec_ctx, t, s);
if (s->id != 0) {
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 3263c99bde..774fed0722 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -147,7 +147,7 @@ typedef struct grpc_chttp2_outstanding_ping {
} grpc_chttp2_outstanding_ping;
typedef struct grpc_chttp2_write_cb {
- size_t call_at_byte;
+ int64_t call_at_byte;
grpc_closure *closure;
struct grpc_chttp2_write_cb *next;
} grpc_chttp2_write_cb;
@@ -353,7 +353,8 @@ struct grpc_chttp2_stream {
grpc_byte_stream *fetching_send_message;
uint32_t fetched_send_message_length;
gpr_slice fetching_slice;
- int64_t fetching_slice_end_offset;
+ int64_t next_message_end_offset;
+ int64_t flow_controlled_bytes_written;
bool complete_fetch_covered_by_poller;
grpc_closure complete_fetch;
grpc_closure complete_fetch_locked;
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index ebdbce1bfd..d34a7918b5 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -56,16 +56,16 @@ static void finish_write_cb(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
static void update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, size_t send_bytes,
+ grpc_chttp2_stream *s, int64_t send_bytes,
grpc_chttp2_write_cb **list, grpc_error *error) {
grpc_chttp2_write_cb *cb = *list;
*list = NULL;
+ s->flow_controlled_bytes_written += send_bytes;
while (cb) {
grpc_chttp2_write_cb *next = cb->next;
- if (cb->call_at_byte <= send_bytes) {
+ if (cb->call_at_byte <= s->flow_controlled_bytes_written) {
finish_write_cb(exec_ctx, t, s, cb, GRPC_ERROR_REF(error));
} else {
- cb->call_at_byte -= send_bytes;
add_to_write_list(list, cb);
}
cb = next;
@@ -236,8 +236,8 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_ERROR_REF(error), "send_initial_metadata_finished");
}
if (s->sending_bytes != 0) {
- update_list(exec_ctx, t, s, s->sending_bytes, &s->on_write_finished_cbs,
- GRPC_ERROR_REF(error));
+ update_list(exec_ctx, t, s, (int64_t)s->sending_bytes,
+ &s->on_write_finished_cbs, GRPC_ERROR_REF(error));
s->sending_bytes = 0;
}
if (s->sent_trailing_metadata) {