aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport')
-rw-r--r--src/core/ext/transport/cronet/client/secure/cronet_channel_create.c2
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_api_dummy.c30
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c140
3 files changed, 82 insertions, 90 deletions
diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c
index df1acddcc0..477cf07f45 100644
--- a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c
+++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c
@@ -60,7 +60,7 @@ GRPCAPI grpc_channel *grpc_cronet_secure_channel_create(
ct->host = gpr_malloc(strlen(target) + 1);
strcpy(ct->host, target);
gpr_log(GPR_DEBUG,
- "grpc_create_cronet_transport: cronet_engine = %p, target=%s", engine,
+ "grpc_create_cronet_transport: stream_engine = %p, target=%s", engine,
ct->host);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
diff --git a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
index 687026c9fd..74327a4214 100644
--- a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
+++ b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
@@ -38,48 +38,46 @@ library, so we can build it in all environments */
#include <grpc/support/log.h>
-#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h"
+#include "third_party/Cronet/bidirectional_stream_c.h"
#ifdef GRPC_COMPILE_WITH_CRONET
/* link with the real CRONET library in the build system */
#else
/* Dummy implementation of cronet API just to test for build-ability */
-cronet_bidirectional_stream* cronet_bidirectional_stream_create(
- cronet_engine* engine, void* annotation,
- cronet_bidirectional_stream_callback* callback) {
+bidirectional_stream* bidirectional_stream_create(
+ stream_engine* engine, void* annotation,
+ bidirectional_stream_callback* callback) {
GPR_ASSERT(0);
return NULL;
}
-int cronet_bidirectional_stream_destroy(cronet_bidirectional_stream* stream) {
+int bidirectional_stream_destroy(bidirectional_stream* stream) {
GPR_ASSERT(0);
return 0;
}
-int cronet_bidirectional_stream_start(
- cronet_bidirectional_stream* stream, const char* url, int priority,
- const char* method, const cronet_bidirectional_stream_header_array* headers,
- bool end_of_stream) {
+int bidirectional_stream_start(bidirectional_stream* stream, const char* url,
+ int priority, const char* method,
+ const bidirectional_stream_header_array* headers,
+ bool end_of_stream) {
GPR_ASSERT(0);
return 0;
}
-int cronet_bidirectional_stream_read(cronet_bidirectional_stream* stream,
- char* buffer, int capacity) {
+int bidirectional_stream_read(bidirectional_stream* stream, char* buffer,
+ int capacity) {
GPR_ASSERT(0);
return 0;
}
-int cronet_bidirectional_stream_write(cronet_bidirectional_stream* stream,
- const char* buffer, int count,
- bool end_of_stream) {
+int bidirectional_stream_write(bidirectional_stream* stream, const char* buffer,
+ int count, bool end_of_stream) {
GPR_ASSERT(0);
return 0;
}
-int cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) {
+void bidirectional_stream_cancel(bidirectional_stream* stream) {
GPR_ASSERT(0);
- return 0;
}
#endif /* GRPC_COMPILE_WITH_CRONET */
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 296c406324..b080408565 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -49,7 +49,7 @@
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/transport_impl.h"
-#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h"
+#include "third_party/Cronet/bidirectional_stream_c.h"
#define GRPC_HEADER_SIZE_IN_BYTES 5
@@ -86,19 +86,18 @@ enum e_op_id {
/* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
-static void on_request_headers_sent(cronet_bidirectional_stream *);
+static void on_request_headers_sent(bidirectional_stream *);
static void on_response_headers_received(
- cronet_bidirectional_stream *,
- const cronet_bidirectional_stream_header_array *, const char *);
-static void on_write_completed(cronet_bidirectional_stream *, const char *);
-static void on_read_completed(cronet_bidirectional_stream *, char *, int);
+ bidirectional_stream *, const bidirectional_stream_header_array *,
+ const char *);
+static void on_write_completed(bidirectional_stream *, const char *);
+static void on_read_completed(bidirectional_stream *, char *, int);
static void on_response_trailers_received(
- cronet_bidirectional_stream *,
- const cronet_bidirectional_stream_header_array *);
-static void on_succeeded(cronet_bidirectional_stream *);
-static void on_failed(cronet_bidirectional_stream *, int);
-static void on_canceled(cronet_bidirectional_stream *);
-static cronet_bidirectional_stream_callback cronet_callbacks = {
+ bidirectional_stream *, const bidirectional_stream_header_array *);
+static void on_succeeded(bidirectional_stream *);
+static void on_failed(bidirectional_stream *, int);
+static void on_canceled(bidirectional_stream *);
+static bidirectional_stream_callback cronet_callbacks = {
on_request_headers_sent,
on_response_headers_received,
on_read_completed,
@@ -111,7 +110,7 @@ static cronet_bidirectional_stream_callback cronet_callbacks = {
/* Cronet transport object */
struct grpc_cronet_transport {
grpc_transport base; /* must be first element in this structure */
- cronet_engine *engine;
+ stream_engine *engine;
char *host;
};
typedef struct grpc_cronet_transport grpc_cronet_transport;
@@ -176,8 +175,8 @@ struct stream_obj {
grpc_transport_stream_op *curr_op;
grpc_cronet_transport curr_ct;
grpc_stream *curr_gs;
- cronet_bidirectional_stream *cbs;
- cronet_bidirectional_stream_header_array header_array;
+ bidirectional_stream *cbs;
+ bidirectional_stream_header_array header_array;
/* Stream level state. Some state will be tracked both at stream and stream_op
* level */
@@ -344,11 +343,11 @@ static void execute_from_storage(stream_obj *s) {
/*
Cronet callback
*/
-static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
+static void on_failed(bidirectional_stream *stream, int net_error) {
CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error);
stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
- cronet_bidirectional_stream_destroy(s->cbs);
+ bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_FAILED] = true;
s->cbs = NULL;
if (s->header_array.headers) {
@@ -367,11 +366,11 @@ static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
/*
Cronet callback
*/
-static void on_canceled(cronet_bidirectional_stream *stream) {
+static void on_canceled(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
- cronet_bidirectional_stream_destroy(s->cbs);
+ bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_CANCELED] = true;
s->cbs = NULL;
if (s->header_array.headers) {
@@ -390,11 +389,11 @@ static void on_canceled(cronet_bidirectional_stream *stream) {
/*
Cronet callback
*/
-static void on_succeeded(cronet_bidirectional_stream *stream) {
+static void on_succeeded(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
- cronet_bidirectional_stream_destroy(s->cbs);
+ bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_SUCCEEDED] = true;
s->cbs = NULL;
free_read_buffer(s);
@@ -405,7 +404,7 @@ static void on_succeeded(cronet_bidirectional_stream *stream) {
/*
Cronet callback
*/
-static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
+static void on_request_headers_sent(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
@@ -424,8 +423,8 @@ static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
Cronet callback
*/
static void on_response_headers_received(
- cronet_bidirectional_stream *stream,
- const cronet_bidirectional_stream_header_array *headers,
+ bidirectional_stream *stream,
+ const bidirectional_stream_header_array *headers,
const char *negotiated_protocol) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
@@ -451,9 +450,9 @@ static void on_response_headers_received(
s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
s->state.rs.received_bytes = 0;
s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
- cronet_bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
- s->state.rs.remaining_bytes);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
+ bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
+ s->state.rs.remaining_bytes);
}
gpr_mu_unlock(&s->mu);
grpc_exec_ctx_finish(&exec_ctx);
@@ -463,8 +462,7 @@ static void on_response_headers_received(
/*
Cronet callback
*/
-static void on_write_completed(cronet_bidirectional_stream *stream,
- const char *data) {
+static void on_write_completed(bidirectional_stream *stream, const char *data) {
stream_obj *s = (stream_obj *)stream->annotation;
CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
gpr_mu_lock(&s->mu);
@@ -480,7 +478,7 @@ static void on_write_completed(cronet_bidirectional_stream *stream,
/*
Cronet callback
*/
-static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
+static void on_read_completed(bidirectional_stream *stream, char *data,
int count) {
stream_obj *s = (stream_obj *)stream->annotation;
CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
@@ -488,16 +486,16 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
gpr_mu_lock(&s->mu);
s->state.state_callback_received[OP_RECV_MESSAGE] = true;
if (count > 0 && s->state.flush_read) {
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
- cronet_bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, 4096);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
+ bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, 4096);
gpr_mu_unlock(&s->mu);
} else if (count > 0) {
s->state.rs.received_bytes += count;
s->state.rs.remaining_bytes -= count;
if (s->state.rs.remaining_bytes > 0) {
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
s->state.state_op_done[OP_READ_REQ_MADE] = true;
- cronet_bidirectional_stream_read(
+ bidirectional_stream_read(
s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
s->state.rs.remaining_bytes);
gpr_mu_unlock(&s->mu);
@@ -520,8 +518,8 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
Cronet callback
*/
static void on_response_trailers_received(
- cronet_bidirectional_stream *stream,
- const cronet_bidirectional_stream_header_array *trailers) {
+ bidirectional_stream *stream,
+ const bidirectional_stream_header_array *trailers) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
trailers);
@@ -551,9 +549,9 @@ static void on_response_trailers_received(
if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] &&
!(s->state.state_op_done[OP_CANCEL_ERROR] ||
s->state.state_callback_received[OP_FAILED])) {
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", s->cbs);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
s->state.state_callback_received[OP_SEND_MESSAGE] = false;
- cronet_bidirectional_stream_write(s->cbs, "", 0, true);
+ bidirectional_stream_write(s->cbs, "", 0, true);
s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
gpr_mu_unlock(&s->mu);
@@ -594,7 +592,7 @@ static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer,
*/
static void convert_metadata_to_cronet_headers(
grpc_linked_mdelem *head, const char *host, char **pp_url,
- cronet_bidirectional_stream_header **pp_headers, size_t *p_num_headers,
+ bidirectional_stream_header **pp_headers, size_t *p_num_headers,
const char **method) {
grpc_linked_mdelem *curr = head;
/* Walk the linked list and get number of header fields */
@@ -605,9 +603,9 @@ static void convert_metadata_to_cronet_headers(
}
/* Allocate enough memory. It is freed in the on_request_headers_sent callback
*/
- cronet_bidirectional_stream_header *headers =
- (cronet_bidirectional_stream_header *)gpr_malloc(
- sizeof(cronet_bidirectional_stream_header) * num_headers_available);
+ bidirectional_stream_header *headers =
+ (bidirectional_stream_header *)gpr_malloc(
+ sizeof(bidirectional_stream_header) * num_headers_available);
*pp_headers = headers;
/* Walk the linked list again, this time copying the header fields.
@@ -833,9 +831,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
* on_failed */
GPR_ASSERT(s->cbs == NULL);
GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
- s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs,
- &cronet_callbacks);
- CRONET_LOG(GPR_DEBUG, "%p = cronet_bidirectional_stream_create()", s->cbs);
+ s->cbs = bidirectional_stream_create(s->curr_ct.engine, s->curr_gs,
+ &cronet_callbacks);
+ CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
char *url = NULL;
const char *method = "POST";
s->header_array.headers = NULL;
@@ -843,10 +841,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url,
&s->header_array.headers, &s->header_array.count, &method);
s->header_array.capacity = s->header_array.count;
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_start(%p, %s)", s->cbs,
- url);
- cronet_bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array,
- false);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url);
+ bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false);
stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
result = ACTION_TAKEN_WITH_CALLBACK;
} else if (stream_op->recv_initial_metadata &&
@@ -897,11 +893,11 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
size_t write_buffer_size;
create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
&write_buffer_size);
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, %p)",
- s->cbs, stream_state->ws.write_buffer);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
+ stream_state->ws.write_buffer);
stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
- cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
- (int)write_buffer_size, false);
+ bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
+ (int)write_buffer_size, false);
result = ACTION_TAKEN_WITH_CALLBACK;
} else {
result = NO_ACTION_POSSIBLE;
@@ -949,11 +945,11 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(stream_state->rs.read_buffer);
stream_state->rs.remaining_bytes = stream_state->rs.length_field;
stream_state->rs.received_bytes = 0;
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
stream_state->state_op_done[OP_READ_REQ_MADE] =
true; /* Indicates that at least one read request has been made */
- cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
- stream_state->rs.remaining_bytes);
+ bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
+ stream_state->rs.remaining_bytes);
result = ACTION_TAKEN_WITH_CALLBACK;
} else {
stream_state->rs.remaining_bytes = 0;
@@ -974,11 +970,11 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
stream_state->rs.received_bytes = 0;
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
stream_state->state_op_done[OP_READ_REQ_MADE] =
true; /* Indicates that at least one read request has been made */
- cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
- stream_state->rs.remaining_bytes);
+ bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
+ stream_state->rs.remaining_bytes);
result = ACTION_TAKEN_WITH_CALLBACK;
} else {
result = NO_ACTION_POSSIBLE;
@@ -1008,9 +1004,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->rs.received_bytes = 0;
stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
stream_state->rs.length_field_received = false;
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
- cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
- stream_state->rs.remaining_bytes);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
+ bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
+ stream_state->rs.remaining_bytes);
result = ACTION_TAKEN_NO_CALLBACK;
}
} else if (stream_op->recv_trailing_metadata &&
@@ -1033,10 +1029,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
result = NO_ACTION_POSSIBLE;
CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
} else {
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)",
- s->cbs);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
- cronet_bidirectional_stream_write(s->cbs, "", 0, true);
+ bidirectional_stream_write(s->cbs, "", 0, true);
result = ACTION_TAKEN_WITH_CALLBACK;
}
stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
@@ -1044,9 +1039,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state,
OP_CANCEL_ERROR)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas);
- CRONET_LOG(GPR_DEBUG, "W: cronet_bidirectional_stream_cancel(%p)", s->cbs);
+ CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
if (s->cbs) {
- cronet_bidirectional_stream_cancel(s->cbs);
+ bidirectional_stream_cancel(s->cbs);
result = ACTION_TAKEN_WITH_CALLBACK;
} else {
result = ACTION_TAKEN_NO_CALLBACK;
@@ -1143,18 +1138,17 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
header_has_authority(op->send_initial_metadata->list.head)) {
/* Cronet does not support :authority header field. We cancel the call when
this field is present in metadata */
- cronet_bidirectional_stream_header_array header_array;
- cronet_bidirectional_stream_header *header;
- cronet_bidirectional_stream cbs;
+ bidirectional_stream_header_array header_array;
+ bidirectional_stream_header *header;
+ bidirectional_stream cbs;
CRONET_LOG(GPR_DEBUG,
":authority header is provided but not supported;"
" cancel operations");
/* Notify application that operation is cancelled by forging trailers */
header_array.count = 1;
header_array.capacity = 1;
- header_array.headers =
- gpr_malloc(sizeof(cronet_bidirectional_stream_header));
- header = (cronet_bidirectional_stream_header *)header_array.headers;
+ header_array.headers = gpr_malloc(sizeof(bidirectional_stream_header));
+ header = (bidirectional_stream_header *)header_array.headers;
header->key = "grpc-status";
header->value = "1"; /* Return status GRPC_STATUS_CANCELLED */
cbs.annotation = (void *)s;