diff options
author | Makarand Dharmapurikar <makarandd@google.com> | 2016-08-10 13:44:13 -0700 |
---|---|---|
committer | Makarand Dharmapurikar <makarandd@google.com> | 2016-08-10 13:44:13 -0700 |
commit | 198f8b01946ac107700958ac6c2ea11ed523f2a5 (patch) | |
tree | 7aa1dcca0f8ad7c1de2e019108c574ae899284ee /src | |
parent | f07506438c012f1f466670f05284594ca6808a26 (diff) |
more fixes
Diffstat (limited to 'src')
-rw-r--r-- | src/core/ext/transport/cronet/transport/cronet_transport.c | 42 |
1 files changed, 25 insertions, 17 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index e8746b4e6e..dc6a780eda 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -75,6 +75,7 @@ enum OP_ID { OP_CANCEL_ERROR, OP_ON_COMPLETE, OP_FAILED, + OP_SUCCEEDED, OP_CANCELED, OP_RECV_MESSAGE_AND_ON_COMPLETE, OP_READ_REQ_MADE, @@ -91,6 +92,7 @@ const char *op_id_string[] = { "OP_CANCEL_ERROR", "OP_ON_COMPLETE", "OP_FAILED", + "OP_SUCCEEDED", "OP_CANCELED", "OP_RECV_MESSAGE_AND_ON_COMPLETE", "OP_READ_REQ_MADE", @@ -189,6 +191,8 @@ struct stream_obj { grpc_stream *curr_gs; cronet_bidirectional_stream *cbs; + // Used for executing callbacks for ops + grpc_exec_ctx exec_ctx; // This holds the state that is at stream level (response and req metadata) struct op_state state; @@ -227,7 +231,10 @@ static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) { static void execute_from_storage(stream_obj *s) { // Cycle through ops and try to take next action. Break when either // an action with callback is taken, or no action is possible. - gpr_mu_lock(&s->mu); + // This can be executed from the Cronet network thread via cronet callback + // or on the application supplied thread via the perform_stream_op function. + if (1) {//gpr_mu_lock(&s->mu) == 0) { + gpr_mu_lock(&s->mu); for (int i = 0; i < s->storage.wrptr; ) { CRONET_LOG(GPR_DEBUG, "calling execute_stream_op[%d]. done = %d", i, s->storage.pending_ops[i].done); if (s->storage.pending_ops[i].done) { @@ -242,7 +249,9 @@ static void execute_from_storage(stream_obj *s) { break; } } - gpr_mu_unlock(&s->mu); + gpr_mu_unlock(&s->mu); + } + grpc_exec_ctx_finish(&s->exec_ctx); } @@ -271,7 +280,9 @@ static void on_succeeded(cronet_bidirectional_stream *stream) { CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream); stream_obj *s = (stream_obj *)stream->annotation; cronet_bidirectional_stream_destroy(s->cbs); + s->state.state_callback_received[OP_FAILED] = true; s->cbs = NULL; + execute_from_storage(s); } static void on_request_headers_sent(cronet_bidirectional_stream *stream) { @@ -380,13 +391,6 @@ static void create_grpc_frame(gpr_slice_buffer *write_slice_buffer, memcpy(p, GPR_SLICE_START_PTR(slice), length); } -static void enqueue_callback(grpc_closure *callback, grpc_error *error) { - GPR_ASSERT(callback); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_exec_ctx_sched(&exec_ctx, callback, error, NULL); - grpc_exec_ctx_finish(&exec_ctx); -} - static void convert_metadata_to_cronet_headers( grpc_linked_mdelem *head, const char *host, @@ -498,9 +502,10 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, struct op_state *st // we haven't sent initial metadata yet else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) result = false; // we haven't sent message yet + // TODO: Streaming Write case is a problem. What if there is an outstanding write (2nd, 3rd,..) present. else if (curr_op->send_message && !stream_state->state_op_done[OP_SEND_MESSAGE]) result = false; // we haven't got on_write_completed for the send yet - else if (curr_op->send_message && !stream_state->state_callback_received[OP_SEND_MESSAGE]) result = false; + else if (stream_state->state_op_done[OP_SEND_MESSAGE] && !stream_state->state_callback_received[OP_SEND_MESSAGE]) result = false; } else if (op_id == OP_CANCEL_ERROR) { // already executed if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false; @@ -510,10 +515,12 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, struct op_state *st // Check if every op that was asked for is done. else if (curr_op->send_initial_metadata && !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) result = false; else if (curr_op->send_message && !stream_state->state_op_done[OP_SEND_MESSAGE]) result = false; + else if (curr_op->send_message && !stream_state->state_callback_received[OP_SEND_MESSAGE]) result = false; else if (curr_op->send_trailing_metadata && !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false; else if (curr_op->recv_initial_metadata && !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) result = false; else if (curr_op->recv_message && !stream_state->state_op_done[OP_RECV_MESSAGE]) result = false; else if (curr_op->recv_trailing_metadata) { + //if (!stream_state->state_op_done[OP_SUCCEEDED]) result = false; gpr_log(GPR_DEBUG, "HACK!!"); // We aren't done with trailing metadata yet if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false; // We've asked for actual message in an earlier op, and it hasn't been delivered yet. @@ -521,7 +528,7 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, struct op_state *st else if (stream_state->state_op_done[OP_READ_REQ_MADE]) { // If this op is not the one asking for read, (which means some earlier op has asked), and the // read hasn't been delivered. - if(!curr_op->recv_message && !stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE]) result = false; + if(!curr_op->recv_message && !stream_state->state_op_done[OP_SUCCEEDED]) result = false; } } // We should see at least one on_write_completed for the trailers that we sent @@ -563,9 +570,9 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) { if (!stream_state->state_op_done[OP_CANCEL_ERROR]) { grpc_chttp2_incoming_metadata_buffer_publish(&oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata); - enqueue_callback(stream_op->recv_initial_metadata_ready, GRPC_ERROR_NONE); + grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_initial_metadata_ready, GRPC_ERROR_NONE, NULL); } else { - enqueue_callback(stream_op->recv_initial_metadata_ready, GRPC_ERROR_CANCELLED); + grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_initial_metadata_ready, GRPC_ERROR_CANCELLED, NULL); } stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; result = ACTION_TAKEN_NO_CALLBACK; @@ -595,7 +602,7 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) { } else if (stream_op->recv_message && op_can_be_run(stream_op, stream_state, &oas->state, OP_RECV_MESSAGE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { - enqueue_callback(stream_op->recv_message_ready, GRPC_ERROR_CANCELLED); + grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_message_ready, GRPC_ERROR_CANCELLED, NULL); stream_state->state_op_done[OP_RECV_MESSAGE] = true; } else if (stream_state->rs.length_field_received == false) { if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES && stream_state->rs.remaining_bytes == 0) { @@ -620,7 +627,7 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) { gpr_slice_buffer_init(&stream_state->rs.read_slice_buffer); grpc_slice_buffer_stream_init(&stream_state->rs.sbs, &stream_state->rs.read_slice_buffer, 0); *((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; - enqueue_callback(stream_op->recv_message_ready, GRPC_ERROR_NONE); + grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_message_ready, GRPC_ERROR_NONE, NULL); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; // Also set per op state. result = ACTION_TAKEN_NO_CALLBACK; @@ -645,7 +652,7 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) { gpr_slice_buffer_add(&stream_state->rs.read_slice_buffer, read_data_slice); grpc_slice_buffer_stream_init(&stream_state->rs.sbs, &stream_state->rs.read_slice_buffer, 0); *((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; - enqueue_callback(stream_op->recv_message_ready, GRPC_ERROR_NONE); + grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_message_ready, GRPC_ERROR_NONE, NULL); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; // Also set per op state. // Clear read state of the stream, so next read op (if it were to come) will work @@ -682,7 +689,7 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) { // All ops are complete. Call the on_complete callback CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas); //CRONET_LOG(GPR_DEBUG, "calling on_complete"); - enqueue_callback(stream_op->on_complete, GRPC_ERROR_NONE); + grpc_exec_ctx_sched(&s->exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE, NULL); // Instead of setting stream state, use the op state as on_complete is on per op basis oas->state.state_op_done[OP_ON_COMPLETE] = true; oas->done = true; // Mark this op as completed @@ -714,6 +721,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, memset(s->state.state_op_done, 0, sizeof(s->state.state_op_done)); memset(s->state.state_callback_received, 0, sizeof(s->state.state_callback_received)); gpr_mu_init(&s->mu); + s->exec_ctx = *exec_ctx; return 0; } |