aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Noah Eisen <ncteisen@gmail.com>2017-11-20 10:08:07 -0800
committerGravatar GitHub <noreply@github.com>2017-11-20 10:08:07 -0800
commitad671ff561d34b30e6a857842add23b27e7ae7de (patch)
tree41931dbf06278b569ab3c6020f02e1368abea880
parent9addd2eef21a96e52135dc9369566d8452412763 (diff)
parentabbdbf93742ed46c7f7481816f0915c069357040 (diff)
Merge pull request #13391 from ncteisen/more-eager-free
Eagerly Free Slices in Case of Partial Write
-rw-r--r--src/core/lib/iomgr/tcp_posix.cc43
-rw-r--r--src/core/lib/slice/slice_internal.h3
-rw-r--r--test/core/end2end/fixtures/http_proxy_fixture.cc16
3 files changed, 44 insertions, 18 deletions
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index 6d9e044fa2..d09cfca9af 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -81,9 +81,7 @@ typedef struct {
grpc_slice_buffer* incoming_buffer;
grpc_slice_buffer* outgoing_buffer;
- /** slice within outgoing_buffer to write next */
- size_t outgoing_slice_idx;
- /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
+ /** byte within outgoing_buffer->slices[0] to write next */
size_t outgoing_byte_idx;
grpc_closure* read_cb;
@@ -532,23 +530,26 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
size_t unwind_slice_idx;
size_t unwind_byte_idx;
+ // We always start at zero, because we eagerly unref and trim the slice
+ // buffer as we write
+ size_t outgoing_slice_idx = 0;
+
for (;;) {
sending_length = 0;
- unwind_slice_idx = tcp->outgoing_slice_idx;
+ unwind_slice_idx = outgoing_slice_idx;
unwind_byte_idx = tcp->outgoing_byte_idx;
- for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
+ for (iov_size = 0; outgoing_slice_idx != tcp->outgoing_buffer->count &&
iov_size != MAX_WRITE_IOVEC;
iov_size++) {
iov[iov_size].iov_base =
GRPC_SLICE_START_PTR(
- tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) +
+ tcp->outgoing_buffer->slices[outgoing_slice_idx]) +
tcp->outgoing_byte_idx;
iov[iov_size].iov_len =
- GRPC_SLICE_LENGTH(
- tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) -
+ GRPC_SLICE_LENGTH(tcp->outgoing_buffer->slices[outgoing_slice_idx]) -
tcp->outgoing_byte_idx;
sending_length += iov[iov_size].iov_len;
- tcp->outgoing_slice_idx++;
+ outgoing_slice_idx++;
tcp->outgoing_byte_idx = 0;
}
GPR_ASSERT(iov_size > 0);
@@ -574,16 +575,25 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
if (sent_length < 0) {
if (errno == EAGAIN) {
- tcp->outgoing_slice_idx = unwind_slice_idx;
tcp->outgoing_byte_idx = unwind_byte_idx;
+ // unref all and forget about all slices that have been written to this
+ // point
+ for (size_t idx = 0; idx < unwind_slice_idx; ++idx) {
+ grpc_slice_unref_internal(
+ exec_ctx, grpc_slice_buffer_take_first(tcp->outgoing_buffer));
+ }
return false;
} else if (errno == EPIPE) {
*error = grpc_error_set_int(GRPC_OS_ERROR(errno, "sendmsg"),
GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+ tcp->outgoing_buffer);
return true;
} else {
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+ tcp->outgoing_buffer);
return true;
}
}
@@ -593,9 +603,9 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
while (trailing > 0) {
size_t slice_length;
- tcp->outgoing_slice_idx--;
- slice_length = GRPC_SLICE_LENGTH(
- tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]);
+ outgoing_slice_idx--;
+ slice_length =
+ GRPC_SLICE_LENGTH(tcp->outgoing_buffer->slices[outgoing_slice_idx]);
if (slice_length > trailing) {
tcp->outgoing_byte_idx = slice_length - trailing;
break;
@@ -604,11 +614,13 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
}
}
- if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
+ if (outgoing_slice_idx == tcp->outgoing_buffer->count) {
*error = GRPC_ERROR_NONE;
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+ tcp->outgoing_buffer);
return true;
}
- };
+ }
}
static void tcp_handle_write(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */,
@@ -672,7 +684,6 @@ static void tcp_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
return;
}
tcp->outgoing_buffer = buf;
- tcp->outgoing_slice_idx = 0;
tcp->outgoing_byte_idx = 0;
if (!tcp_flush(exec_ctx, tcp, &error)) {
diff --git a/src/core/lib/slice/slice_internal.h b/src/core/lib/slice/slice_internal.h
index 2439fc0826..10527dcdeb 100644
--- a/src/core/lib/slice/slice_internal.h
+++ b/src/core/lib/slice/slice_internal.h
@@ -32,6 +32,9 @@ grpc_slice grpc_slice_ref_internal(grpc_slice slice);
void grpc_slice_unref_internal(grpc_exec_ctx* exec_ctx, grpc_slice slice);
void grpc_slice_buffer_reset_and_unref_internal(grpc_exec_ctx* exec_ctx,
grpc_slice_buffer* sb);
+void grpc_slice_buffer_partial_unref_internal(grpc_exec_ctx* exec_ctx,
+ grpc_slice_buffer* sb,
+ size_t idx);
void grpc_slice_buffer_destroy_internal(grpc_exec_ctx* exec_ctx,
grpc_slice_buffer* sb);
diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc
index 0bdd15c57d..ac0c953a79 100644
--- a/test/core/end2end/fixtures/http_proxy_fixture.cc
+++ b/test/core/end2end/fixtures/http_proxy_fixture.cc
@@ -88,9 +88,11 @@ typedef struct proxy_connection {
grpc_slice_buffer client_read_buffer;
grpc_slice_buffer client_deferred_write_buffer;
+ bool client_is_writing;
grpc_slice_buffer client_write_buffer;
grpc_slice_buffer server_read_buffer;
grpc_slice_buffer server_deferred_write_buffer;
+ bool server_is_writing;
grpc_slice_buffer server_write_buffer;
grpc_http_parser http_parser;
@@ -148,6 +150,7 @@ static void proxy_connection_failed(grpc_exec_ctx* exec_ctx,
static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
proxy_connection* conn = (proxy_connection*)arg;
+ conn->client_is_writing = false;
if (error != GRPC_ERROR_NONE) {
proxy_connection_failed(exec_ctx, conn, true /* is_client */,
"HTTP proxy client write", error);
@@ -160,6 +163,7 @@ static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg,
if (conn->client_deferred_write_buffer.length > 0) {
grpc_slice_buffer_move_into(&conn->client_deferred_write_buffer,
&conn->client_write_buffer);
+ conn->client_is_writing = true;
grpc_endpoint_write(exec_ctx, conn->client_endpoint,
&conn->client_write_buffer,
&conn->on_client_write_done);
@@ -173,6 +177,7 @@ static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg,
static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
proxy_connection* conn = (proxy_connection*)arg;
+ conn->server_is_writing = false;
if (error != GRPC_ERROR_NONE) {
proxy_connection_failed(exec_ctx, conn, false /* is_client */,
"HTTP proxy server write", error);
@@ -185,6 +190,7 @@ static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg,
if (conn->server_deferred_write_buffer.length > 0) {
grpc_slice_buffer_move_into(&conn->server_deferred_write_buffer,
&conn->server_write_buffer);
+ conn->server_is_writing = true;
grpc_endpoint_write(exec_ctx, conn->server_endpoint,
&conn->server_write_buffer,
&conn->on_server_write_done);
@@ -210,13 +216,14 @@ static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg,
// the current write is finished.
//
// Otherwise, move the read data into the write buffer and write it.
- if (conn->server_write_buffer.length > 0) {
+ if (conn->server_is_writing) {
grpc_slice_buffer_move_into(&conn->client_read_buffer,
&conn->server_deferred_write_buffer);
} else {
grpc_slice_buffer_move_into(&conn->client_read_buffer,
&conn->server_write_buffer);
proxy_connection_ref(conn, "client_read");
+ conn->server_is_writing = true;
grpc_endpoint_write(exec_ctx, conn->server_endpoint,
&conn->server_write_buffer,
&conn->on_server_write_done);
@@ -242,13 +249,14 @@ static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg,
// the current write is finished.
//
// Otherwise, move the read data into the write buffer and write it.
- if (conn->client_write_buffer.length > 0) {
+ if (conn->client_is_writing) {
grpc_slice_buffer_move_into(&conn->server_read_buffer,
&conn->client_deferred_write_buffer);
} else {
grpc_slice_buffer_move_into(&conn->server_read_buffer,
&conn->client_write_buffer);
proxy_connection_ref(conn, "server_read");
+ conn->client_is_writing = true;
grpc_endpoint_write(exec_ctx, conn->client_endpoint,
&conn->client_write_buffer,
&conn->on_client_write_done);
@@ -262,6 +270,7 @@ static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg,
static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
proxy_connection* conn = (proxy_connection*)arg;
+ conn->client_is_writing = false;
if (error != GRPC_ERROR_NONE) {
proxy_connection_failed(exec_ctx, conn, true /* is_client */,
"HTTP proxy write response", error);
@@ -302,6 +311,7 @@ static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_slice slice =
grpc_slice_from_copied_string("HTTP/1.0 200 connected\r\n\r\n");
grpc_slice_buffer_add(&conn->client_write_buffer, slice);
+ conn->client_is_writing = true;
grpc_endpoint_write(exec_ctx, conn->client_endpoint,
&conn->client_write_buffer,
&conn->on_write_response_done);
@@ -450,9 +460,11 @@ static void on_accept(grpc_exec_ctx* exec_ctx, void* arg,
grpc_combiner_scheduler(conn->proxy->combiner));
grpc_slice_buffer_init(&conn->client_read_buffer);
grpc_slice_buffer_init(&conn->client_deferred_write_buffer);
+ conn->client_is_writing = false;
grpc_slice_buffer_init(&conn->client_write_buffer);
grpc_slice_buffer_init(&conn->server_read_buffer);
grpc_slice_buffer_init(&conn->server_deferred_write_buffer);
+ conn->server_is_writing = false;
grpc_slice_buffer_init(&conn->server_write_buffer);
grpc_http_parser_init(&conn->http_parser, GRPC_HTTP_REQUEST,
&conn->http_request);