aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-09-26 10:16:14 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-09-26 10:16:14 -0700
commit1f8d1d5afde35c05bb20f024c4af932a6199e362 (patch)
tree3db2fda17e6dff4fd32c756d26f2cb64d4c91d75
parentb635c61e04843261729c121343cdca38fd03b3a5 (diff)
Fixes
-rw-r--r--src/core/lib/iomgr/buffer_pool.c42
-rw-r--r--src/core/lib/iomgr/buffer_pool.h11
-rw-r--r--src/core/lib/iomgr/tcp_posix.c25
3 files changed, 50 insertions, 28 deletions
diff --git a/src/core/lib/iomgr/buffer_pool.c b/src/core/lib/iomgr/buffer_pool.c
index bd6559ad4b..837642f3bc 100644
--- a/src/core/lib/iomgr/buffer_pool.c
+++ b/src/core/lib/iomgr/buffer_pool.c
@@ -249,6 +249,7 @@ static void bu_slice_unref(void *p) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_buffer_user_free(&exec_ctx, rc->buffer_user, rc->size);
grpc_exec_ctx_finish(&exec_ctx);
+ gpr_free(rc);
}
}
@@ -332,16 +333,15 @@ static void bu_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) {
static void bu_allocated_slices(grpc_exec_ctx *exec_ctx, void *ts,
grpc_error *error) {
- grpc_buffer_user_slice_allocator *alloc_temp_storage = ts;
+ grpc_buffer_user_slice_allocator *slice_allocator = ts;
if (error == GRPC_ERROR_NONE) {
- for (size_t i = 0; i < alloc_temp_storage->count; i++) {
- gpr_slice_buffer_add(alloc_temp_storage->dest,
- bu_slice_create(alloc_temp_storage->buffer_user,
- alloc_temp_storage->length));
+ for (size_t i = 0; i < slice_allocator->count; i++) {
+ gpr_slice_buffer_add_indexed(slice_allocator->dest,
+ bu_slice_create(slice_allocator->buffer_user,
+ slice_allocator->length));
}
}
- grpc_closure_run(exec_ctx, alloc_temp_storage->on_done,
- GRPC_ERROR_REF(error));
+ grpc_closure_run(exec_ctx, &slice_allocator->on_done, GRPC_ERROR_REF(error));
}
typedef struct {
@@ -552,17 +552,21 @@ void grpc_buffer_user_finish_reclaimation(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_NONE, false);
}
+void grpc_buffer_user_slice_allocator_init(
+ grpc_buffer_user_slice_allocator *slice_allocator,
+ grpc_buffer_user *buffer_user, grpc_iomgr_cb_func cb, void *p) {
+ grpc_closure_init(&slice_allocator->on_allocated, bu_allocated_slices,
+ slice_allocator);
+ grpc_closure_init(&slice_allocator->on_done, cb, p);
+ slice_allocator->buffer_user = buffer_user;
+}
+
void grpc_buffer_user_alloc_slices(
- grpc_exec_ctx *exec_ctx, grpc_buffer_user *buffer_user,
- grpc_buffer_user_slice_allocator *alloc_temp_storage, size_t length,
- size_t count, gpr_slice_buffer *dest, grpc_closure *on_done) {
- grpc_closure_init(&alloc_temp_storage->on_allocated, bu_allocated_slices,
- alloc_temp_storage);
- alloc_temp_storage->on_done = on_done;
- alloc_temp_storage->length = length;
- alloc_temp_storage->count = count;
- alloc_temp_storage->dest = dest;
- alloc_temp_storage->buffer_user = buffer_user;
- grpc_buffer_user_alloc(exec_ctx, buffer_user, count * length,
- &alloc_temp_storage->on_allocated);
+ grpc_exec_ctx *exec_ctx, grpc_buffer_user_slice_allocator *slice_allocator,
+ size_t length, size_t count, gpr_slice_buffer *dest) {
+ slice_allocator->length = length;
+ slice_allocator->count = count;
+ slice_allocator->dest = dest;
+ grpc_buffer_user_alloc(exec_ctx, slice_allocator->buffer_user, count * length,
+ &slice_allocator->on_allocated);
}
diff --git a/src/core/lib/iomgr/buffer_pool.h b/src/core/lib/iomgr/buffer_pool.h
index 1ac9b62906..968454ec94 100644
--- a/src/core/lib/iomgr/buffer_pool.h
+++ b/src/core/lib/iomgr/buffer_pool.h
@@ -100,16 +100,19 @@ void grpc_buffer_user_finish_reclaimation(grpc_exec_ctx *exec_ctx,
typedef struct grpc_buffer_user_slice_allocator {
grpc_closure on_allocated;
- grpc_closure *on_done;
+ grpc_closure on_done;
size_t length;
size_t count;
gpr_slice_buffer *dest;
grpc_buffer_user *buffer_user;
} grpc_buffer_user_slice_allocator;
+void grpc_buffer_user_slice_allocator_init(
+ grpc_buffer_user_slice_allocator *slice_allocator,
+ grpc_buffer_user *buffer_user, grpc_iomgr_cb_func cb, void *p);
+
void grpc_buffer_user_alloc_slices(
- grpc_exec_ctx *exec_ctx, grpc_buffer_user *buffer_user,
- grpc_buffer_user_slice_allocator *alloc_temp_storage, size_t length,
- size_t count, gpr_slice_buffer *dest, grpc_closure *on_done);
+ grpc_exec_ctx *exec_ctx, grpc_buffer_user_slice_allocator *slice_allocator,
+ size_t length, size_t count, gpr_slice_buffer *dest);
#endif /* GRPC_CORE_LIB_IOMGR_BUFFER_POOL_H */
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index b4c53ee070..27a7f83b4d 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -102,6 +102,7 @@ typedef struct {
char *peer_string;
grpc_buffer_user buffer_user;
+ grpc_buffer_user_slice_allocator slice_allocator;
} grpc_tcp;
static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
@@ -189,7 +190,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
}
#define MAX_READ_IOVEC 4
-static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
+static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
struct msghdr msg;
struct iovec iov[MAX_READ_IOVEC];
ssize_t read_bytes;
@@ -200,10 +201,6 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC);
GPR_TIMER_BEGIN("tcp_continue_read", 0);
- while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
- gpr_slice_buffer_add_indexed(tcp->incoming_buffer,
- gpr_slice_malloc(tcp->slice_size));
- }
for (i = 0; i < tcp->incoming_buffer->count; i++) {
iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
@@ -260,6 +257,22 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
GPR_TIMER_END("tcp_continue_read", 0);
}
+static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcp,
+ grpc_error *error) {
+ tcp_do_read(exec_ctx, tcp);
+}
+
+static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
+ if (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
+ grpc_buffer_user_alloc_slices(
+ exec_ctx, &tcp->slice_allocator, tcp->slice_size,
+ (size_t)tcp->iov_size - tcp->incoming_buffer->count,
+ tcp->incoming_buffer);
+ } else {
+ tcp_do_read(exec_ctx, tcp);
+ }
+}
+
static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error) {
grpc_tcp *tcp = (grpc_tcp *)arg;
@@ -515,6 +528,8 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, grpc_buffer_pool *buffer_pool,
tcp->write_closure.cb_arg = tcp;
gpr_slice_buffer_init(&tcp->last_read_buffer);
grpc_buffer_user_init(&tcp->buffer_user, buffer_pool);
+ grpc_buffer_user_slice_allocator_init(
+ &tcp->slice_allocator, &tcp->buffer_user, tcp_read_allocation_done, tcp);
/* Tell network status tracker about new endpoint */
grpc_network_status_register_endpoint(&tcp->base);