aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport')
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.c37
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c59
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.c17
-rw-r--r--src/core/ext/transport/chttp2/transport/incoming_metadata.c65
-rw-r--r--src/core/ext/transport/chttp2/transport/incoming_metadata.h18
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h2
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c18
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c113
8 files changed, 178 insertions, 151 deletions
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c
index 68e9d7e392..2b226c1bf7 100644
--- a/src/core/ext/transport/chttp2/client/chttp2_connector.c
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c
@@ -63,8 +63,6 @@ typedef struct {
grpc_closure *notify;
grpc_connect_in_args args;
grpc_connect_out_args *result;
- grpc_closure initial_string_sent;
- grpc_slice_buffer initial_string_buffer;
grpc_endpoint *endpoint; // Non-NULL until handshaking starts.
@@ -82,7 +80,6 @@ static void chttp2_connector_unref(grpc_exec_ctx *exec_ctx,
grpc_connector *con) {
chttp2_connector *c = (chttp2_connector *)con;
if (gpr_unref(&c->refs)) {
- /* c->initial_string_buffer does not need to be destroyed */
gpr_mu_destroy(&c->mu);
// If handshaking is not yet in progress, destroy the endpoint.
// Otherwise, the handshaker will do this for us.
@@ -160,28 +157,6 @@ static void start_handshake_locked(grpc_exec_ctx *exec_ctx,
c->endpoint = NULL; // Endpoint handed off to handshake manager.
}
-static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- chttp2_connector *c = arg;
- gpr_mu_lock(&c->mu);
- if (error != GRPC_ERROR_NONE || c->shutdown) {
- if (error == GRPC_ERROR_NONE) {
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connector shutdown");
- } else {
- error = GRPC_ERROR_REF(error);
- }
- memset(c->result, 0, sizeof(*c->result));
- grpc_closure *notify = c->notify;
- c->notify = NULL;
- grpc_closure_sched(exec_ctx, notify, error);
- gpr_mu_unlock(&c->mu);
- chttp2_connector_unref(exec_ctx, arg);
- } else {
- start_handshake_locked(exec_ctx, c);
- gpr_mu_unlock(&c->mu);
- }
-}
-
static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
chttp2_connector *c = arg;
gpr_mu_lock(&c->mu);
@@ -204,17 +179,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
chttp2_connector_unref(exec_ctx, arg);
} else {
GPR_ASSERT(c->endpoint != NULL);
- if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) {
- grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent,
- c, grpc_schedule_on_exec_ctx);
- grpc_slice_buffer_init(&c->initial_string_buffer);
- grpc_slice_buffer_add(&c->initial_string_buffer,
- c->args.initial_connect_string);
- grpc_endpoint_write(exec_ctx, c->endpoint, &c->initial_string_buffer,
- &c->initial_string_sent);
- } else {
- start_handshake_locked(exec_ctx, c);
- }
+ start_handshake_locked(exec_ctx, c);
gpr_mu_unlock(&c->mu);
}
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 17b99384ab..88d96b1a15 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -578,7 +578,7 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s) {
static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_stream_refcount *refcount,
- const void *server_data) {
+ const void *server_data, gpr_arena *arena) {
GPR_TIMER_BEGIN("init_stream", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
@@ -591,8 +591,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
gpr_ref_init(&s->active_streams, 1);
GRPC_CHTTP2_STREAM_REF(s, "chttp2");
- grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0]);
- grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1]);
+ grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena);
+ grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1], arena);
grpc_chttp2_data_parser_init(&s->data_parser);
grpc_slice_buffer_init(&s->flow_controlled_buffer);
s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
@@ -668,16 +668,17 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GPR_TIMER_END("destroy_stream", 0);
- gpr_free(s->destroy_stream_arg);
+ grpc_closure_sched(exec_ctx, s->destroy_stream_arg, GRPC_ERROR_NONE);
}
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, void *and_free_memory) {
+ grpc_stream *gs,
+ grpc_closure *then_schedule_closure) {
GPR_TIMER_BEGIN("destroy_stream", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
- s->destroy_stream_arg = and_free_memory;
+ s->destroy_stream_arg = then_schedule_closure;
grpc_closure_sched(
exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s,
grpc_combiner_scheduler(t->combiner, false)),
@@ -1634,14 +1635,20 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
s->recv_trailing_metadata_finished != NULL) {
char status_string[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(status, status_string);
- grpc_chttp2_incoming_metadata_buffer_replace_or_add(
- exec_ctx, &s->metadata_buffer[1],
- grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_STATUS,
- grpc_slice_from_copied_string(status_string)));
- grpc_chttp2_incoming_metadata_buffer_replace_or_add(
- exec_ctx, &s->metadata_buffer[1],
- grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
- grpc_slice_ref_internal(slice)));
+ GRPC_LOG_IF_ERROR("add_status",
+ grpc_chttp2_incoming_metadata_buffer_replace_or_add(
+ exec_ctx, &s->metadata_buffer[1],
+ grpc_mdelem_from_slices(
+ exec_ctx, GRPC_MDSTR_GRPC_STATUS,
+ grpc_slice_from_copied_string(status_string))));
+ if (!GRPC_SLICE_IS_EMPTY(slice)) {
+ GRPC_LOG_IF_ERROR(
+ "add_status_message",
+ grpc_chttp2_incoming_metadata_buffer_replace_or_add(
+ exec_ctx, &s->metadata_buffer[1],
+ grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
+ grpc_slice_ref_internal(slice))));
+ }
s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
@@ -1764,6 +1771,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error) {
grpc_slice hdr;
grpc_slice status_hdr;
+ grpc_slice http_status_hdr;
grpc_slice message_pfx;
uint8_t *p;
uint32_t len = 0;
@@ -1779,6 +1787,26 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
It's complicated by the fact that our send machinery would be dead by
the time we got around to sending this, so instead we ignore HPACK
compression and just write the uncompressed bytes onto the wire. */
+ if (!s->sent_initial_metadata) {
+ http_status_hdr = grpc_slice_malloc(13);
+ p = GRPC_SLICE_START_PTR(http_status_hdr);
+ *p++ = 0x00;
+ *p++ = 7;
+ *p++ = ':';
+ *p++ = 's';
+ *p++ = 't';
+ *p++ = 'a';
+ *p++ = 't';
+ *p++ = 'u';
+ *p++ = 's';
+ *p++ = 3;
+ *p++ = '2';
+ *p++ = '0';
+ *p++ = '0';
+ GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
+ len += (uint32_t)GRPC_SLICE_LENGTH(http_status_hdr);
+ }
+
status_hdr = grpc_slice_malloc(15 + (grpc_status >= 10));
p = GRPC_SLICE_START_PTR(status_hdr);
*p++ = 0x00; /* literal header, not indexed */
@@ -1844,6 +1872,9 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
grpc_slice_buffer_add(&t->qbuf, hdr);
+ if (!s->sent_initial_metadata) {
+ grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
+ }
grpc_slice_buffer_add(&t->qbuf, status_hdr);
grpc_slice_buffer_add(&t->qbuf, message_pfx);
grpc_slice_buffer_add(&t->qbuf, grpc_slice_ref_internal(slice));
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c
index a99ed0e7f3..5099d736bf 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c
@@ -1627,13 +1627,18 @@ void grpc_chttp2_hpack_parser_destroy(grpc_exec_ctx *exec_ctx,
grpc_error *grpc_chttp2_hpack_parser_parse(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_parser *p,
grpc_slice slice) {
- /* TODO(ctiller): limit the distance of end from beg, and perform multiple
- steps in the event of a large chunk of data to limit
- stack space usage when no tail call optimization is
- available */
+/* max number of bytes to parse at a time... limits call stack depth on
+ * compilers without TCO */
+#define MAX_PARSE_LENGTH 1024
p->current_slice_refcount = slice.refcount;
- grpc_error *error = p->state(exec_ctx, p, GRPC_SLICE_START_PTR(slice),
- GRPC_SLICE_END_PTR(slice));
+ uint8_t *start = GRPC_SLICE_START_PTR(slice);
+ uint8_t *end = GRPC_SLICE_END_PTR(slice);
+ grpc_error *error = GRPC_ERROR_NONE;
+ while (start != end && error == GRPC_ERROR_NONE) {
+ uint8_t *target = start + GPR_MIN(MAX_PARSE_LENGTH, end - start);
+ error = p->state(exec_ctx, p, start, target);
+ start = target;
+ }
p->current_slice_refcount = NULL;
return error;
}
diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c
index c91b019aa0..da0a34d32a 100644
--- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c
+++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c
@@ -41,69 +41,48 @@
#include <grpc/support/log.h>
void grpc_chttp2_incoming_metadata_buffer_init(
- grpc_chttp2_incoming_metadata_buffer *buffer) {
- buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena) {
+ buffer->arena = arena;
+ grpc_metadata_batch_init(&buffer->batch);
+ buffer->batch.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
}
void grpc_chttp2_incoming_metadata_buffer_destroy(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer) {
- size_t i;
- if (!buffer->published) {
- for (i = 0; i < buffer->count; i++) {
- GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md);
- }
- }
- gpr_free(buffer->elems);
+ grpc_metadata_batch_destroy(exec_ctx, &buffer->batch);
}
-void grpc_chttp2_incoming_metadata_buffer_add(
- grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem) {
- GPR_ASSERT(!buffer->published);
- if (buffer->capacity == buffer->count) {
- buffer->capacity = GPR_MAX(8, 2 * buffer->capacity);
- buffer->elems =
- gpr_realloc(buffer->elems, sizeof(*buffer->elems) * buffer->capacity);
- }
- buffer->elems[buffer->count++].md = elem;
+grpc_error *grpc_chttp2_incoming_metadata_buffer_add(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
+ grpc_mdelem elem) {
buffer->size += GRPC_MDELEM_LENGTH(elem);
+ return grpc_metadata_batch_add_tail(
+ exec_ctx, &buffer->batch,
+ gpr_arena_alloc(buffer->arena, sizeof(grpc_linked_mdelem)), elem);
}
-void grpc_chttp2_incoming_metadata_buffer_replace_or_add(
+grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
grpc_mdelem elem) {
- for (size_t i = 0; i < buffer->count; i++) {
- if (grpc_slice_eq(GRPC_MDKEY(buffer->elems[i].md), GRPC_MDKEY(elem))) {
- GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md);
- buffer->elems[i].md = elem;
- return;
+ for (grpc_linked_mdelem *l = buffer->batch.list.head; l != NULL;
+ l = l->next) {
+ if (grpc_slice_eq(GRPC_MDKEY(l->md), GRPC_MDKEY(elem))) {
+ GRPC_MDELEM_UNREF(exec_ctx, l->md);
+ l->md = elem;
+ return GRPC_ERROR_NONE;
}
}
- grpc_chttp2_incoming_metadata_buffer_add(buffer, elem);
+ return grpc_chttp2_incoming_metadata_buffer_add(exec_ctx, buffer, elem);
}
void grpc_chttp2_incoming_metadata_buffer_set_deadline(
grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) {
- GPR_ASSERT(!buffer->published);
- buffer->deadline = deadline;
+ buffer->batch.deadline = deadline;
}
void grpc_chttp2_incoming_metadata_buffer_publish(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
grpc_metadata_batch *batch) {
- GPR_ASSERT(!buffer->published);
- buffer->published = 1;
- if (buffer->count > 0) {
- size_t i;
- for (i = 0; i < buffer->count; i++) {
- /* TODO(ctiller): do something better here */
- if (!GRPC_LOG_IF_ERROR("grpc_chttp2_incoming_metadata_buffer_publish",
- grpc_metadata_batch_link_tail(
- exec_ctx, batch, &buffer->elems[i]))) {
- GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md);
- }
- }
- } else {
- batch->list.head = batch->list.tail = NULL;
- }
- batch->deadline = buffer->deadline;
+ *batch = buffer->batch;
+ grpc_metadata_batch_init(&buffer->batch);
}
diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.h b/src/core/ext/transport/chttp2/transport/incoming_metadata.h
index 1eac6fc150..288c917e65 100644
--- a/src/core/ext/transport/chttp2/transport/incoming_metadata.h
+++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.h
@@ -37,28 +37,26 @@
#include "src/core/lib/transport/transport.h"
typedef struct {
- grpc_linked_mdelem *elems;
- size_t count;
- size_t capacity;
- gpr_timespec deadline;
- int published;
+ gpr_arena *arena;
+ grpc_metadata_batch batch;
size_t size; // total size of metadata
} grpc_chttp2_incoming_metadata_buffer;
/** assumes everything initially zeroed */
void grpc_chttp2_incoming_metadata_buffer_init(
- grpc_chttp2_incoming_metadata_buffer *buffer);
+ grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena);
void grpc_chttp2_incoming_metadata_buffer_destroy(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_publish(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
grpc_metadata_batch *batch);
-void grpc_chttp2_incoming_metadata_buffer_add(
- grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem);
-void grpc_chttp2_incoming_metadata_buffer_replace_or_add(
+grpc_error *grpc_chttp2_incoming_metadata_buffer_add(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
- grpc_mdelem elem);
+ grpc_mdelem elem) GRPC_MUST_USE_RESULT;
+grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
+ grpc_mdelem elem) GRPC_MUST_USE_RESULT;
void grpc_chttp2_incoming_metadata_buffer_set_deadline(
grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index d26812ad6b..3c56c21599 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -425,7 +425,7 @@ struct grpc_chttp2_stream {
grpc_stream_refcount *refcount;
grpc_closure destroy_stream;
- void *destroy_stream_arg;
+ grpc_closure *destroy_stream_arg;
grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
uint8_t included[STREAM_LIST_COUNT];
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 65cb6133b2..7e457ced27 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -550,7 +550,14 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp,
s->seen_error = true;
GRPC_MDELEM_UNREF(exec_ctx, md);
} else {
- grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[0], md);
+ grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add(
+ exec_ctx, &s->metadata_buffer[0], md);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_chttp2_cancel_stream(exec_ctx, t, s, error);
+ grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
+ s->seen_error = true;
+ GRPC_MDELEM_UNREF(exec_ctx, md);
+ }
}
}
@@ -601,7 +608,14 @@ static void on_trailing_header(grpc_exec_ctx *exec_ctx, void *tp,
s->seen_error = true;
GRPC_MDELEM_UNREF(exec_ctx, md);
} else {
- grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[1], md);
+ grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add(
+ exec_ctx, &s->metadata_buffer[1], md);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_chttp2_cancel_stream(exec_ctx, t, s, error);
+ grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
+ s->seen_error = true;
+ GRPC_MDELEM_UNREF(exec_ctx, md);
+ }
}
GPR_TIMER_END("on_trailing_header", 0);
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index fe98f48bbb..952690eba7 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -128,6 +128,7 @@ struct read_state {
int received_bytes;
int remaining_bytes;
int length_field;
+ bool compressed;
char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES];
char *payload_field;
bool read_stream_closed;
@@ -184,6 +185,7 @@ struct op_storage {
};
struct stream_obj {
+ gpr_arena *arena;
struct op_and_state *oas;
grpc_transport_stream_op *curr_op;
grpc_cronet_transport *curr_ct;
@@ -272,7 +274,7 @@ static void maybe_flush_read(stream_obj *s) {
/* Whenever the evaluation of any of the two condition is changed, we check
* whether we should enter the flush read state. */
if (s->state.pending_recv_trailing_metadata && s->state.fail_state) {
- if (!s->state.flush_read) {
+ if (!s->state.flush_read && !s->state.rs.read_stream_closed) {
CRONET_LOG(GPR_DEBUG, "%p: Flush read", s);
s->state.flush_read = true;
null_and_maybe_free_read_buffer(s);
@@ -483,18 +485,31 @@ static void on_response_headers_received(
CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
headers, negotiated_protocol);
stream_obj *s = (stream_obj *)stream->annotation;
+
+ /* Identify if this is a header or a trailer (in a trailer-only response case)
+ */
+ for (size_t i = 0; i < headers->count; i++) {
+ if (0 == strcmp("grpc-status", headers->headers[i].key)) {
+ on_response_trailers_received(stream, headers);
+ return;
+ }
+ }
+
gpr_mu_lock(&s->mu);
memset(&s->state.rs.initial_metadata, 0,
sizeof(s->state.rs.initial_metadata));
- grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata);
+ grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata,
+ s->arena);
for (size_t i = 0; i < headers->count; i++) {
- grpc_chttp2_incoming_metadata_buffer_add(
- &s->state.rs.initial_metadata,
- grpc_mdelem_from_slices(
- &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string(
- headers->headers[i].key)),
- grpc_slice_intern(
- grpc_slice_from_static_string(headers->headers[i].value))));
+ GRPC_LOG_IF_ERROR(
+ "on_response_headers_received",
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &exec_ctx, &s->state.rs.initial_metadata,
+ grpc_mdelem_from_slices(
+ &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string(
+ headers->headers[i].key)),
+ grpc_slice_intern(grpc_slice_from_static_string(
+ headers->headers[i].value)))));
}
s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
@@ -503,6 +518,7 @@ static void on_response_headers_received(
is closed */
GPR_ASSERT(s->state.rs.length_field_received == false);
s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
+ s->state.rs.compressed = false;
s->state.rs.received_bytes = 0;
s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
@@ -586,17 +602,20 @@ static void on_response_trailers_received(
memset(&s->state.rs.trailing_metadata, 0,
sizeof(s->state.rs.trailing_metadata));
s->state.rs.trailing_metadata_valid = false;
- grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata);
+ grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata,
+ s->arena);
for (size_t i = 0; i < trailers->count; i++) {
CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key,
trailers->headers[i].value);
- grpc_chttp2_incoming_metadata_buffer_add(
- &s->state.rs.trailing_metadata,
- grpc_mdelem_from_slices(
- &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string(
- trailers->headers[i].key)),
- grpc_slice_intern(
- grpc_slice_from_static_string(trailers->headers[i].value))));
+ GRPC_LOG_IF_ERROR(
+ "on_response_trailers_received",
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &exec_ctx, &s->state.rs.trailing_metadata,
+ grpc_mdelem_from_slices(
+ &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string(
+ trailers->headers[i].key)),
+ grpc_slice_intern(grpc_slice_from_static_string(
+ trailers->headers[i].value)))));
s->state.rs.trailing_metadata_valid = true;
if (0 == strcmp(trailers->headers[i].key, "grpc-status") &&
0 != strcmp(trailers->headers[i].value, "0")) {
@@ -634,7 +653,7 @@ static void on_response_trailers_received(
*/
static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer,
char **pp_write_buffer,
- size_t *p_write_buffer_size) {
+ size_t *p_write_buffer_size, uint32_t flags) {
grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer);
size_t length = GRPC_SLICE_LENGTH(slice);
*p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
@@ -643,7 +662,9 @@ static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer,
*pp_write_buffer = write_buffer;
uint8_t *p = (uint8_t *)write_buffer;
/* Append 5 byte header */
- *p++ = 0;
+ /* Compressed flag */
+ *p++ = (flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0;
+ /* Message length */
*p++ = (uint8_t)(length >> 24);
*p++ = (uint8_t)(length >> 16);
*p++ = (uint8_t)(length >> 8);
@@ -721,14 +742,16 @@ static void convert_metadata_to_cronet_headers(
*p_num_headers = (size_t)num_headers;
}
-static int parse_grpc_header(const uint8_t *data) {
+static void parse_grpc_header(const uint8_t *data, int *length,
+ bool *compressed) {
+ const uint8_t c = *data;
const uint8_t *p = data + 1;
- int length = 0;
- length |= ((uint8_t)*p++) << 24;
- length |= ((uint8_t)*p++) << 16;
- length |= ((uint8_t)*p++) << 8;
- length |= ((uint8_t)*p++);
- return length;
+ *compressed = ((c & 0x01) == 0x01);
+ *length = 0;
+ *length |= ((uint8_t)*p++) << 24;
+ *length |= ((uint8_t)*p++) << 16;
+ *length |= ((uint8_t)*p++) << 8;
+ *length |= ((uint8_t)*p++);
}
static bool header_has_authority(grpc_linked_mdelem *head) {
@@ -781,7 +804,8 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op,
else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
result = false;
/* we haven't received headers yet. */
- else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA])
+ else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
+ !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
result = false;
} else if (op_id == OP_SEND_MESSAGE) {
/* already executed (note we're checking op specific state, not stream
@@ -794,7 +818,8 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op,
/* already executed */
if (op_state->state_op_done[OP_RECV_MESSAGE]) result = false;
/* we haven't received headers yet. */
- else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA])
+ else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
+ !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
result = false;
} else if (op_id == OP_RECV_TRAILING_METADATA) {
/* already executed */
@@ -948,12 +973,6 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_init(&write_slice_buffer);
grpc_byte_stream_next(NULL, stream_op->send_message, &slice,
stream_op->send_message->length, NULL);
- /* Check that compression flag is OFF. We don't support compression yet.
- */
- if (stream_op->send_message->flags != 0) {
- gpr_log(GPR_ERROR, "Compression is not supported");
- GPR_ASSERT(stream_op->send_message->flags == 0);
- }
grpc_slice_buffer_add(&write_slice_buffer, slice);
if (write_slice_buffer.count != 1) {
/* Empty request not handled yet */
@@ -963,7 +982,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
if (write_slice_buffer.count > 0) {
size_t write_buffer_size;
create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
- &write_buffer_size);
+ &write_buffer_size, stream_op->send_message->flags);
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
stream_state->ws.write_buffer);
stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
@@ -1015,6 +1034,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
} else if (stream_state->state_callback_received[OP_FAILED]) {
grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
GRPC_ERROR_NONE);
+ } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
+ grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
+ GRPC_ERROR_NONE);
} else {
grpc_chttp2_incoming_metadata_buffer_publish(
exec_ctx, &oas->s->state.rs.initial_metadata,
@@ -1059,8 +1081,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->rs.remaining_bytes == 0) {
/* Start a read operation for data */
stream_state->rs.length_field_received = true;
- stream_state->rs.length_field = stream_state->rs.remaining_bytes =
- parse_grpc_header((const uint8_t *)stream_state->rs.read_buffer);
+ parse_grpc_header((const uint8_t *)stream_state->rs.read_buffer,
+ &stream_state->rs.length_field,
+ &stream_state->rs.compressed);
CRONET_LOG(GPR_DEBUG, "length field = %d",
stream_state->rs.length_field);
if (stream_state->rs.length_field > 0) {
@@ -1082,6 +1105,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
&stream_state->rs.read_slice_buffer, 0);
+ if (stream_state->rs.compressed) {
+ stream_state->rs.sbs.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+ }
*((grpc_byte_buffer **)stream_op->recv_message) =
(grpc_byte_buffer *)&stream_state->rs.sbs;
grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
@@ -1093,6 +1119,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
stream_state->rs.received_bytes = 0;
+ stream_state->rs.compressed = false;
stream_state->rs.length_field_received = false;
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
stream_state->state_op_done[OP_READ_REQ_MADE] =
@@ -1107,6 +1134,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
stream_state->rs.received_bytes = 0;
+ stream_state->rs.compressed = false;
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
stream_state->state_op_done[OP_READ_REQ_MADE] =
true; /* Indicates that at least one read request has been made */
@@ -1130,6 +1158,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
read_data_slice);
grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
&stream_state->rs.read_slice_buffer, 0);
+ if (stream_state->rs.compressed) {
+ stream_state->rs.sbs.base.flags = GRPC_WRITE_INTERNAL_COMPRESS;
+ }
*((grpc_byte_buffer **)stream_op->recv_message) =
(grpc_byte_buffer *)&stream_state->rs.sbs;
grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
@@ -1139,6 +1170,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
/* Do an extra read to trigger on_succeeded() callback in case connection
is closed */
stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
+ stream_state->rs.compressed = false;
stream_state->rs.received_bytes = 0;
stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
stream_state->rs.length_field_received = false;
@@ -1215,7 +1247,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_stream_refcount *refcount,
- const void *server_data) {
+ const void *server_data, gpr_arena *arena) {
stream_obj *s = (stream_obj *)gs;
memset(&s->storage, 0, sizeof(s->storage));
s->storage.head = NULL;
@@ -1237,6 +1269,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->curr_gs = gs;
s->curr_ct = (grpc_cronet_transport *)gt;
+ s->arena = arena;
gpr_mu_init(&s->mu);
return 0;
@@ -1273,10 +1306,12 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
}
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, void *and_free_memory) {
+ grpc_stream *gs,
+ grpc_closure *then_schedule_closure) {
stream_obj *s = (stream_obj *)gs;
null_and_maybe_free_read_buffer(s);
GRPC_ERROR_UNREF(s->state.cancel_error);
+ grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
}
static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {}