diff options
author | Muxi Yan <muxi@users.noreply.github.com> | 2017-03-21 23:56:03 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-03-21 23:56:03 -0700 |
commit | 1855710c714f2bee978f82a7d97f51a5945c3814 (patch) | |
tree | 633416fe9cd36f2706f8b21f19e9177744f062f8 | |
parent | 8e2713ec31aeb48adfc6d02bc110e712b82629c0 (diff) | |
parent | 12b44ae5ecb865aedcf9a6b86268a85cd43c5555 (diff) |
Merge pull request #10210 from muxi/enable-cronet-compression
Enable cronet compression
-rw-r--r-- | src/core/ext/transport/cronet/transport/cronet_transport.c | 65 | ||||
-rw-r--r-- | src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m | 3 |
2 files changed, 46 insertions, 22 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 450d9ab23a..36bb67869c 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -128,6 +128,7 @@ struct read_state { int received_bytes; int remaining_bytes; int length_field; + bool compressed; char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES]; char *payload_field; bool read_stream_closed; @@ -484,6 +485,16 @@ static void on_response_headers_received( CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream, headers, negotiated_protocol); stream_obj *s = (stream_obj *)stream->annotation; + + /* Identify if this is a header or a trailer (in a trailer-only response case) + */ + for (size_t i = 0; i < headers->count; i++) { + if (0 == strcmp("grpc-status", headers->headers[i].key)) { + on_response_trailers_received(stream, headers); + return; + } + } + gpr_mu_lock(&s->mu); memset(&s->state.rs.initial_metadata, 0, sizeof(s->state.rs.initial_metadata)); @@ -507,6 +518,7 @@ static void on_response_headers_received( is closed */ GPR_ASSERT(s->state.rs.length_field_received == false); s->state.rs.read_buffer = s->state.rs.grpc_header_bytes; + s->state.rs.compressed = false; s->state.rs.received_bytes = 0; s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); @@ -641,7 +653,7 @@ static void on_response_trailers_received( */ static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer, char **pp_write_buffer, - size_t *p_write_buffer_size) { + size_t *p_write_buffer_size, uint32_t flags) { grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer); size_t length = GRPC_SLICE_LENGTH(slice); *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES; @@ -650,7 +662,9 @@ static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer, *pp_write_buffer = write_buffer; uint8_t *p = (uint8_t *)write_buffer; /* Append 5 byte header */ - *p++ = 0; + /* Compressed flag */ + *p++ = (flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0; + /* Message length */ *p++ = (uint8_t)(length >> 24); *p++ = (uint8_t)(length >> 16); *p++ = (uint8_t)(length >> 8); @@ -728,14 +742,16 @@ static void convert_metadata_to_cronet_headers( *p_num_headers = (size_t)num_headers; } -static int parse_grpc_header(const uint8_t *data) { +static void parse_grpc_header(const uint8_t *data, int *length, + bool *compressed) { + const uint8_t c = *data; const uint8_t *p = data + 1; - int length = 0; - length |= ((uint8_t)*p++) << 24; - length |= ((uint8_t)*p++) << 16; - length |= ((uint8_t)*p++) << 8; - length |= ((uint8_t)*p++); - return length; + *compressed = ((c & 0x01) == 0x01); + *length = 0; + *length |= ((uint8_t)*p++) << 24; + *length |= ((uint8_t)*p++) << 16; + *length |= ((uint8_t)*p++) << 8; + *length |= ((uint8_t)*p++); } static bool header_has_authority(grpc_linked_mdelem *head) { @@ -788,7 +804,8 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) result = false; /* we haven't received headers yet. */ - else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA]) + else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] && + !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false; } else if (op_id == OP_SEND_MESSAGE) { /* already executed (note we're checking op specific state, not stream @@ -801,7 +818,8 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, /* already executed */ if (op_state->state_op_done[OP_RECV_MESSAGE]) result = false; /* we haven't received headers yet. */ - else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA]) + else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] && + !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false; } else if (op_id == OP_RECV_TRAILING_METADATA) { /* already executed */ @@ -955,12 +973,6 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_init(&write_slice_buffer); grpc_byte_stream_next(NULL, stream_op->send_message, &slice, stream_op->send_message->length, NULL); - /* Check that compression flag is OFF. We don't support compression yet. - */ - if (stream_op->send_message->flags != 0) { - gpr_log(GPR_ERROR, "Compression is not supported"); - GPR_ASSERT(stream_op->send_message->flags == 0); - } grpc_slice_buffer_add(&write_slice_buffer, slice); if (write_slice_buffer.count != 1) { /* Empty request not handled yet */ @@ -970,7 +982,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, if (write_slice_buffer.count > 0) { size_t write_buffer_size; create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer, - &write_buffer_size); + &write_buffer_size, stream_op->send_message->flags); CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs, stream_state->ws.write_buffer); stream_state->state_callback_received[OP_SEND_MESSAGE] = false; @@ -1022,6 +1034,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } else if (stream_state->state_callback_received[OP_FAILED]) { grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, GRPC_ERROR_NONE); + } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) { + grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, + GRPC_ERROR_NONE); } else { grpc_chttp2_incoming_metadata_buffer_publish( exec_ctx, &oas->s->state.rs.initial_metadata, @@ -1066,8 +1081,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->rs.remaining_bytes == 0) { /* Start a read operation for data */ stream_state->rs.length_field_received = true; - stream_state->rs.length_field = stream_state->rs.remaining_bytes = - parse_grpc_header((const uint8_t *)stream_state->rs.read_buffer); + parse_grpc_header((const uint8_t *)stream_state->rs.read_buffer, + &stream_state->rs.length_field, + &stream_state->rs.compressed); CRONET_LOG(GPR_DEBUG, "length field = %d", stream_state->rs.length_field); if (stream_state->rs.length_field > 0) { @@ -1089,6 +1105,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); grpc_slice_buffer_stream_init(&stream_state->rs.sbs, &stream_state->rs.read_slice_buffer, 0); + if (stream_state->rs.compressed) { + stream_state->rs.sbs.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS; + } *((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, @@ -1100,6 +1119,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; stream_state->rs.received_bytes = 0; + stream_state->rs.compressed = false; stream_state->rs.length_field_received = false; CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); stream_state->state_op_done[OP_READ_REQ_MADE] = @@ -1114,6 +1134,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; stream_state->rs.received_bytes = 0; + stream_state->rs.compressed = false; CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); stream_state->state_op_done[OP_READ_REQ_MADE] = true; /* Indicates that at least one read request has been made */ @@ -1137,6 +1158,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, read_data_slice); grpc_slice_buffer_stream_init(&stream_state->rs.sbs, &stream_state->rs.read_slice_buffer, 0); + if (stream_state->rs.compressed) { + stream_state->rs.sbs.base.flags = GRPC_WRITE_INTERNAL_COMPRESS; + } *((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs; grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, @@ -1146,6 +1170,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, /* Do an extra read to trigger on_succeeded() callback in case connection is closed */ stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; + stream_state->rs.compressed = false; stream_state->rs.received_bytes = 0; stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; stream_state->rs.length_field_received = false; diff --git a/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m b/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m index 1e0c8024ca..3b442645e8 100644 --- a/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m +++ b/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m @@ -273,8 +273,7 @@ static char *roots_filename; } - (void)testCompressedPayload { - // NOT SUPPORTED - // [self testIndividualCase:"compressed_payload"]; + [self testIndividualCase:"compressed_payload"]; } - (void)testConnectivity { |