diff options
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r-- | src/core/lib/iomgr/buffer_pool.c | 684 | ||||
-rw-r--r-- | src/core/lib/iomgr/buffer_pool.h | 128 | ||||
-rw-r--r-- | src/core/lib/iomgr/endpoint.c | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/endpoint.h | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/endpoint_pair.h | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/endpoint_pair_posix.c | 12 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client.h | 3 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_posix.c | 51 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_posix.h | 45 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.c | 78 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.h | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server.h | 3 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_posix.c | 23 |
13 files changed, 1013 insertions, 30 deletions
diff --git a/src/core/lib/iomgr/buffer_pool.c b/src/core/lib/iomgr/buffer_pool.c new file mode 100644 index 0000000000..8fbf75cbe4 --- /dev/null +++ b/src/core/lib/iomgr/buffer_pool.c @@ -0,0 +1,684 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/buffer_pool.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/useful.h> + +#include "src/core/lib/iomgr/combiner.h" + +int grpc_buffer_pool_trace = 0; + +typedef bool (*bpstate_func)(grpc_exec_ctx *exec_ctx, + grpc_buffer_pool *buffer_pool); + +typedef struct { + grpc_buffer_user *head; + grpc_buffer_user *tail; +} grpc_buffer_user_list; + +struct grpc_buffer_pool { + gpr_refcount refs; + + grpc_combiner *combiner; + int64_t size; + int64_t free_pool; + + bool step_scheduled; + bool reclaiming; + grpc_closure bpstep_closure; + grpc_closure bpreclaimation_done_closure; + + grpc_buffer_user *roots[GRPC_BULIST_COUNT]; + + char *name; +}; + +/******************************************************************************* + * list management + */ + +static void bulist_add_tail(grpc_buffer_user *buffer_user, grpc_bulist list) { + grpc_buffer_pool *buffer_pool = buffer_user->buffer_pool; + grpc_buffer_user **root = &buffer_pool->roots[list]; + if (*root == NULL) { + *root = buffer_user; + buffer_user->links[list].next = buffer_user->links[list].prev = buffer_user; + } else { + buffer_user->links[list].next = *root; + buffer_user->links[list].prev = (*root)->links[list].prev; + buffer_user->links[list].next->links[list].prev = + buffer_user->links[list].prev->links[list].next = buffer_user; + } +} + +static void bulist_add_head(grpc_buffer_user *buffer_user, grpc_bulist list) { + grpc_buffer_pool *buffer_pool = buffer_user->buffer_pool; + grpc_buffer_user **root = &buffer_pool->roots[list]; + if (*root == NULL) { + *root = buffer_user; + buffer_user->links[list].next = buffer_user->links[list].prev = buffer_user; + } else { + buffer_user->links[list].next = (*root)->links[list].next; + buffer_user->links[list].prev = *root; + buffer_user->links[list].next->links[list].prev = + buffer_user->links[list].prev->links[list].next = buffer_user; + *root = buffer_user; + } +} + +static bool bulist_empty(grpc_buffer_pool *buffer_pool, grpc_bulist list) { + return buffer_pool->roots[list] == NULL; +} + +static grpc_buffer_user *bulist_pop(grpc_buffer_pool *buffer_pool, + grpc_bulist list) { + grpc_buffer_user **root = &buffer_pool->roots[list]; + grpc_buffer_user *buffer_user = *root; + if (buffer_user == NULL) { + return NULL; + } + if (buffer_user->links[list].next == buffer_user) { + *root = NULL; + } else { + buffer_user->links[list].next->links[list].prev = + buffer_user->links[list].prev; + buffer_user->links[list].prev->links[list].next = + buffer_user->links[list].next; + *root = buffer_user->links[list].next; + } + buffer_user->links[list].next = buffer_user->links[list].prev = NULL; + return buffer_user; +} + +static void bulist_remove(grpc_buffer_user *buffer_user, grpc_bulist list) { + if (buffer_user->links[list].next == NULL) return; + grpc_buffer_pool *buffer_pool = buffer_user->buffer_pool; + if (buffer_pool->roots[list] == buffer_user) { + buffer_pool->roots[list] = buffer_user->links[list].next; + if (buffer_pool->roots[list] == buffer_user) { + buffer_pool->roots[list] = NULL; + } + } + buffer_user->links[list].next->links[list].prev = + buffer_user->links[list].prev; + buffer_user->links[list].prev->links[list].next = + buffer_user->links[list].next; +} + +/******************************************************************************* + * buffer pool state machine + */ + +static bool bpalloc(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool); +static bool bpscavenge(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool); +static bool bpreclaim(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool, + bool destructive); + +static void bpstep(grpc_exec_ctx *exec_ctx, void *bp, grpc_error *error) { + grpc_buffer_pool *buffer_pool = bp; + buffer_pool->step_scheduled = false; + do { + if (bpalloc(exec_ctx, buffer_pool)) goto done; + } while (bpscavenge(exec_ctx, buffer_pool)); + bpreclaim(exec_ctx, buffer_pool, false) || + bpreclaim(exec_ctx, buffer_pool, true); +done: + grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool); +} + +static void bpstep_sched(grpc_exec_ctx *exec_ctx, + grpc_buffer_pool *buffer_pool) { + if (buffer_pool->step_scheduled) return; + buffer_pool->step_scheduled = true; + grpc_buffer_pool_internal_ref(buffer_pool); + grpc_combiner_execute_finally(exec_ctx, buffer_pool->combiner, + &buffer_pool->bpstep_closure, GRPC_ERROR_NONE, + false); +} + +/* returns true if all allocations are completed */ +static bool bpalloc(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool) { + grpc_buffer_user *buffer_user; + while ((buffer_user = + bulist_pop(buffer_pool, GRPC_BULIST_AWAITING_ALLOCATION))) { + gpr_mu_lock(&buffer_user->mu); + if (buffer_user->free_pool < 0 && + -buffer_user->free_pool <= buffer_pool->free_pool) { + int64_t amt = -buffer_user->free_pool; + buffer_user->free_pool = 0; + buffer_pool->free_pool -= amt; + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: grant alloc %" PRId64 + " bytes; bp_free_pool -> %" PRId64, + buffer_pool->name, buffer_user->name, amt, + buffer_pool->free_pool); + } + } else if (grpc_buffer_pool_trace && buffer_user->free_pool >= 0) { + gpr_log(GPR_DEBUG, "BP %s %s: discard already satisfied alloc request", + buffer_pool->name, buffer_user->name); + } + if (buffer_user->free_pool >= 0) { + buffer_user->allocating = false; + grpc_exec_ctx_enqueue_list(exec_ctx, &buffer_user->on_allocated, NULL); + gpr_mu_unlock(&buffer_user->mu); + } else { + bulist_add_head(buffer_user, GRPC_BULIST_AWAITING_ALLOCATION); + gpr_mu_unlock(&buffer_user->mu); + return false; + } + } + return true; +} + +/* returns true if any memory could be reclaimed from buffers */ +static bool bpscavenge(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool) { + grpc_buffer_user *buffer_user; + while ((buffer_user = + bulist_pop(buffer_pool, GRPC_BULIST_NON_EMPTY_FREE_POOL))) { + gpr_mu_lock(&buffer_user->mu); + if (buffer_user->free_pool > 0) { + int64_t amt = buffer_user->free_pool; + buffer_user->free_pool = 0; + buffer_pool->free_pool += amt; + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: scavenge %" PRId64 + " bytes; bp_free_pool -> %" PRId64, + buffer_pool->name, buffer_user->name, amt, + buffer_pool->free_pool); + } + gpr_mu_unlock(&buffer_user->mu); + return true; + } else { + gpr_mu_unlock(&buffer_user->mu); + } + } + return false; +} + +/* returns true if reclaimation is proceeding */ +static bool bpreclaim(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool, + bool destructive) { + if (buffer_pool->reclaiming) return true; + grpc_bulist list = destructive ? GRPC_BULIST_RECLAIMER_DESTRUCTIVE + : GRPC_BULIST_RECLAIMER_BENIGN; + grpc_buffer_user *buffer_user = bulist_pop(buffer_pool, list); + if (buffer_user == NULL) return false; + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: initiate %s reclaimation", buffer_pool->name, + buffer_user->name, destructive ? "destructive" : "benign"); + } + buffer_pool->reclaiming = true; + grpc_buffer_pool_internal_ref(buffer_pool); + grpc_closure *c = buffer_user->reclaimers[destructive]; + buffer_user->reclaimers[destructive] = NULL; + grpc_closure_run(exec_ctx, c, GRPC_ERROR_NONE); + return true; +} + +/******************************************************************************* + * bu_slice: a slice implementation that is backed by a grpc_buffer_user + */ + +typedef struct { + gpr_slice_refcount base; + gpr_refcount refs; + grpc_buffer_user *buffer_user; + size_t size; +} bu_slice_refcount; + +static void bu_slice_ref(void *p) { + bu_slice_refcount *rc = p; + gpr_ref(&rc->refs); +} + +static void bu_slice_unref(void *p) { + bu_slice_refcount *rc = p; + if (gpr_unref(&rc->refs)) { + /* TODO(ctiller): this is dangerous, but I think safe for now: + we have no guarantee here that we're at a safe point for creating an + execution context, but we have no way of writing this code otherwise. + In the future: consider lifting gpr_slice to grpc, and offering an + internal_{ref,unref} pair that is execution context aware. Alternatively, + make exec_ctx be thread local and 'do the right thing' (whatever that is) + if NULL */ + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_buffer_user_free(&exec_ctx, rc->buffer_user, rc->size); + grpc_exec_ctx_finish(&exec_ctx); + gpr_free(rc); + } +} + +static gpr_slice bu_slice_create(grpc_buffer_user *buffer_user, size_t size) { + bu_slice_refcount *rc = gpr_malloc(sizeof(bu_slice_refcount) + size); + rc->base.ref = bu_slice_ref; + rc->base.unref = bu_slice_unref; + gpr_ref_init(&rc->refs, 1); + rc->buffer_user = buffer_user; + rc->size = size; + gpr_slice slice; + slice.refcount = &rc->base; + slice.data.refcounted.bytes = (uint8_t *)(rc + 1); + slice.data.refcounted.length = size; + return slice; +} + +/******************************************************************************* + * grpc_buffer_pool internal implementation + */ + +static void bu_allocate(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) { + grpc_buffer_user *buffer_user = bu; + if (bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_AWAITING_ALLOCATION)) { + bpstep_sched(exec_ctx, buffer_user->buffer_pool); + } + bulist_add_tail(buffer_user, GRPC_BULIST_AWAITING_ALLOCATION); +} + +static void bu_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *bu, + grpc_error *error) { + grpc_buffer_user *buffer_user = bu; + if (!bulist_empty(buffer_user->buffer_pool, + GRPC_BULIST_AWAITING_ALLOCATION) && + bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_NON_EMPTY_FREE_POOL)) { + bpstep_sched(exec_ctx, buffer_user->buffer_pool); + } + bulist_add_tail(buffer_user, GRPC_BULIST_NON_EMPTY_FREE_POOL); +} + +static void bu_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *bu, + grpc_error *error) { + grpc_buffer_user *buffer_user = bu; + if (!bulist_empty(buffer_user->buffer_pool, + GRPC_BULIST_AWAITING_ALLOCATION) && + bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_NON_EMPTY_FREE_POOL) && + bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_RECLAIMER_BENIGN)) { + bpstep_sched(exec_ctx, buffer_user->buffer_pool); + } + bulist_add_tail(buffer_user, GRPC_BULIST_RECLAIMER_BENIGN); +} + +static void bu_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *bu, + grpc_error *error) { + grpc_buffer_user *buffer_user = bu; + if (!bulist_empty(buffer_user->buffer_pool, + GRPC_BULIST_AWAITING_ALLOCATION) && + bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_NON_EMPTY_FREE_POOL) && + bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_RECLAIMER_BENIGN) && + bulist_empty(buffer_user->buffer_pool, + GRPC_BULIST_RECLAIMER_DESTRUCTIVE)) { + bpstep_sched(exec_ctx, buffer_user->buffer_pool); + } + bulist_add_tail(buffer_user, GRPC_BULIST_RECLAIMER_DESTRUCTIVE); +} + +static void bu_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) { + grpc_buffer_user *buffer_user = bu; + GPR_ASSERT(buffer_user->allocated == 0); + for (int i = 0; i < GRPC_BULIST_COUNT; i++) { + bulist_remove(buffer_user, (grpc_bulist)i); + } + grpc_exec_ctx_sched(exec_ctx, buffer_user->reclaimers[0], + GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, buffer_user->reclaimers[1], + GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, (grpc_closure *)gpr_atm_no_barrier_load( + &buffer_user->on_done_destroy_closure), + GRPC_ERROR_NONE, NULL); + if (buffer_user->free_pool != 0) { + buffer_user->buffer_pool->free_pool += buffer_user->free_pool; + bpstep_sched(exec_ctx, buffer_user->buffer_pool); + } +} + +static void bu_allocated_slices(grpc_exec_ctx *exec_ctx, void *ts, + grpc_error *error) { + grpc_buffer_user_slice_allocator *slice_allocator = ts; + if (error == GRPC_ERROR_NONE) { + for (size_t i = 0; i < slice_allocator->count; i++) { + gpr_slice_buffer_add_indexed(slice_allocator->dest, + bu_slice_create(slice_allocator->buffer_user, + slice_allocator->length)); + } + } + grpc_closure_run(exec_ctx, &slice_allocator->on_done, GRPC_ERROR_REF(error)); +} + +typedef struct { + int64_t size; + grpc_buffer_pool *buffer_pool; + grpc_closure closure; +} bp_resize_args; + +static void bp_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { + bp_resize_args *a = args; + int64_t delta = a->size - a->buffer_pool->size; + a->buffer_pool->size += delta; + a->buffer_pool->free_pool += delta; + if (delta < 0 && a->buffer_pool->free_pool < 0) { + bpstep_sched(exec_ctx, a->buffer_pool); + } else if (delta > 0 && + !bulist_empty(a->buffer_pool, GRPC_BULIST_AWAITING_ALLOCATION)) { + bpstep_sched(exec_ctx, a->buffer_pool); + } + grpc_buffer_pool_internal_unref(exec_ctx, a->buffer_pool); + gpr_free(a); +} + +static void bp_reclaimation_done(grpc_exec_ctx *exec_ctx, void *bp, + grpc_error *error) { + grpc_buffer_pool *buffer_pool = bp; + buffer_pool->reclaiming = false; + bpstep_sched(exec_ctx, buffer_pool); + grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool); +} + +/******************************************************************************* + * grpc_buffer_pool api + */ + +grpc_buffer_pool *grpc_buffer_pool_create(const char *name) { + grpc_buffer_pool *buffer_pool = gpr_malloc(sizeof(*buffer_pool)); + gpr_ref_init(&buffer_pool->refs, 1); + buffer_pool->combiner = grpc_combiner_create(NULL); + buffer_pool->free_pool = INT64_MAX; + buffer_pool->size = INT64_MAX; + buffer_pool->step_scheduled = false; + buffer_pool->reclaiming = false; + if (name != NULL) { + buffer_pool->name = gpr_strdup(name); + } else { + gpr_asprintf(&buffer_pool->name, "anonymous_pool_%" PRIxPTR, + (intptr_t)buffer_pool); + } + grpc_closure_init(&buffer_pool->bpstep_closure, bpstep, buffer_pool); + grpc_closure_init(&buffer_pool->bpreclaimation_done_closure, + bp_reclaimation_done, buffer_pool); + for (int i = 0; i < GRPC_BULIST_COUNT; i++) { + buffer_pool->roots[i] = NULL; + } + return buffer_pool; +} + +void grpc_buffer_pool_internal_unref(grpc_exec_ctx *exec_ctx, + grpc_buffer_pool *buffer_pool) { + if (gpr_unref(&buffer_pool->refs)) { + grpc_combiner_destroy(exec_ctx, buffer_pool->combiner); + gpr_free(buffer_pool->name); + gpr_free(buffer_pool); + } +} + +void grpc_buffer_pool_unref(grpc_buffer_pool *buffer_pool) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool); + grpc_exec_ctx_finish(&exec_ctx); +} + +grpc_buffer_pool *grpc_buffer_pool_internal_ref(grpc_buffer_pool *buffer_pool) { + gpr_ref(&buffer_pool->refs); + return buffer_pool; +} + +void grpc_buffer_pool_ref(grpc_buffer_pool *buffer_pool) { + grpc_buffer_pool_internal_ref(buffer_pool); +} + +void grpc_buffer_pool_resize(grpc_buffer_pool *buffer_pool, size_t size) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + bp_resize_args *a = gpr_malloc(sizeof(*a)); + a->buffer_pool = grpc_buffer_pool_internal_ref(buffer_pool); + a->size = (int64_t)size; + grpc_closure_init(&a->closure, bp_resize, a); + grpc_combiner_execute(&exec_ctx, buffer_pool->combiner, &a->closure, + GRPC_ERROR_NONE, false); + grpc_exec_ctx_finish(&exec_ctx); +} + +/******************************************************************************* + * grpc_buffer_user channel args api + */ + +grpc_buffer_pool *grpc_buffer_pool_from_channel_args( + const grpc_channel_args *channel_args) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_BUFFER_POOL)) { + if (channel_args->args[i].type == GRPC_ARG_POINTER) { + return grpc_buffer_pool_internal_ref( + channel_args->args[i].value.pointer.p); + } else { + gpr_log(GPR_DEBUG, GRPC_ARG_BUFFER_POOL " should be a pointer"); + } + } + } + return grpc_buffer_pool_create(NULL); +} + +static void *bp_copy(void *bp) { + grpc_buffer_pool_ref(bp); + return bp; +} + +static void bp_destroy(void *bp) { grpc_buffer_pool_unref(bp); } + +static int bp_cmp(void *a, void *b) { return GPR_ICMP(a, b); } + +const grpc_arg_pointer_vtable *grpc_buffer_pool_arg_vtable(void) { + static const grpc_arg_pointer_vtable vtable = {bp_copy, bp_destroy, bp_cmp}; + return &vtable; +} + +/******************************************************************************* + * grpc_buffer_user api + */ + +void grpc_buffer_user_init(grpc_buffer_user *buffer_user, + grpc_buffer_pool *buffer_pool, const char *name) { + buffer_user->buffer_pool = grpc_buffer_pool_internal_ref(buffer_pool); + grpc_closure_init(&buffer_user->allocate_closure, &bu_allocate, buffer_user); + grpc_closure_init(&buffer_user->add_to_free_pool_closure, + &bu_add_to_free_pool, buffer_user); + grpc_closure_init(&buffer_user->post_reclaimer_closure[0], + &bu_post_benign_reclaimer, buffer_user); + grpc_closure_init(&buffer_user->post_reclaimer_closure[1], + &bu_post_destructive_reclaimer, buffer_user); + grpc_closure_init(&buffer_user->destroy_closure, &bu_destroy, buffer_user); + gpr_mu_init(&buffer_user->mu); + buffer_user->allocated = 0; + buffer_user->free_pool = 0; + grpc_closure_list_init(&buffer_user->on_allocated); + buffer_user->allocating = false; + buffer_user->added_to_free_pool = false; + gpr_atm_no_barrier_store(&buffer_user->on_done_destroy_closure, 0); + buffer_user->reclaimers[0] = NULL; + buffer_user->reclaimers[1] = NULL; + for (int i = 0; i < GRPC_BULIST_COUNT; i++) { + buffer_user->links[i].next = buffer_user->links[i].prev = NULL; + } +#ifndef NDEBUG + buffer_user->asan_canary = gpr_malloc(1); +#endif + if (name != NULL) { + buffer_user->name = gpr_strdup(name); + } else { + gpr_asprintf(&buffer_user->name, "anonymous_buffer_user_%" PRIxPTR, + (intptr_t)buffer_user); + } +} + +void grpc_buffer_user_shutdown(grpc_exec_ctx *exec_ctx, + grpc_buffer_user *buffer_user, + grpc_closure *on_done) { + gpr_mu_lock(&buffer_user->mu); + GPR_ASSERT(gpr_atm_no_barrier_load(&buffer_user->on_done_destroy_closure) == + 0); + gpr_atm_no_barrier_store(&buffer_user->on_done_destroy_closure, + (gpr_atm)on_done); + if (buffer_user->allocated == 0) { + grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, + &buffer_user->destroy_closure, GRPC_ERROR_NONE, + false); + } + gpr_mu_unlock(&buffer_user->mu); +} + +void grpc_buffer_user_destroy(grpc_exec_ctx *exec_ctx, + grpc_buffer_user *buffer_user) { +#ifndef NDEBUG + gpr_free(buffer_user->asan_canary); +#endif + grpc_buffer_pool_internal_unref(exec_ctx, buffer_user->buffer_pool); + gpr_mu_destroy(&buffer_user->mu); + gpr_free(buffer_user->name); +} + +void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx, + grpc_buffer_user *buffer_user, size_t size, + grpc_closure *optional_on_done) { + gpr_mu_lock(&buffer_user->mu); + grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load( + &buffer_user->on_done_destroy_closure); + if (on_done_destroy != NULL) { + /* already shutdown */ + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR " after shutdown", + buffer_user->buffer_pool->name, buffer_user->name, size); + } + grpc_exec_ctx_sched( + exec_ctx, optional_on_done, + GRPC_ERROR_CREATE("Buffer pool user is already shutdown"), NULL); + gpr_mu_unlock(&buffer_user->mu); + return; + } + buffer_user->allocated += (int64_t)size; + buffer_user->free_pool -= (int64_t)size; + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR "; allocated -> %" PRId64 + ", free_pool -> %" PRId64, + buffer_user->buffer_pool->name, buffer_user->name, size, + buffer_user->allocated, buffer_user->free_pool); + } + if (buffer_user->free_pool < 0) { + grpc_closure_list_append(&buffer_user->on_allocated, optional_on_done, + GRPC_ERROR_NONE); + if (!buffer_user->allocating) { + buffer_user->allocating = true; + grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, + &buffer_user->allocate_closure, GRPC_ERROR_NONE, + false); + } + } else { + grpc_exec_ctx_sched(exec_ctx, optional_on_done, GRPC_ERROR_NONE, NULL); + } + gpr_mu_unlock(&buffer_user->mu); +} + +void grpc_buffer_user_free(grpc_exec_ctx *exec_ctx, + grpc_buffer_user *buffer_user, size_t size) { + gpr_mu_lock(&buffer_user->mu); + GPR_ASSERT(buffer_user->allocated >= (int64_t)size); + bool was_zero_or_negative = buffer_user->free_pool <= 0; + buffer_user->free_pool += (int64_t)size; + buffer_user->allocated -= (int64_t)size; + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: free %" PRIdPTR "; allocated -> %" PRId64 + ", free_pool -> %" PRId64, + buffer_user->buffer_pool->name, buffer_user->name, size, + buffer_user->allocated, buffer_user->free_pool); + } + bool is_bigger_than_zero = buffer_user->free_pool > 0; + if (is_bigger_than_zero && was_zero_or_negative && + !buffer_user->added_to_free_pool) { + buffer_user->added_to_free_pool = true; + grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, + &buffer_user->add_to_free_pool_closure, + GRPC_ERROR_NONE, false); + } + grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load( + &buffer_user->on_done_destroy_closure); + if (on_done_destroy != NULL && buffer_user->allocated == 0) { + grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, + &buffer_user->destroy_closure, GRPC_ERROR_NONE, + false); + } + gpr_mu_unlock(&buffer_user->mu); +} + +void grpc_buffer_user_post_reclaimer(grpc_exec_ctx *exec_ctx, + grpc_buffer_user *buffer_user, + bool destructive, grpc_closure *closure) { + if (gpr_atm_acq_load(&buffer_user->on_done_destroy_closure) == 0) { + GPR_ASSERT(buffer_user->reclaimers[destructive] == NULL); + buffer_user->reclaimers[destructive] = closure; + grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, + &buffer_user->post_reclaimer_closure[destructive], + GRPC_ERROR_NONE, false); + } else { + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL); + } +} + +void grpc_buffer_user_finish_reclaimation(grpc_exec_ctx *exec_ctx, + grpc_buffer_user *buffer_user) { + if (grpc_buffer_pool_trace) { + gpr_log(GPR_DEBUG, "BP %s %s: reclaimation complete", + buffer_user->buffer_pool->name, buffer_user->name); + } + grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner, + &buffer_user->buffer_pool->bpreclaimation_done_closure, + GRPC_ERROR_NONE, false); +} + +void grpc_buffer_user_slice_allocator_init( + grpc_buffer_user_slice_allocator *slice_allocator, + grpc_buffer_user *buffer_user, grpc_iomgr_cb_func cb, void *p) { + grpc_closure_init(&slice_allocator->on_allocated, bu_allocated_slices, + slice_allocator); + grpc_closure_init(&slice_allocator->on_done, cb, p); + slice_allocator->buffer_user = buffer_user; +} + +void grpc_buffer_user_alloc_slices( + grpc_exec_ctx *exec_ctx, grpc_buffer_user_slice_allocator *slice_allocator, + size_t length, size_t count, gpr_slice_buffer *dest) { + slice_allocator->length = length; + slice_allocator->count = count; + slice_allocator->dest = dest; + grpc_buffer_user_alloc(exec_ctx, slice_allocator->buffer_user, count * length, + &slice_allocator->on_allocated); +} diff --git a/src/core/lib/iomgr/buffer_pool.h b/src/core/lib/iomgr/buffer_pool.h new file mode 100644 index 0000000000..1564872b5d --- /dev/null +++ b/src/core/lib/iomgr/buffer_pool.h @@ -0,0 +1,128 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_BUFFER_POOL_H +#define GRPC_CORE_LIB_IOMGR_BUFFER_POOL_H + +#include <grpc/grpc.h> + +#include "src/core/lib/iomgr/exec_ctx.h" + +extern int grpc_buffer_pool_trace; + +grpc_buffer_pool *grpc_buffer_pool_internal_ref(grpc_buffer_pool *buffer_pool); +void grpc_buffer_pool_internal_unref(grpc_exec_ctx *exec_ctx, + grpc_buffer_pool *buffer_pool); +grpc_buffer_pool *grpc_buffer_pool_from_channel_args( + const grpc_channel_args *channel_args); + +typedef enum { + GRPC_BULIST_AWAITING_ALLOCATION, + GRPC_BULIST_NON_EMPTY_FREE_POOL, + GRPC_BULIST_RECLAIMER_BENIGN, + GRPC_BULIST_RECLAIMER_DESTRUCTIVE, + GRPC_BULIST_COUNT +} grpc_bulist; + +typedef struct grpc_buffer_user grpc_buffer_user; + +typedef struct { + grpc_buffer_user *next; + grpc_buffer_user *prev; +} grpc_buffer_user_link; + +struct grpc_buffer_user { + grpc_buffer_pool *buffer_pool; + + grpc_closure allocate_closure; + grpc_closure add_to_free_pool_closure; + +#ifndef NDEBUG + void *asan_canary; +#endif + + gpr_mu mu; + int64_t allocated; + int64_t free_pool; + grpc_closure_list on_allocated; + bool allocating; + bool added_to_free_pool; + + grpc_closure *reclaimers[2]; + grpc_closure post_reclaimer_closure[2]; + + grpc_closure destroy_closure; + gpr_atm on_done_destroy_closure; + + grpc_buffer_user_link links[GRPC_BULIST_COUNT]; + + char *name; +}; + +void grpc_buffer_user_init(grpc_buffer_user *buffer_user, + grpc_buffer_pool *buffer_pool, const char *name); +void grpc_buffer_user_shutdown(grpc_exec_ctx *exec_ctx, + grpc_buffer_user *buffer_user, + grpc_closure *on_done); +void grpc_buffer_user_destroy(grpc_exec_ctx *exec_ctx, + grpc_buffer_user *buffer_user); + +void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx, + grpc_buffer_user *buffer_user, size_t size, + grpc_closure *optional_on_done); +void grpc_buffer_user_free(grpc_exec_ctx *exec_ctx, + grpc_buffer_user *buffer_user, size_t size); +void grpc_buffer_user_post_reclaimer(grpc_exec_ctx *exec_ctx, + grpc_buffer_user *buffer_user, + bool destructive, grpc_closure *closure); +void grpc_buffer_user_finish_reclaimation(grpc_exec_ctx *exec_ctx, + grpc_buffer_user *buffer_user); + +typedef struct grpc_buffer_user_slice_allocator { + grpc_closure on_allocated; + grpc_closure on_done; + size_t length; + size_t count; + gpr_slice_buffer *dest; + grpc_buffer_user *buffer_user; +} grpc_buffer_user_slice_allocator; + +void grpc_buffer_user_slice_allocator_init( + grpc_buffer_user_slice_allocator *slice_allocator, + grpc_buffer_user *buffer_user, grpc_iomgr_cb_func cb, void *p); + +void grpc_buffer_user_alloc_slices( + grpc_exec_ctx *exec_ctx, grpc_buffer_user_slice_allocator *slice_allocator, + size_t length, size_t count, gpr_slice_buffer *dest); + +#endif /* GRPC_CORE_LIB_IOMGR_BUFFER_POOL_H */ diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c index f901fcf962..f3548a1d74 100644 --- a/src/core/lib/iomgr/endpoint.c +++ b/src/core/lib/iomgr/endpoint.c @@ -69,3 +69,7 @@ char* grpc_endpoint_get_peer(grpc_endpoint* ep) { grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) { return ep->vtable->get_workqueue(ep); } + +grpc_buffer_user* grpc_endpoint_get_buffer_user(grpc_endpoint* ep) { + return ep->vtable->get_buffer_user(ep); +} diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 910a6f6532..df6b899e39 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -37,6 +37,7 @@ #include <grpc/support/slice.h> #include <grpc/support/slice_buffer.h> #include <grpc/support/time.h> +#include "src/core/lib/iomgr/buffer_pool.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_set.h" @@ -58,6 +59,7 @@ struct grpc_endpoint_vtable { grpc_pollset_set *pollset); void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); + grpc_buffer_user *(*get_buffer_user)(grpc_endpoint *ep); char *(*get_peer)(grpc_endpoint *ep); }; @@ -100,6 +102,8 @@ void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset_set); +grpc_buffer_user *grpc_endpoint_get_buffer_user(grpc_endpoint *endpoint); + struct grpc_endpoint { const grpc_endpoint_vtable *vtable; }; diff --git a/src/core/lib/iomgr/endpoint_pair.h b/src/core/lib/iomgr/endpoint_pair.h index 5cd78051bd..4938cf8599 100644 --- a/src/core/lib/iomgr/endpoint_pair.h +++ b/src/core/lib/iomgr/endpoint_pair.h @@ -41,7 +41,7 @@ typedef struct { grpc_endpoint *server; } grpc_endpoint_pair; -grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, - size_t read_slice_size); +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( + const char *name, grpc_buffer_pool *buffer_pool, size_t read_slice_size); #endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_PAIR_H */ diff --git a/src/core/lib/iomgr/endpoint_pair_posix.c b/src/core/lib/iomgr/endpoint_pair_posix.c index e295fb4867..64c161675f 100644 --- a/src/core/lib/iomgr/endpoint_pair_posix.c +++ b/src/core/lib/iomgr/endpoint_pair_posix.c @@ -62,20 +62,20 @@ static void create_sockets(int sv[2]) { GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[1]) == GRPC_ERROR_NONE); } -grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, - size_t read_slice_size) { +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( + const char *name, grpc_buffer_pool *buffer_pool, size_t read_slice_size) { int sv[2]; grpc_endpoint_pair p; char *final_name; create_sockets(sv); gpr_asprintf(&final_name, "%s:client", name); - p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size, - "socketpair-server"); + p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), buffer_pool, + read_slice_size, "socketpair-server"); gpr_free(final_name); gpr_asprintf(&final_name, "%s:server", name); - p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size, - "socketpair-client"); + p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), buffer_pool, + read_slice_size, "socketpair-client"); gpr_free(final_name); return p; } diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h index a07e0b9f0c..b854e5aadc 100644 --- a/src/core/lib/iomgr/tcp_client.h +++ b/src/core/lib/iomgr/tcp_client.h @@ -39,6 +39,8 @@ #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/sockaddr.h" +#define GRPC_ARG_TCP_READ_CHUNK_SIZE "grpc.experimental.tcp_read_chunk_size" + /* Asynchronously connect to an address (specified as (addr, len)), and call cb with arg and the completed connection when done (or call cb with arg and NULL on failure). @@ -47,6 +49,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_connect, grpc_endpoint **endpoint, grpc_pollset_set *interested_parties, + const grpc_channel_args *channel_args, const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline); diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 3496b6094f..dadd4cc2eb 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -35,7 +35,7 @@ #ifdef GPR_POSIX_SOCKET -#include "src/core/lib/iomgr/tcp_client.h" +#include "src/core/lib/iomgr/tcp_client_posix.h" #include <errno.h> #include <netinet/in.h> @@ -47,6 +47,7 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_posix.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -69,6 +70,7 @@ typedef struct { char *addr_str; grpc_endpoint **ep; grpc_closure *closure; + grpc_channel_args *channel_args; } async_connect; static grpc_error *prepare_socket(const struct sockaddr *addr, int fd) { @@ -114,10 +116,38 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { if (done) { gpr_mu_destroy(&ac->mu); gpr_free(ac->addr_str); + grpc_channel_args_destroy(ac->channel_args); gpr_free(ac); } } +grpc_endpoint *grpc_tcp_client_create_from_fd( + grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args, + const char *addr_str) { + size_t tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE; + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(NULL); + if (channel_args != NULL) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) { + grpc_integer_options options = {(int)tcp_read_chunk_size, 1, + 8 * 1024 * 1024}; + tcp_read_chunk_size = (size_t)grpc_channel_arg_get_integer( + &channel_args->args[i], options); + } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_BUFFER_POOL)) { + grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool); + buffer_pool = grpc_buffer_pool_internal_ref( + channel_args->args[i].value.pointer.p); + } + } + } + + grpc_endpoint *ep = + grpc_tcp_create(fd, buffer_pool, tcp_read_chunk_size, addr_str); + grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool); + return ep; +} + static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { async_connect *ac = acp; int so_error = 0; @@ -165,7 +195,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { switch (so_error) { case 0: grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd); - *ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str); + *ep = grpc_tcp_client_create_from_fd(exec_ctx, fd, ac->channel_args, + ac->addr_str); fd = NULL; break; case ENOBUFS: @@ -215,6 +246,7 @@ finish: if (done) { gpr_mu_destroy(&ac->mu); gpr_free(ac->addr_str); + grpc_channel_args_destroy(ac->channel_args); gpr_free(ac); } grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); @@ -223,6 +255,7 @@ finish: static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, + const grpc_channel_args *channel_args, const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline) { int fd; @@ -271,7 +304,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, fdobj = grpc_fd_create(fd, name); if (err >= 0) { - *ep = grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str); + *ep = + grpc_tcp_client_create_from_fd(exec_ctx, fdobj, channel_args, addr_str); grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL); goto done; } @@ -296,6 +330,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, ac->refs = 2; ac->write_closure.cb = on_writable; ac->write_closure.cb_arg = ac; + ac->channel_args = grpc_channel_args_copy(channel_args); if (grpc_tcp_trace) { gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", @@ -317,16 +352,18 @@ done: // overridden by api_fuzzer.c void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, - grpc_pollset_set *interested_parties, const struct sockaddr *addr, - size_t addr_len, gpr_timespec deadline) = tcp_client_connect_impl; + grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, + const struct sockaddr *addr, size_t addr_len, + gpr_timespec deadline) = tcp_client_connect_impl; void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, + const grpc_channel_args *channel_args, const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline) { - grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, addr, - addr_len, deadline); + grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, + channel_args, addr, addr_len, deadline); } #endif diff --git a/src/core/lib/iomgr/tcp_client_posix.h b/src/core/lib/iomgr/tcp_client_posix.h new file mode 100644 index 0000000000..d8108b8359 --- /dev/null +++ b/src/core/lib/iomgr/tcp_client_posix.h @@ -0,0 +1,45 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H +#define GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H + +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/tcp_client.h" + +grpc_endpoint *grpc_tcp_client_create_from_fd( + grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args, + const char *addr_str); + +#endif diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 00fd77679a..648ca52818 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -80,6 +80,7 @@ typedef struct { msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */ size_t slice_size; gpr_refcount refcount; + gpr_atm shutdown_count; /* garbage after the last read */ gpr_slice_buffer last_read_buffer; @@ -100,15 +101,29 @@ typedef struct { grpc_closure write_closure; char *peer_string; + + grpc_buffer_user buffer_user; + grpc_buffer_user_slice_allocator slice_allocator; } grpc_tcp; static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, grpc_error *error); static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, grpc_error *error); +static void tcp_unref_closure(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, + grpc_error *error); + +static void tcp_maybe_shutdown_buffer_user(grpc_exec_ctx *exec_ctx, + grpc_tcp *tcp) { + if (gpr_atm_full_fetch_add(&tcp->shutdown_count, 1) == 0) { + grpc_buffer_user_shutdown(exec_ctx, &tcp->buffer_user, + grpc_closure_create(tcp_unref_closure, tcp)); + } +} static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; + tcp_maybe_shutdown_buffer_user(exec_ctx, tcp); grpc_fd_shutdown(exec_ctx, tcp->em_fd); } @@ -116,6 +131,7 @@ static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd, "tcp_unref_orphan"); gpr_slice_buffer_destroy(&tcp->last_read_buffer); + grpc_buffer_user_destroy(exec_ctx, &tcp->buffer_user); gpr_free(tcp->peer_string); gpr_free(tcp); } @@ -152,9 +168,16 @@ static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } #endif +static void tcp_unref_closure(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + TCP_UNREF(exec_ctx, arg, "buffer_user"); +} + static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp *tcp = (grpc_tcp *)ep; + tcp_maybe_shutdown_buffer_user(exec_ctx, tcp); + gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer); TCP_UNREF(exec_ctx, tcp, "destroy"); } @@ -181,7 +204,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, } #define MAX_READ_IOVEC 4 -static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { +static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { struct msghdr msg; struct iovec iov[MAX_READ_IOVEC]; ssize_t read_bytes; @@ -192,10 +215,6 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC); GPR_TIMER_BEGIN("tcp_continue_read", 0); - while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) { - gpr_slice_buffer_add_indexed(tcp->incoming_buffer, - gpr_slice_malloc(tcp->slice_size)); - } for (i = 0; i < tcp->incoming_buffer->count; i++) { iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]); iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]); @@ -232,7 +251,7 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { } else if (read_bytes == 0) { /* 0 read size ==> end of stream */ gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(exec_ctx, tcp, GRPC_ERROR_CREATE("EOF")); + call_read_cb(exec_ctx, tcp, GRPC_ERROR_CREATE("Socket closed")); TCP_UNREF(exec_ctx, tcp, "read"); } else { GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); @@ -252,6 +271,30 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { GPR_TIMER_END("tcp_continue_read", 0); } +static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp, + grpc_error *error) { + grpc_tcp *tcp = tcpp; + if (error != GRPC_ERROR_NONE) { + gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); + gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer); + call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error)); + TCP_UNREF(exec_ctx, tcp, "read"); + } else { + tcp_do_read(exec_ctx, tcp); + } +} + +static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + if (tcp->incoming_buffer->count < (size_t)tcp->iov_size) { + grpc_buffer_user_alloc_slices( + exec_ctx, &tcp->slice_allocator, tcp->slice_size, + (size_t)tcp->iov_size - tcp->incoming_buffer->count, + tcp->incoming_buffer); + } else { + tcp_do_read(exec_ctx, tcp); + } +} + static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, grpc_error *error) { grpc_tcp *tcp = (grpc_tcp *)arg; @@ -259,6 +302,7 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, if (error != GRPC_ERROR_NONE) { gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); + gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer); call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error)); TCP_UNREF(exec_ctx, tcp, "read"); } else { @@ -469,6 +513,11 @@ static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) { return grpc_fd_get_workqueue(tcp->em_fd); } +static grpc_buffer_user *tcp_get_buffer_user(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return &tcp->buffer_user; +} + static const grpc_endpoint_vtable vtable = {tcp_read, tcp_write, tcp_get_workqueue, @@ -476,10 +525,11 @@ static const grpc_endpoint_vtable vtable = {tcp_read, tcp_add_to_pollset_set, tcp_shutdown, tcp_destroy, + tcp_get_buffer_user, tcp_get_peer}; -grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, - const char *peer_string) { +grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, grpc_buffer_pool *buffer_pool, + size_t slice_size, const char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); tcp->base.vtable = &vtable; tcp->peer_string = gpr_strdup(peer_string); @@ -492,14 +542,19 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, tcp->slice_size = slice_size; tcp->iov_size = 1; tcp->finished_edge = true; - /* paired with unref in grpc_tcp_destroy */ - gpr_ref_init(&tcp->refcount, 1); + /* paired with unref in grpc_tcp_destroy, and with the shutdown for our + * buffer_user */ + gpr_ref_init(&tcp->refcount, 2); + gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); tcp->em_fd = em_fd; tcp->read_closure.cb = tcp_handle_read; tcp->read_closure.cb_arg = tcp; tcp->write_closure.cb = tcp_handle_write; tcp->write_closure.cb_arg = tcp; gpr_slice_buffer_init(&tcp->last_read_buffer); + grpc_buffer_user_init(&tcp->buffer_user, buffer_pool, peer_string); + grpc_buffer_user_slice_allocator_init( + &tcp->slice_allocator, &tcp->buffer_user, tcp_read_allocation_done, tcp); /* Tell network status tracker about new endpoint */ grpc_network_status_register_endpoint(&tcp->base); @@ -514,10 +569,13 @@ int grpc_tcp_fd(grpc_endpoint *ep) { void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, int *fd, grpc_closure *done) { + grpc_network_status_unregister_endpoint(ep); grpc_tcp *tcp = (grpc_tcp *)ep; GPR_ASSERT(ep->vtable == &vtable); tcp->release_fd = fd; tcp->release_fd_cb = done; + tcp_maybe_shutdown_buffer_user(exec_ctx, tcp); + gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer); TCP_UNREF(exec_ctx, tcp, "destroy"); } diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h index 99125836d6..768355cf0c 100644 --- a/src/core/lib/iomgr/tcp_posix.h +++ b/src/core/lib/iomgr/tcp_posix.h @@ -53,8 +53,8 @@ extern int grpc_tcp_trace; /* Create a tcp endpoint given a file desciptor and a read slice size. Takes ownership of fd. */ -grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size, - const char *peer_string); +grpc_endpoint *grpc_tcp_create(grpc_fd *fd, grpc_buffer_pool *buffer_pool, + size_t read_slice_size, const char *peer_string); /* Return the tcp endpoint's fd, or -1 if this is not available. Does not release the fd. diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h index 9a390699b4..2568d7d836 100644 --- a/src/core/lib/iomgr/tcp_server.h +++ b/src/core/lib/iomgr/tcp_server.h @@ -60,7 +60,8 @@ typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg, /* Create a server, initially not bound to any ports. The caller owns one ref. If shutdown_complete is not NULL, it will be used by grpc_tcp_server_unref() when the ref count reaches zero. */ -grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, +grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, + grpc_closure *shutdown_complete, const grpc_channel_args *args, grpc_tcp_server **server); diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 73df5477e6..2afce529ef 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -137,6 +137,8 @@ struct grpc_tcp_server { /* next pollset to assign a channel to */ gpr_atm next_pollset_to_assign; + + grpc_buffer_pool *buffer_pool; }; static gpr_once check_init = GPR_ONCE_INIT; @@ -153,23 +155,37 @@ static void init(void) { #endif } -grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, +grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, + grpc_closure *shutdown_complete, const grpc_channel_args *args, grpc_tcp_server **server) { gpr_once_init(&check_init, init); grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); s->so_reuseport = has_so_reuseport; + s->buffer_pool = grpc_buffer_pool_create(NULL); for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) { if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) { if (args->args[i].type == GRPC_ARG_INTEGER) { s->so_reuseport = has_so_reuseport && (args->args[i].value.integer != 0); } else { + grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool); gpr_free(s); return GRPC_ERROR_CREATE(GRPC_ARG_ALLOW_REUSEPORT " must be an integer"); } + } else if (0 == strcmp(GRPC_ARG_BUFFER_POOL, args->args[i].key)) { + if (args->args[i].type == GRPC_ARG_POINTER) { + grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool); + s->buffer_pool = + grpc_buffer_pool_internal_ref(args->args[i].value.pointer.p); + } else { + grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool); + gpr_free(s); + return GRPC_ERROR_CREATE(GRPC_ARG_BUFFER_POOL + " must be a pointer to a buffer pool"); + } } } gpr_ref_init(&s->refs, 1); @@ -206,6 +222,8 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { gpr_free(sp); } + grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool); + gpr_free(s); } @@ -422,7 +440,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { sp->server->on_accept_cb( exec_ctx, sp->server->on_accept_cb_arg, - grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), + grpc_tcp_create(fdobj, sp->server->buffer_pool, + GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), read_notifier_pollset, &acceptor); gpr_free(name); |