diff options
author | Craig Tiller <ctiller@google.com> | 2016-09-26 10:16:14 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-09-26 10:16:14 -0700 |
commit | 1f8d1d5afde35c05bb20f024c4af932a6199e362 (patch) | |
tree | 3db2fda17e6dff4fd32c756d26f2cb64d4c91d75 | |
parent | b635c61e04843261729c121343cdca38fd03b3a5 (diff) |
Fixes
-rw-r--r-- | src/core/lib/iomgr/buffer_pool.c | 42 | ||||
-rw-r--r-- | src/core/lib/iomgr/buffer_pool.h | 11 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.c | 25 |
3 files changed, 50 insertions, 28 deletions
diff --git a/src/core/lib/iomgr/buffer_pool.c b/src/core/lib/iomgr/buffer_pool.c index bd6559ad4b..837642f3bc 100644 --- a/src/core/lib/iomgr/buffer_pool.c +++ b/src/core/lib/iomgr/buffer_pool.c @@ -249,6 +249,7 @@ static void bu_slice_unref(void *p) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_buffer_user_free(&exec_ctx, rc->buffer_user, rc->size); grpc_exec_ctx_finish(&exec_ctx); + gpr_free(rc); } } @@ -332,16 +333,15 @@ static void bu_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) { static void bu_allocated_slices(grpc_exec_ctx *exec_ctx, void *ts, grpc_error *error) { - grpc_buffer_user_slice_allocator *alloc_temp_storage = ts; + grpc_buffer_user_slice_allocator *slice_allocator = ts; if (error == GRPC_ERROR_NONE) { - for (size_t i = 0; i < alloc_temp_storage->count; i++) { - gpr_slice_buffer_add(alloc_temp_storage->dest, - bu_slice_create(alloc_temp_storage->buffer_user, - alloc_temp_storage->length)); + for (size_t i = 0; i < slice_allocator->count; i++) { + gpr_slice_buffer_add_indexed(slice_allocator->dest, + bu_slice_create(slice_allocator->buffer_user, + slice_allocator->length)); } } - grpc_closure_run(exec_ctx, alloc_temp_storage->on_done, - GRPC_ERROR_REF(error)); + grpc_closure_run(exec_ctx, &slice_allocator->on_done, GRPC_ERROR_REF(error)); } typedef struct { @@ -552,17 +552,21 @@ void grpc_buffer_user_finish_reclaimation(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE, false); } +void grpc_buffer_user_slice_allocator_init( + grpc_buffer_user_slice_allocator *slice_allocator, + grpc_buffer_user *buffer_user, grpc_iomgr_cb_func cb, void *p) { + grpc_closure_init(&slice_allocator->on_allocated, bu_allocated_slices, + slice_allocator); + grpc_closure_init(&slice_allocator->on_done, cb, p); + slice_allocator->buffer_user = buffer_user; +} + void grpc_buffer_user_alloc_slices( - grpc_exec_ctx *exec_ctx, grpc_buffer_user *buffer_user, - grpc_buffer_user_slice_allocator *alloc_temp_storage, size_t length, - size_t count, gpr_slice_buffer *dest, grpc_closure *on_done) { - grpc_closure_init(&alloc_temp_storage->on_allocated, bu_allocated_slices, - alloc_temp_storage); - alloc_temp_storage->on_done = on_done; - alloc_temp_storage->length = length; - alloc_temp_storage->count = count; - alloc_temp_storage->dest = dest; - alloc_temp_storage->buffer_user = buffer_user; - grpc_buffer_user_alloc(exec_ctx, buffer_user, count * length, - &alloc_temp_storage->on_allocated); + grpc_exec_ctx *exec_ctx, grpc_buffer_user_slice_allocator *slice_allocator, + size_t length, size_t count, gpr_slice_buffer *dest) { + slice_allocator->length = length; + slice_allocator->count = count; + slice_allocator->dest = dest; + grpc_buffer_user_alloc(exec_ctx, slice_allocator->buffer_user, count * length, + &slice_allocator->on_allocated); } diff --git a/src/core/lib/iomgr/buffer_pool.h b/src/core/lib/iomgr/buffer_pool.h index 1ac9b62906..968454ec94 100644 --- a/src/core/lib/iomgr/buffer_pool.h +++ b/src/core/lib/iomgr/buffer_pool.h @@ -100,16 +100,19 @@ void grpc_buffer_user_finish_reclaimation(grpc_exec_ctx *exec_ctx, typedef struct grpc_buffer_user_slice_allocator { grpc_closure on_allocated; - grpc_closure *on_done; + grpc_closure on_done; size_t length; size_t count; gpr_slice_buffer *dest; grpc_buffer_user *buffer_user; } grpc_buffer_user_slice_allocator; +void grpc_buffer_user_slice_allocator_init( + grpc_buffer_user_slice_allocator *slice_allocator, + grpc_buffer_user *buffer_user, grpc_iomgr_cb_func cb, void *p); + void grpc_buffer_user_alloc_slices( - grpc_exec_ctx *exec_ctx, grpc_buffer_user *buffer_user, - grpc_buffer_user_slice_allocator *alloc_temp_storage, size_t length, - size_t count, gpr_slice_buffer *dest, grpc_closure *on_done); + grpc_exec_ctx *exec_ctx, grpc_buffer_user_slice_allocator *slice_allocator, + size_t length, size_t count, gpr_slice_buffer *dest); #endif /* GRPC_CORE_LIB_IOMGR_BUFFER_POOL_H */ diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index b4c53ee070..27a7f83b4d 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -102,6 +102,7 @@ typedef struct { char *peer_string; grpc_buffer_user buffer_user; + grpc_buffer_user_slice_allocator slice_allocator; } grpc_tcp; static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, @@ -189,7 +190,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, } #define MAX_READ_IOVEC 4 -static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { +static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { struct msghdr msg; struct iovec iov[MAX_READ_IOVEC]; ssize_t read_bytes; @@ -200,10 +201,6 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC); GPR_TIMER_BEGIN("tcp_continue_read", 0); - while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) { - gpr_slice_buffer_add_indexed(tcp->incoming_buffer, - gpr_slice_malloc(tcp->slice_size)); - } for (i = 0; i < tcp->incoming_buffer->count; i++) { iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]); iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]); @@ -260,6 +257,22 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { GPR_TIMER_END("tcp_continue_read", 0); } +static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcp, + grpc_error *error) { + tcp_do_read(exec_ctx, tcp); +} + +static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + if (tcp->incoming_buffer->count < (size_t)tcp->iov_size) { + grpc_buffer_user_alloc_slices( + exec_ctx, &tcp->slice_allocator, tcp->slice_size, + (size_t)tcp->iov_size - tcp->incoming_buffer->count, + tcp->incoming_buffer); + } else { + tcp_do_read(exec_ctx, tcp); + } +} + static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, grpc_error *error) { grpc_tcp *tcp = (grpc_tcp *)arg; @@ -515,6 +528,8 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, grpc_buffer_pool *buffer_pool, tcp->write_closure.cb_arg = tcp; gpr_slice_buffer_init(&tcp->last_read_buffer); grpc_buffer_user_init(&tcp->buffer_user, buffer_pool); + grpc_buffer_user_slice_allocator_init( + &tcp->slice_allocator, &tcp->buffer_user, tcp_read_allocation_done, tcp); /* Tell network status tracker about new endpoint */ grpc_network_status_register_endpoint(&tcp->base); |