diff options
Diffstat (limited to 'src/core/ext/transport/cronet')
-rw-r--r-- | src/core/ext/transport/cronet/transport/cronet_transport.c | 101 |
1 files changed, 62 insertions, 39 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 952690eba7..0b9189558f 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -172,7 +172,7 @@ struct op_state { }; struct op_and_state { - grpc_transport_stream_op op; + grpc_transport_stream_op_batch op; struct op_state state; bool done; struct stream_obj *s; /* Pointer back to the stream object */ @@ -187,7 +187,7 @@ struct op_storage { struct stream_obj { gpr_arena *arena; struct op_and_state *oas; - grpc_transport_stream_op *curr_op; + grpc_transport_stream_op_batch *curr_op; grpc_cronet_transport *curr_ct; grpc_stream *curr_gs; bidirectional_stream *cbs; @@ -298,12 +298,13 @@ static grpc_error *make_error_with_desc(int error_code, const char *desc) { /* Add a new stream op to op storage. */ -static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) { +static void add_to_storage(struct stream_obj *s, + grpc_transport_stream_op_batch *op) { struct op_storage *storage = &s->storage; /* add new op at the beginning of the linked list. The memory is freed in remove_from_storage */ struct op_and_state *new_op = gpr_malloc(sizeof(struct op_and_state)); - memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op)); + memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op_batch)); memset(&new_op->state, 0, sizeof(new_op->state)); new_op->s = s; new_op->done = false; @@ -768,7 +769,7 @@ static bool header_has_authority(grpc_linked_mdelem *head) { Op Execution: Decide if one of the actions contained in the stream op can be executed. This is the heart of the state machine. */ -static bool op_can_be_run(grpc_transport_stream_op *curr_op, +static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op, struct stream_obj *s, struct op_state *op_state, enum e_op_id op_id) { struct op_state *stream_state = &s->state; @@ -919,7 +920,7 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, */ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, struct op_and_state *oas) { - grpc_transport_stream_op *stream_op = &oas->op; + grpc_transport_stream_op_batch *stream_op = &oas->op; struct stream_obj *s = oas->s; grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; struct op_state *stream_state = &s->state; @@ -941,9 +942,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, char *url = NULL; const char *method = "POST"; s->header_array.headers = NULL; - convert_metadata_to_cronet_headers( - stream_op->send_initial_metadata->list.head, t->host, &url, - &s->header_array.headers, &s->header_array.count, &method); + convert_metadata_to_cronet_headers(stream_op->payload->send_initial_metadata + .send_initial_metadata->list.head, + t->host, &url, &s->header_array.headers, + &s->header_array.count, &method); s->header_array.capacity = s->header_array.count; CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url); bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false); @@ -971,8 +973,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, grpc_slice_buffer write_slice_buffer; grpc_slice slice; grpc_slice_buffer_init(&write_slice_buffer); - grpc_byte_stream_next(NULL, stream_op->send_message, &slice, - stream_op->send_message->length, NULL); + grpc_byte_stream_next( + NULL, stream_op->payload->send_message.send_message, &slice, + stream_op->payload->send_message.send_message->length, NULL); grpc_slice_buffer_add(&write_slice_buffer, slice); if (write_slice_buffer.count != 1) { /* Empty request not handled yet */ @@ -982,7 +985,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, if (write_slice_buffer.count > 0) { size_t write_buffer_size; create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer, - &write_buffer_size, stream_op->send_message->flags); + &write_buffer_size, + stream_op->payload->send_message.send_message->flags); CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs, stream_state->ws.write_buffer); stream_state->state_callback_received[OP_SEND_MESSAGE] = false; @@ -1029,20 +1033,28 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, OP_RECV_INITIAL_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE); + grpc_closure_sched( + exec_ctx, + stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, + GRPC_ERROR_NONE); } else if (stream_state->state_callback_received[OP_FAILED]) { - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE); + grpc_closure_sched( + exec_ctx, + stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, + GRPC_ERROR_NONE); } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) { - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE); + grpc_closure_sched( + exec_ctx, + stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, + GRPC_ERROR_NONE); } else { grpc_chttp2_incoming_metadata_buffer_publish( exec_ctx, &oas->s->state.rs.initial_metadata, - stream_op->recv_initial_metadata); - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE); + stream_op->payload->recv_initial_metadata.recv_initial_metadata); + grpc_closure_sched( + exec_ctx, + stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, + GRPC_ERROR_NONE); } stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; result = ACTION_TAKEN_NO_CALLBACK; @@ -1051,27 +1063,31 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { CRONET_LOG(GPR_DEBUG, "Stream is cancelled."); - grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + grpc_closure_sched(exec_ctx, + stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->state_callback_received[OP_FAILED]) { CRONET_LOG(GPR_DEBUG, "Stream failed."); - grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + grpc_closure_sched(exec_ctx, + stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->rs.read_stream_closed == true) { /* No more data will be received */ CRONET_LOG(GPR_DEBUG, "read stream closed"); - grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + grpc_closure_sched(exec_ctx, + stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->flush_read) { CRONET_LOG(GPR_DEBUG, "flush read"); - grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + grpc_closure_sched(exec_ctx, + stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; @@ -1110,8 +1126,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } *((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; - grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, - GRPC_ERROR_NONE); + grpc_closure_sched( + exec_ctx, stream_op->payload->recv_message.recv_message_ready, + GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; @@ -1161,9 +1178,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, if (stream_state->rs.compressed) { stream_state->rs.sbs.base.flags = GRPC_WRITE_INTERNAL_COMPRESS; } - *((grpc_byte_buffer **)stream_op->recv_message) = + *((grpc_byte_buffer **)stream_op->payload->recv_message.recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; - grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + grpc_closure_sched(exec_ctx, + stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; @@ -1187,12 +1205,12 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, if (oas->s->state.rs.trailing_metadata_valid) { grpc_chttp2_incoming_metadata_buffer_publish( exec_ctx, &oas->s->state.rs.trailing_metadata, - stream_op->recv_trailing_metadata); + stream_op->payload->recv_trailing_metadata.recv_trailing_metadata); stream_state->rs.trailing_metadata_valid = false; } stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true; result = ACTION_TAKEN_NO_CALLBACK; - } else if (stream_op->cancel_error && + } else if (stream_op->cancel_stream && op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas); CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs); @@ -1204,7 +1222,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } stream_state->state_op_done[OP_CANCEL_ERROR] = true; if (!stream_state->cancel_error) { - stream_state->cancel_error = GRPC_ERROR_REF(stream_op->cancel_error); + stream_state->cancel_error = + GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error); } } else if (stream_op->on_complete && op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) { @@ -1283,18 +1302,22 @@ static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set) {} static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_transport_stream_op *op) { + grpc_stream *gs, + grpc_transport_stream_op_batch *op) { CRONET_LOG(GPR_DEBUG, "perform_stream_op"); if (op->send_initial_metadata && - header_has_authority(op->send_initial_metadata->list.head)) { + header_has_authority(op->payload->send_initial_metadata + .send_initial_metadata->list.head)) { /* Cronet does not support :authority header field. We cancel the call when this field is present in metadata */ - if (op->recv_initial_metadata_ready) { - grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready, - GRPC_ERROR_CANCELLED); + if (op->recv_initial_metadata) { + grpc_closure_sched( + exec_ctx, + op->payload->recv_initial_metadata.recv_initial_metadata_ready, + GRPC_ERROR_CANCELLED); } - if (op->recv_message_ready) { - grpc_closure_sched(exec_ctx, op->recv_message_ready, + if (op->recv_message) { + grpc_closure_sched(exec_ctx, op->payload->recv_message.recv_message_ready, GRPC_ERROR_CANCELLED); } grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_CANCELLED); |