aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar ctiller <ctiller@google.com>2015-01-12 11:23:09 -0800
committerGravatar Nicolas Noble <nnoble@google.com>2015-01-12 14:03:28 -0800
commit00297df63b5c2d660dafa5314b3c87a57cfaf0b8 (patch)
treea4c3b1e3f918708c9788df31a8ff5ad8b91f2809
parent45fc159eed8d7b9be2277f81e3f43de7e5daabc2 (diff)
Ensure flow control callbacks happen outside the transport lock.
Split encoding into two phases: a collection phase to decide on what is allowed (by flow control) to be sent, and a framing phase when the data is actually sent. Perform the second phase outside of the transport mutex (but serially, guarded by t->writing) and make flow control callbacks during that phase. This will allow us to make further transport level calls in response to flow control callbacks, and will be needed by the forthcoming async api for C++. Change on 2015/01/12 by ctiller <ctiller@google.com> ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=83774409
-rw-r--r--src/core/transport/chttp2/stream_encoder.c130
-rw-r--r--src/core/transport/chttp2/stream_encoder.h15
-rw-r--r--src/core/transport/chttp2_transport.c246
-rw-r--r--test/core/transport/chttp2/stream_encoder_test.c24
4 files changed, 249 insertions, 166 deletions
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index 8595a59879..92a36d0c16 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -68,8 +68,6 @@ typedef struct {
gpr_uint8 last_was_header;
/* output stream id */
gpr_uint32 stream_id;
- /* number of flow controlled bytes written */
- gpr_uint32 output_size;
gpr_slice_buffer *output;
} framer_state;
@@ -464,49 +462,31 @@ void grpc_chttp2_hpack_compressor_destroy(grpc_chttp2_hpack_compressor *c) {
grpc_mdstr_unref(c->timeout_key_str);
}
-gpr_uint32 grpc_chttp2_encode_some(grpc_stream_op *ops, size_t *ops_count,
- int eof, gpr_slice_buffer *output,
- gpr_uint32 max_bytes, gpr_uint32 stream_id,
- grpc_chttp2_hpack_compressor *compressor) {
- framer_state st;
+gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count,
+ gpr_uint32 max_flow_controlled_bytes,
+ grpc_stream_op_buffer *outops) {
gpr_slice slice;
grpc_stream_op *op;
gpr_uint32 max_take_size;
+ gpr_uint32 flow_controlled_bytes_taken = 0;
gpr_uint32 curop = 0;
- gpr_uint32 nops = *ops_count;
gpr_uint8 *p;
- GPR_ASSERT(stream_id != 0);
-
- st.cur_frame_type = NONE;
- st.last_was_header = 0;
- st.stream_id = stream_id;
- st.output = output;
- st.output_size = 0;
-
- while (curop < nops) {
- GPR_ASSERT(st.output_size <= max_bytes);
- op = &ops[curop];
+ while (curop < *inops_count) {
+ GPR_ASSERT(flow_controlled_bytes_taken <= max_flow_controlled_bytes);
+ op = &inops[curop];
switch (op->type) {
case GRPC_NO_OP:
+ /* skip */
curop++;
break;
case GRPC_OP_FLOW_CTL_CB:
- op->data.flow_ctl_cb.cb(op->data.flow_ctl_cb.arg, GRPC_OP_OK);
- curop++;
- break;
- case GRPC_OP_METADATA:
- hpack_enc(compressor, op->data.metadata, &st);
- curop++;
- break;
case GRPC_OP_DEADLINE:
- deadline_enc(compressor, op->data.deadline, &st);
- curop++;
- break;
+ case GRPC_OP_METADATA:
case GRPC_OP_METADATA_BOUNDARY:
- ensure_frame_type(&st, HEADER, 0);
- finish_frame(&st, 1, 0);
- st.last_was_header = 0; /* force a new header frame */
+ /* these just get copied as they don't impact the number of flow
+ controlled bytes */
+ grpc_sopb_append(outops, op, 1);
curop++;
break;
case GRPC_OP_BEGIN_MESSAGE:
@@ -525,42 +505,100 @@ gpr_uint32 grpc_chttp2_encode_some(grpc_stream_op *ops, size_t *ops_count,
case GRPC_OP_SLICE:
slice = op->data.slice;
if (!GPR_SLICE_LENGTH(slice)) {
+ /* skip zero length slices */
+ gpr_slice_unref(slice);
curop++;
break;
}
- if (st.output_size == max_bytes) {
+ max_take_size = max_flow_controlled_bytes - flow_controlled_bytes_taken;
+ if (max_take_size == 0) {
goto exit_loop;
}
+ if (GPR_SLICE_LENGTH(slice) > max_take_size) {
+ slice = gpr_slice_split_head(&op->data.slice, max_take_size);
+ grpc_sopb_add_slice(outops, slice);
+ } else {
+ /* consume this op immediately */
+ grpc_sopb_append(outops, op, 1);
+ curop++;
+ }
+ flow_controlled_bytes_taken += GPR_SLICE_LENGTH(slice);
+ break;
+ }
+ }
+exit_loop:
+ *inops_count -= curop;
+ memmove(inops, inops + curop, *inops_count * sizeof(grpc_stream_op));
+
+ return flow_controlled_bytes_taken;
+}
+
+void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
+ gpr_uint32 stream_id,
+ grpc_chttp2_hpack_compressor *compressor,
+ gpr_slice_buffer *output) {
+ framer_state st;
+ gpr_slice slice;
+ grpc_stream_op *op;
+ gpr_uint32 max_take_size;
+ gpr_uint32 curop = 0;
+
+ GPR_ASSERT(stream_id != 0);
+
+ st.cur_frame_type = NONE;
+ st.last_was_header = 0;
+ st.stream_id = stream_id;
+ st.output = output;
+
+ while (curop < ops_count) {
+ op = &ops[curop];
+ switch (op->type) {
+ case GRPC_NO_OP:
+ case GRPC_OP_BEGIN_MESSAGE:
+ gpr_log(
+ GPR_ERROR,
+ "These stream ops should be filtered out by grpc_chttp2_preencode");
+ abort();
+ case GRPC_OP_FLOW_CTL_CB:
+ op->data.flow_ctl_cb.cb(op->data.flow_ctl_cb.arg, GRPC_OP_OK);
+ curop++;
+ break;
+ case GRPC_OP_METADATA:
+ hpack_enc(compressor, op->data.metadata, &st);
+ curop++;
+ break;
+ case GRPC_OP_DEADLINE:
+ deadline_enc(compressor, op->data.deadline, &st);
+ curop++;
+ break;
+ case GRPC_OP_METADATA_BOUNDARY:
+ ensure_frame_type(&st, HEADER, 0);
+ finish_frame(&st, 1, 0);
+ st.last_was_header = 0; /* force a new header frame */
+ curop++;
+ break;
+ case GRPC_OP_SLICE:
+ slice = op->data.slice;
if (st.cur_frame_type == DATA &&
st.output->length - st.output_length_at_start_of_frame ==
GRPC_CHTTP2_MAX_PAYLOAD_LENGTH) {
finish_frame(&st, 0, 0);
}
ensure_frame_type(&st, DATA, 1);
- max_take_size =
- GPR_MIN(max_bytes - st.output_size,
- GRPC_CHTTP2_MAX_PAYLOAD_LENGTH +
- st.output_length_at_start_of_frame - st.output->length);
+ max_take_size = GRPC_CHTTP2_MAX_PAYLOAD_LENGTH +
+ st.output_length_at_start_of_frame - st.output->length;
if (GPR_SLICE_LENGTH(slice) > max_take_size) {
slice = gpr_slice_split_head(&op->data.slice, max_take_size);
} else {
/* consume this op immediately */
curop++;
}
- st.output_size += GPR_SLICE_LENGTH(slice);
gpr_slice_buffer_add(output, slice);
break;
}
}
-exit_loop:
if (eof && st.cur_frame_type == NONE) {
begin_frame(&st, DATA);
}
- finish_frame(&st, 1, eof && curop == nops);
-
- nops -= curop;
- *ops_count = nops;
- memmove(ops, ops + curop, nops * sizeof(grpc_stream_op));
-
- return st.output_size;
+ finish_frame(&st, 1, eof);
}
diff --git a/src/core/transport/chttp2/stream_encoder.h b/src/core/transport/chttp2/stream_encoder.h
index dad64697a5..4b093e8495 100644
--- a/src/core/transport/chttp2/stream_encoder.h
+++ b/src/core/transport/chttp2/stream_encoder.h
@@ -78,9 +78,16 @@ void grpc_chttp2_hpack_compressor_init(grpc_chttp2_hpack_compressor *c,
grpc_mdctx *mdctx);
void grpc_chttp2_hpack_compressor_destroy(grpc_chttp2_hpack_compressor *c);
-gpr_uint32 grpc_chttp2_encode_some(grpc_stream_op *ops, size_t *ops_count,
- int eof, gpr_slice_buffer *output,
- gpr_uint32 max_bytes, gpr_uint32 stream_id,
- grpc_chttp2_hpack_compressor *compressor);
+/* select stream ops to be encoded, moving them from inops to outops, and
+ moving subsequent ops in inops forward in the queue */
+gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count,
+ gpr_uint32 max_flow_controlled_bytes,
+ grpc_stream_op_buffer *outops);
+
+/* encode stream ops to output */
+void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
+ gpr_uint32 stream_id,
+ grpc_chttp2_hpack_compressor *compressor,
+ gpr_slice_buffer *output);
#endif /* __GRPC_INTERNAL_TRANSPORT_CHTTP2_STREAM_ENCODER_H__ */
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 5bf763e76f..1b90d4715b 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -71,6 +71,13 @@ typedef struct stream stream;
typedef enum {
/* streams that have pending writes */
WRITABLE = 0,
+ /* streams that have been selected to be written */
+ WRITING,
+ /* streams that have just been written, and included a close */
+ WRITTEN_CLOSED,
+ /* streams that have been cancelled and have some pending state updates
+ to perform */
+ CANCELLED,
/* streams that want to send window updates */
WINDOW_UPDATE,
/* streams that are waiting to start because there are too many concurrent
@@ -258,7 +265,12 @@ struct stream {
gpr_uint32 outgoing_window;
gpr_uint32 incoming_window;
- gpr_uint8 write_closed;
+ /* when the application requests writes be closed, the write_closed is
+ 'queued'; when the close is flow controlled into the send path, we are
+ 'sending' it; when the write has been performed it is 'sent' */
+ gpr_uint8 queued_write_closed;
+ gpr_uint8 sending_write_closed;
+ gpr_uint8 sent_write_closed;
gpr_uint8 read_closed;
gpr_uint8 cancelled;
gpr_uint8 allow_window_updates;
@@ -267,7 +279,10 @@ struct stream {
stream_link links[STREAM_LIST_COUNT];
gpr_uint8 included[STREAM_LIST_COUNT];
+ /* sops from application */
grpc_stream_op_buffer outgoing_sopb;
+ /* sops that have passed flow control to be written */
+ grpc_stream_op_buffer writing_sopb;
grpc_chttp2_data_parser parser;
@@ -284,7 +299,7 @@ static int prepare_callbacks(transport *t);
static void run_callbacks(transport *t);
static int prepare_write(transport *t);
-static void finish_write(void *t, grpc_endpoint_cb_status status);
+static void perform_write(transport *t, grpc_endpoint *ep);
static void lock(transport *t);
static void unlock(transport *t);
@@ -303,6 +318,7 @@ static void cancel_stream_id(transport *t, gpr_uint32 id,
static void cancel_stream(transport *t, stream *s,
grpc_status_code local_status,
grpc_chttp2_error_code error_code, int send_rst);
+static void finalize_cancellations(transport *t);
static stream *lookup_stream(transport *t, gpr_uint32 id);
static void remove_from_stream_map(transport *t, stream *s);
static void maybe_start_some_streams(transport *t);
@@ -518,7 +534,9 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->incoming_window =
t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->write_closed = 0;
+ s->queued_write_closed = 0;
+ s->sending_write_closed = 0;
+ s->sent_write_closed = 0;
s->read_closed = 0;
s->cancelled = 0;
s->allow_window_updates = 0;
@@ -526,8 +544,9 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
memset(&s->links, 0, sizeof(s->links));
memset(&s->included, 0, sizeof(s->included));
grpc_sopb_init(&s->outgoing_sopb);
- grpc_chttp2_data_parser_init(&s->parser);
+ grpc_sopb_init(&s->writing_sopb);
grpc_sopb_init(&s->callback_sopb);
+ grpc_chttp2_data_parser_init(&s->parser);
if (!server_data) {
unlock(t);
@@ -565,8 +584,9 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
gpr_mu_unlock(&t->mu);
grpc_sopb_destroy(&s->outgoing_sopb);
- grpc_chttp2_data_parser_destroy(&s->parser);
+ grpc_sopb_destroy(&s->writing_sopb);
grpc_sopb_destroy(&s->callback_sopb);
+ grpc_chttp2_data_parser_destroy(&s->parser);
unref_transport(t);
}
@@ -575,6 +595,10 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
* LIST MANAGEMENT
*/
+static int stream_list_empty(transport *t, stream_list_id id) {
+ return t->lists[id].head == NULL;
+}
+
static stream *stream_list_remove_head(transport *t, stream_list_id id) {
stream *s = t->lists[id].head;
if (s) {
@@ -666,6 +690,10 @@ static void unlock(transport *t) {
}
}
+ if (!t->writing) {
+ finalize_cancellations(t);
+ }
+
/* gather any callbacks that need to be made */
if (!t->calling_back && t->cb) {
perform_callbacks = prepare_callbacks(t);
@@ -709,53 +737,9 @@ static void unlock(transport *t) {
}
/* write some bytes if necessary */
- while (start_write) {
- switch (grpc_endpoint_write(ep, t->outbuf.slices, t->outbuf.count,
- finish_write, t)) {
- case GRPC_ENDPOINT_WRITE_DONE:
- /* grab the lock directly without wrappers since we just want to
- continue writes if we loop: no need to check read callbacks again */
- gpr_mu_lock(&t->mu);
- t->outbuf.count = 0;
- t->outbuf.length = 0;
- t->writing = start_write = prepare_write(t);
- if (!start_write) {
- if (!t->reading) {
- grpc_endpoint_destroy(t->ep);
- t->ep = NULL;
- gpr_cv_broadcast(&t->cv);
- /* endpoint ref: safe because we'll still have the ref for write */
- unref_transport(t);
- }
- }
- gpr_mu_unlock(&t->mu);
- if (!start_write) {
- unref_transport(t);
- }
- break;
- case GRPC_ENDPOINT_WRITE_ERROR:
- start_write = 0;
- /* use the wrapper lock/unlock here as we drop_connection, causing
- read callbacks to be queued (which will be cleared during unlock) */
- lock(t);
- t->outbuf.count = 0;
- t->outbuf.length = 0;
- t->writing = 0;
- drop_connection(t);
- if (!t->reading) {
- grpc_endpoint_destroy(t->ep);
- t->ep = NULL;
- gpr_cv_broadcast(&t->cv);
- /* endpoint ref: safe because we'll still have the ref for write */
- unref_transport(t);
- }
- unlock(t);
- unref_transport(t);
- break;
- case GRPC_ENDPOINT_WRITE_PENDING:
- start_write = 0;
- break;
- }
+ if (start_write) {
+ /* ultimately calls unref_transport(t); and clears t->writing */
+ perform_write(t, ep);
}
if (perform_callbacks || call_closed || num_goaways) {
@@ -788,32 +772,10 @@ static void push_setting(transport *t, grpc_chttp2_setting_id id,
}
}
-static void finish_write(void *tp, grpc_endpoint_cb_status error) {
- transport *t = tp;
-
- lock(t);
- if (error != GRPC_ENDPOINT_CB_OK) {
- drop_connection(t);
- }
- t->outbuf.count = 0;
- t->outbuf.length = 0;
- /* leave the writing flag up on shutdown to prevent further writes in unlock()
- from starting */
- t->writing = 0;
- if (!t->reading) {
- grpc_endpoint_destroy(t->ep);
- t->ep = NULL;
- gpr_cv_broadcast(&t->cv);
- unref_transport(t); /* safe because we'll still have the ref for write */
- }
- unlock(t);
-
- unref_transport(t);
-}
-
static int prepare_write(transport *t) {
stream *s;
gpr_slice_buffer tempbuf;
+ gpr_uint32 window_delta;
/* simple writes are queued to qbuf, and flushed here */
tempbuf = t->qbuf;
@@ -834,17 +796,16 @@ static int prepare_write(transport *t) {
/* for each stream that's become writable, frame it's data (according to
available window sizes) and add to the output buffer */
while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE))) {
- gpr_uint32 written = grpc_chttp2_encode_some(
- s->outgoing_sopb.ops, &s->outgoing_sopb.nops, s->write_closed,
- &t->outbuf, GPR_MIN(t->outgoing_window, s->outgoing_window), s->id,
- &t->hpack_compressor);
- t->outgoing_window -= written;
- s->outgoing_window -= written;
-
- /* if there are no more writes to do and writes are closed, we need to
- queue a callback to let the application know */
- if (s->write_closed && s->outgoing_sopb.nops == 0) {
- stream_list_join(t, s, PENDING_CALLBACKS);
+ window_delta = grpc_chttp2_preencode(
+ s->outgoing_sopb.ops, &s->outgoing_sopb.nops,
+ GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb);
+ t->outgoing_window -= window_delta;
+ s->outgoing_window -= window_delta;
+
+ s->sending_write_closed =
+ s->queued_write_closed && s->outgoing_sopb.nops == 0;
+ if (s->writing_sopb.nops > 0 || s->sending_write_closed) {
+ stream_list_join(t, s, WRITING);
}
/* if there are still writes to do and the stream still has window
@@ -857,25 +818,89 @@ static int prepare_write(transport *t) {
/* for each stream that wants to update its window, add that window here */
while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) {
- gpr_uint32 window_add =
+ window_delta =
t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
s->incoming_window;
- if (!s->read_closed && window_add) {
- gpr_slice_buffer_add(&t->outbuf,
- grpc_chttp2_window_update_create(s->id, window_add));
- s->incoming_window += window_add;
+ if (!s->read_closed && window_delta) {
+ gpr_slice_buffer_add(
+ &t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
+ s->incoming_window += window_delta;
}
}
/* if the transport is ready to send a window update, do so here also */
if (t->incoming_window < t->connection_window_target * 3 / 4) {
- gpr_uint32 window_add = t->connection_window_target - t->incoming_window;
+ window_delta = t->connection_window_target - t->incoming_window;
gpr_slice_buffer_add(&t->outbuf,
- grpc_chttp2_window_update_create(0, window_add));
- t->incoming_window += window_add;
+ grpc_chttp2_window_update_create(0, window_delta));
+ t->incoming_window += window_delta;
}
- return t->outbuf.length > 0;
+ return t->outbuf.length > 0 || !stream_list_empty(t, WRITING);
+}
+
+static void finalize_outbuf(transport *t) {
+ stream *s;
+
+ while ((s = stream_list_remove_head(t, WRITING))) {
+ grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops,
+ s->sending_write_closed, s->id, &t->hpack_compressor,
+ &t->outbuf);
+ s->writing_sopb.nops = 0;
+ if (s->sending_write_closed) {
+ stream_list_join(t, s, WRITTEN_CLOSED);
+ }
+ }
+}
+
+static void finish_write_common(transport *t, int success) {
+ stream *s;
+
+ lock(t);
+ if (!success) {
+ drop_connection(t);
+ }
+ while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
+ s->sent_write_closed = 1;
+ stream_list_join(t, s, PENDING_CALLBACKS);
+ }
+ t->outbuf.count = 0;
+ t->outbuf.length = 0;
+ /* leave the writing flag up on shutdown to prevent further writes in unlock()
+ from starting */
+ t->writing = 0;
+ if (!t->reading) {
+ grpc_endpoint_destroy(t->ep);
+ t->ep = NULL;
+ gpr_cv_broadcast(&t->cv);
+ unref_transport(t); /* safe because we'll still have the ref for write */
+ }
+ unlock(t);
+
+ unref_transport(t);
+}
+
+static void finish_write(void *tp, grpc_endpoint_cb_status error) {
+ transport *t = tp;
+ finish_write_common(t, error == GRPC_ENDPOINT_CB_OK);
+}
+
+static void perform_write(transport *t, grpc_endpoint *ep) {
+ finalize_outbuf(t);
+
+ GPR_ASSERT(t->outbuf.count > 0);
+
+ switch (grpc_endpoint_write(ep, t->outbuf.slices, t->outbuf.count,
+ finish_write, t)) {
+ case GRPC_ENDPOINT_WRITE_DONE:
+ finish_write_common(t, 1);
+ break;
+ case GRPC_ENDPOINT_WRITE_ERROR:
+ finish_write_common(t, 0);
+ break;
+ case GRPC_ENDPOINT_WRITE_PENDING:
+ break;
+ }
}
static void maybe_start_some_streams(transport *t) {
@@ -901,19 +926,14 @@ static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops,
lock(t);
if (is_last) {
- s->write_closed = 1;
+ s->queued_write_closed = 1;
}
if (!s->cancelled) {
grpc_sopb_append(&s->outgoing_sopb, ops, ops_count);
- if (is_last && s->outgoing_sopb.nops == 0) {
- if (s->id != 0) {
- gpr_slice_buffer_add(&t->qbuf,
- grpc_chttp2_data_frame_create_empty_close(s->id));
- }
- } else if (s->id == 0) {
+ if (s->id == 0) {
stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
maybe_start_some_streams(t);
- } else if (s->outgoing_window) {
+ } else {
stream_list_join(t, s, WRITABLE);
}
} else {
@@ -967,12 +987,22 @@ static void send_ping(grpc_transport *gt, void (*cb)(void *user_data),
* INPUT PROCESSING
*/
+static void finalize_cancellations(transport *t) {
+ stream *s;
+
+ while ((s = stream_list_remove_head(t, CANCELLED))) {
+ s->read_closed = 1;
+ s->sent_write_closed = 1;
+ stream_list_join(t, s, PENDING_CALLBACKS);
+ }
+}
+
static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code,
int send_rst) {
- char buffer[32];
int had_outgoing;
+ char buffer[32];
if (s) {
/* clear out any unreported input & output: nobody cares anymore */
@@ -981,10 +1011,9 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
grpc_sopb_reset(&s->outgoing_sopb);
if (s->cancelled) {
send_rst = 0;
- } else if (!s->read_closed || !s->write_closed || had_outgoing) {
+ } else if (!s->read_closed || !s->sent_write_closed || had_outgoing) {
s->cancelled = 1;
- s->read_closed = 1;
- s->write_closed = 1;
+ stream_list_join(t, s, CANCELLED);
sprintf(buffer, "%d", local_status);
grpc_sopb_add_metadata(
@@ -1667,8 +1696,7 @@ static int prepare_callbacks(transport *t) {
s->parser.incoming_sopb = s->callback_sopb;
s->callback_sopb = temp_sopb;
- s->callback_state = compute_state(
- s->write_closed && s->outgoing_sopb.nops == 0, s->read_closed);
+ s->callback_state = compute_state(s->sent_write_closed, s->read_closed);
if (s->callback_state == GRPC_STREAM_CLOSED) {
remove_from_stream_map(t, s);
if (s->published_close) {
diff --git a/test/core/transport/chttp2/stream_encoder_test.c b/test/core/transport/chttp2/stream_encoder_test.c
index 3ee11d94e8..ba552786db 100644
--- a/test/core/transport/chttp2/stream_encoder_test.c
+++ b/test/core/transport/chttp2/stream_encoder_test.c
@@ -64,15 +64,20 @@ static gpr_slice create_test_slice(size_t length) {
static void verify_sopb(size_t window_available, int eof,
size_t expect_window_used, const char *expected) {
gpr_slice_buffer output;
+ grpc_stream_op_buffer encops;
gpr_slice merged;
gpr_slice expect = parse_hexstring(expected);
gpr_slice_buffer_init(&output);
+ grpc_sopb_init(&encops);
GPR_ASSERT(expect_window_used ==
- grpc_chttp2_encode_some(g_sopb.ops, &g_sopb.nops, eof, &output,
- window_available, 0xdeadbeef,
- &g_compressor));
+ grpc_chttp2_preencode(g_sopb.ops, &g_sopb.nops, window_available,
+ &encops));
+ grpc_chttp2_encode(encops.ops, encops.nops, eof, 0xdeadbeef, &g_compressor,
+ &output);
+ encops.nops = 0;
merged = grpc_slice_merge(output.slices, output.count);
gpr_slice_buffer_destroy(&output);
+ grpc_sopb_destroy(&encops);
if (0 != gpr_slice_cmp(merged, expect)) {
char *expect_str =
@@ -240,21 +245,25 @@ static void test_decode_random_headers_inner(int max_len) {
test_decode_random_header_state st;
gpr_slice_buffer output;
gpr_slice merged;
+ grpc_stream_op_buffer encops;
grpc_chttp2_hpack_parser parser;
grpc_chttp2_hpack_parser_init(&parser, g_mdctx);
+ grpc_sopb_init(&encops);
gpr_log(GPR_INFO, "max_len = %d", max_len);
- for (i = 0; i < 100000; i++) {
+ for (i = 0; i < 10000; i++) {
randstr(st.key, max_len);
randstr(st.value, max_len);
add_sopb_header(st.key, st.value);
gpr_slice_buffer_init(&output);
- GPR_ASSERT(0 == grpc_chttp2_encode_some(g_sopb.ops, &g_sopb.nops, 0,
- &output, 0, 0xdeadbeef,
- &g_compressor));
+ GPR_ASSERT(0 ==
+ grpc_chttp2_preencode(g_sopb.ops, &g_sopb.nops, 0, &encops));
+ grpc_chttp2_encode(encops.ops, encops.nops, 0, 0xdeadbeef, &g_compressor,
+ &output);
+ encops.nops = 0;
merged = grpc_slice_merge(output.slices, output.count);
gpr_slice_buffer_destroy(&output);
@@ -269,6 +278,7 @@ static void test_decode_random_headers_inner(int max_len) {
}
grpc_chttp2_hpack_parser_destroy(&parser);
+ grpc_sopb_destroy(&encops);
}
#define DECL_TEST_DECODE_RANDOM_HEADERS(n) \