aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Makarand Dharmapurikar <makarandd@google.com>2016-08-10 13:44:13 -0700
committerGravatar Makarand Dharmapurikar <makarandd@google.com>2016-08-10 13:44:13 -0700
commit198f8b01946ac107700958ac6c2ea11ed523f2a5 (patch)
tree7aa1dcca0f8ad7c1de2e019108c574ae899284ee /src
parentf07506438c012f1f466670f05284594ca6808a26 (diff)
more fixes
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c42
1 files changed, 25 insertions, 17 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index e8746b4e6e..dc6a780eda 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -75,6 +75,7 @@ enum OP_ID {
OP_CANCEL_ERROR,
OP_ON_COMPLETE,
OP_FAILED,
+ OP_SUCCEEDED,
OP_CANCELED,
OP_RECV_MESSAGE_AND_ON_COMPLETE,
OP_READ_REQ_MADE,
@@ -91,6 +92,7 @@ const char *op_id_string[] = {
"OP_CANCEL_ERROR",
"OP_ON_COMPLETE",
"OP_FAILED",
+ "OP_SUCCEEDED",
"OP_CANCELED",
"OP_RECV_MESSAGE_AND_ON_COMPLETE",
"OP_READ_REQ_MADE",
@@ -189,6 +191,8 @@ struct stream_obj {
grpc_stream *curr_gs;
cronet_bidirectional_stream *cbs;
+ // Used for executing callbacks for ops
+ grpc_exec_ctx exec_ctx;
// This holds the state that is at stream level (response and req metadata)
struct op_state state;
@@ -227,7 +231,10 @@ static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) {
static void execute_from_storage(stream_obj *s) {
// Cycle through ops and try to take next action. Break when either
// an action with callback is taken, or no action is possible.
- gpr_mu_lock(&s->mu);
+ // This can be executed from the Cronet network thread via cronet callback
+ // or on the application supplied thread via the perform_stream_op function.
+ if (1) {//gpr_mu_lock(&s->mu) == 0) {
+ gpr_mu_lock(&s->mu);
for (int i = 0; i < s->storage.wrptr; ) {
CRONET_LOG(GPR_DEBUG, "calling execute_stream_op[%d]. done = %d", i, s->storage.pending_ops[i].done);
if (s->storage.pending_ops[i].done) {
@@ -242,7 +249,9 @@ static void execute_from_storage(stream_obj *s) {
break;
}
}
- gpr_mu_unlock(&s->mu);
+ gpr_mu_unlock(&s->mu);
+ }
+ grpc_exec_ctx_finish(&s->exec_ctx);
}
@@ -271,7 +280,9 @@ static void on_succeeded(cronet_bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation;
cronet_bidirectional_stream_destroy(s->cbs);
+ s->state.state_callback_received[OP_FAILED] = true;
s->cbs = NULL;
+ execute_from_storage(s);
}
static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
@@ -380,13 +391,6 @@ static void create_grpc_frame(gpr_slice_buffer *write_slice_buffer,
memcpy(p, GPR_SLICE_START_PTR(slice), length);
}
-static void enqueue_callback(grpc_closure *callback, grpc_error *error) {
- GPR_ASSERT(callback);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_exec_ctx_sched(&exec_ctx, callback, error, NULL);
- grpc_exec_ctx_finish(&exec_ctx);
-}
-
static void convert_metadata_to_cronet_headers(
grpc_linked_mdelem *head,
const char *host,
@@ -498,9 +502,10 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, struct op_state *st
// we haven't sent initial metadata yet
else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) result = false;
// we haven't sent message yet
+ // TODO: Streaming Write case is a problem. What if there is an outstanding write (2nd, 3rd,..) present.
else if (curr_op->send_message && !stream_state->state_op_done[OP_SEND_MESSAGE]) result = false;
// we haven't got on_write_completed for the send yet
- else if (curr_op->send_message && !stream_state->state_callback_received[OP_SEND_MESSAGE]) result = false;
+ else if (stream_state->state_op_done[OP_SEND_MESSAGE] && !stream_state->state_callback_received[OP_SEND_MESSAGE]) result = false;
} else if (op_id == OP_CANCEL_ERROR) {
// already executed
if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
@@ -510,10 +515,12 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, struct op_state *st
// Check if every op that was asked for is done.
else if (curr_op->send_initial_metadata && !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) result = false;
else if (curr_op->send_message && !stream_state->state_op_done[OP_SEND_MESSAGE]) result = false;
+ else if (curr_op->send_message && !stream_state->state_callback_received[OP_SEND_MESSAGE]) result = false;
else if (curr_op->send_trailing_metadata && !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false;
else if (curr_op->recv_initial_metadata && !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) result = false;
else if (curr_op->recv_message && !stream_state->state_op_done[OP_RECV_MESSAGE]) result = false;
else if (curr_op->recv_trailing_metadata) {
+ //if (!stream_state->state_op_done[OP_SUCCEEDED]) result = false; gpr_log(GPR_DEBUG, "HACK!!");
// We aren't done with trailing metadata yet
if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false;
// We've asked for actual message in an earlier op, and it hasn't been delivered yet.
@@ -521,7 +528,7 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, struct op_state *st
else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
// If this op is not the one asking for read, (which means some earlier op has asked), and the
// read hasn't been delivered.
- if(!curr_op->recv_message && !stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE]) result = false;
+ if(!curr_op->recv_message && !stream_state->state_op_done[OP_SUCCEEDED]) result = false;
}
}
// We should see at least one on_write_completed for the trailers that we sent
@@ -563,9 +570,9 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) {
if (!stream_state->state_op_done[OP_CANCEL_ERROR]) {
grpc_chttp2_incoming_metadata_buffer_publish(&oas->s->state.rs.initial_metadata,
stream_op->recv_initial_metadata);
- enqueue_callback(stream_op->recv_initial_metadata_ready, GRPC_ERROR_NONE);
+ grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_initial_metadata_ready, GRPC_ERROR_NONE, NULL);
} else {
- enqueue_callback(stream_op->recv_initial_metadata_ready, GRPC_ERROR_CANCELLED);
+ grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_initial_metadata_ready, GRPC_ERROR_CANCELLED, NULL);
}
stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK;
@@ -595,7 +602,7 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) {
} else if (stream_op->recv_message && op_can_be_run(stream_op, stream_state, &oas->state, OP_RECV_MESSAGE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
- enqueue_callback(stream_op->recv_message_ready, GRPC_ERROR_CANCELLED);
+ grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_message_ready, GRPC_ERROR_CANCELLED, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
} else if (stream_state->rs.length_field_received == false) {
if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES && stream_state->rs.remaining_bytes == 0) {
@@ -620,7 +627,7 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) {
gpr_slice_buffer_init(&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_stream_init(&stream_state->rs.sbs, &stream_state->rs.read_slice_buffer, 0);
*((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs;
- enqueue_callback(stream_op->recv_message_ready, GRPC_ERROR_NONE);
+ grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_message_ready, GRPC_ERROR_NONE, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true; // Also set per op state.
result = ACTION_TAKEN_NO_CALLBACK;
@@ -645,7 +652,7 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) {
gpr_slice_buffer_add(&stream_state->rs.read_slice_buffer, read_data_slice);
grpc_slice_buffer_stream_init(&stream_state->rs.sbs, &stream_state->rs.read_slice_buffer, 0);
*((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs;
- enqueue_callback(stream_op->recv_message_ready, GRPC_ERROR_NONE);
+ grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_message_ready, GRPC_ERROR_NONE, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true; // Also set per op state.
// Clear read state of the stream, so next read op (if it were to come) will work
@@ -682,7 +689,7 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) {
// All ops are complete. Call the on_complete callback
CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
//CRONET_LOG(GPR_DEBUG, "calling on_complete");
- enqueue_callback(stream_op->on_complete, GRPC_ERROR_NONE);
+ grpc_exec_ctx_sched(&s->exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE, NULL);
// Instead of setting stream state, use the op state as on_complete is on per op basis
oas->state.state_op_done[OP_ON_COMPLETE] = true;
oas->done = true; // Mark this op as completed
@@ -714,6 +721,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
memset(s->state.state_op_done, 0, sizeof(s->state.state_op_done));
memset(s->state.state_callback_received, 0, sizeof(s->state.state_callback_received));
gpr_mu_init(&s->mu);
+ s->exec_ctx = *exec_ctx;
return 0;
}