aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport')
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.c10
-rw-r--r--src/core/ext/transport/chttp2/server/chttp2_server.c16
-rw-r--r--src/core/ext/transport/chttp2/server/chttp2_server.h3
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.c2
-rw-r--r--src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c6
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c81
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h11
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_lists.c5
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c2
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c130
10 files changed, 172 insertions, 94 deletions
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c
index 213395c20f..568b114d64 100644
--- a/src/core/ext/transport/chttp2/client/chttp2_connector.c
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c
@@ -56,6 +56,7 @@ typedef struct {
gpr_refcount refs;
bool shutdown;
+ bool connecting;
char *server_name;
grpc_chttp2_create_handshakers_func create_handshakers;
@@ -103,7 +104,9 @@ static void chttp2_connector_shutdown(grpc_exec_ctx *exec_ctx,
}
// If handshaking is not yet in progress, shutdown the endpoint.
// Otherwise, the handshaker will do this for us.
- if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint);
+ if (!c->connecting && c->endpoint != NULL) {
+ grpc_endpoint_shutdown(exec_ctx, c->endpoint);
+ }
gpr_mu_unlock(&c->mu);
}
@@ -192,6 +195,8 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
chttp2_connector *c = arg;
gpr_mu_lock(&c->mu);
+ GPR_ASSERT(c->connecting);
+ c->connecting = false;
if (error != GRPC_ERROR_NONE || c->shutdown) {
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE("connector shutdown");
@@ -202,6 +207,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_closure *notify = c->notify;
c->notify = NULL;
grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
+ if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint);
gpr_mu_unlock(&c->mu);
chttp2_connector_unref(exec_ctx, arg);
} else {
@@ -235,6 +241,8 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(c->endpoint == NULL);
chttp2_connector_ref(con); // Ref taken for callback.
grpc_closure_init(&c->connected, connected, c);
+ GPR_ASSERT(!c->connecting);
+ c->connecting = true;
grpc_tcp_client_connect(exec_ctx, &c->connected, &c->endpoint,
args->interested_parties, args->channel_args,
args->addr, args->deadline);
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c
index 7795606e73..8ee7e29316 100644
--- a/src/core/ext/transport/chttp2/server/chttp2_server.c
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.c
@@ -58,8 +58,8 @@ void grpc_chttp2_server_handshaker_factory_create_handshakers(
grpc_chttp2_server_handshaker_factory *handshaker_factory,
grpc_handshake_manager *handshake_mgr) {
if (handshaker_factory != NULL) {
- handshaker_factory->vtable->create_handshakers(
- exec_ctx, handshaker_factory, handshake_mgr);
+ handshaker_factory->vtable->create_handshakers(exec_ctx, handshaker_factory,
+ handshake_mgr);
}
}
@@ -71,7 +71,6 @@ void grpc_chttp2_server_handshaker_factory_destroy(
}
}
-
typedef struct pending_handshake_manager_node {
grpc_handshake_manager *handshake_mgr;
struct pending_handshake_manager_node *next;
@@ -196,9 +195,9 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
// args instead of hard-coding it.
const gpr_timespec deadline = gpr_time_add(
gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN));
- grpc_handshake_manager_do_handshake(
- exec_ctx, connection_state->handshake_mgr, tcp, state->args, deadline,
- acceptor, on_handshake_done, connection_state);
+ grpc_handshake_manager_do_handshake(exec_ctx, connection_state->handshake_mgr,
+ tcp, state->args, deadline, acceptor,
+ on_handshake_done, connection_state);
}
/* Server callback: start listening on our ports */
@@ -275,9 +274,8 @@ grpc_error *grpc_chttp2_server_add_port(
memset(state, 0, sizeof(*state));
grpc_closure_init(&state->tcp_server_shutdown_complete,
tcp_server_shutdown_complete, state);
- err =
- grpc_tcp_server_create(exec_ctx, &state->tcp_server_shutdown_complete,
- args, &tcp_server);
+ err = grpc_tcp_server_create(exec_ctx, &state->tcp_server_shutdown_complete,
+ args, &tcp_server);
if (err != GRPC_ERROR_NONE) {
goto error;
}
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.h b/src/core/ext/transport/chttp2/server/chttp2_server.h
index b1ff04bcbb..3073399267 100644
--- a/src/core/ext/transport/chttp2/server/chttp2_server.h
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.h
@@ -73,7 +73,6 @@ void grpc_chttp2_server_handshaker_factory_destroy(
grpc_error *grpc_chttp2_server_add_port(
grpc_exec_ctx *exec_ctx, grpc_server *server, const char *addr,
grpc_channel_args *args,
- grpc_chttp2_server_handshaker_factory *handshaker_factory,
- int *port_num);
+ grpc_chttp2_server_handshaker_factory *handshaker_factory, int *port_num);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H */
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index 366312bd72..7e286d4e46 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -45,7 +45,7 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
int port_num = 0;
GRPC_API_TRACE("grpc_server_add_insecure_http2_port(server=%p, addr=%s)", 2,
(server, addr));
- grpc_error* err = grpc_chttp2_server_add_port(
+ grpc_error *err = grpc_chttp2_server_add_port(
&exec_ctx, server, addr,
grpc_channel_args_copy(grpc_server_get_channel_args(server)),
NULL /* handshaker_factory */, &port_num);
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
index 5f41728132..85c21f0ca2 100644
--- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
@@ -64,7 +64,7 @@ static void server_security_handshaker_factory_create_handshakers(
}
static void server_security_handshaker_factory_destroy(
- grpc_exec_ctx* exec_ctx, grpc_chttp2_server_handshaker_factory *hf) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_server_handshaker_factory *hf) {
server_security_handshaker_factory *handshaker_factory =
(server_security_handshaker_factory *)hf;
GRPC_SECURITY_CONNECTOR_UNREF(&handshaker_factory->security_connector->base,
@@ -106,8 +106,8 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
goto done;
}
// Create handshaker factory.
- server_security_handshaker_factory* handshaker_factory =
- gpr_malloc(sizeof(*handshaker_factory));
+ server_security_handshaker_factory *handshaker_factory =
+ gpr_malloc(sizeof(*handshaker_factory));
memset(handshaker_factory, 0, sizeof(*handshaker_factory));
handshaker_factory->base.vtable = &server_security_handshaker_factory_vtable;
handshaker_factory->security_connector = sc;
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 3e7c078d3c..6bc054866b 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -425,7 +425,6 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
/* flush writable stream list to avoid dangling references */
grpc_chttp2_stream *s;
while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
- grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close");
}
end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error));
@@ -521,10 +520,6 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
}
}
- if (s->fail_pending_writes_on_writes_finished_error != NULL) {
- GRPC_ERROR_UNREF(s->fail_pending_writes_on_writes_finished_error);
- }
-
GPR_ASSERT(s->send_initial_metadata_finished == NULL);
GPR_ASSERT(s->fetching_send_message == NULL);
GPR_ASSERT(s->send_trailing_metadata_finished == NULL);
@@ -604,11 +599,13 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
write_state_name(t->write_state),
write_state_name(st), reason));
t->write_state = st;
- if (st == GRPC_CHTTP2_WRITE_STATE_IDLE &&
- t->close_transport_on_writes_finished != NULL) {
- grpc_error *err = t->close_transport_on_writes_finished;
- t->close_transport_on_writes_finished = NULL;
- close_transport_locked(exec_ctx, t, err);
+ if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
+ grpc_exec_ctx_enqueue_list(exec_ctx, &t->run_after_write, NULL);
+ if (t->close_transport_on_writes_finished != NULL) {
+ grpc_error *err = t->close_transport_on_writes_finished;
+ t->close_transport_on_writes_finished = NULL;
+ close_transport_locked(exec_ctx, t, err);
+ }
}
}
@@ -825,7 +822,14 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
}
}
+/* Flag that this closure barrier wants stats to be updated before finishing */
#define CLOSURE_BARRIER_STATS_BIT (1 << 0)
+/* Flag that this closure barrier may be covering a write in a pollset, and so
+ we should not complete this closure until we can prove that the write got
+ scheduled */
+#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 1)
+/* First bit of the reference count, stored in the high order bits (with the low
+ bits being used for flags defined above) */
#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
static grpc_closure *add_closure_barrier(grpc_closure *closure) {
@@ -852,6 +856,16 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
return;
}
closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
+ if (grpc_http_trace) {
+ const char *errstr = grpc_error_string(error);
+ gpr_log(GPR_DEBUG,
+ "complete_closure_step: %p refs=%d flags=0x%04x desc=%s err=%s",
+ closure,
+ (int)(closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT),
+ (int)(closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT),
+ desc, errstr);
+ grpc_error_free_string(errstr);
+ }
if (error != GRPC_ERROR_NONE) {
if (closure->error_data.error == GRPC_ERROR_NONE) {
closure->error_data.error =
@@ -868,7 +882,13 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
grpc_transport_move_stats(&s->stats, s->collecting_stats);
s->collecting_stats = NULL;
}
- grpc_closure_run(exec_ctx, closure, closure->error_data.error);
+ if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
+ !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
+ grpc_closure_run(exec_ctx, closure, closure->error_data.error);
+ } else {
+ grpc_closure_list_append(&t->run_after_write, closure,
+ closure->error_data.error);
+ }
}
}
@@ -1013,6 +1033,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
if (op->send_initial_metadata != NULL) {
GPR_ASSERT(s->send_initial_metadata_finished == NULL);
+ on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->send_initial_metadata_finished = add_closure_barrier(on_complete);
s->send_initial_metadata = op->send_initial_metadata;
const size_t metadata_size =
@@ -1066,6 +1087,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
if (op->send_message != NULL) {
+ on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
if (s->write_closed) {
grpc_chttp2_complete_closure_step(
@@ -1103,6 +1125,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
if (op->send_trailing_metadata != NULL) {
GPR_ASSERT(s->send_trailing_metadata_finished == NULL);
+ on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
s->send_trailing_metadata = op->send_trailing_metadata;
const size_t metadata_size =
@@ -1406,7 +1429,6 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
if (grpc_chttp2_list_remove_writable_stream(t, s)) {
- grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:remove_stream");
}
@@ -1537,41 +1559,9 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s,
return error;
}
-void grpc_chttp2_leave_writing_lists(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s) {
- if (s->need_fail_pending_writes_on_writes_finished) {
- grpc_error *error = s->fail_pending_writes_on_writes_finished_error;
- s->fail_pending_writes_on_writes_finished_error = NULL;
- s->need_fail_pending_writes_on_writes_finished = false;
- grpc_chttp2_fail_pending_writes(exec_ctx, t, s, error);
- }
-}
-
void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error) {
- if (s->need_fail_pending_writes_on_writes_finished ||
- (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE &&
- (s->included[GRPC_CHTTP2_LIST_WRITABLE] ||
- s->included[GRPC_CHTTP2_LIST_WRITING]))) {
- /* If a write is in progress, and it involves this stream, wait for the
- * write to complete before cancelling things out. If we don't do this, then
- * our combiner lock might think that some operation on its queue might be
- * covering a completion even though there is none, in which case we might
- * offload to another thread, which isn't guarateed to exist */
- if (error != GRPC_ERROR_NONE) {
- if (s->fail_pending_writes_on_writes_finished_error == GRPC_ERROR_NONE) {
- s->fail_pending_writes_on_writes_finished_error = GRPC_ERROR_CREATE(
- "Post-poned fail writes due to in-progress write");
- }
- s->fail_pending_writes_on_writes_finished_error = grpc_error_add_child(
- s->fail_pending_writes_on_writes_finished_error, error);
- }
- s->need_fail_pending_writes_on_writes_finished = true;
- return; /* early out */
- }
-
error =
removal_error(error, s, "Pending writes failed due to stream closure");
s->send_initial_metadata = NULL;
@@ -1632,6 +1622,9 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
if (s->id != 0) {
remove_stream(exec_ctx, t, s->id,
removal_error(GRPC_ERROR_REF(error), s, "Stream removed"));
+ } else {
+ /* Purge streams waiting on concurrency still waiting for id assignment */
+ grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2");
}
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 6cba1e7fd2..b727965d43 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -327,6 +327,9 @@ struct grpc_chttp2_transport {
*/
grpc_error *close_transport_on_writes_finished;
+ /* a list of closures to run after writes are finished */
+ grpc_closure_list run_after_write;
+
/* buffer pool state */
/** have we scheduled a benign cleanup? */
bool benign_reclaimer_registered;
@@ -409,9 +412,6 @@ struct grpc_chttp2_stream {
grpc_error *read_closed_error;
/** the error that resulted in this stream being write-closed */
grpc_error *write_closed_error;
- /** should any writes be cleared once this stream becomes non-writable */
- bool need_fail_pending_writes_on_writes_finished;
- grpc_error *fail_pending_writes_on_writes_finished_error;
grpc_published_metadata_method published_metadata[2];
bool final_metadata_requested;
@@ -496,6 +496,8 @@ void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
int grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t,
grpc_chttp2_stream **s);
+void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s);
void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
@@ -692,9 +694,6 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
-void grpc_chttp2_leave_writing_lists(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s);
void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error);
diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c
index 6d25b3ae57..a60264cc51 100644
--- a/src/core/ext/transport/chttp2/transport/stream_lists.c
+++ b/src/core/ext/transport/chttp2/transport/stream_lists.c
@@ -158,6 +158,11 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t,
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
}
+void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s) {
+ stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
+}
+
void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 769b229a0d..139e7387c4 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -208,7 +208,6 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:already_writing");
}
} else {
- grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:no_write");
}
}
@@ -253,7 +252,6 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, !t->is_client, 1,
GRPC_ERROR_REF(error));
}
- grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:end");
}
grpc_slice_buffer_reset_and_unref(&t->outbuf);
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index a4c110101e..afc59f4b12 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -149,6 +149,9 @@ struct write_state {
struct op_state {
bool state_op_done[OP_NUM_OPS];
bool state_callback_received[OP_NUM_OPS];
+ bool fail_state;
+ bool flush_read;
+ grpc_error *cancel_error;
/* data structure for storing data coming from server */
struct read_state rs;
/* data structure for storing data going to the server */
@@ -248,6 +251,12 @@ static void free_read_buffer(stream_obj *s) {
}
}
+static grpc_error *make_error_with_desc(int error_code, const char *desc) {
+ grpc_error *error = GRPC_ERROR_CREATE(desc);
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, error_code);
+ return error;
+}
+
/*
Add a new stream op to op storage.
*/
@@ -433,6 +442,18 @@ static void on_response_headers_received(
grpc_mdstr_from_string(headers->headers[i].value)));
}
s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
+ if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
+ s->state.state_callback_received[OP_FAILED])) {
+ /* Do an extra read to trigger on_succeeded() callback in case connection
+ is closed */
+ GPR_ASSERT(s->state.rs.length_field_received == false);
+ s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
+ s->state.rs.received_bytes = 0;
+ s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
+ cronet_bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
+ s->state.rs.remaining_bytes);
+ }
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -464,7 +485,11 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
count);
gpr_mu_lock(&s->mu);
s->state.state_callback_received[OP_RECV_MESSAGE] = true;
- if (count > 0) {
+ if (count > 0 && s->state.flush_read) {
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
+ cronet_bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, 4096);
+ gpr_mu_unlock(&s->mu);
+ } else if (count > 0) {
s->state.rs.received_bytes += count;
s->state.rs.remaining_bytes -= count;
if (s->state.rs.remaining_bytes > 0) {
@@ -479,6 +504,10 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
execute_from_storage(s);
}
} else {
+ if (s->state.flush_read) {
+ gpr_free(s->state.rs.read_buffer);
+ s->state.rs.read_buffer = NULL;
+ }
s->state.rs.read_stream_closed = true;
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
@@ -508,10 +537,27 @@ static void on_response_trailers_received(
grpc_mdstr_from_string(trailers->headers[i].key),
grpc_mdstr_from_string(trailers->headers[i].value)));
s->state.rs.trailing_metadata_valid = true;
+ if (0 == strcmp(trailers->headers[i].key, "grpc-status") &&
+ 0 != strcmp(trailers->headers[i].value, "0")) {
+ s->state.fail_state = true;
+ }
}
s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
- gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ /* Send a EOS when server terminates the stream (testServerFinishesRequest) to
+ * trigger on_succeeded */
+ if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] &&
+ !(s->state.state_op_done[OP_CANCEL_ERROR] ||
+ s->state.state_callback_received[OP_FAILED])) {
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", s->cbs);
+ s->state.state_callback_received[OP_SEND_MESSAGE] = false;
+ cronet_bidirectional_stream_write(s->cbs, "", 0, true);
+ s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
+
+ gpr_mu_unlock(&s->mu);
+ } else {
+ gpr_mu_unlock(&s->mu);
+ execute_from_storage(s);
+ }
}
/*
@@ -632,9 +678,9 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op,
/* When call is canceled, every op can be run, except under following
conditions
*/
- bool is_canceled_of_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
+ bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
stream_state->state_callback_received[OP_FAILED];
- if (is_canceled_of_failed) {
+ if (is_canceled_or_failed) {
if (op_id == OP_SEND_INITIAL_METADATA) result = false;
if (op_id == OP_SEND_MESSAGE) result = false;
if (op_id == OP_SEND_TRAILING_METADATA) result = false;
@@ -778,16 +824,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state,
OP_SEND_INITIAL_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
- /* This OP is the beginning. Reset various states */
- memset(&s->header_array, 0, sizeof(s->header_array));
- memset(&stream_state->rs, 0, sizeof(stream_state->rs));
- memset(&stream_state->ws, 0, sizeof(stream_state->ws));
- memset(stream_state->state_op_done, 0, sizeof(stream_state->state_op_done));
- memset(stream_state->state_callback_received, 0,
- sizeof(stream_state->state_callback_received));
/* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
* on_failed */
GPR_ASSERT(s->cbs == NULL);
+ GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs,
&cronet_callbacks);
CRONET_LOG(GPR_DEBUG, "%p = cronet_bidirectional_stream_create()", s->cbs);
@@ -808,10 +848,13 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state,
OP_RECV_INITIAL_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
- if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
- stream_state->state_callback_received[OP_FAILED]) {
+ if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
GRPC_ERROR_CANCELLED, NULL);
+ } else if (stream_state->state_callback_received[OP_FAILED]) {
+ grpc_exec_ctx_sched(
+ exec_ctx, stream_op->recv_initial_metadata_ready,
+ make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL);
} else {
grpc_chttp2_incoming_metadata_buffer_publish(
&oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata);
@@ -865,12 +908,19 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state,
OP_RECV_MESSAGE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
- if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
- stream_state->state_callback_received[OP_FAILED]) {
- CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
+ if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
+ CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
GRPC_ERROR_CANCELLED, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
+ result = ACTION_TAKEN_NO_CALLBACK;
+ } else if (stream_state->state_callback_received[OP_FAILED]) {
+ CRONET_LOG(GPR_DEBUG, "Stream failed.");
+ grpc_exec_ctx_sched(
+ exec_ctx, stream_op->recv_message_ready,
+ make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL);
+ stream_state->state_op_done[OP_RECV_MESSAGE] = true;
+ result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->rs.read_stream_closed == true) {
/* No more data will be received */
CRONET_LOG(GPR_DEBUG, "read stream closed");
@@ -878,6 +928,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_NONE, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
+ result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->rs.length_field_received == false) {
if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
stream_state->rs.remaining_bytes == 0) {
@@ -946,10 +997,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_NONE, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
- /* Clear read state of the stream, so next read op (if it were to come)
- * will work */
- stream_state->rs.received_bytes = stream_state->rs.remaining_bytes =
- stream_state->rs.length_field_received = 0;
+ /* Do an extra read to trigger on_succeeded() callback in case connection
+ is closed */
+ stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
+ stream_state->rs.received_bytes = 0;
+ stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
+ stream_state->rs.length_field_received = false;
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
+ cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
+ stream_state->rs.remaining_bytes);
result = ACTION_TAKEN_NO_CALLBACK;
}
} else if (stream_op->recv_trailing_metadata &&
@@ -986,17 +1042,25 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
CRONET_LOG(GPR_DEBUG, "W: cronet_bidirectional_stream_cancel(%p)", s->cbs);
if (s->cbs) {
cronet_bidirectional_stream_cancel(s->cbs);
+ result = ACTION_TAKEN_WITH_CALLBACK;
+ } else {
+ result = ACTION_TAKEN_NO_CALLBACK;
}
stream_state->state_op_done[OP_CANCEL_ERROR] = true;
- result = ACTION_TAKEN_WITH_CALLBACK;
+ if (!stream_state->cancel_error) {
+ stream_state->cancel_error = GRPC_ERROR_REF(stream_op->cancel_error);
+ }
} else if (stream_op->on_complete &&
op_can_be_run(stream_op, stream_state, &oas->state,
OP_ON_COMPLETE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
- if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
- stream_state->state_callback_received[OP_FAILED]) {
+ if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete,
- GRPC_ERROR_CANCELLED, NULL);
+ GRPC_ERROR_REF(stream_state->cancel_error), NULL);
+ } else if (stream_state->state_callback_received[OP_FAILED]) {
+ grpc_exec_ctx_sched(
+ exec_ctx, stream_op->on_complete,
+ make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL);
} else {
/* All actions in this stream_op are complete. Call the on_complete
* callback
@@ -1017,6 +1081,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
make a note */
if (stream_op->recv_message)
stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
+ } else if (stream_state->fail_state && !stream_state->flush_read) {
+ CRONET_LOG(GPR_DEBUG, "running: %p flush read", oas);
+ if (stream_state->rs.read_buffer &&
+ stream_state->rs.read_buffer != stream_state->rs.grpc_header_bytes) {
+ gpr_free(stream_state->rs.read_buffer);
+ stream_state->rs.read_buffer = NULL;
+ }
+ stream_state->rs.read_buffer = gpr_malloc(4096);
+ stream_state->flush_read = true;
} else {
result = NO_ACTION_POSSIBLE;
}
@@ -1042,6 +1115,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
memset(s->state.state_op_done, 0, sizeof(s->state.state_op_done));
memset(s->state.state_callback_received, 0,
sizeof(s->state.state_callback_received));
+ s->state.fail_state = s->state.flush_read = false;
+ s->state.cancel_error = NULL;
gpr_mu_init(&s->mu);
return 0;
}
@@ -1088,7 +1163,10 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
}
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, void *and_free_memory) {}
+ grpc_stream *gs, void *and_free_memory) {
+ stream_obj *s = (stream_obj *)gs;
+ GRPC_ERROR_UNREF(s->state.cancel_error);
+}
static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {}