aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2016-10-21 13:59:14 -0700
committerGravatar Muxi Yan <mxyan@google.com>2017-01-03 21:17:52 -0800
commit740ae63a8a816369b18b8ac9602a9acc7cc0b30d (patch)
tree2c7e62ebbd3d637208c8a2d5e8ead6dc6e7ae5b4 /src
parent6ff14349ff1c7c16eff295aab21e9c370eb52af7 (diff)
Packet coalescing transport layer and end2end test changes
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_api_dummy.c3
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c146
-rw-r--r--src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m4
-rw-r--r--src/objective-c/tests/Podfile1
-rw-r--r--src/objective-c/tests/Tests.xcodeproj/project.pbxproj1
5 files changed, 112 insertions, 43 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
index 687026c9fd..38755604b9 100644
--- a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
+++ b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
@@ -77,9 +77,8 @@ int cronet_bidirectional_stream_write(cronet_bidirectional_stream* stream,
return 0;
}
-int cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) {
+void cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) {
GPR_ASSERT(0);
- return 0;
}
#endif /* GRPC_COMPILE_WITH_CRONET */
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index afc59f4b12..4063dcaefc 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -86,7 +86,7 @@ enum e_op_id {
/* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
-static void on_request_headers_sent(cronet_bidirectional_stream *);
+static void on_stream_ready(cronet_bidirectional_stream *);
static void on_response_headers_received(
cronet_bidirectional_stream *,
const cronet_bidirectional_stream_header_array *, const char *);
@@ -99,7 +99,7 @@ static void on_succeeded(cronet_bidirectional_stream *);
static void on_failed(cronet_bidirectional_stream *, int);
static void on_canceled(cronet_bidirectional_stream *);
static cronet_bidirectional_stream_callback cronet_callbacks = {
- on_request_headers_sent,
+ on_stream_ready,
on_response_headers_received,
on_read_completed,
on_write_completed,
@@ -151,6 +151,11 @@ struct op_state {
bool state_callback_received[OP_NUM_OPS];
bool fail_state;
bool flush_read;
+#ifdef GRPC_CRONET_WITH_PACKET_COALESCING
+ bool flush_cronet_when_ready;
+ bool pending_write_for_trailer;
+#endif
+ bool unprocessed_send_message;
grpc_error *cancel_error;
/* data structure for storing data coming from server */
struct read_state rs;
@@ -273,6 +278,9 @@ static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) {
new_op->next = storage->head;
storage->head = new_op;
storage->num_pending_ops++;
+ if (op->send_message) {
+ s->state.unprocessed_send_message = true;
+ }
CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
storage->num_pending_ops);
gpr_mu_unlock(&s->mu);
@@ -405,8 +413,8 @@ static void on_succeeded(cronet_bidirectional_stream *stream) {
/*
Cronet callback
*/
-static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
- CRONET_LOG(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream);
+static void on_stream_ready(cronet_bidirectional_stream *stream) {
+ CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
@@ -416,6 +424,14 @@ static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
gpr_free(s->header_array.headers);
s->header_array.headers = NULL;
}
+/* Send the initial metadata on wire if there is no SEND_MESSAGE or
+ * SEND_TRAILING_METADATA ops pending */
+#ifdef GRPC_CRONET_WITH_PACKET_COALESCING
+ if (s->state.flush_cronet_when_ready) {
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
+ cronet_bidirectional_stream_flush(stream);
+ }
+#endif
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -551,6 +567,10 @@ static void on_response_trailers_received(
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", s->cbs);
s->state.state_callback_received[OP_SEND_MESSAGE] = false;
cronet_bidirectional_stream_write(s->cbs, "", 0, true);
+#ifdef GRPC_CRONET_WITH_PACKET_COALESCING
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
+ cronet_bidirectional_stream_flush(s->cbs);
+#endif
s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
gpr_mu_unlock(&s->mu);
@@ -598,7 +618,7 @@ static void convert_metadata_to_cronet_headers(
curr = curr->next;
num_headers_available++;
}
- /* Allocate enough memory. It is freed in the on_request_headers_sent callback
+ /* Allocate enough memory. It is freed in the on_stream_ready callback
*/
cronet_bidirectional_stream_header *headers =
(cronet_bidirectional_stream_header *)gpr_malloc(
@@ -740,12 +760,16 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op,
else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
result = false;
/* we haven't sent message yet */
- else if (curr_op->send_message &&
+ else if (stream_state->unprocessed_send_message &&
!stream_state->state_op_done[OP_SEND_MESSAGE])
result = false;
/* we haven't got on_write_completed for the send yet */
else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
- !stream_state->state_callback_received[OP_SEND_MESSAGE])
+ !stream_state->state_callback_received[OP_SEND_MESSAGE]
+#ifdef GRPC_CRONET_WITH_PACKET_COALESCING
+ && !stream_state->pending_write_for_trailer
+#endif
+ )
result = false;
} else if (op_id == OP_CANCEL_ERROR) {
/* already executed */
@@ -831,6 +855,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs,
&cronet_callbacks);
CRONET_LOG(GPR_DEBUG, "%p = cronet_bidirectional_stream_create()", s->cbs);
+#ifdef GRPC_CRONET_WITH_PACKET_COALESCING
+ cronet_bidirectional_stream_disable_auto_flush(s->cbs, true);
+ cronet_bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
+#endif
char *url = NULL;
const char *method = "POST";
s->header_array.headers = NULL;
@@ -843,30 +871,17 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
cronet_bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array,
false);
stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
- result = ACTION_TAKEN_WITH_CALLBACK;
- } else if (stream_op->recv_initial_metadata &&
- op_can_be_run(stream_op, stream_state, &oas->state,
- OP_RECV_INITIAL_METADATA)) {
- CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
- if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
- grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
- GRPC_ERROR_CANCELLED, NULL);
- } else if (stream_state->state_callback_received[OP_FAILED]) {
- grpc_exec_ctx_sched(
- exec_ctx, stream_op->recv_initial_metadata_ready,
- make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL);
- } else {
- grpc_chttp2_incoming_metadata_buffer_publish(
- &oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata);
- grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
- GRPC_ERROR_NONE, NULL);
+#ifdef GRPC_CRONET_WITH_PACKET_COALESCING
+ if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
+ s->state.flush_cronet_when_ready = true;
}
- stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
- result = ACTION_TAKEN_NO_CALLBACK;
+#endif
+ result = ACTION_TAKEN_WITH_CALLBACK;
} else if (stream_op->send_message &&
op_can_be_run(stream_op, stream_state, &oas->state,
OP_SEND_MESSAGE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas);
+ stream_state->unprocessed_send_message = false;
if (stream_state->state_callback_received[OP_FAILED]) {
result = NO_ACTION_POSSIBLE;
CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
@@ -897,13 +912,63 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
(int)write_buffer_size, false);
+#ifdef GRPC_CRONET_WITH_PACKET_COALESCING
+ if (!stream_op->send_trailing_metadata) {
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)",
+ s->cbs);
+ cronet_bidirectional_stream_flush(s->cbs);
+ result = ACTION_TAKEN_WITH_CALLBACK;
+ } else {
+ stream_state->pending_write_for_trailer = true;
+ result = ACTION_TAKEN_NO_CALLBACK;
+ }
+#else
result = ACTION_TAKEN_WITH_CALLBACK;
+#endif
} else {
result = NO_ACTION_POSSIBLE;
}
}
stream_state->state_op_done[OP_SEND_MESSAGE] = true;
oas->state.state_op_done[OP_SEND_MESSAGE] = true;
+ } else if (stream_op->send_trailing_metadata &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_SEND_TRAILING_METADATA)) {
+ CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas);
+ if (stream_state->state_callback_received[OP_FAILED]) {
+ result = NO_ACTION_POSSIBLE;
+ CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
+ } else {
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)",
+ s->cbs);
+ stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
+ cronet_bidirectional_stream_write(s->cbs, "", 0, true);
+#ifdef GRPC_CRONET_WITH_PACKET_COALESCING
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
+ cronet_bidirectional_stream_flush(s->cbs);
+#endif
+ result = ACTION_TAKEN_WITH_CALLBACK;
+ }
+ stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
+ } else if (stream_op->recv_initial_metadata &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_RECV_INITIAL_METADATA)) {
+ CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
+ if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
+ grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
+ GRPC_ERROR_CANCELLED, NULL);
+ } else if (stream_state->state_callback_received[OP_FAILED]) {
+ grpc_exec_ctx_sched(
+ exec_ctx, stream_op->recv_initial_metadata_ready,
+ make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL);
+ } else {
+ grpc_chttp2_incoming_metadata_buffer_publish(
+ &oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata);
+ grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
+ GRPC_ERROR_NONE, NULL);
+ }
+ stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
+ result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_op->recv_message &&
op_can_be_run(stream_op, stream_state, &oas->state,
OP_RECV_MESSAGE)) {
@@ -962,6 +1027,16 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_NONE, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
+
+ /* Extra read to trigger on_succeed */
+ stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
+ stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
+ stream_state->rs.received_bytes = 0;
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
+ stream_state->state_op_done[OP_READ_REQ_MADE] =
+ true; /* Indicates that at least one read request has been made */
+ cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
+ stream_state->rs.remaining_bytes);
result = ACTION_TAKEN_NO_CALLBACK;
}
} else if (stream_state->rs.remaining_bytes == 0) {
@@ -1020,21 +1095,6 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
}
stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK;
- } else if (stream_op->send_trailing_metadata &&
- op_can_be_run(stream_op, stream_state, &oas->state,
- OP_SEND_TRAILING_METADATA)) {
- CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas);
- if (stream_state->state_callback_received[OP_FAILED]) {
- result = NO_ACTION_POSSIBLE;
- CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
- } else {
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)",
- s->cbs);
- stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
- cronet_bidirectional_stream_write(s->cbs, "", 0, true);
- result = ACTION_TAKEN_WITH_CALLBACK;
- }
- stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
} else if (stream_op->cancel_error &&
op_can_be_run(stream_op, stream_state, &oas->state,
OP_CANCEL_ERROR)) {
@@ -1117,6 +1177,10 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
sizeof(s->state.state_callback_received));
s->state.fail_state = s->state.flush_read = false;
s->state.cancel_error = NULL;
+#ifdef GRPC_CRONET_WITH_PACKET_COALESCING
+ s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false;
+#endif
+ s->state.unprocessed_send_message = false;
gpr_mu_init(&s->mu);
return 0;
}
diff --git a/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m b/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m
index 4ba7badd86..fe9a7c2247 100644
--- a/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m
+++ b/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m
@@ -346,6 +346,10 @@ static char *roots_filename;
[self testIndividualCase:"no_op"];
}
+- (void)testPacketCoalescing {
+ [self testIndividualCase:"packet_coalescing"];
+}
+
- (void)testPayload {
[self testIndividualCase:"payload"];
}
diff --git a/src/objective-c/tests/Podfile b/src/objective-c/tests/Podfile
index 5785b976f2..d1ef0886fe 100644
--- a/src/objective-c/tests/Podfile
+++ b/src/objective-c/tests/Podfile
@@ -92,6 +92,7 @@ post_install do |installer|
# GPR_UNREACHABLE_CODE causes "Control may reach end of non-void
# function" warning
config.build_settings['GCC_WARN_ABOUT_RETURN_TYPE'] = 'NO'
+ config.build_settings['GCC_PREPROCESSOR_DEFINITIONS'] = '$(inherited) COCOAPODS=1 GRPC_CRONET_WITH_PACKET_COALESCING=1'
end
end
diff --git a/src/objective-c/tests/Tests.xcodeproj/project.pbxproj b/src/objective-c/tests/Tests.xcodeproj/project.pbxproj
index c4a6567ae0..8455e71b02 100644
--- a/src/objective-c/tests/Tests.xcodeproj/project.pbxproj
+++ b/src/objective-c/tests/Tests.xcodeproj/project.pbxproj
@@ -1296,6 +1296,7 @@
"$(inherited)",
"GPB_USE_PROTOBUF_FRAMEWORK_IMPORTS=1",
"GRPC_COMPILE_WITH_CRONET=1",
+ "GRPC_CRONET_WITH_PACKET_COALESCING=1",
);
INFOPLIST_FILE = InteropTestsRemoteWithCronet/Info.plist;
IPHONEOS_DEPLOYMENT_TARGET = 9.3;