diff options
Diffstat (limited to 'src/core/ext/transport')
3 files changed, 82 insertions, 90 deletions
diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c index df1acddcc0..477cf07f45 100644 --- a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c +++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c @@ -60,7 +60,7 @@ GRPCAPI grpc_channel *grpc_cronet_secure_channel_create( ct->host = gpr_malloc(strlen(target) + 1); strcpy(ct->host, target); gpr_log(GPR_DEBUG, - "grpc_create_cronet_transport: cronet_engine = %p, target=%s", engine, + "grpc_create_cronet_transport: stream_engine = %p, target=%s", engine, ct->host); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; diff --git a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c index 687026c9fd..74327a4214 100644 --- a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c +++ b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c @@ -38,48 +38,46 @@ library, so we can build it in all environments */ #include <grpc/support/log.h> -#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h" +#include "third_party/Cronet/bidirectional_stream_c.h" #ifdef GRPC_COMPILE_WITH_CRONET /* link with the real CRONET library in the build system */ #else /* Dummy implementation of cronet API just to test for build-ability */ -cronet_bidirectional_stream* cronet_bidirectional_stream_create( - cronet_engine* engine, void* annotation, - cronet_bidirectional_stream_callback* callback) { +bidirectional_stream* bidirectional_stream_create( + stream_engine* engine, void* annotation, + bidirectional_stream_callback* callback) { GPR_ASSERT(0); return NULL; } -int cronet_bidirectional_stream_destroy(cronet_bidirectional_stream* stream) { +int bidirectional_stream_destroy(bidirectional_stream* stream) { GPR_ASSERT(0); return 0; } -int cronet_bidirectional_stream_start( - cronet_bidirectional_stream* stream, const char* url, int priority, - const char* method, const cronet_bidirectional_stream_header_array* headers, - bool end_of_stream) { +int bidirectional_stream_start(bidirectional_stream* stream, const char* url, + int priority, const char* method, + const bidirectional_stream_header_array* headers, + bool end_of_stream) { GPR_ASSERT(0); return 0; } -int cronet_bidirectional_stream_read(cronet_bidirectional_stream* stream, - char* buffer, int capacity) { +int bidirectional_stream_read(bidirectional_stream* stream, char* buffer, + int capacity) { GPR_ASSERT(0); return 0; } -int cronet_bidirectional_stream_write(cronet_bidirectional_stream* stream, - const char* buffer, int count, - bool end_of_stream) { +int bidirectional_stream_write(bidirectional_stream* stream, const char* buffer, + int count, bool end_of_stream) { GPR_ASSERT(0); return 0; } -int cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) { +void bidirectional_stream_cancel(bidirectional_stream* stream) { GPR_ASSERT(0); - return 0; } #endif /* GRPC_COMPILE_WITH_CRONET */ diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 296c406324..b080408565 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -49,7 +49,7 @@ #include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/transport_impl.h" -#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h" +#include "third_party/Cronet/bidirectional_stream_c.h" #define GRPC_HEADER_SIZE_IN_BYTES 5 @@ -86,19 +86,18 @@ enum e_op_id { /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */ -static void on_request_headers_sent(cronet_bidirectional_stream *); +static void on_request_headers_sent(bidirectional_stream *); static void on_response_headers_received( - cronet_bidirectional_stream *, - const cronet_bidirectional_stream_header_array *, const char *); -static void on_write_completed(cronet_bidirectional_stream *, const char *); -static void on_read_completed(cronet_bidirectional_stream *, char *, int); + bidirectional_stream *, const bidirectional_stream_header_array *, + const char *); +static void on_write_completed(bidirectional_stream *, const char *); +static void on_read_completed(bidirectional_stream *, char *, int); static void on_response_trailers_received( - cronet_bidirectional_stream *, - const cronet_bidirectional_stream_header_array *); -static void on_succeeded(cronet_bidirectional_stream *); -static void on_failed(cronet_bidirectional_stream *, int); -static void on_canceled(cronet_bidirectional_stream *); -static cronet_bidirectional_stream_callback cronet_callbacks = { + bidirectional_stream *, const bidirectional_stream_header_array *); +static void on_succeeded(bidirectional_stream *); +static void on_failed(bidirectional_stream *, int); +static void on_canceled(bidirectional_stream *); +static bidirectional_stream_callback cronet_callbacks = { on_request_headers_sent, on_response_headers_received, on_read_completed, @@ -111,7 +110,7 @@ static cronet_bidirectional_stream_callback cronet_callbacks = { /* Cronet transport object */ struct grpc_cronet_transport { grpc_transport base; /* must be first element in this structure */ - cronet_engine *engine; + stream_engine *engine; char *host; }; typedef struct grpc_cronet_transport grpc_cronet_transport; @@ -176,8 +175,8 @@ struct stream_obj { grpc_transport_stream_op *curr_op; grpc_cronet_transport curr_ct; grpc_stream *curr_gs; - cronet_bidirectional_stream *cbs; - cronet_bidirectional_stream_header_array header_array; + bidirectional_stream *cbs; + bidirectional_stream_header_array header_array; /* Stream level state. Some state will be tracked both at stream and stream_op * level */ @@ -344,11 +343,11 @@ static void execute_from_storage(stream_obj *s) { /* Cronet callback */ -static void on_failed(cronet_bidirectional_stream *stream, int net_error) { +static void on_failed(bidirectional_stream *stream, int net_error) { CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error); stream_obj *s = (stream_obj *)stream->annotation; gpr_mu_lock(&s->mu); - cronet_bidirectional_stream_destroy(s->cbs); + bidirectional_stream_destroy(s->cbs); s->state.state_callback_received[OP_FAILED] = true; s->cbs = NULL; if (s->header_array.headers) { @@ -367,11 +366,11 @@ static void on_failed(cronet_bidirectional_stream *stream, int net_error) { /* Cronet callback */ -static void on_canceled(cronet_bidirectional_stream *stream) { +static void on_canceled(bidirectional_stream *stream) { CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream); stream_obj *s = (stream_obj *)stream->annotation; gpr_mu_lock(&s->mu); - cronet_bidirectional_stream_destroy(s->cbs); + bidirectional_stream_destroy(s->cbs); s->state.state_callback_received[OP_CANCELED] = true; s->cbs = NULL; if (s->header_array.headers) { @@ -390,11 +389,11 @@ static void on_canceled(cronet_bidirectional_stream *stream) { /* Cronet callback */ -static void on_succeeded(cronet_bidirectional_stream *stream) { +static void on_succeeded(bidirectional_stream *stream) { CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream); stream_obj *s = (stream_obj *)stream->annotation; gpr_mu_lock(&s->mu); - cronet_bidirectional_stream_destroy(s->cbs); + bidirectional_stream_destroy(s->cbs); s->state.state_callback_received[OP_SUCCEEDED] = true; s->cbs = NULL; free_read_buffer(s); @@ -405,7 +404,7 @@ static void on_succeeded(cronet_bidirectional_stream *stream) { /* Cronet callback */ -static void on_request_headers_sent(cronet_bidirectional_stream *stream) { +static void on_request_headers_sent(bidirectional_stream *stream) { CRONET_LOG(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream); stream_obj *s = (stream_obj *)stream->annotation; gpr_mu_lock(&s->mu); @@ -424,8 +423,8 @@ static void on_request_headers_sent(cronet_bidirectional_stream *stream) { Cronet callback */ static void on_response_headers_received( - cronet_bidirectional_stream *stream, - const cronet_bidirectional_stream_header_array *headers, + bidirectional_stream *stream, + const bidirectional_stream_header_array *headers, const char *negotiated_protocol) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream, @@ -451,9 +450,9 @@ static void on_response_headers_received( s->state.rs.read_buffer = s->state.rs.grpc_header_bytes; s->state.rs.received_bytes = 0; s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; - CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); - cronet_bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, - s->state.rs.remaining_bytes); + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); + bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, + s->state.rs.remaining_bytes); } gpr_mu_unlock(&s->mu); grpc_exec_ctx_finish(&exec_ctx); @@ -463,8 +462,7 @@ static void on_response_headers_received( /* Cronet callback */ -static void on_write_completed(cronet_bidirectional_stream *stream, - const char *data) { +static void on_write_completed(bidirectional_stream *stream, const char *data) { stream_obj *s = (stream_obj *)stream->annotation; CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data); gpr_mu_lock(&s->mu); @@ -480,7 +478,7 @@ static void on_write_completed(cronet_bidirectional_stream *stream, /* Cronet callback */ -static void on_read_completed(cronet_bidirectional_stream *stream, char *data, +static void on_read_completed(bidirectional_stream *stream, char *data, int count) { stream_obj *s = (stream_obj *)stream->annotation; CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data, @@ -488,16 +486,16 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data, gpr_mu_lock(&s->mu); s->state.state_callback_received[OP_RECV_MESSAGE] = true; if (count > 0 && s->state.flush_read) { - CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); - cronet_bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, 4096); + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); + bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, 4096); gpr_mu_unlock(&s->mu); } else if (count > 0) { s->state.rs.received_bytes += count; s->state.rs.remaining_bytes -= count; if (s->state.rs.remaining_bytes > 0) { - CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); s->state.state_op_done[OP_READ_REQ_MADE] = true; - cronet_bidirectional_stream_read( + bidirectional_stream_read( s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes, s->state.rs.remaining_bytes); gpr_mu_unlock(&s->mu); @@ -520,8 +518,8 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data, Cronet callback */ static void on_response_trailers_received( - cronet_bidirectional_stream *stream, - const cronet_bidirectional_stream_header_array *trailers) { + bidirectional_stream *stream, + const bidirectional_stream_header_array *trailers) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream, trailers); @@ -551,9 +549,9 @@ static void on_response_trailers_received( if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] && !(s->state.state_op_done[OP_CANCEL_ERROR] || s->state.state_callback_received[OP_FAILED])) { - CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", s->cbs); + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs); s->state.state_callback_received[OP_SEND_MESSAGE] = false; - cronet_bidirectional_stream_write(s->cbs, "", 0, true); + bidirectional_stream_write(s->cbs, "", 0, true); s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true; gpr_mu_unlock(&s->mu); @@ -594,7 +592,7 @@ static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer, */ static void convert_metadata_to_cronet_headers( grpc_linked_mdelem *head, const char *host, char **pp_url, - cronet_bidirectional_stream_header **pp_headers, size_t *p_num_headers, + bidirectional_stream_header **pp_headers, size_t *p_num_headers, const char **method) { grpc_linked_mdelem *curr = head; /* Walk the linked list and get number of header fields */ @@ -605,9 +603,9 @@ static void convert_metadata_to_cronet_headers( } /* Allocate enough memory. It is freed in the on_request_headers_sent callback */ - cronet_bidirectional_stream_header *headers = - (cronet_bidirectional_stream_header *)gpr_malloc( - sizeof(cronet_bidirectional_stream_header) * num_headers_available); + bidirectional_stream_header *headers = + (bidirectional_stream_header *)gpr_malloc( + sizeof(bidirectional_stream_header) * num_headers_available); *pp_headers = headers; /* Walk the linked list again, this time copying the header fields. @@ -833,9 +831,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, * on_failed */ GPR_ASSERT(s->cbs == NULL); GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]); - s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs, - &cronet_callbacks); - CRONET_LOG(GPR_DEBUG, "%p = cronet_bidirectional_stream_create()", s->cbs); + s->cbs = bidirectional_stream_create(s->curr_ct.engine, s->curr_gs, + &cronet_callbacks); + CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs); char *url = NULL; const char *method = "POST"; s->header_array.headers = NULL; @@ -843,10 +841,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url, &s->header_array.headers, &s->header_array.count, &method); s->header_array.capacity = s->header_array.count; - CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_start(%p, %s)", s->cbs, - url); - cronet_bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, - false); + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url); + bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false); stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true; result = ACTION_TAKEN_WITH_CALLBACK; } else if (stream_op->recv_initial_metadata && @@ -897,11 +893,11 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, size_t write_buffer_size; create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer, &write_buffer_size); - CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, %p)", - s->cbs, stream_state->ws.write_buffer); + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs, + stream_state->ws.write_buffer); stream_state->state_callback_received[OP_SEND_MESSAGE] = false; - cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, - (int)write_buffer_size, false); + bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, + (int)write_buffer_size, false); result = ACTION_TAKEN_WITH_CALLBACK; } else { result = NO_ACTION_POSSIBLE; @@ -949,11 +945,11 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, GPR_ASSERT(stream_state->rs.read_buffer); stream_state->rs.remaining_bytes = stream_state->rs.length_field; stream_state->rs.received_bytes = 0; - CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); stream_state->state_op_done[OP_READ_REQ_MADE] = true; /* Indicates that at least one read request has been made */ - cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, - stream_state->rs.remaining_bytes); + bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, + stream_state->rs.remaining_bytes); result = ACTION_TAKEN_WITH_CALLBACK; } else { stream_state->rs.remaining_bytes = 0; @@ -974,11 +970,11 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; stream_state->rs.received_bytes = 0; - CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); stream_state->state_op_done[OP_READ_REQ_MADE] = true; /* Indicates that at least one read request has been made */ - cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, - stream_state->rs.remaining_bytes); + bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, + stream_state->rs.remaining_bytes); result = ACTION_TAKEN_WITH_CALLBACK; } else { result = NO_ACTION_POSSIBLE; @@ -1008,9 +1004,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->rs.received_bytes = 0; stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; stream_state->rs.length_field_received = false; - CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); - cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, - stream_state->rs.remaining_bytes); + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); + bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, + stream_state->rs.remaining_bytes); result = ACTION_TAKEN_NO_CALLBACK; } } else if (stream_op->recv_trailing_metadata && @@ -1033,10 +1029,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, result = NO_ACTION_POSSIBLE; CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); } else { - CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", - s->cbs); + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs); stream_state->state_callback_received[OP_SEND_MESSAGE] = false; - cronet_bidirectional_stream_write(s->cbs, "", 0, true); + bidirectional_stream_write(s->cbs, "", 0, true); result = ACTION_TAKEN_WITH_CALLBACK; } stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true; @@ -1044,9 +1039,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, op_can_be_run(stream_op, stream_state, &oas->state, OP_CANCEL_ERROR)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas); - CRONET_LOG(GPR_DEBUG, "W: cronet_bidirectional_stream_cancel(%p)", s->cbs); + CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs); if (s->cbs) { - cronet_bidirectional_stream_cancel(s->cbs); + bidirectional_stream_cancel(s->cbs); result = ACTION_TAKEN_WITH_CALLBACK; } else { result = ACTION_TAKEN_NO_CALLBACK; @@ -1143,18 +1138,17 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, header_has_authority(op->send_initial_metadata->list.head)) { /* Cronet does not support :authority header field. We cancel the call when this field is present in metadata */ - cronet_bidirectional_stream_header_array header_array; - cronet_bidirectional_stream_header *header; - cronet_bidirectional_stream cbs; + bidirectional_stream_header_array header_array; + bidirectional_stream_header *header; + bidirectional_stream cbs; CRONET_LOG(GPR_DEBUG, ":authority header is provided but not supported;" " cancel operations"); /* Notify application that operation is cancelled by forging trailers */ header_array.count = 1; header_array.capacity = 1; - header_array.headers = - gpr_malloc(sizeof(cronet_bidirectional_stream_header)); - header = (cronet_bidirectional_stream_header *)header_array.headers; + header_array.headers = gpr_malloc(sizeof(bidirectional_stream_header)); + header = (bidirectional_stream_header *)header_array.headers; header->key = "grpc-status"; header->value = "1"; /* Return status GRPC_STATUS_CANCELLED */ cbs.annotation = (void *)s; |