aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/cronet/transport/cronet_transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport/cronet/transport/cronet_transport.c')
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c218
1 files changed, 172 insertions, 46 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 25ad40b935..afc59f4b12 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -34,14 +34,15 @@
#include <string.h>
#include <grpc/impl/codegen/port_platform.h>
+#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
-#include <grpc/support/slice_buffer.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
+#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/channel.h"
@@ -130,7 +131,7 @@ struct read_state {
/* vars for holding data destined for the application */
struct grpc_slice_buffer_stream sbs;
- gpr_slice_buffer read_slice_buffer;
+ grpc_slice_buffer read_slice_buffer;
/* vars for trailing metadata */
grpc_chttp2_incoming_metadata_buffer trailing_metadata;
@@ -148,6 +149,9 @@ struct write_state {
struct op_state {
bool state_op_done[OP_NUM_OPS];
bool state_callback_received[OP_NUM_OPS];
+ bool fail_state;
+ bool flush_read;
+ grpc_error *cancel_error;
/* data structure for storing data coming from server */
struct read_state rs;
/* data structure for storing data going to the server */
@@ -247,6 +251,12 @@ static void free_read_buffer(stream_obj *s) {
}
}
+static grpc_error *make_error_with_desc(int error_code, const char *desc) {
+ grpc_error *error = GRPC_ERROR_CREATE(desc);
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, error_code);
+ return error;
+}
+
/*
Add a new stream op to op storage.
*/
@@ -432,6 +442,18 @@ static void on_response_headers_received(
grpc_mdstr_from_string(headers->headers[i].value)));
}
s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
+ if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
+ s->state.state_callback_received[OP_FAILED])) {
+ /* Do an extra read to trigger on_succeeded() callback in case connection
+ is closed */
+ GPR_ASSERT(s->state.rs.length_field_received == false);
+ s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
+ s->state.rs.received_bytes = 0;
+ s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
+ cronet_bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
+ s->state.rs.remaining_bytes);
+ }
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -463,7 +485,11 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
count);
gpr_mu_lock(&s->mu);
s->state.state_callback_received[OP_RECV_MESSAGE] = true;
- if (count > 0) {
+ if (count > 0 && s->state.flush_read) {
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
+ cronet_bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, 4096);
+ gpr_mu_unlock(&s->mu);
+ } else if (count > 0) {
s->state.rs.received_bytes += count;
s->state.rs.remaining_bytes -= count;
if (s->state.rs.remaining_bytes > 0) {
@@ -478,6 +504,10 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
execute_from_storage(s);
}
} else {
+ if (s->state.flush_read) {
+ gpr_free(s->state.rs.read_buffer);
+ s->state.rs.read_buffer = NULL;
+ }
s->state.rs.read_stream_closed = true;
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
@@ -507,21 +537,38 @@ static void on_response_trailers_received(
grpc_mdstr_from_string(trailers->headers[i].key),
grpc_mdstr_from_string(trailers->headers[i].value)));
s->state.rs.trailing_metadata_valid = true;
+ if (0 == strcmp(trailers->headers[i].key, "grpc-status") &&
+ 0 != strcmp(trailers->headers[i].value, "0")) {
+ s->state.fail_state = true;
+ }
}
s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
- gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ /* Send a EOS when server terminates the stream (testServerFinishesRequest) to
+ * trigger on_succeeded */
+ if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] &&
+ !(s->state.state_op_done[OP_CANCEL_ERROR] ||
+ s->state.state_callback_received[OP_FAILED])) {
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", s->cbs);
+ s->state.state_callback_received[OP_SEND_MESSAGE] = false;
+ cronet_bidirectional_stream_write(s->cbs, "", 0, true);
+ s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
+
+ gpr_mu_unlock(&s->mu);
+ } else {
+ gpr_mu_unlock(&s->mu);
+ execute_from_storage(s);
+ }
}
/*
Utility function that takes the data from s->write_slice_buffer and assembles
into a contiguous byte stream with 5 byte gRPC header prepended.
*/
-static void create_grpc_frame(gpr_slice_buffer *write_slice_buffer,
+static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer,
char **pp_write_buffer,
size_t *p_write_buffer_size) {
- gpr_slice slice = gpr_slice_buffer_take_first(write_slice_buffer);
- size_t length = GPR_SLICE_LENGTH(slice);
+ grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer);
+ size_t length = GRPC_SLICE_LENGTH(slice);
*p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
/* This is freed in the on_write_completed callback */
char *write_buffer = gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES);
@@ -534,7 +581,7 @@ static void create_grpc_frame(gpr_slice_buffer *write_slice_buffer,
*p++ = (uint8_t)(length >> 8);
*p++ = (uint8_t)(length);
/* append actual data */
- memcpy(p, GPR_SLICE_START_PTR(slice), length);
+ memcpy(p, GRPC_SLICE_START_PTR(slice), length);
}
/*
@@ -610,6 +657,16 @@ static int parse_grpc_header(const uint8_t *data) {
return length;
}
+static bool header_has_authority(grpc_linked_mdelem *head) {
+ while (head != NULL) {
+ if (head->md->key == GRPC_MDSTR_AUTHORITY) {
+ return true;
+ }
+ head = head->next;
+ }
+ return false;
+}
+
/*
Op Execution: Decide if one of the actions contained in the stream op can be
executed. This is the heart of the state machine.
@@ -621,9 +678,9 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op,
/* When call is canceled, every op can be run, except under following
conditions
*/
- bool is_canceled_of_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
+ bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
stream_state->state_callback_received[OP_FAILED];
- if (is_canceled_of_failed) {
+ if (is_canceled_or_failed) {
if (op_id == OP_SEND_INITIAL_METADATA) result = false;
if (op_id == OP_SEND_MESSAGE) result = false;
if (op_id == OP_SEND_TRAILING_METADATA) result = false;
@@ -767,16 +824,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state,
OP_SEND_INITIAL_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
- /* This OP is the beginning. Reset various states */
- memset(&s->header_array, 0, sizeof(s->header_array));
- memset(&stream_state->rs, 0, sizeof(stream_state->rs));
- memset(&stream_state->ws, 0, sizeof(stream_state->ws));
- memset(stream_state->state_op_done, 0, sizeof(stream_state->state_op_done));
- memset(stream_state->state_callback_received, 0,
- sizeof(stream_state->state_callback_received));
/* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
* on_failed */
GPR_ASSERT(s->cbs == NULL);
+ GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs,
&cronet_callbacks);
CRONET_LOG(GPR_DEBUG, "%p = cronet_bidirectional_stream_create()", s->cbs);
@@ -797,10 +848,13 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state,
OP_RECV_INITIAL_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
- if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
- stream_state->state_callback_received[OP_FAILED]) {
+ if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
GRPC_ERROR_CANCELLED, NULL);
+ } else if (stream_state->state_callback_received[OP_FAILED]) {
+ grpc_exec_ctx_sched(
+ exec_ctx, stream_op->recv_initial_metadata_ready,
+ make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL);
} else {
grpc_chttp2_incoming_metadata_buffer_publish(
&oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata);
@@ -817,9 +871,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
result = NO_ACTION_POSSIBLE;
CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
} else {
- gpr_slice_buffer write_slice_buffer;
- gpr_slice slice;
- gpr_slice_buffer_init(&write_slice_buffer);
+ grpc_slice_buffer write_slice_buffer;
+ grpc_slice slice;
+ grpc_slice_buffer_init(&write_slice_buffer);
grpc_byte_stream_next(NULL, stream_op->send_message, &slice,
stream_op->send_message->length, NULL);
/* Check that compression flag is OFF. We don't support compression yet.
@@ -828,7 +882,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_ERROR, "Compression is not supported");
GPR_ASSERT(stream_op->send_message->flags == 0);
}
- gpr_slice_buffer_add(&write_slice_buffer, slice);
+ grpc_slice_buffer_add(&write_slice_buffer, slice);
if (write_slice_buffer.count != 1) {
/* Empty request not handled yet */
gpr_log(GPR_ERROR, "Empty request is not supported");
@@ -854,12 +908,19 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state,
OP_RECV_MESSAGE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
- if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
- stream_state->state_callback_received[OP_FAILED]) {
- CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
+ if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
+ CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
GRPC_ERROR_CANCELLED, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
+ result = ACTION_TAKEN_NO_CALLBACK;
+ } else if (stream_state->state_callback_received[OP_FAILED]) {
+ CRONET_LOG(GPR_DEBUG, "Stream failed.");
+ grpc_exec_ctx_sched(
+ exec_ctx, stream_op->recv_message_ready,
+ make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL);
+ stream_state->state_op_done[OP_RECV_MESSAGE] = true;
+ result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->rs.read_stream_closed == true) {
/* No more data will be received */
CRONET_LOG(GPR_DEBUG, "read stream closed");
@@ -867,6 +928,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_NONE, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
+ result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->rs.length_field_received == false) {
if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
stream_state->rs.remaining_bytes == 0) {
@@ -891,7 +953,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
} else {
stream_state->rs.remaining_bytes = 0;
CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
- gpr_slice_buffer_init(&stream_state->rs.read_slice_buffer);
+ grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
&stream_state->rs.read_slice_buffer, 0);
*((grpc_byte_buffer **)stream_op->recv_message) =
@@ -918,15 +980,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
}
} else if (stream_state->rs.remaining_bytes == 0) {
CRONET_LOG(GPR_DEBUG, "read operation complete");
- gpr_slice read_data_slice =
- gpr_slice_malloc((uint32_t)stream_state->rs.length_field);
- uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice);
+ grpc_slice read_data_slice =
+ grpc_slice_malloc((uint32_t)stream_state->rs.length_field);
+ uint8_t *dst_p = GRPC_SLICE_START_PTR(read_data_slice);
memcpy(dst_p, stream_state->rs.read_buffer,
(size_t)stream_state->rs.length_field);
free_read_buffer(s);
- gpr_slice_buffer_init(&stream_state->rs.read_slice_buffer);
- gpr_slice_buffer_add(&stream_state->rs.read_slice_buffer,
- read_data_slice);
+ grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
+ grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
+ read_data_slice);
grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
&stream_state->rs.read_slice_buffer, 0);
*((grpc_byte_buffer **)stream_op->recv_message) =
@@ -935,10 +997,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_NONE, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
- /* Clear read state of the stream, so next read op (if it were to come)
- * will work */
- stream_state->rs.received_bytes = stream_state->rs.remaining_bytes =
- stream_state->rs.length_field_received = 0;
+ /* Do an extra read to trigger on_succeeded() callback in case connection
+ is closed */
+ stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
+ stream_state->rs.received_bytes = 0;
+ stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
+ stream_state->rs.length_field_received = false;
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
+ cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
+ stream_state->rs.remaining_bytes);
result = ACTION_TAKEN_NO_CALLBACK;
}
} else if (stream_op->recv_trailing_metadata &&
@@ -975,17 +1042,32 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
CRONET_LOG(GPR_DEBUG, "W: cronet_bidirectional_stream_cancel(%p)", s->cbs);
if (s->cbs) {
cronet_bidirectional_stream_cancel(s->cbs);
+ result = ACTION_TAKEN_WITH_CALLBACK;
+ } else {
+ result = ACTION_TAKEN_NO_CALLBACK;
}
stream_state->state_op_done[OP_CANCEL_ERROR] = true;
- result = ACTION_TAKEN_WITH_CALLBACK;
+ if (!stream_state->cancel_error) {
+ stream_state->cancel_error = GRPC_ERROR_REF(stream_op->cancel_error);
+ }
} else if (stream_op->on_complete &&
op_can_be_run(stream_op, stream_state, &oas->state,
OP_ON_COMPLETE)) {
- /* All actions in this stream_op are complete. Call the on_complete callback
- */
CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
- grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE,
- NULL);
+ if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
+ grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete,
+ GRPC_ERROR_REF(stream_state->cancel_error), NULL);
+ } else if (stream_state->state_callback_received[OP_FAILED]) {
+ grpc_exec_ctx_sched(
+ exec_ctx, stream_op->on_complete,
+ make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL);
+ } else {
+ /* All actions in this stream_op are complete. Call the on_complete
+ * callback
+ */
+ grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE,
+ NULL);
+ }
oas->state.state_op_done[OP_ON_COMPLETE] = true;
oas->done = true;
/* reset any send message state, only if this ON_COMPLETE is about a send.
@@ -999,6 +1081,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
make a note */
if (stream_op->recv_message)
stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
+ } else if (stream_state->fail_state && !stream_state->flush_read) {
+ CRONET_LOG(GPR_DEBUG, "running: %p flush read", oas);
+ if (stream_state->rs.read_buffer &&
+ stream_state->rs.read_buffer != stream_state->rs.grpc_header_bytes) {
+ gpr_free(stream_state->rs.read_buffer);
+ stream_state->rs.read_buffer = NULL;
+ }
+ stream_state->rs.read_buffer = gpr_malloc(4096);
+ stream_state->flush_read = true;
} else {
result = NO_ACTION_POSSIBLE;
}
@@ -1024,6 +1115,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
memset(s->state.state_op_done, 0, sizeof(s->state.state_op_done));
memset(s->state.state_callback_received, 0,
sizeof(s->state.state_callback_received));
+ s->state.fail_state = s->state.flush_read = false;
+ s->state.cancel_error = NULL;
gpr_mu_init(&s->mu);
return 0;
}
@@ -1042,11 +1135,38 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->curr_gs = gs;
memcpy(&s->curr_ct, gt, sizeof(grpc_cronet_transport));
add_to_storage(s, op);
- execute_from_storage(s);
+ if (op->send_initial_metadata &&
+ header_has_authority(op->send_initial_metadata->list.head)) {
+ /* Cronet does not support :authority header field. We cancel the call when
+ this field is present in metadata */
+ cronet_bidirectional_stream_header_array header_array;
+ cronet_bidirectional_stream_header *header;
+ cronet_bidirectional_stream cbs;
+ CRONET_LOG(GPR_DEBUG,
+ ":authority header is provided but not supported;"
+ " cancel operations");
+ /* Notify application that operation is cancelled by forging trailers */
+ header_array.count = 1;
+ header_array.capacity = 1;
+ header_array.headers =
+ gpr_malloc(sizeof(cronet_bidirectional_stream_header));
+ header = (cronet_bidirectional_stream_header *)header_array.headers;
+ header->key = "grpc-status";
+ header->value = "1"; /* Return status GRPC_STATUS_CANCELLED */
+ cbs.annotation = (void *)s;
+ s->state.state_op_done[OP_CANCEL_ERROR] = true;
+ on_response_trailers_received(&cbs, &header_array);
+ gpr_free(header_array.headers);
+ } else {
+ execute_from_storage(s);
+ }
}
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, void *and_free_memory) {}
+ grpc_stream *gs, void *and_free_memory) {
+ stream_obj *s = (stream_obj *)gs;
+ GRPC_ERROR_UNREF(s->state.cancel_error);
+}
static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {}
@@ -1054,6 +1174,11 @@ static char *get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
return NULL;
}
+static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx,
+ grpc_transport *gt) {
+ return NULL;
+}
+
static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_transport_op *op) {}
@@ -1066,4 +1191,5 @@ const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj),
perform_op,
destroy_stream,
destroy_transport,
- get_peer};
+ get_peer,
+ get_endpoint};