aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-04-24 11:44:53 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-04-24 11:44:53 -0700
commitc1f7560ac27b6db4106115e5308f1a9124378a60 (patch)
treee35947864ae18806e9e190fffdd1aa3a31e5a4e0 /src
parenta940752deefd463fd07eb529af715813d7fdcd8d (diff)
Stream mapping fixes
Diffstat (limited to 'src')
-rw-r--r--src/core/surface/call.c9
-rw-r--r--src/core/transport/chttp2_transport.c12
2 files changed, 16 insertions, 5 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index dbf78f2cfe..2f514465fc 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -687,7 +687,7 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
static void call_on_done_recv(void *pc, int success) {
grpc_call *call = pc;
size_t i;
- int unref = 0;
+ int unref_due_to_connection_close = 0;
lock(call);
call->receiving = 0;
if (success) {
@@ -714,7 +714,7 @@ static void call_on_done_recv(void *pc, int success) {
if (call->recv_state == GRPC_STREAM_CLOSED) {
GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
call->read_state = READ_STATE_STREAM_CLOSED;
- unref = 1;
+ unref_due_to_connection_close = 1;
}
finish_read_ops(call);
} else {
@@ -725,9 +725,11 @@ static void call_on_done_recv(void *pc, int success) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_ERROR);
finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_OP_ERROR);
}
+ call->recv_ops.nops = 0;
unlock(call);
- if (unref) {
+ grpc_call_internal_unref(call, 0);
+ if (unref_due_to_connection_close) {
grpc_call_internal_unref(call, 0);
}
}
@@ -798,6 +800,7 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
op->bind_pollset = grpc_cq_pollset(call->cq);
call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA;
call->write_state = WRITE_STATE_STARTED;
+ call->send_initial_metadata_count = 0;
/* fall through intended */
case WRITE_STATE_STARTED:
if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index a02fb93677..237def41aa 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -306,7 +306,6 @@ struct stream {
gpr_uint8 send_closed;
gpr_uint8 read_closed;
gpr_uint8 cancelled;
- gpr_uint8 published_close;
op_closure send_done_closure;
op_closure recv_done_closure;
@@ -731,6 +730,8 @@ static void stream_list_join(transport *t, stream *s, stream_list_id id) {
static void remove_from_stream_map(transport *t, stream *s) {
if (s->id == 0) return;
+ IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing stream %d", t->is_client? "CLI" : "SVR",
+ s->id));
if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) {
maybe_start_some_streams(t);
}
@@ -999,6 +1000,8 @@ static void maybe_start_some_streams(transport *t) {
stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
if (!s) break;
+ IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new stream %p to id %d", t->is_client? "CLI" : "SVR", s, t->next_stream_id));
+
GPR_ASSERT(s->id == 0);
s->id = t->next_stream_id;
t->next_stream_id += 2;
@@ -1018,6 +1021,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
s->write_state = WRITE_STATE_QUEUED_CLOSE;
}
if (s->id == 0) {
+ IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: New stream %p waiting for concurrency", t->is_client? "CLI" : "SVR", s));
stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
maybe_start_some_streams(t);
} else if (s->outgoing_window > 0) {
@@ -1300,7 +1304,8 @@ static void on_header(void *tp, grpc_mdelem *md) {
GPR_ASSERT(s);
- IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:HDR: %s: %s", s->id,
+ IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:%s:HDR: %s: %s", s->id,
+ t->is_client? "CLI" : "SVR",
grpc_mdstr_as_c_string(md->key),
grpc_mdstr_as_c_string(md->value)));
@@ -1872,6 +1877,9 @@ static void finish_reads(transport *t) {
if (*s->publish_state != s->published_state) {
s->published_state = *s->publish_state;
publish = 1;
+ if (s->published_state == GRPC_STREAM_CLOSED) {
+ remove_from_stream_map(t, s);
+ }
}
if (s->parser.incoming_sopb.nops > 0) {
grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb);