aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/buffer_pool.c684
-rw-r--r--src/core/lib/iomgr/buffer_pool.h128
-rw-r--r--src/core/lib/iomgr/endpoint.c4
-rw-r--r--src/core/lib/iomgr/endpoint.h4
-rw-r--r--src/core/lib/iomgr/endpoint_pair.h4
-rw-r--r--src/core/lib/iomgr/endpoint_pair_posix.c12
-rw-r--r--src/core/lib/iomgr/tcp_client.h3
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c51
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.h45
-rw-r--r--src/core/lib/iomgr/tcp_posix.c78
-rw-r--r--src/core/lib/iomgr/tcp_posix.h4
-rw-r--r--src/core/lib/iomgr/tcp_server.h3
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c23
13 files changed, 1013 insertions, 30 deletions
diff --git a/src/core/lib/iomgr/buffer_pool.c b/src/core/lib/iomgr/buffer_pool.c
new file mode 100644
index 0000000000..8fbf75cbe4
--- /dev/null
+++ b/src/core/lib/iomgr/buffer_pool.c
@@ -0,0 +1,684 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/buffer_pool.h"
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/lib/iomgr/combiner.h"
+
+int grpc_buffer_pool_trace = 0;
+
+typedef bool (*bpstate_func)(grpc_exec_ctx *exec_ctx,
+ grpc_buffer_pool *buffer_pool);
+
+typedef struct {
+ grpc_buffer_user *head;
+ grpc_buffer_user *tail;
+} grpc_buffer_user_list;
+
+struct grpc_buffer_pool {
+ gpr_refcount refs;
+
+ grpc_combiner *combiner;
+ int64_t size;
+ int64_t free_pool;
+
+ bool step_scheduled;
+ bool reclaiming;
+ grpc_closure bpstep_closure;
+ grpc_closure bpreclaimation_done_closure;
+
+ grpc_buffer_user *roots[GRPC_BULIST_COUNT];
+
+ char *name;
+};
+
+/*******************************************************************************
+ * list management
+ */
+
+static void bulist_add_tail(grpc_buffer_user *buffer_user, grpc_bulist list) {
+ grpc_buffer_pool *buffer_pool = buffer_user->buffer_pool;
+ grpc_buffer_user **root = &buffer_pool->roots[list];
+ if (*root == NULL) {
+ *root = buffer_user;
+ buffer_user->links[list].next = buffer_user->links[list].prev = buffer_user;
+ } else {
+ buffer_user->links[list].next = *root;
+ buffer_user->links[list].prev = (*root)->links[list].prev;
+ buffer_user->links[list].next->links[list].prev =
+ buffer_user->links[list].prev->links[list].next = buffer_user;
+ }
+}
+
+static void bulist_add_head(grpc_buffer_user *buffer_user, grpc_bulist list) {
+ grpc_buffer_pool *buffer_pool = buffer_user->buffer_pool;
+ grpc_buffer_user **root = &buffer_pool->roots[list];
+ if (*root == NULL) {
+ *root = buffer_user;
+ buffer_user->links[list].next = buffer_user->links[list].prev = buffer_user;
+ } else {
+ buffer_user->links[list].next = (*root)->links[list].next;
+ buffer_user->links[list].prev = *root;
+ buffer_user->links[list].next->links[list].prev =
+ buffer_user->links[list].prev->links[list].next = buffer_user;
+ *root = buffer_user;
+ }
+}
+
+static bool bulist_empty(grpc_buffer_pool *buffer_pool, grpc_bulist list) {
+ return buffer_pool->roots[list] == NULL;
+}
+
+static grpc_buffer_user *bulist_pop(grpc_buffer_pool *buffer_pool,
+ grpc_bulist list) {
+ grpc_buffer_user **root = &buffer_pool->roots[list];
+ grpc_buffer_user *buffer_user = *root;
+ if (buffer_user == NULL) {
+ return NULL;
+ }
+ if (buffer_user->links[list].next == buffer_user) {
+ *root = NULL;
+ } else {
+ buffer_user->links[list].next->links[list].prev =
+ buffer_user->links[list].prev;
+ buffer_user->links[list].prev->links[list].next =
+ buffer_user->links[list].next;
+ *root = buffer_user->links[list].next;
+ }
+ buffer_user->links[list].next = buffer_user->links[list].prev = NULL;
+ return buffer_user;
+}
+
+static void bulist_remove(grpc_buffer_user *buffer_user, grpc_bulist list) {
+ if (buffer_user->links[list].next == NULL) return;
+ grpc_buffer_pool *buffer_pool = buffer_user->buffer_pool;
+ if (buffer_pool->roots[list] == buffer_user) {
+ buffer_pool->roots[list] = buffer_user->links[list].next;
+ if (buffer_pool->roots[list] == buffer_user) {
+ buffer_pool->roots[list] = NULL;
+ }
+ }
+ buffer_user->links[list].next->links[list].prev =
+ buffer_user->links[list].prev;
+ buffer_user->links[list].prev->links[list].next =
+ buffer_user->links[list].next;
+}
+
+/*******************************************************************************
+ * buffer pool state machine
+ */
+
+static bool bpalloc(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool);
+static bool bpscavenge(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool);
+static bool bpreclaim(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool,
+ bool destructive);
+
+static void bpstep(grpc_exec_ctx *exec_ctx, void *bp, grpc_error *error) {
+ grpc_buffer_pool *buffer_pool = bp;
+ buffer_pool->step_scheduled = false;
+ do {
+ if (bpalloc(exec_ctx, buffer_pool)) goto done;
+ } while (bpscavenge(exec_ctx, buffer_pool));
+ bpreclaim(exec_ctx, buffer_pool, false) ||
+ bpreclaim(exec_ctx, buffer_pool, true);
+done:
+ grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool);
+}
+
+static void bpstep_sched(grpc_exec_ctx *exec_ctx,
+ grpc_buffer_pool *buffer_pool) {
+ if (buffer_pool->step_scheduled) return;
+ buffer_pool->step_scheduled = true;
+ grpc_buffer_pool_internal_ref(buffer_pool);
+ grpc_combiner_execute_finally(exec_ctx, buffer_pool->combiner,
+ &buffer_pool->bpstep_closure, GRPC_ERROR_NONE,
+ false);
+}
+
+/* returns true if all allocations are completed */
+static bool bpalloc(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool) {
+ grpc_buffer_user *buffer_user;
+ while ((buffer_user =
+ bulist_pop(buffer_pool, GRPC_BULIST_AWAITING_ALLOCATION))) {
+ gpr_mu_lock(&buffer_user->mu);
+ if (buffer_user->free_pool < 0 &&
+ -buffer_user->free_pool <= buffer_pool->free_pool) {
+ int64_t amt = -buffer_user->free_pool;
+ buffer_user->free_pool = 0;
+ buffer_pool->free_pool -= amt;
+ if (grpc_buffer_pool_trace) {
+ gpr_log(GPR_DEBUG, "BP %s %s: grant alloc %" PRId64
+ " bytes; bp_free_pool -> %" PRId64,
+ buffer_pool->name, buffer_user->name, amt,
+ buffer_pool->free_pool);
+ }
+ } else if (grpc_buffer_pool_trace && buffer_user->free_pool >= 0) {
+ gpr_log(GPR_DEBUG, "BP %s %s: discard already satisfied alloc request",
+ buffer_pool->name, buffer_user->name);
+ }
+ if (buffer_user->free_pool >= 0) {
+ buffer_user->allocating = false;
+ grpc_exec_ctx_enqueue_list(exec_ctx, &buffer_user->on_allocated, NULL);
+ gpr_mu_unlock(&buffer_user->mu);
+ } else {
+ bulist_add_head(buffer_user, GRPC_BULIST_AWAITING_ALLOCATION);
+ gpr_mu_unlock(&buffer_user->mu);
+ return false;
+ }
+ }
+ return true;
+}
+
+/* returns true if any memory could be reclaimed from buffers */
+static bool bpscavenge(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool) {
+ grpc_buffer_user *buffer_user;
+ while ((buffer_user =
+ bulist_pop(buffer_pool, GRPC_BULIST_NON_EMPTY_FREE_POOL))) {
+ gpr_mu_lock(&buffer_user->mu);
+ if (buffer_user->free_pool > 0) {
+ int64_t amt = buffer_user->free_pool;
+ buffer_user->free_pool = 0;
+ buffer_pool->free_pool += amt;
+ if (grpc_buffer_pool_trace) {
+ gpr_log(GPR_DEBUG, "BP %s %s: scavenge %" PRId64
+ " bytes; bp_free_pool -> %" PRId64,
+ buffer_pool->name, buffer_user->name, amt,
+ buffer_pool->free_pool);
+ }
+ gpr_mu_unlock(&buffer_user->mu);
+ return true;
+ } else {
+ gpr_mu_unlock(&buffer_user->mu);
+ }
+ }
+ return false;
+}
+
+/* returns true if reclaimation is proceeding */
+static bool bpreclaim(grpc_exec_ctx *exec_ctx, grpc_buffer_pool *buffer_pool,
+ bool destructive) {
+ if (buffer_pool->reclaiming) return true;
+ grpc_bulist list = destructive ? GRPC_BULIST_RECLAIMER_DESTRUCTIVE
+ : GRPC_BULIST_RECLAIMER_BENIGN;
+ grpc_buffer_user *buffer_user = bulist_pop(buffer_pool, list);
+ if (buffer_user == NULL) return false;
+ if (grpc_buffer_pool_trace) {
+ gpr_log(GPR_DEBUG, "BP %s %s: initiate %s reclaimation", buffer_pool->name,
+ buffer_user->name, destructive ? "destructive" : "benign");
+ }
+ buffer_pool->reclaiming = true;
+ grpc_buffer_pool_internal_ref(buffer_pool);
+ grpc_closure *c = buffer_user->reclaimers[destructive];
+ buffer_user->reclaimers[destructive] = NULL;
+ grpc_closure_run(exec_ctx, c, GRPC_ERROR_NONE);
+ return true;
+}
+
+/*******************************************************************************
+ * bu_slice: a slice implementation that is backed by a grpc_buffer_user
+ */
+
+typedef struct {
+ gpr_slice_refcount base;
+ gpr_refcount refs;
+ grpc_buffer_user *buffer_user;
+ size_t size;
+} bu_slice_refcount;
+
+static void bu_slice_ref(void *p) {
+ bu_slice_refcount *rc = p;
+ gpr_ref(&rc->refs);
+}
+
+static void bu_slice_unref(void *p) {
+ bu_slice_refcount *rc = p;
+ if (gpr_unref(&rc->refs)) {
+ /* TODO(ctiller): this is dangerous, but I think safe for now:
+ we have no guarantee here that we're at a safe point for creating an
+ execution context, but we have no way of writing this code otherwise.
+ In the future: consider lifting gpr_slice to grpc, and offering an
+ internal_{ref,unref} pair that is execution context aware. Alternatively,
+ make exec_ctx be thread local and 'do the right thing' (whatever that is)
+ if NULL */
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_buffer_user_free(&exec_ctx, rc->buffer_user, rc->size);
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_free(rc);
+ }
+}
+
+static gpr_slice bu_slice_create(grpc_buffer_user *buffer_user, size_t size) {
+ bu_slice_refcount *rc = gpr_malloc(sizeof(bu_slice_refcount) + size);
+ rc->base.ref = bu_slice_ref;
+ rc->base.unref = bu_slice_unref;
+ gpr_ref_init(&rc->refs, 1);
+ rc->buffer_user = buffer_user;
+ rc->size = size;
+ gpr_slice slice;
+ slice.refcount = &rc->base;
+ slice.data.refcounted.bytes = (uint8_t *)(rc + 1);
+ slice.data.refcounted.length = size;
+ return slice;
+}
+
+/*******************************************************************************
+ * grpc_buffer_pool internal implementation
+ */
+
+static void bu_allocate(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) {
+ grpc_buffer_user *buffer_user = bu;
+ if (bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_AWAITING_ALLOCATION)) {
+ bpstep_sched(exec_ctx, buffer_user->buffer_pool);
+ }
+ bulist_add_tail(buffer_user, GRPC_BULIST_AWAITING_ALLOCATION);
+}
+
+static void bu_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *bu,
+ grpc_error *error) {
+ grpc_buffer_user *buffer_user = bu;
+ if (!bulist_empty(buffer_user->buffer_pool,
+ GRPC_BULIST_AWAITING_ALLOCATION) &&
+ bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_NON_EMPTY_FREE_POOL)) {
+ bpstep_sched(exec_ctx, buffer_user->buffer_pool);
+ }
+ bulist_add_tail(buffer_user, GRPC_BULIST_NON_EMPTY_FREE_POOL);
+}
+
+static void bu_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *bu,
+ grpc_error *error) {
+ grpc_buffer_user *buffer_user = bu;
+ if (!bulist_empty(buffer_user->buffer_pool,
+ GRPC_BULIST_AWAITING_ALLOCATION) &&
+ bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_NON_EMPTY_FREE_POOL) &&
+ bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_RECLAIMER_BENIGN)) {
+ bpstep_sched(exec_ctx, buffer_user->buffer_pool);
+ }
+ bulist_add_tail(buffer_user, GRPC_BULIST_RECLAIMER_BENIGN);
+}
+
+static void bu_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *bu,
+ grpc_error *error) {
+ grpc_buffer_user *buffer_user = bu;
+ if (!bulist_empty(buffer_user->buffer_pool,
+ GRPC_BULIST_AWAITING_ALLOCATION) &&
+ bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_NON_EMPTY_FREE_POOL) &&
+ bulist_empty(buffer_user->buffer_pool, GRPC_BULIST_RECLAIMER_BENIGN) &&
+ bulist_empty(buffer_user->buffer_pool,
+ GRPC_BULIST_RECLAIMER_DESTRUCTIVE)) {
+ bpstep_sched(exec_ctx, buffer_user->buffer_pool);
+ }
+ bulist_add_tail(buffer_user, GRPC_BULIST_RECLAIMER_DESTRUCTIVE);
+}
+
+static void bu_destroy(grpc_exec_ctx *exec_ctx, void *bu, grpc_error *error) {
+ grpc_buffer_user *buffer_user = bu;
+ GPR_ASSERT(buffer_user->allocated == 0);
+ for (int i = 0; i < GRPC_BULIST_COUNT; i++) {
+ bulist_remove(buffer_user, (grpc_bulist)i);
+ }
+ grpc_exec_ctx_sched(exec_ctx, buffer_user->reclaimers[0],
+ GRPC_ERROR_CANCELLED, NULL);
+ grpc_exec_ctx_sched(exec_ctx, buffer_user->reclaimers[1],
+ GRPC_ERROR_CANCELLED, NULL);
+ grpc_exec_ctx_sched(exec_ctx, (grpc_closure *)gpr_atm_no_barrier_load(
+ &buffer_user->on_done_destroy_closure),
+ GRPC_ERROR_NONE, NULL);
+ if (buffer_user->free_pool != 0) {
+ buffer_user->buffer_pool->free_pool += buffer_user->free_pool;
+ bpstep_sched(exec_ctx, buffer_user->buffer_pool);
+ }
+}
+
+static void bu_allocated_slices(grpc_exec_ctx *exec_ctx, void *ts,
+ grpc_error *error) {
+ grpc_buffer_user_slice_allocator *slice_allocator = ts;
+ if (error == GRPC_ERROR_NONE) {
+ for (size_t i = 0; i < slice_allocator->count; i++) {
+ gpr_slice_buffer_add_indexed(slice_allocator->dest,
+ bu_slice_create(slice_allocator->buffer_user,
+ slice_allocator->length));
+ }
+ }
+ grpc_closure_run(exec_ctx, &slice_allocator->on_done, GRPC_ERROR_REF(error));
+}
+
+typedef struct {
+ int64_t size;
+ grpc_buffer_pool *buffer_pool;
+ grpc_closure closure;
+} bp_resize_args;
+
+static void bp_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
+ bp_resize_args *a = args;
+ int64_t delta = a->size - a->buffer_pool->size;
+ a->buffer_pool->size += delta;
+ a->buffer_pool->free_pool += delta;
+ if (delta < 0 && a->buffer_pool->free_pool < 0) {
+ bpstep_sched(exec_ctx, a->buffer_pool);
+ } else if (delta > 0 &&
+ !bulist_empty(a->buffer_pool, GRPC_BULIST_AWAITING_ALLOCATION)) {
+ bpstep_sched(exec_ctx, a->buffer_pool);
+ }
+ grpc_buffer_pool_internal_unref(exec_ctx, a->buffer_pool);
+ gpr_free(a);
+}
+
+static void bp_reclaimation_done(grpc_exec_ctx *exec_ctx, void *bp,
+ grpc_error *error) {
+ grpc_buffer_pool *buffer_pool = bp;
+ buffer_pool->reclaiming = false;
+ bpstep_sched(exec_ctx, buffer_pool);
+ grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool);
+}
+
+/*******************************************************************************
+ * grpc_buffer_pool api
+ */
+
+grpc_buffer_pool *grpc_buffer_pool_create(const char *name) {
+ grpc_buffer_pool *buffer_pool = gpr_malloc(sizeof(*buffer_pool));
+ gpr_ref_init(&buffer_pool->refs, 1);
+ buffer_pool->combiner = grpc_combiner_create(NULL);
+ buffer_pool->free_pool = INT64_MAX;
+ buffer_pool->size = INT64_MAX;
+ buffer_pool->step_scheduled = false;
+ buffer_pool->reclaiming = false;
+ if (name != NULL) {
+ buffer_pool->name = gpr_strdup(name);
+ } else {
+ gpr_asprintf(&buffer_pool->name, "anonymous_pool_%" PRIxPTR,
+ (intptr_t)buffer_pool);
+ }
+ grpc_closure_init(&buffer_pool->bpstep_closure, bpstep, buffer_pool);
+ grpc_closure_init(&buffer_pool->bpreclaimation_done_closure,
+ bp_reclaimation_done, buffer_pool);
+ for (int i = 0; i < GRPC_BULIST_COUNT; i++) {
+ buffer_pool->roots[i] = NULL;
+ }
+ return buffer_pool;
+}
+
+void grpc_buffer_pool_internal_unref(grpc_exec_ctx *exec_ctx,
+ grpc_buffer_pool *buffer_pool) {
+ if (gpr_unref(&buffer_pool->refs)) {
+ grpc_combiner_destroy(exec_ctx, buffer_pool->combiner);
+ gpr_free(buffer_pool->name);
+ gpr_free(buffer_pool);
+ }
+}
+
+void grpc_buffer_pool_unref(grpc_buffer_pool *buffer_pool) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+grpc_buffer_pool *grpc_buffer_pool_internal_ref(grpc_buffer_pool *buffer_pool) {
+ gpr_ref(&buffer_pool->refs);
+ return buffer_pool;
+}
+
+void grpc_buffer_pool_ref(grpc_buffer_pool *buffer_pool) {
+ grpc_buffer_pool_internal_ref(buffer_pool);
+}
+
+void grpc_buffer_pool_resize(grpc_buffer_pool *buffer_pool, size_t size) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ bp_resize_args *a = gpr_malloc(sizeof(*a));
+ a->buffer_pool = grpc_buffer_pool_internal_ref(buffer_pool);
+ a->size = (int64_t)size;
+ grpc_closure_init(&a->closure, bp_resize, a);
+ grpc_combiner_execute(&exec_ctx, buffer_pool->combiner, &a->closure,
+ GRPC_ERROR_NONE, false);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+/*******************************************************************************
+ * grpc_buffer_user channel args api
+ */
+
+grpc_buffer_pool *grpc_buffer_pool_from_channel_args(
+ const grpc_channel_args *channel_args) {
+ for (size_t i = 0; i < channel_args->num_args; i++) {
+ if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_BUFFER_POOL)) {
+ if (channel_args->args[i].type == GRPC_ARG_POINTER) {
+ return grpc_buffer_pool_internal_ref(
+ channel_args->args[i].value.pointer.p);
+ } else {
+ gpr_log(GPR_DEBUG, GRPC_ARG_BUFFER_POOL " should be a pointer");
+ }
+ }
+ }
+ return grpc_buffer_pool_create(NULL);
+}
+
+static void *bp_copy(void *bp) {
+ grpc_buffer_pool_ref(bp);
+ return bp;
+}
+
+static void bp_destroy(void *bp) { grpc_buffer_pool_unref(bp); }
+
+static int bp_cmp(void *a, void *b) { return GPR_ICMP(a, b); }
+
+const grpc_arg_pointer_vtable *grpc_buffer_pool_arg_vtable(void) {
+ static const grpc_arg_pointer_vtable vtable = {bp_copy, bp_destroy, bp_cmp};
+ return &vtable;
+}
+
+/*******************************************************************************
+ * grpc_buffer_user api
+ */
+
+void grpc_buffer_user_init(grpc_buffer_user *buffer_user,
+ grpc_buffer_pool *buffer_pool, const char *name) {
+ buffer_user->buffer_pool = grpc_buffer_pool_internal_ref(buffer_pool);
+ grpc_closure_init(&buffer_user->allocate_closure, &bu_allocate, buffer_user);
+ grpc_closure_init(&buffer_user->add_to_free_pool_closure,
+ &bu_add_to_free_pool, buffer_user);
+ grpc_closure_init(&buffer_user->post_reclaimer_closure[0],
+ &bu_post_benign_reclaimer, buffer_user);
+ grpc_closure_init(&buffer_user->post_reclaimer_closure[1],
+ &bu_post_destructive_reclaimer, buffer_user);
+ grpc_closure_init(&buffer_user->destroy_closure, &bu_destroy, buffer_user);
+ gpr_mu_init(&buffer_user->mu);
+ buffer_user->allocated = 0;
+ buffer_user->free_pool = 0;
+ grpc_closure_list_init(&buffer_user->on_allocated);
+ buffer_user->allocating = false;
+ buffer_user->added_to_free_pool = false;
+ gpr_atm_no_barrier_store(&buffer_user->on_done_destroy_closure, 0);
+ buffer_user->reclaimers[0] = NULL;
+ buffer_user->reclaimers[1] = NULL;
+ for (int i = 0; i < GRPC_BULIST_COUNT; i++) {
+ buffer_user->links[i].next = buffer_user->links[i].prev = NULL;
+ }
+#ifndef NDEBUG
+ buffer_user->asan_canary = gpr_malloc(1);
+#endif
+ if (name != NULL) {
+ buffer_user->name = gpr_strdup(name);
+ } else {
+ gpr_asprintf(&buffer_user->name, "anonymous_buffer_user_%" PRIxPTR,
+ (intptr_t)buffer_user);
+ }
+}
+
+void grpc_buffer_user_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_buffer_user *buffer_user,
+ grpc_closure *on_done) {
+ gpr_mu_lock(&buffer_user->mu);
+ GPR_ASSERT(gpr_atm_no_barrier_load(&buffer_user->on_done_destroy_closure) ==
+ 0);
+ gpr_atm_no_barrier_store(&buffer_user->on_done_destroy_closure,
+ (gpr_atm)on_done);
+ if (buffer_user->allocated == 0) {
+ grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner,
+ &buffer_user->destroy_closure, GRPC_ERROR_NONE,
+ false);
+ }
+ gpr_mu_unlock(&buffer_user->mu);
+}
+
+void grpc_buffer_user_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_buffer_user *buffer_user) {
+#ifndef NDEBUG
+ gpr_free(buffer_user->asan_canary);
+#endif
+ grpc_buffer_pool_internal_unref(exec_ctx, buffer_user->buffer_pool);
+ gpr_mu_destroy(&buffer_user->mu);
+ gpr_free(buffer_user->name);
+}
+
+void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx,
+ grpc_buffer_user *buffer_user, size_t size,
+ grpc_closure *optional_on_done) {
+ gpr_mu_lock(&buffer_user->mu);
+ grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load(
+ &buffer_user->on_done_destroy_closure);
+ if (on_done_destroy != NULL) {
+ /* already shutdown */
+ if (grpc_buffer_pool_trace) {
+ gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR " after shutdown",
+ buffer_user->buffer_pool->name, buffer_user->name, size);
+ }
+ grpc_exec_ctx_sched(
+ exec_ctx, optional_on_done,
+ GRPC_ERROR_CREATE("Buffer pool user is already shutdown"), NULL);
+ gpr_mu_unlock(&buffer_user->mu);
+ return;
+ }
+ buffer_user->allocated += (int64_t)size;
+ buffer_user->free_pool -= (int64_t)size;
+ if (grpc_buffer_pool_trace) {
+ gpr_log(GPR_DEBUG, "BP %s %s: alloc %" PRIdPTR "; allocated -> %" PRId64
+ ", free_pool -> %" PRId64,
+ buffer_user->buffer_pool->name, buffer_user->name, size,
+ buffer_user->allocated, buffer_user->free_pool);
+ }
+ if (buffer_user->free_pool < 0) {
+ grpc_closure_list_append(&buffer_user->on_allocated, optional_on_done,
+ GRPC_ERROR_NONE);
+ if (!buffer_user->allocating) {
+ buffer_user->allocating = true;
+ grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner,
+ &buffer_user->allocate_closure, GRPC_ERROR_NONE,
+ false);
+ }
+ } else {
+ grpc_exec_ctx_sched(exec_ctx, optional_on_done, GRPC_ERROR_NONE, NULL);
+ }
+ gpr_mu_unlock(&buffer_user->mu);
+}
+
+void grpc_buffer_user_free(grpc_exec_ctx *exec_ctx,
+ grpc_buffer_user *buffer_user, size_t size) {
+ gpr_mu_lock(&buffer_user->mu);
+ GPR_ASSERT(buffer_user->allocated >= (int64_t)size);
+ bool was_zero_or_negative = buffer_user->free_pool <= 0;
+ buffer_user->free_pool += (int64_t)size;
+ buffer_user->allocated -= (int64_t)size;
+ if (grpc_buffer_pool_trace) {
+ gpr_log(GPR_DEBUG, "BP %s %s: free %" PRIdPTR "; allocated -> %" PRId64
+ ", free_pool -> %" PRId64,
+ buffer_user->buffer_pool->name, buffer_user->name, size,
+ buffer_user->allocated, buffer_user->free_pool);
+ }
+ bool is_bigger_than_zero = buffer_user->free_pool > 0;
+ if (is_bigger_than_zero && was_zero_or_negative &&
+ !buffer_user->added_to_free_pool) {
+ buffer_user->added_to_free_pool = true;
+ grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner,
+ &buffer_user->add_to_free_pool_closure,
+ GRPC_ERROR_NONE, false);
+ }
+ grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load(
+ &buffer_user->on_done_destroy_closure);
+ if (on_done_destroy != NULL && buffer_user->allocated == 0) {
+ grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner,
+ &buffer_user->destroy_closure, GRPC_ERROR_NONE,
+ false);
+ }
+ gpr_mu_unlock(&buffer_user->mu);
+}
+
+void grpc_buffer_user_post_reclaimer(grpc_exec_ctx *exec_ctx,
+ grpc_buffer_user *buffer_user,
+ bool destructive, grpc_closure *closure) {
+ if (gpr_atm_acq_load(&buffer_user->on_done_destroy_closure) == 0) {
+ GPR_ASSERT(buffer_user->reclaimers[destructive] == NULL);
+ buffer_user->reclaimers[destructive] = closure;
+ grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner,
+ &buffer_user->post_reclaimer_closure[destructive],
+ GRPC_ERROR_NONE, false);
+ } else {
+ grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL);
+ }
+}
+
+void grpc_buffer_user_finish_reclaimation(grpc_exec_ctx *exec_ctx,
+ grpc_buffer_user *buffer_user) {
+ if (grpc_buffer_pool_trace) {
+ gpr_log(GPR_DEBUG, "BP %s %s: reclaimation complete",
+ buffer_user->buffer_pool->name, buffer_user->name);
+ }
+ grpc_combiner_execute(exec_ctx, buffer_user->buffer_pool->combiner,
+ &buffer_user->buffer_pool->bpreclaimation_done_closure,
+ GRPC_ERROR_NONE, false);
+}
+
+void grpc_buffer_user_slice_allocator_init(
+ grpc_buffer_user_slice_allocator *slice_allocator,
+ grpc_buffer_user *buffer_user, grpc_iomgr_cb_func cb, void *p) {
+ grpc_closure_init(&slice_allocator->on_allocated, bu_allocated_slices,
+ slice_allocator);
+ grpc_closure_init(&slice_allocator->on_done, cb, p);
+ slice_allocator->buffer_user = buffer_user;
+}
+
+void grpc_buffer_user_alloc_slices(
+ grpc_exec_ctx *exec_ctx, grpc_buffer_user_slice_allocator *slice_allocator,
+ size_t length, size_t count, gpr_slice_buffer *dest) {
+ slice_allocator->length = length;
+ slice_allocator->count = count;
+ slice_allocator->dest = dest;
+ grpc_buffer_user_alloc(exec_ctx, slice_allocator->buffer_user, count * length,
+ &slice_allocator->on_allocated);
+}
diff --git a/src/core/lib/iomgr/buffer_pool.h b/src/core/lib/iomgr/buffer_pool.h
new file mode 100644
index 0000000000..1564872b5d
--- /dev/null
+++ b/src/core/lib/iomgr/buffer_pool.h
@@ -0,0 +1,128 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_BUFFER_POOL_H
+#define GRPC_CORE_LIB_IOMGR_BUFFER_POOL_H
+
+#include <grpc/grpc.h>
+
+#include "src/core/lib/iomgr/exec_ctx.h"
+
+extern int grpc_buffer_pool_trace;
+
+grpc_buffer_pool *grpc_buffer_pool_internal_ref(grpc_buffer_pool *buffer_pool);
+void grpc_buffer_pool_internal_unref(grpc_exec_ctx *exec_ctx,
+ grpc_buffer_pool *buffer_pool);
+grpc_buffer_pool *grpc_buffer_pool_from_channel_args(
+ const grpc_channel_args *channel_args);
+
+typedef enum {
+ GRPC_BULIST_AWAITING_ALLOCATION,
+ GRPC_BULIST_NON_EMPTY_FREE_POOL,
+ GRPC_BULIST_RECLAIMER_BENIGN,
+ GRPC_BULIST_RECLAIMER_DESTRUCTIVE,
+ GRPC_BULIST_COUNT
+} grpc_bulist;
+
+typedef struct grpc_buffer_user grpc_buffer_user;
+
+typedef struct {
+ grpc_buffer_user *next;
+ grpc_buffer_user *prev;
+} grpc_buffer_user_link;
+
+struct grpc_buffer_user {
+ grpc_buffer_pool *buffer_pool;
+
+ grpc_closure allocate_closure;
+ grpc_closure add_to_free_pool_closure;
+
+#ifndef NDEBUG
+ void *asan_canary;
+#endif
+
+ gpr_mu mu;
+ int64_t allocated;
+ int64_t free_pool;
+ grpc_closure_list on_allocated;
+ bool allocating;
+ bool added_to_free_pool;
+
+ grpc_closure *reclaimers[2];
+ grpc_closure post_reclaimer_closure[2];
+
+ grpc_closure destroy_closure;
+ gpr_atm on_done_destroy_closure;
+
+ grpc_buffer_user_link links[GRPC_BULIST_COUNT];
+
+ char *name;
+};
+
+void grpc_buffer_user_init(grpc_buffer_user *buffer_user,
+ grpc_buffer_pool *buffer_pool, const char *name);
+void grpc_buffer_user_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_buffer_user *buffer_user,
+ grpc_closure *on_done);
+void grpc_buffer_user_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_buffer_user *buffer_user);
+
+void grpc_buffer_user_alloc(grpc_exec_ctx *exec_ctx,
+ grpc_buffer_user *buffer_user, size_t size,
+ grpc_closure *optional_on_done);
+void grpc_buffer_user_free(grpc_exec_ctx *exec_ctx,
+ grpc_buffer_user *buffer_user, size_t size);
+void grpc_buffer_user_post_reclaimer(grpc_exec_ctx *exec_ctx,
+ grpc_buffer_user *buffer_user,
+ bool destructive, grpc_closure *closure);
+void grpc_buffer_user_finish_reclaimation(grpc_exec_ctx *exec_ctx,
+ grpc_buffer_user *buffer_user);
+
+typedef struct grpc_buffer_user_slice_allocator {
+ grpc_closure on_allocated;
+ grpc_closure on_done;
+ size_t length;
+ size_t count;
+ gpr_slice_buffer *dest;
+ grpc_buffer_user *buffer_user;
+} grpc_buffer_user_slice_allocator;
+
+void grpc_buffer_user_slice_allocator_init(
+ grpc_buffer_user_slice_allocator *slice_allocator,
+ grpc_buffer_user *buffer_user, grpc_iomgr_cb_func cb, void *p);
+
+void grpc_buffer_user_alloc_slices(
+ grpc_exec_ctx *exec_ctx, grpc_buffer_user_slice_allocator *slice_allocator,
+ size_t length, size_t count, gpr_slice_buffer *dest);
+
+#endif /* GRPC_CORE_LIB_IOMGR_BUFFER_POOL_H */
diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c
index f901fcf962..f3548a1d74 100644
--- a/src/core/lib/iomgr/endpoint.c
+++ b/src/core/lib/iomgr/endpoint.c
@@ -69,3 +69,7 @@ char* grpc_endpoint_get_peer(grpc_endpoint* ep) {
grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) {
return ep->vtable->get_workqueue(ep);
}
+
+grpc_buffer_user* grpc_endpoint_get_buffer_user(grpc_endpoint* ep) {
+ return ep->vtable->get_buffer_user(ep);
+}
diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h
index 910a6f6532..df6b899e39 100644
--- a/src/core/lib/iomgr/endpoint.h
+++ b/src/core/lib/iomgr/endpoint.h
@@ -37,6 +37,7 @@
#include <grpc/support/slice.h>
#include <grpc/support/slice_buffer.h>
#include <grpc/support/time.h>
+#include "src/core/lib/iomgr/buffer_pool.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
@@ -58,6 +59,7 @@ struct grpc_endpoint_vtable {
grpc_pollset_set *pollset);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
+ grpc_buffer_user *(*get_buffer_user)(grpc_endpoint *ep);
char *(*get_peer)(grpc_endpoint *ep);
};
@@ -100,6 +102,8 @@ void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_endpoint *ep,
grpc_pollset_set *pollset_set);
+grpc_buffer_user *grpc_endpoint_get_buffer_user(grpc_endpoint *endpoint);
+
struct grpc_endpoint {
const grpc_endpoint_vtable *vtable;
};
diff --git a/src/core/lib/iomgr/endpoint_pair.h b/src/core/lib/iomgr/endpoint_pair.h
index 5cd78051bd..4938cf8599 100644
--- a/src/core/lib/iomgr/endpoint_pair.h
+++ b/src/core/lib/iomgr/endpoint_pair.h
@@ -41,7 +41,7 @@ typedef struct {
grpc_endpoint *server;
} grpc_endpoint_pair;
-grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
- size_t read_slice_size);
+grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(
+ const char *name, grpc_buffer_pool *buffer_pool, size_t read_slice_size);
#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_PAIR_H */
diff --git a/src/core/lib/iomgr/endpoint_pair_posix.c b/src/core/lib/iomgr/endpoint_pair_posix.c
index e295fb4867..64c161675f 100644
--- a/src/core/lib/iomgr/endpoint_pair_posix.c
+++ b/src/core/lib/iomgr/endpoint_pair_posix.c
@@ -62,20 +62,20 @@ static void create_sockets(int sv[2]) {
GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[1]) == GRPC_ERROR_NONE);
}
-grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
- size_t read_slice_size) {
+grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(
+ const char *name, grpc_buffer_pool *buffer_pool, size_t read_slice_size) {
int sv[2];
grpc_endpoint_pair p;
char *final_name;
create_sockets(sv);
gpr_asprintf(&final_name, "%s:client", name);
- p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size,
- "socketpair-server");
+ p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), buffer_pool,
+ read_slice_size, "socketpair-server");
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
- p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size,
- "socketpair-client");
+ p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), buffer_pool,
+ read_slice_size, "socketpair-client");
gpr_free(final_name);
return p;
}
diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h
index a07e0b9f0c..b854e5aadc 100644
--- a/src/core/lib/iomgr/tcp_client.h
+++ b/src/core/lib/iomgr/tcp_client.h
@@ -39,6 +39,8 @@
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/sockaddr.h"
+#define GRPC_ARG_TCP_READ_CHUNK_SIZE "grpc.experimental.tcp_read_chunk_size"
+
/* Asynchronously connect to an address (specified as (addr, len)), and call
cb with arg and the completed connection when done (or call cb with arg and
NULL on failure).
@@ -47,6 +49,7 @@
void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_connect,
grpc_endpoint **endpoint,
grpc_pollset_set *interested_parties,
+ const grpc_channel_args *channel_args,
const struct sockaddr *addr, size_t addr_len,
gpr_timespec deadline);
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index 3496b6094f..dadd4cc2eb 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -35,7 +35,7 @@
#ifdef GPR_POSIX_SOCKET
-#include "src/core/lib/iomgr/tcp_client.h"
+#include "src/core/lib/iomgr/tcp_client_posix.h"
#include <errno.h>
#include <netinet/in.h>
@@ -47,6 +47,7 @@
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_posix.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -69,6 +70,7 @@ typedef struct {
char *addr_str;
grpc_endpoint **ep;
grpc_closure *closure;
+ grpc_channel_args *channel_args;
} async_connect;
static grpc_error *prepare_socket(const struct sockaddr *addr, int fd) {
@@ -114,10 +116,38 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
if (done) {
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_str);
+ grpc_channel_args_destroy(ac->channel_args);
gpr_free(ac);
}
}
+grpc_endpoint *grpc_tcp_client_create_from_fd(
+ grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args,
+ const char *addr_str) {
+ size_t tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE;
+ grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(NULL);
+ if (channel_args != NULL) {
+ for (size_t i = 0; i < channel_args->num_args; i++) {
+ if (0 ==
+ strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) {
+ grpc_integer_options options = {(int)tcp_read_chunk_size, 1,
+ 8 * 1024 * 1024};
+ tcp_read_chunk_size = (size_t)grpc_channel_arg_get_integer(
+ &channel_args->args[i], options);
+ } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_BUFFER_POOL)) {
+ grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool);
+ buffer_pool = grpc_buffer_pool_internal_ref(
+ channel_args->args[i].value.pointer.p);
+ }
+ }
+ }
+
+ grpc_endpoint *ep =
+ grpc_tcp_create(fd, buffer_pool, tcp_read_chunk_size, addr_str);
+ grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool);
+ return ep;
+}
+
static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
async_connect *ac = acp;
int so_error = 0;
@@ -165,7 +195,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
switch (so_error) {
case 0:
grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd);
- *ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str);
+ *ep = grpc_tcp_client_create_from_fd(exec_ctx, fd, ac->channel_args,
+ ac->addr_str);
fd = NULL;
break;
case ENOBUFS:
@@ -215,6 +246,7 @@ finish:
if (done) {
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_str);
+ grpc_channel_args_destroy(ac->channel_args);
gpr_free(ac);
}
grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
@@ -223,6 +255,7 @@ finish:
static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
grpc_closure *closure, grpc_endpoint **ep,
grpc_pollset_set *interested_parties,
+ const grpc_channel_args *channel_args,
const struct sockaddr *addr,
size_t addr_len, gpr_timespec deadline) {
int fd;
@@ -271,7 +304,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
fdobj = grpc_fd_create(fd, name);
if (err >= 0) {
- *ep = grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str);
+ *ep =
+ grpc_tcp_client_create_from_fd(exec_ctx, fdobj, channel_args, addr_str);
grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
goto done;
}
@@ -296,6 +330,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
ac->refs = 2;
ac->write_closure.cb = on_writable;
ac->write_closure.cb_arg = ac;
+ ac->channel_args = grpc_channel_args_copy(channel_args);
if (grpc_tcp_trace) {
gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting",
@@ -317,16 +352,18 @@ done:
// overridden by api_fuzzer.c
void (*grpc_tcp_client_connect_impl)(
grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep,
- grpc_pollset_set *interested_parties, const struct sockaddr *addr,
- size_t addr_len, gpr_timespec deadline) = tcp_client_connect_impl;
+ grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args,
+ const struct sockaddr *addr, size_t addr_len,
+ gpr_timespec deadline) = tcp_client_connect_impl;
void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_endpoint **ep,
grpc_pollset_set *interested_parties,
+ const grpc_channel_args *channel_args,
const struct sockaddr *addr, size_t addr_len,
gpr_timespec deadline) {
- grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, addr,
- addr_len, deadline);
+ grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties,
+ channel_args, addr, addr_len, deadline);
}
#endif
diff --git a/src/core/lib/iomgr/tcp_client_posix.h b/src/core/lib/iomgr/tcp_client_posix.h
new file mode 100644
index 0000000000..d8108b8359
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_client_posix.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H
+#define GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H
+
+#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/tcp_client.h"
+
+grpc_endpoint *grpc_tcp_client_create_from_fd(
+ grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args,
+ const char *addr_str);
+
+#endif
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 00fd77679a..648ca52818 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -80,6 +80,7 @@ typedef struct {
msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */
size_t slice_size;
gpr_refcount refcount;
+ gpr_atm shutdown_count;
/* garbage after the last read */
gpr_slice_buffer last_read_buffer;
@@ -100,15 +101,29 @@ typedef struct {
grpc_closure write_closure;
char *peer_string;
+
+ grpc_buffer_user buffer_user;
+ grpc_buffer_user_slice_allocator slice_allocator;
} grpc_tcp;
static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error);
static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error);
+static void tcp_unref_closure(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
+ grpc_error *error);
+
+static void tcp_maybe_shutdown_buffer_user(grpc_exec_ctx *exec_ctx,
+ grpc_tcp *tcp) {
+ if (gpr_atm_full_fetch_add(&tcp->shutdown_count, 1) == 0) {
+ grpc_buffer_user_shutdown(exec_ctx, &tcp->buffer_user,
+ grpc_closure_create(tcp_unref_closure, tcp));
+ }
+}
static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
+ tcp_maybe_shutdown_buffer_user(exec_ctx, tcp);
grpc_fd_shutdown(exec_ctx, tcp->em_fd);
}
@@ -116,6 +131,7 @@ static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
"tcp_unref_orphan");
gpr_slice_buffer_destroy(&tcp->last_read_buffer);
+ grpc_buffer_user_destroy(exec_ctx, &tcp->buffer_user);
gpr_free(tcp->peer_string);
gpr_free(tcp);
}
@@ -152,9 +168,16 @@ static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
#endif
+static void tcp_unref_closure(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ TCP_UNREF(exec_ctx, arg, "buffer_user");
+}
+
static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep;
+ tcp_maybe_shutdown_buffer_user(exec_ctx, tcp);
+ gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
TCP_UNREF(exec_ctx, tcp, "destroy");
}
@@ -181,7 +204,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
}
#define MAX_READ_IOVEC 4
-static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
+static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
struct msghdr msg;
struct iovec iov[MAX_READ_IOVEC];
ssize_t read_bytes;
@@ -192,10 +215,6 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC);
GPR_TIMER_BEGIN("tcp_continue_read", 0);
- while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
- gpr_slice_buffer_add_indexed(tcp->incoming_buffer,
- gpr_slice_malloc(tcp->slice_size));
- }
for (i = 0; i < tcp->incoming_buffer->count; i++) {
iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
@@ -232,7 +251,7 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
} else if (read_bytes == 0) {
/* 0 read size ==> end of stream */
gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(exec_ctx, tcp, GRPC_ERROR_CREATE("EOF"));
+ call_read_cb(exec_ctx, tcp, GRPC_ERROR_CREATE("Socket closed"));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
@@ -252,6 +271,30 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
GPR_TIMER_END("tcp_continue_read", 0);
}
+static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp,
+ grpc_error *error) {
+ grpc_tcp *tcp = tcpp;
+ if (error != GRPC_ERROR_NONE) {
+ gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+ gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
+ call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error));
+ TCP_UNREF(exec_ctx, tcp, "read");
+ } else {
+ tcp_do_read(exec_ctx, tcp);
+ }
+}
+
+static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
+ if (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
+ grpc_buffer_user_alloc_slices(
+ exec_ctx, &tcp->slice_allocator, tcp->slice_size,
+ (size_t)tcp->iov_size - tcp->incoming_buffer->count,
+ tcp->incoming_buffer);
+ } else {
+ tcp_do_read(exec_ctx, tcp);
+ }
+}
+
static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error) {
grpc_tcp *tcp = (grpc_tcp *)arg;
@@ -259,6 +302,7 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
if (error != GRPC_ERROR_NONE) {
gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+ gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
@@ -469,6 +513,11 @@ static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) {
return grpc_fd_get_workqueue(tcp->em_fd);
}
+static grpc_buffer_user *tcp_get_buffer_user(grpc_endpoint *ep) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ return &tcp->buffer_user;
+}
+
static const grpc_endpoint_vtable vtable = {tcp_read,
tcp_write,
tcp_get_workqueue,
@@ -476,10 +525,11 @@ static const grpc_endpoint_vtable vtable = {tcp_read,
tcp_add_to_pollset_set,
tcp_shutdown,
tcp_destroy,
+ tcp_get_buffer_user,
tcp_get_peer};
-grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
- const char *peer_string) {
+grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, grpc_buffer_pool *buffer_pool,
+ size_t slice_size, const char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
tcp->peer_string = gpr_strdup(peer_string);
@@ -492,14 +542,19 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
tcp->slice_size = slice_size;
tcp->iov_size = 1;
tcp->finished_edge = true;
- /* paired with unref in grpc_tcp_destroy */
- gpr_ref_init(&tcp->refcount, 1);
+ /* paired with unref in grpc_tcp_destroy, and with the shutdown for our
+ * buffer_user */
+ gpr_ref_init(&tcp->refcount, 2);
+ gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
tcp->em_fd = em_fd;
tcp->read_closure.cb = tcp_handle_read;
tcp->read_closure.cb_arg = tcp;
tcp->write_closure.cb = tcp_handle_write;
tcp->write_closure.cb_arg = tcp;
gpr_slice_buffer_init(&tcp->last_read_buffer);
+ grpc_buffer_user_init(&tcp->buffer_user, buffer_pool, peer_string);
+ grpc_buffer_user_slice_allocator_init(
+ &tcp->slice_allocator, &tcp->buffer_user, tcp_read_allocation_done, tcp);
/* Tell network status tracker about new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
@@ -514,10 +569,13 @@ int grpc_tcp_fd(grpc_endpoint *ep) {
void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
int *fd, grpc_closure *done) {
+ grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep;
GPR_ASSERT(ep->vtable == &vtable);
tcp->release_fd = fd;
tcp->release_fd_cb = done;
+ tcp_maybe_shutdown_buffer_user(exec_ctx, tcp);
+ gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
TCP_UNREF(exec_ctx, tcp, "destroy");
}
diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h
index 99125836d6..768355cf0c 100644
--- a/src/core/lib/iomgr/tcp_posix.h
+++ b/src/core/lib/iomgr/tcp_posix.h
@@ -53,8 +53,8 @@ extern int grpc_tcp_trace;
/* Create a tcp endpoint given a file desciptor and a read slice size.
Takes ownership of fd. */
-grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size,
- const char *peer_string);
+grpc_endpoint *grpc_tcp_create(grpc_fd *fd, grpc_buffer_pool *buffer_pool,
+ size_t read_slice_size, const char *peer_string);
/* Return the tcp endpoint's fd, or -1 if this is not available. Does not
release the fd.
diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h
index 9a390699b4..2568d7d836 100644
--- a/src/core/lib/iomgr/tcp_server.h
+++ b/src/core/lib/iomgr/tcp_server.h
@@ -60,7 +60,8 @@ typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
/* Create a server, initially not bound to any ports. The caller owns one ref.
If shutdown_complete is not NULL, it will be used by
grpc_tcp_server_unref() when the ref count reaches zero. */
-grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
+grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
+ grpc_closure *shutdown_complete,
const grpc_channel_args *args,
grpc_tcp_server **server);
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 73df5477e6..2afce529ef 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -137,6 +137,8 @@ struct grpc_tcp_server {
/* next pollset to assign a channel to */
gpr_atm next_pollset_to_assign;
+
+ grpc_buffer_pool *buffer_pool;
};
static gpr_once check_init = GPR_ONCE_INIT;
@@ -153,23 +155,37 @@ static void init(void) {
#endif
}
-grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
+grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
+ grpc_closure *shutdown_complete,
const grpc_channel_args *args,
grpc_tcp_server **server) {
gpr_once_init(&check_init, init);
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
s->so_reuseport = has_so_reuseport;
+ s->buffer_pool = grpc_buffer_pool_create(NULL);
for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_INTEGER) {
s->so_reuseport =
has_so_reuseport && (args->args[i].value.integer != 0);
} else {
+ grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool);
gpr_free(s);
return GRPC_ERROR_CREATE(GRPC_ARG_ALLOW_REUSEPORT
" must be an integer");
}
+ } else if (0 == strcmp(GRPC_ARG_BUFFER_POOL, args->args[i].key)) {
+ if (args->args[i].type == GRPC_ARG_POINTER) {
+ grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool);
+ s->buffer_pool =
+ grpc_buffer_pool_internal_ref(args->args[i].value.pointer.p);
+ } else {
+ grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool);
+ gpr_free(s);
+ return GRPC_ERROR_CREATE(GRPC_ARG_BUFFER_POOL
+ " must be a pointer to a buffer pool");
+ }
}
}
gpr_ref_init(&s->refs, 1);
@@ -206,6 +222,8 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_free(sp);
}
+ grpc_buffer_pool_internal_unref(exec_ctx, s->buffer_pool);
+
gpr_free(s);
}
@@ -422,7 +440,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg,
- grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
+ grpc_tcp_create(fdobj, sp->server->buffer_pool,
+ GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
read_notifier_pollset, &acceptor);
gpr_free(name);