diff options
author | David Garcia Quintas <dgq@google.com> | 2016-11-14 17:55:13 -0800 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2016-11-14 17:55:13 -0800 |
commit | d03afbdeba47c15b00916ecf62e53192e1f8412b (patch) | |
tree | e41f441085bb8a983d7bddb4d3f99dbb55f40187 /src/core/lib/iomgr | |
parent | 5f50a1baaa92eb6b361a8e5f4cf0b3b31623380d (diff) | |
parent | b794a9687587bc5ede33e60aea140b3283dc8915 (diff) |
Merge branch 'master' of github.com:grpc/grpc into rr_fixall
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r-- | src/core/lib/iomgr/endpoint.c | 6 | ||||
-rw-r--r-- | src/core/lib/iomgr/endpoint.h | 17 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epoll_linux.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/load_file.c | 6 | ||||
-rw-r--r-- | src/core/lib/iomgr/load_file.h | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/resource_quota.c | 217 | ||||
-rw-r--r-- | src/core/lib/iomgr/resource_quota.h | 98 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_mutator.c | 98 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_mutator.h | 80 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_utils_common_posix.c | 9 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_utils_posix.h | 5 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client.h | 1 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_posix.c | 16 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_windows.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.c | 100 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_uv.c | 107 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_windows.c | 92 | ||||
-rw-r--r-- | src/core/lib/iomgr/wakeup_fd_pipe.c | 2 |
18 files changed, 518 insertions, 344 deletions
diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c index 74fa9c45df..2d300f4560 100644 --- a/src/core/lib/iomgr/endpoint.c +++ b/src/core/lib/iomgr/endpoint.c @@ -34,12 +34,12 @@ #include "src/core/lib/iomgr/endpoint.h" void grpc_endpoint_read(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - gpr_slice_buffer* slices, grpc_closure* cb) { + grpc_slice_buffer* slices, grpc_closure* cb) { ep->vtable->read(exec_ctx, ep, slices, cb); } void grpc_endpoint_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - gpr_slice_buffer* slices, grpc_closure* cb) { + grpc_slice_buffer* slices, grpc_closure* cb) { ep->vtable->write(exec_ctx, ep, slices, cb); } @@ -66,6 +66,8 @@ char* grpc_endpoint_get_peer(grpc_endpoint* ep) { return ep->vtable->get_peer(ep); } +int grpc_endpoint_get_fd(grpc_endpoint* ep) { return ep->vtable->get_fd(ep); } + grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) { return ep->vtable->get_workqueue(ep); } diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 0ac5486ff5..1609b64f2b 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -34,8 +34,8 @@ #ifndef GRPC_CORE_LIB_IOMGR_ENDPOINT_H #define GRPC_CORE_LIB_IOMGR_ENDPOINT_H -#include <grpc/support/slice.h> -#include <grpc/support/slice_buffer.h> +#include <grpc/slice.h> +#include <grpc/slice_buffer.h> #include <grpc/support/time.h> #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_set.h" @@ -49,9 +49,9 @@ typedef struct grpc_endpoint_vtable grpc_endpoint_vtable; struct grpc_endpoint_vtable { void (*read)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - gpr_slice_buffer *slices, grpc_closure *cb); + grpc_slice_buffer *slices, grpc_closure *cb); void (*write)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - gpr_slice_buffer *slices, grpc_closure *cb); + grpc_slice_buffer *slices, grpc_closure *cb); grpc_workqueue *(*get_workqueue)(grpc_endpoint *ep); void (*add_to_pollset)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset *pollset); @@ -61,6 +61,7 @@ struct grpc_endpoint_vtable { void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); grpc_resource_user *(*get_resource_user)(grpc_endpoint *ep); char *(*get_peer)(grpc_endpoint *ep); + int (*get_fd)(grpc_endpoint *ep); }; /* When data is available on the connection, calls the callback with slices. @@ -69,10 +70,14 @@ struct grpc_endpoint_vtable { Valid slices may be placed into \a slices even when the callback is invoked with error != GRPC_ERROR_NONE. */ void grpc_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - gpr_slice_buffer *slices, grpc_closure *cb); + grpc_slice_buffer *slices, grpc_closure *cb); char *grpc_endpoint_get_peer(grpc_endpoint *ep); +/* Get the file descriptor used by \a ep. Return -1 if \a ep is not using an fd. + */ +int grpc_endpoint_get_fd(grpc_endpoint *ep); + /* Retrieve a reference to the workqueue associated with this endpoint */ grpc_workqueue *grpc_endpoint_get_workqueue(grpc_endpoint *ep); @@ -87,7 +92,7 @@ grpc_workqueue *grpc_endpoint_get_workqueue(grpc_endpoint *ep); it is a valid slice buffer. */ void grpc_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - gpr_slice_buffer *slices, grpc_closure *cb); + grpc_slice_buffer *slices, grpc_closure *cb); /* Causes any pending and future read/write callbacks to run immediately with success==0 */ diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index db51ec4939..91041a7c28 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -163,7 +163,7 @@ static void fd_global_shutdown(void); #define PI_ADD_REF(p, r) pi_add_ref((p)) #define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p)) -#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */ +#endif /* !defined(GRPC_PI_REF_COUNT_DEBUG) */ /* This is also used as grpc_workqueue (by directly casing it) */ typedef struct polling_island { diff --git a/src/core/lib/iomgr/load_file.c b/src/core/lib/iomgr/load_file.c index b62ecbc534..217bc5da59 100644 --- a/src/core/lib/iomgr/load_file.c +++ b/src/core/lib/iomgr/load_file.c @@ -44,10 +44,10 @@ #include "src/core/lib/support/string.h" grpc_error *grpc_load_file(const char *filename, int add_null_terminator, - gpr_slice *output) { + grpc_slice *output) { unsigned char *contents = NULL; size_t contents_size = 0; - gpr_slice result = gpr_empty_slice(); + grpc_slice result = gpr_empty_slice(); FILE *file; size_t bytes_read = 0; grpc_error *error = GRPC_ERROR_NONE; @@ -72,7 +72,7 @@ grpc_error *grpc_load_file(const char *filename, int add_null_terminator, if (add_null_terminator) { contents[contents_size++] = 0; } - result = gpr_slice_new(contents, contents_size, gpr_free); + result = grpc_slice_new(contents, contents_size, gpr_free); end: *output = result; diff --git a/src/core/lib/iomgr/load_file.h b/src/core/lib/iomgr/load_file.h index 9aac2225d1..73ee8c3abf 100644 --- a/src/core/lib/iomgr/load_file.h +++ b/src/core/lib/iomgr/load_file.h @@ -36,7 +36,7 @@ #include <stdio.h> -#include <grpc/support/slice.h> +#include <grpc/slice.h> #include "src/core/lib/iomgr/error.h" @@ -47,7 +47,7 @@ extern "C" { /* Loads the content of a file into a slice. add_null_terminator will add a NULL terminator if non-zero. */ grpc_error *grpc_load_file(const char *filename, int add_null_terminator, - gpr_slice *slice); + grpc_slice *slice); #ifdef __cplusplus } diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index 8a06443d58..051a30baa3 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -44,6 +44,81 @@ int grpc_resource_quota_trace = 0; +/* Internal linked list pointers for a resource user */ +typedef struct { + grpc_resource_user *next; + grpc_resource_user *prev; +} grpc_resource_user_link; + +/* Resource users are kept in (potentially) several intrusive linked lists + at once. These are the list names. */ +typedef enum { + /* Resource users that are waiting for an allocation */ + GRPC_RULIST_AWAITING_ALLOCATION, + /* Resource users that have free memory available for internal reclamation */ + GRPC_RULIST_NON_EMPTY_FREE_POOL, + /* Resource users that have published a benign reclamation is available */ + GRPC_RULIST_RECLAIMER_BENIGN, + /* Resource users that have published a destructive reclamation is + available */ + GRPC_RULIST_RECLAIMER_DESTRUCTIVE, + /* Number of lists: must be last */ + GRPC_RULIST_COUNT +} grpc_rulist; + +struct grpc_resource_user { + /* The quota this resource user consumes from */ + grpc_resource_quota *resource_quota; + + /* Closure to schedule an allocation under the resource quota combiner lock */ + grpc_closure allocate_closure; + /* Closure to publish a non empty free pool under the resource quota combiner + lock */ + grpc_closure add_to_free_pool_closure; + + /* one ref for each ref call (released by grpc_resource_user_unref), and one + ref for each byte allocated (released by grpc_resource_user_free) */ + gpr_atm refs; + /* is this resource user unlocked? starts at 0, increases for each shutdown + call */ + gpr_atm shutdown; + + gpr_mu mu; + /* The amount of memory (in bytes) this user has cached for its own use: to + avoid quota contention, each resource user can keep some memory in + addition to what it is immediately using (e.g., for caching), and the quota + can pull it back under memory pressure. + This value can become negative if more memory has been requested than + existed in the free pool, at which point the quota is consulted to bring + this value non-negative (asynchronously). */ + int64_t free_pool; + /* A list of closures to call once free_pool becomes non-negative - ie when + all outstanding allocations have been granted. */ + grpc_closure_list on_allocated; + /* True if we are currently trying to allocate from the quota, false if not */ + bool allocating; + /* True if we are currently trying to add ourselves to the non-free quota + list, false otherwise */ + bool added_to_free_pool; + + /* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer + */ + grpc_closure *reclaimers[2]; + /* Trampoline closures to finish reclamation and re-enter the quota combiner + lock */ + grpc_closure post_reclaimer_closure[2]; + + /* Closure to execute under the quota combiner to de-register and shutdown the + resource user */ + grpc_closure destroy_closure; + + /* Links in the various grpc_rulist lists */ + grpc_resource_user_link links[GRPC_RULIST_COUNT]; + + /* The name of this resource user, for debugging/tracing */ + char *name; +}; + struct grpc_resource_quota { /* refcount */ gpr_refcount refs; @@ -272,7 +347,7 @@ static bool rq_reclaim(grpc_exec_ctx *exec_ctx, */ typedef struct { - gpr_slice_refcount base; + grpc_slice_refcount base; gpr_refcount refs; grpc_resource_user *resource_user; size_t size; @@ -289,7 +364,7 @@ static void ru_slice_unref(void *p) { /* TODO(ctiller): this is dangerous, but I think safe for now: we have no guarantee here that we're at a safe point for creating an execution context, but we have no way of writing this code otherwise. - In the future: consider lifting gpr_slice to grpc, and offering an + In the future: consider lifting grpc_slice to grpc, and offering an internal_{ref,unref} pair that is execution context aware. Alternatively, make exec_ctx be thread local and 'do the right thing' (whatever that @@ -302,15 +377,15 @@ static void ru_slice_unref(void *p) { } } -static gpr_slice ru_slice_create(grpc_resource_user *resource_user, - size_t size) { +static grpc_slice ru_slice_create(grpc_resource_user *resource_user, + size_t size) { ru_slice_refcount *rc = gpr_malloc(sizeof(ru_slice_refcount) + size); rc->base.ref = ru_slice_ref; rc->base.unref = ru_slice_unref; gpr_ref_init(&rc->refs, 1); rc->resource_user = resource_user; rc->size = size; - gpr_slice slice; + grpc_slice slice; slice.refcount = &rc->base; slice.data.refcounted.bytes = (uint8_t *)(rc + 1); slice.data.refcounted.length = size; @@ -373,9 +448,19 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE); } +static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { + grpc_resource_user *resource_user = ru; + grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[0], + GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[1], + GRPC_ERROR_CANCELLED, NULL); + resource_user->reclaimers[0] = NULL; + resource_user->reclaimers[1] = NULL; +} + static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { grpc_resource_user *resource_user = ru; - GPR_ASSERT(resource_user->allocated == 0); + GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->refs) == 0); for (int i = 0; i < GRPC_RULIST_COUNT; i++) { rulist_remove(resource_user, (grpc_rulist)i); } @@ -383,13 +468,14 @@ static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { GRPC_ERROR_CANCELLED, NULL); grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[1], GRPC_ERROR_CANCELLED, NULL); - grpc_exec_ctx_sched(exec_ctx, (grpc_closure *)gpr_atm_no_barrier_load( - &resource_user->on_done_destroy_closure), - GRPC_ERROR_NONE, NULL); if (resource_user->free_pool != 0) { resource_user->resource_quota->free_pool += resource_user->free_pool; rq_step_sched(exec_ctx, resource_user->resource_quota); } + grpc_resource_quota_internal_unref(exec_ctx, resource_user->resource_quota); + gpr_mu_destroy(&resource_user->mu); + gpr_free(resource_user->name); + gpr_free(resource_user); } static void ru_allocated_slices(grpc_exec_ctx *exec_ctx, void *arg, @@ -397,7 +483,7 @@ static void ru_allocated_slices(grpc_exec_ctx *exec_ctx, void *arg, grpc_resource_user_slice_allocator *slice_allocator = arg; if (error == GRPC_ERROR_NONE) { for (size_t i = 0; i < slice_allocator->count; i++) { - gpr_slice_buffer_add_indexed( + grpc_slice_buffer_add_indexed( slice_allocator->dest, ru_slice_create(slice_allocator->resource_user, slice_allocator->length)); } @@ -539,9 +625,9 @@ const grpc_arg_pointer_vtable *grpc_resource_quota_arg_vtable(void) { * grpc_resource_user api */ -void grpc_resource_user_init(grpc_resource_user *resource_user, - grpc_resource_quota *resource_quota, - const char *name) { +grpc_resource_user *grpc_resource_user_create( + grpc_resource_quota *resource_quota, const char *name) { + grpc_resource_user *resource_user = gpr_malloc(sizeof(*resource_user)); resource_user->resource_quota = grpc_resource_quota_internal_ref(resource_quota); grpc_closure_init(&resource_user->allocate_closure, &ru_allocate, @@ -555,12 +641,12 @@ void grpc_resource_user_init(grpc_resource_user *resource_user, grpc_closure_init(&resource_user->destroy_closure, &ru_destroy, resource_user); gpr_mu_init(&resource_user->mu); - resource_user->allocated = 0; + gpr_atm_rel_store(&resource_user->refs, 1); + gpr_atm_rel_store(&resource_user->shutdown, 0); resource_user->free_pool = 0; grpc_closure_list_init(&resource_user->on_allocated); resource_user->allocating = false; resource_user->added_to_free_pool = false; - gpr_atm_no_barrier_store(&resource_user->on_done_destroy_closure, 0); resource_user->reclaimers[0] = NULL; resource_user->reclaimers[1] = NULL; for (int i = 0; i < GRPC_RULIST_COUNT; i++) { @@ -572,56 +658,54 @@ void grpc_resource_user_init(grpc_resource_user *resource_user, gpr_asprintf(&resource_user->name, "anonymous_resource_user_%" PRIxPTR, (intptr_t)resource_user); } + return resource_user; } -void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx, - grpc_resource_user *resource_user, - grpc_closure *on_done) { - gpr_mu_lock(&resource_user->mu); - GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->on_done_destroy_closure) == - 0); - gpr_atm_no_barrier_store(&resource_user->on_done_destroy_closure, - (gpr_atm)on_done); - if (resource_user->allocated == 0) { +static void ru_ref_by(grpc_resource_user *resource_user, gpr_atm amount) { + GPR_ASSERT(amount > 0); + GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&resource_user->refs, amount) != 0); +} + +static void ru_unref_by(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user, gpr_atm amount) { + GPR_ASSERT(amount > 0); + gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount); + GPR_ASSERT(old >= amount); + if (old == amount) { grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, &resource_user->destroy_closure, GRPC_ERROR_NONE, false); } - gpr_mu_unlock(&resource_user->mu); } -void grpc_resource_user_destroy(grpc_exec_ctx *exec_ctx, - grpc_resource_user *resource_user) { - grpc_resource_quota_internal_unref(exec_ctx, resource_user->resource_quota); - gpr_mu_destroy(&resource_user->mu); - gpr_free(resource_user->name); +void grpc_resource_user_ref(grpc_resource_user *resource_user) { + ru_ref_by(resource_user, 1); +} + +void grpc_resource_user_unref(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user) { + ru_unref_by(exec_ctx, resource_user, 1); +} + +void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user) { + if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) { + grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, + grpc_closure_create(ru_shutdown, resource_user), + GRPC_ERROR_NONE, false); + } } void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user, size_t size, grpc_closure *optional_on_done) { gpr_mu_lock(&resource_user->mu); - grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load( - &resource_user->on_done_destroy_closure); - if (on_done_destroy != NULL) { - /* already shutdown */ - if (grpc_resource_quota_trace) { - gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR " after shutdown", - resource_user->resource_quota->name, resource_user->name, size); - } - grpc_exec_ctx_sched( - exec_ctx, optional_on_done, - GRPC_ERROR_CREATE("Buffer pool user is already shutdown"), NULL); - gpr_mu_unlock(&resource_user->mu); - return; - } - resource_user->allocated += (int64_t)size; + ru_ref_by(resource_user, (gpr_atm)size); resource_user->free_pool -= (int64_t)size; if (grpc_resource_quota_trace) { - gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR "; allocated -> %" PRId64 - ", free_pool -> %" PRId64, + gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64, resource_user->resource_quota->name, resource_user->name, size, - resource_user->allocated, resource_user->free_pool); + resource_user->free_pool); } if (resource_user->free_pool < 0) { grpc_closure_list_append(&resource_user->on_allocated, optional_on_done, @@ -641,15 +725,12 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx, void grpc_resource_user_free(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user, size_t size) { gpr_mu_lock(&resource_user->mu); - GPR_ASSERT(resource_user->allocated >= (int64_t)size); bool was_zero_or_negative = resource_user->free_pool <= 0; resource_user->free_pool += (int64_t)size; - resource_user->allocated -= (int64_t)size; if (grpc_resource_quota_trace) { - gpr_log(GPR_DEBUG, "RQ %s %s: free %" PRIdPTR "; allocated -> %" PRId64 - ", free_pool -> %" PRId64, + gpr_log(GPR_DEBUG, "RQ %s %s: free %" PRIdPTR "; free_pool -> %" PRId64, resource_user->resource_quota->name, resource_user->name, size, - resource_user->allocated, resource_user->free_pool); + resource_user->free_pool); } bool is_bigger_than_zero = resource_user->free_pool > 0; if (is_bigger_than_zero && was_zero_or_negative && @@ -659,29 +740,23 @@ void grpc_resource_user_free(grpc_exec_ctx *exec_ctx, &resource_user->add_to_free_pool_closure, GRPC_ERROR_NONE, false); } - grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load( - &resource_user->on_done_destroy_closure); - if (on_done_destroy != NULL && resource_user->allocated == 0) { - grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, - &resource_user->destroy_closure, GRPC_ERROR_NONE, - false); - } gpr_mu_unlock(&resource_user->mu); + ru_unref_by(exec_ctx, resource_user, (gpr_atm)size); } void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user, bool destructive, grpc_closure *closure) { - if (gpr_atm_acq_load(&resource_user->on_done_destroy_closure) == 0) { - GPR_ASSERT(resource_user->reclaimers[destructive] == NULL); - resource_user->reclaimers[destructive] = closure; - grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, - &resource_user->post_reclaimer_closure[destructive], - GRPC_ERROR_NONE, false); - } else { + GPR_ASSERT(resource_user->reclaimers[destructive] == NULL); + if (gpr_atm_acq_load(&resource_user->shutdown) > 0) { grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL); + return; } + resource_user->reclaimers[destructive] = closure; + grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, + &resource_user->post_reclaimer_closure[destructive], + GRPC_ERROR_NONE, false); } void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx, @@ -708,7 +783,7 @@ void grpc_resource_user_slice_allocator_init( void grpc_resource_user_alloc_slices( grpc_exec_ctx *exec_ctx, grpc_resource_user_slice_allocator *slice_allocator, size_t length, - size_t count, gpr_slice_buffer *dest) { + size_t count, grpc_slice_buffer *dest) { slice_allocator->length = length; slice_allocator->count = count; slice_allocator->dest = dest; @@ -716,9 +791,9 @@ void grpc_resource_user_alloc_slices( count * length, &slice_allocator->on_allocated); } -gpr_slice grpc_resource_user_slice_malloc(grpc_exec_ctx *exec_ctx, - grpc_resource_user *resource_user, - size_t size) { +grpc_slice grpc_resource_user_slice_malloc(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user, + size_t size) { grpc_resource_user_alloc(exec_ctx, resource_user, size, NULL); return ru_slice_create(resource_user, size); } diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h index da68f21a2c..0181fd978b 100644 --- a/src/core/lib/iomgr/resource_quota.h +++ b/src/core/lib/iomgr/resource_quota.h @@ -84,91 +84,15 @@ void grpc_resource_quota_internal_unref(grpc_exec_ctx *exec_ctx, grpc_resource_quota *grpc_resource_quota_from_channel_args( const grpc_channel_args *channel_args); -/* Resource users are kept in (potentially) several intrusive linked lists - at once. These are the list names. */ -typedef enum { - /* Resource users that are waiting for an allocation */ - GRPC_RULIST_AWAITING_ALLOCATION, - /* Resource users that have free memory available for internal reclamation */ - GRPC_RULIST_NON_EMPTY_FREE_POOL, - /* Resource users that have published a benign reclamation is available */ - GRPC_RULIST_RECLAIMER_BENIGN, - /* Resource users that have published a destructive reclamation is - available */ - GRPC_RULIST_RECLAIMER_DESTRUCTIVE, - /* Number of lists: must be last */ - GRPC_RULIST_COUNT -} grpc_rulist; - typedef struct grpc_resource_user grpc_resource_user; -/* Internal linked list pointers for a resource user */ -typedef struct { - grpc_resource_user *next; - grpc_resource_user *prev; -} grpc_resource_user_link; - -struct grpc_resource_user { - /* The quota this resource user consumes from */ - grpc_resource_quota *resource_quota; - - /* Closure to schedule an allocation under the resource quota combiner lock */ - grpc_closure allocate_closure; - /* Closure to publish a non empty free pool under the resource quota combiner - lock */ - grpc_closure add_to_free_pool_closure; - - gpr_mu mu; - /* Total allocated memory outstanding by this resource user in bytes; - always positive */ - int64_t allocated; - /* The amount of memory (in bytes) this user has cached for its own use: to - avoid quota contention, each resource user can keep some memory in - addition to what it is immediately using (e.g., for caching), and the quota - can pull it back under memory pressure. - This value can become negative if more memory has been requested than - existed in the free pool, at which point the quota is consulted to bring - this value non-negative (asynchronously). */ - int64_t free_pool; - /* A list of closures to call once free_pool becomes non-negative - ie when - all outstanding allocations have been granted. */ - grpc_closure_list on_allocated; - /* True if we are currently trying to allocate from the quota, false if not */ - bool allocating; - /* True if we are currently trying to add ourselves to the non-free quota - list, false otherwise */ - bool added_to_free_pool; - - /* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer - */ - grpc_closure *reclaimers[2]; - /* Trampoline closures to finish reclamation and re-enter the quota combiner - lock */ - grpc_closure post_reclaimer_closure[2]; - - /* Closure to execute under the quota combiner to de-register and shutdown the - resource user */ - grpc_closure destroy_closure; - /* User supplied closure to call once the user has finished shutting down AND - all outstanding allocations have been freed. Real type is grpc_closure*, - but it's stored as an atomic to avoid a mutex on some fast paths. */ - gpr_atm on_done_destroy_closure; - - /* Links in the various grpc_rulist lists */ - grpc_resource_user_link links[GRPC_RULIST_COUNT]; - - /* The name of this resource user, for debugging/tracing */ - char *name; -}; - -void grpc_resource_user_init(grpc_resource_user *resource_user, - grpc_resource_quota *resource_quota, - const char *name); +grpc_resource_user *grpc_resource_user_create( + grpc_resource_quota *resource_quota, const char *name); +void grpc_resource_user_ref(grpc_resource_user *resource_user); +void grpc_resource_user_unref(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user); void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx, - grpc_resource_user *resource_user, - grpc_closure *on_done); -void grpc_resource_user_destroy(grpc_exec_ctx *exec_ctx, - grpc_resource_user *resource_user); + grpc_resource_user *resource_user); /* Allocate from the resource user (and its quota). If optional_on_done is NULL, then allocate immediately. This may push the @@ -203,7 +127,7 @@ typedef struct grpc_resource_user_slice_allocator { /* Number of slices to allocate on the current request */ size_t count; /* Destination for slices to allocate on the current request */ - gpr_slice_buffer *dest; + grpc_slice_buffer *dest; /* Parent resource user */ grpc_resource_user *resource_user; } grpc_resource_user_slice_allocator; @@ -219,11 +143,11 @@ void grpc_resource_user_slice_allocator_init( void grpc_resource_user_alloc_slices( grpc_exec_ctx *exec_ctx, grpc_resource_user_slice_allocator *slice_allocator, size_t length, - size_t count, gpr_slice_buffer *dest); + size_t count, grpc_slice_buffer *dest); /* Allocate one slice of length \a size synchronously. */ -gpr_slice grpc_resource_user_slice_malloc(grpc_exec_ctx *exec_ctx, - grpc_resource_user *resource_user, - size_t size); +grpc_slice grpc_resource_user_slice_malloc(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user, + size_t size); #endif /* GRPC_CORE_LIB_IOMGR_RESOURCE_QUOTA_H */ diff --git a/src/core/lib/iomgr/socket_mutator.c b/src/core/lib/iomgr/socket_mutator.c new file mode 100644 index 0000000000..8b1efb6bab --- /dev/null +++ b/src/core/lib/iomgr/socket_mutator.c @@ -0,0 +1,98 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/socket_mutator.h" + +#include <grpc/impl/codegen/grpc_types.h> +#include <grpc/support/sync.h> +#include <grpc/support/useful.h> + +void grpc_socket_mutator_init(grpc_socket_mutator *mutator, + const grpc_socket_mutator_vtable *vtable) { + mutator->vtable = vtable; + gpr_ref_init(&mutator->refcount, 1); +} + +grpc_socket_mutator *grpc_socket_mutator_ref(grpc_socket_mutator *mutator) { + gpr_ref(&mutator->refcount); + return mutator; +} + +bool grpc_socket_mutator_mutate_fd(grpc_socket_mutator *mutator, int fd) { + return mutator->vtable->mutate_fd(fd, mutator); +} + +int grpc_socket_mutator_compare(grpc_socket_mutator *a, + grpc_socket_mutator *b) { + int c = GPR_ICMP(a, b); + if (c != 0) { + grpc_socket_mutator *sma = a; + grpc_socket_mutator *smb = b; + c = GPR_ICMP(sma->vtable, smb->vtable); + if (c == 0) { + c = sma->vtable->compare(sma, smb); + } + } + return c; +} + +void grpc_socket_mutator_unref(grpc_socket_mutator *mutator) { + if (gpr_unref(&mutator->refcount)) { + mutator->vtable->destory(mutator); + } +} + +static void *socket_mutator_arg_copy(void *p) { + return grpc_socket_mutator_ref(p); +} + +static void socket_mutator_arg_destroy(void *p) { + grpc_socket_mutator_unref(p); +} + +static int socket_mutator_cmp(void *a, void *b) { + return grpc_socket_mutator_compare((grpc_socket_mutator *)a, + (grpc_socket_mutator *)b); +} + +static const grpc_arg_pointer_vtable socket_mutator_arg_vtable = { + socket_mutator_arg_copy, socket_mutator_arg_destroy, socket_mutator_cmp}; + +grpc_arg grpc_socket_mutator_to_arg(grpc_socket_mutator *mutator) { + grpc_arg arg; + arg.type = GRPC_ARG_POINTER; + arg.key = GRPC_ARG_SOCKET_MUTATOR; + arg.value.pointer.vtable = &socket_mutator_arg_vtable; + arg.value.pointer.p = mutator; + return arg; +} diff --git a/src/core/lib/iomgr/socket_mutator.h b/src/core/lib/iomgr/socket_mutator.h new file mode 100644 index 0000000000..2f5b6c248e --- /dev/null +++ b/src/core/lib/iomgr/socket_mutator.h @@ -0,0 +1,80 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_SOCKET_MUTATOR_H +#define GRPC_CORE_LIB_IOMGR_SOCKET_MUTATOR_H + +#include <grpc/impl/codegen/grpc_types.h> +#include <grpc/support/sync.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/** The virtual table of grpc_socket_mutator */ +typedef struct { + /** Mutates the socket opitons of \a fd */ + bool (*mutate_fd)(int fd, grpc_socket_mutator *mutator); + /** Compare socket mutator \a a and \a b */ + int (*compare)(grpc_socket_mutator *a, grpc_socket_mutator *b); + /** Destroys the socket mutator instance */ + void (*destory)(grpc_socket_mutator *mutator); +} grpc_socket_mutator_vtable; + +/** The Socket Mutator interface allows changes on socket options */ +struct grpc_socket_mutator { + const grpc_socket_mutator_vtable *vtable; + gpr_refcount refcount; +}; + +/** called by concrete implementations to initialize the base struct */ +void grpc_socket_mutator_init(grpc_socket_mutator *mutator, + const grpc_socket_mutator_vtable *vtable); + +/** Wrap \a mutator as a grpc_arg */ +grpc_arg grpc_socket_mutator_to_arg(grpc_socket_mutator *mutator); + +/** Perform the file descriptor mutation operation of \a mutator on \a fd */ +bool grpc_socket_mutator_mutate_fd(grpc_socket_mutator *mutator, int fd); + +/** Compare if \a a and \a b are the same mutator or have same settings */ +int grpc_socket_mutator_compare(grpc_socket_mutator *a, grpc_socket_mutator *b); + +grpc_socket_mutator *grpc_socket_mutator_ref(grpc_socket_mutator *mutator); +void grpc_socket_mutator_unref(grpc_socket_mutator *mutator); + +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_MUTATOR_H */ diff --git a/src/core/lib/iomgr/socket_utils_common_posix.c b/src/core/lib/iomgr/socket_utils_common_posix.c index bc28bbe316..88e9ade253 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.c +++ b/src/core/lib/iomgr/socket_utils_common_posix.c @@ -209,6 +209,15 @@ grpc_error *grpc_set_socket_low_latency(int fd, int low_latency) { return GRPC_ERROR_NONE; } +/* set a socket using a grpc_socket_mutator */ +grpc_error *grpc_set_socket_with_mutator(int fd, grpc_socket_mutator *mutator) { + GPR_ASSERT(mutator); + if (!grpc_socket_mutator_mutate_fd(mutator, fd)) { + return GRPC_ERROR_CREATE("grpc_socket_mutator failed."); + } + return GRPC_ERROR_NONE; +} + static gpr_once g_probe_ipv6_once = GPR_ONCE_INIT; static int g_ipv6_loopback_available; diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h index 175fb2b717..e84d3781a1 100644 --- a/src/core/lib/iomgr/socket_utils_posix.h +++ b/src/core/lib/iomgr/socket_utils_posix.h @@ -39,7 +39,9 @@ #include <sys/socket.h> #include <unistd.h> +#include <grpc/impl/codegen/grpc_types.h> #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/socket_mutator.h" /* a wrapper for accept or accept4 */ int grpc_accept4(int sockfd, grpc_resolved_address *resolved_addr, int nonblock, @@ -88,6 +90,9 @@ grpc_error *grpc_set_socket_sndbuf(int fd, int buffer_size_bytes); /* Tries to set the socket's receive buffer to given size. */ grpc_error *grpc_set_socket_rcvbuf(int fd, int buffer_size_bytes); +/* Tries to set the socket using a grpc_socket_mutator */ +grpc_error *grpc_set_socket_with_mutator(int fd, grpc_socket_mutator *mutator); + /* An enum to keep track of IPv4/IPv6 socket modes. Currently, this information is only used when a socket is first created, but diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h index 18e6e60ebc..0485661316 100644 --- a/src/core/lib/iomgr/tcp_client.h +++ b/src/core/lib/iomgr/tcp_client.h @@ -34,6 +34,7 @@ #ifndef GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H #define GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H +#include <grpc/impl/codegen/grpc_types.h> #include <grpc/support/time.h> #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/pollset_set.h" diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index bc08c94ee0..13347735df 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -51,6 +51,7 @@ #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_posix.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/socket_mutator.h" #include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/iomgr/tcp_posix.h" #include "src/core/lib/iomgr/timer.h" @@ -73,7 +74,8 @@ typedef struct { grpc_channel_args *channel_args; } async_connect; -static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd) { +static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd, + const grpc_channel_args *channel_args) { grpc_error *err = GRPC_ERROR_NONE; GPR_ASSERT(fd >= 0); @@ -88,6 +90,16 @@ static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd) { } err = grpc_set_socket_no_sigpipe_if_possible(fd); if (err != GRPC_ERROR_NONE) goto error; + if (channel_args) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_SOCKET_MUTATOR)) { + GPR_ASSERT(channel_args->args[i].type == GRPC_ARG_POINTER); + grpc_socket_mutator *mutator = channel_args->args[i].value.pointer.p; + err = grpc_set_socket_with_mutator(fd, mutator); + if (err != GRPC_ERROR_NONE) goto error; + } + } + } goto done; error: @@ -287,7 +299,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, GPR_ASSERT(grpc_sockaddr_is_v4mapped(addr, &addr4_copy)); addr = &addr4_copy; } - if ((error = prepare_socket(addr, fd)) != GRPC_ERROR_NONE) { + if ((error = prepare_socket(addr, fd, channel_args)) != GRPC_ERROR_NONE) { grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); return; } diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c index 30f7c66f15..4d1e809872 100644 --- a/src/core/lib/iomgr/tcp_client_windows.c +++ b/src/core/lib/iomgr/tcp_client_windows.c @@ -37,10 +37,10 @@ #include "src/core/lib/iomgr/sockaddr_windows.h" +#include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/log_windows.h> -#include <grpc/support/slice_buffer.h> #include <grpc/support/useful.h> #include "src/core/lib/channel/channel_args.h" diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 880af93ee1..12a4797e6f 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -46,9 +46,9 @@ #include <sys/types.h> #include <unistd.h> +#include <grpc/slice.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/slice.h> #include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> @@ -56,6 +56,7 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" #ifdef GRPC_HAVE_MSG_NOSIGNAL @@ -83,10 +84,10 @@ typedef struct { gpr_atm shutdown_count; /* garbage after the last read */ - gpr_slice_buffer last_read_buffer; + grpc_slice_buffer last_read_buffer; - gpr_slice_buffer *incoming_buffer; - gpr_slice_buffer *outgoing_buffer; + grpc_slice_buffer *incoming_buffer; + grpc_slice_buffer *outgoing_buffer; /** slice within outgoing_buffer to write next */ size_t outgoing_slice_idx; /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */ @@ -102,7 +103,7 @@ typedef struct { char *peer_string; - grpc_resource_user resource_user; + grpc_resource_user *resource_user; grpc_resource_user_slice_allocator slice_allocator; } grpc_tcp; @@ -110,28 +111,18 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, grpc_error *error); static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, grpc_error *error); -static void tcp_unref_closure(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - grpc_error *error); - -static void tcp_maybe_shutdown_resource_user(grpc_exec_ctx *exec_ctx, - grpc_tcp *tcp) { - if (gpr_atm_full_fetch_add(&tcp->shutdown_count, 1) == 0) { - grpc_resource_user_shutdown(exec_ctx, &tcp->resource_user, - grpc_closure_create(tcp_unref_closure, tcp)); - } -} static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - tcp_maybe_shutdown_resource_user(exec_ctx, tcp); grpc_fd_shutdown(exec_ctx, tcp->em_fd); + grpc_resource_user_shutdown(exec_ctx, tcp->resource_user); } static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd, "tcp_unref_orphan"); - gpr_slice_buffer_destroy(&tcp->last_read_buffer); - grpc_resource_user_destroy(exec_ctx, &tcp->resource_user); + grpc_slice_buffer_destroy(&tcp->last_read_buffer); + grpc_resource_user_unref(exec_ctx, tcp->resource_user); gpr_free(tcp->peer_string); gpr_free(tcp); } @@ -168,16 +159,10 @@ static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } #endif -static void tcp_unref_closure(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - TCP_UNREF(exec_ctx, arg, "resource_user"); -} - static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp *tcp = (grpc_tcp *)ep; - tcp_maybe_shutdown_resource_user(exec_ctx, tcp); - gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer); + grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer); TCP_UNREF(exec_ctx, tcp, "destroy"); } @@ -191,8 +176,8 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, gpr_log(GPR_DEBUG, "read: error=%s", str); grpc_error_free_string(str); for (i = 0; i < tcp->incoming_buffer->count; i++) { - char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i], - GPR_DUMP_HEX | GPR_DUMP_ASCII); + char *dump = grpc_dump_slice(tcp->incoming_buffer->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump); gpr_free(dump); } @@ -216,8 +201,8 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { GPR_TIMER_BEGIN("tcp_continue_read", 0); for (i = 0; i < tcp->incoming_buffer->count; i++) { - iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]); - iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]); + iov[i].iov_base = GRPC_SLICE_START_PTR(tcp->incoming_buffer->slices[i]); + iov[i].iov_len = GRPC_SLICE_LENGTH(tcp->incoming_buffer->slices[i]); } msg.msg_name = NULL; @@ -244,19 +229,19 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { /* We've consumed the edge, request a new one */ grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { - gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); + grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer); call_read_cb(exec_ctx, tcp, GRPC_OS_ERROR(errno, "recvmsg")); TCP_UNREF(exec_ctx, tcp, "read"); } } else if (read_bytes == 0) { /* 0 read size ==> end of stream */ - gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); + grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer); call_read_cb(exec_ctx, tcp, GRPC_ERROR_CREATE("Socket closed")); TCP_UNREF(exec_ctx, tcp, "read"); } else { GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); if ((size_t)read_bytes < tcp->incoming_buffer->length) { - gpr_slice_buffer_trim_end( + grpc_slice_buffer_trim_end( tcp->incoming_buffer, tcp->incoming_buffer->length - (size_t)read_bytes, &tcp->last_read_buffer); @@ -275,8 +260,8 @@ static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { grpc_tcp *tcp = tcpp; if (error != GRPC_ERROR_NONE) { - gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer); + grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer); + grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer); call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error)); TCP_UNREF(exec_ctx, tcp, "read"); } else { @@ -301,8 +286,8 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, GPR_ASSERT(!tcp->finished_edge); if (error != GRPC_ERROR_NONE) { - gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer); + grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer); + grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer); call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error)); TCP_UNREF(exec_ctx, tcp, "read"); } else { @@ -311,13 +296,13 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, } static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - gpr_slice_buffer *incoming_buffer, grpc_closure *cb) { + grpc_slice_buffer *incoming_buffer, grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; tcp->incoming_buffer = incoming_buffer; - gpr_slice_buffer_reset_and_unref(incoming_buffer); - gpr_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer); + grpc_slice_buffer_reset_and_unref(incoming_buffer); + grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer); TCP_REF(tcp, "read"); if (tcp->finished_edge) { tcp->finished_edge = false; @@ -347,11 +332,11 @@ static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) { iov_size != MAX_WRITE_IOVEC; iov_size++) { iov[iov_size].iov_base = - GPR_SLICE_START_PTR( + GRPC_SLICE_START_PTR( tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) + tcp->outgoing_byte_idx; iov[iov_size].iov_len = - GPR_SLICE_LENGTH( + GRPC_SLICE_LENGTH( tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) - tcp->outgoing_byte_idx; sending_length += iov[iov_size].iov_len; @@ -392,7 +377,7 @@ static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) { size_t slice_length; tcp->outgoing_slice_idx--; - slice_length = GPR_SLICE_LENGTH( + slice_length = GRPC_SLICE_LENGTH( tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]); if (slice_length > trailing) { tcp->outgoing_byte_idx = slice_length - trailing; @@ -442,7 +427,7 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, } static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - gpr_slice_buffer *buf, grpc_closure *cb) { + grpc_slice_buffer *buf, grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_error *error = GRPC_ERROR_NONE; @@ -451,7 +436,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, for (i = 0; i < buf->count; i++) { char *data = - gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); + grpc_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data); gpr_free(data); } @@ -508,6 +493,11 @@ static char *tcp_get_peer(grpc_endpoint *ep) { return gpr_strdup(tcp->peer_string); } +static int tcp_get_fd(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return tcp->fd; +} + static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; return grpc_fd_get_workqueue(tcp->em_fd); @@ -515,7 +505,7 @@ static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) { static grpc_resource_user *tcp_get_resource_user(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - return &tcp->resource_user; + return tcp->resource_user; } static const grpc_endpoint_vtable vtable = {tcp_read, @@ -526,7 +516,8 @@ static const grpc_endpoint_vtable vtable = {tcp_read, tcp_shutdown, tcp_destroy, tcp_get_resource_user, - tcp_get_peer}; + tcp_get_peer, + tcp_get_fd}; grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, grpc_resource_quota *resource_quota, @@ -543,20 +534,18 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, tcp->slice_size = slice_size; tcp->iov_size = 1; tcp->finished_edge = true; - /* paired with unref in grpc_tcp_destroy, and with the shutdown for our - * resource_user */ - gpr_ref_init(&tcp->refcount, 2); + /* paired with unref in grpc_tcp_destroy */ + gpr_ref_init(&tcp->refcount, 1); gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); tcp->em_fd = em_fd; tcp->read_closure.cb = tcp_handle_read; tcp->read_closure.cb_arg = tcp; tcp->write_closure.cb = tcp_handle_write; tcp->write_closure.cb_arg = tcp; - gpr_slice_buffer_init(&tcp->last_read_buffer); - grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string); - grpc_resource_user_slice_allocator_init(&tcp->slice_allocator, - &tcp->resource_user, - tcp_read_allocation_done, tcp); + grpc_slice_buffer_init(&tcp->last_read_buffer); + tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); + grpc_resource_user_slice_allocator_init( + &tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp); /* Tell network status tracker about new endpoint */ grpc_network_status_register_endpoint(&tcp->base); @@ -576,8 +565,7 @@ void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, GPR_ASSERT(ep->vtable == &vtable); tcp->release_fd = fd; tcp->release_fd_cb = done; - tcp_maybe_shutdown_resource_user(exec_ctx, tcp); - gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer); + grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer); TCP_UNREF(exec_ctx, tcp, "destroy"); } diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 8e74c9e863..6e2ad1dbe9 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -38,14 +38,17 @@ #include <limits.h> #include <string.h> +#include <grpc/slice_buffer.h> + #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/slice_buffer.h> #include <grpc/support/string_util.h> #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/network_status_tracker.h" +#include "src/core/lib/iomgr/resource_quota.h" #include "src/core/lib/iomgr/tcp_uv.h" +#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" int grpc_tcp_trace = 0; @@ -62,15 +65,14 @@ typedef struct { grpc_closure *read_cb; grpc_closure *write_cb; - gpr_slice read_slice; - gpr_slice_buffer *read_slices; - gpr_slice_buffer *write_slices; + grpc_slice read_slice; + grpc_slice_buffer *read_slices; + grpc_slice_buffer *write_slices; uv_buf_t *write_buffers; - grpc_resource_user resource_user; + grpc_resource_user *resource_user; bool shutting_down; - bool resource_user_shutting_down; char *peer_string; grpc_pollset *pollset; @@ -78,23 +80,23 @@ typedef struct { static void uv_close_callback(uv_handle_t *handle) { gpr_free(handle); } -static void tcp_free(grpc_tcp *tcp) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_resource_user_destroy(&exec_ctx, &tcp->resource_user); +static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + grpc_resource_user_unref(exec_ctx, tcp->resource_user); gpr_free(tcp); - grpc_exec_ctx_finish(&exec_ctx); } /*#define GRPC_TCP_REFCOUNT_DEBUG*/ #ifdef GRPC_TCP_REFCOUNT_DEBUG -#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__) -#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) -static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file, - int line) { +#define TCP_UNREF(exec_ctx, tcp, reason) \ + tcp_unref((exec_ctx), (tcp), (reason), __FILE__, __LINE__) +#define TCP_REF(tcp, reason) \ + tcp_ref((exec_ctx), (tcp), (reason), __FILE__, __LINE__) +static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, + const char *reason, const char *file, int line) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp, reason, tcp->refcount.count, tcp->refcount.count - 1); if (gpr_unref(&tcp->refcount)) { - tcp_free(tcp); + tcp_free(exec_ctx, tcp); } } @@ -105,11 +107,11 @@ static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file, gpr_ref(&tcp->refcount); } #else -#define TCP_UNREF(tcp, reason) tcp_unref((tcp)) +#define TCP_UNREF(exec_ctx, tcp, reason) tcp_unref((exec_ctx), (tcp)) #define TCP_REF(tcp, reason) tcp_ref((tcp)) -static void tcp_unref(grpc_tcp *tcp) { +static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { if (gpr_unref(&tcp->refcount)) { - tcp_free(tcp); + tcp_free(exec_ctx, tcp); } } @@ -122,15 +124,15 @@ static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size, grpc_tcp *tcp = handle->data; (void)suggested_size; tcp->read_slice = grpc_resource_user_slice_malloc( - &exec_ctx, &tcp->resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); - buf->base = (char *)GPR_SLICE_START_PTR(tcp->read_slice); - buf->len = GPR_SLICE_LENGTH(tcp->read_slice); + &exec_ctx, tcp->resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + buf->base = (char *)GRPC_SLICE_START_PTR(tcp->read_slice); + buf->len = GRPC_SLICE_LENGTH(tcp->read_slice); grpc_exec_ctx_finish(&exec_ctx); } static void read_callback(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { - gpr_slice sub; + grpc_slice sub; grpc_error *error; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp *tcp = stream->data; @@ -139,7 +141,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, // Nothing happened. Wait for the next callback return; } - TCP_UNREF(tcp, "read"); + TCP_UNREF(&exec_ctx, tcp, "read"); tcp->read_cb = NULL; // TODO(murgatroid99): figure out what the return value here means uv_read_stop(stream); @@ -147,8 +149,8 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, error = GRPC_ERROR_CREATE("EOF"); } else if (nread > 0) { // Successful read - sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, (size_t)nread); - gpr_slice_buffer_add(tcp->read_slices, sub); + sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, (size_t)nread); + grpc_slice_buffer_add(tcp->read_slices, sub); error = GRPC_ERROR_NONE; if (grpc_tcp_trace) { size_t i; @@ -156,8 +158,8 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, gpr_log(GPR_DEBUG, "read: error=%s", str); grpc_error_free_string(str); for (i = 0; i < tcp->read_slices->count; i++) { - char *dump = gpr_dump_slice(tcp->read_slices->slices[i], - GPR_DUMP_HEX | GPR_DUMP_ASCII); + char *dump = grpc_dump_slice(tcp->read_slices->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump); gpr_free(dump); @@ -172,14 +174,14 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, } static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - gpr_slice_buffer *read_slices, grpc_closure *cb) { + grpc_slice_buffer *read_slices, grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; int status; grpc_error *error = GRPC_ERROR_NONE; GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; tcp->read_slices = read_slices; - gpr_slice_buffer_reset_and_unref(read_slices); + grpc_slice_buffer_reset_and_unref(read_slices); TCP_REF(tcp, "read"); // TODO(murgatroid99): figure out what the return value here means status = @@ -202,7 +204,7 @@ static void write_callback(uv_write_t *req, int status) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_closure *cb = tcp->write_cb; tcp->write_cb = NULL; - TCP_UNREF(tcp, "write"); + TCP_UNREF(&exec_ctx, tcp, "write"); if (status == 0) { error = GRPC_ERROR_NONE; } else { @@ -213,28 +215,28 @@ static void write_callback(uv_write_t *req, int status) { gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str); } gpr_free(tcp->write_buffers); - grpc_resource_user_free(&exec_ctx, &tcp->resource_user, + grpc_resource_user_free(&exec_ctx, tcp->resource_user, sizeof(uv_buf_t) * tcp->write_slices->count); grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL); grpc_exec_ctx_finish(&exec_ctx); } static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - gpr_slice_buffer *write_slices, + grpc_slice_buffer *write_slices, grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; uv_buf_t *buffers; unsigned int buffer_count; unsigned int i; - gpr_slice *slice; + grpc_slice *slice; uv_write_t *write_req; if (grpc_tcp_trace) { size_t j; for (j = 0; j < write_slices->count; j++) { - char *data = gpr_dump_slice(write_slices->slices[j], - GPR_DUMP_HEX | GPR_DUMP_ASCII); + char *data = grpc_dump_slice(write_slices->slices[j], + GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data); gpr_free(data); } @@ -259,12 +261,12 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->write_cb = cb; buffer_count = (unsigned int)tcp->write_slices->count; buffers = gpr_malloc(sizeof(uv_buf_t) * buffer_count); - grpc_resource_user_alloc(exec_ctx, &tcp->resource_user, + grpc_resource_user_alloc(exec_ctx, tcp->resource_user, sizeof(uv_buf_t) * buffer_count, NULL); for (i = 0; i < buffer_count; i++) { slice = &tcp->write_slices->slices[i]; - buffers[i].base = (char *)GPR_SLICE_START_PTR(*slice); - buffers[i].len = GPR_SLICE_LENGTH(*slice); + buffers[i].base = (char *)GRPC_SLICE_START_PTR(*slice); + buffers[i].len = GRPC_SLICE_LENGTH(*slice); } tcp->write_buffers = buffers; write_req = &tcp->write_req; @@ -295,22 +297,6 @@ static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, static void shutdown_callback(uv_shutdown_t *req, int status) {} -static void resource_user_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - TCP_UNREF(arg, "resource_user"); -} - -static void uv_resource_user_maybe_shutdown(grpc_exec_ctx *exec_ctx, - grpc_tcp *tcp) { - if (!tcp->resource_user_shutting_down) { - tcp->resource_user_shutting_down = true; - TCP_REF(tcp, "resource_user"); - grpc_resource_user_shutdown( - exec_ctx, &tcp->resource_user, - grpc_closure_create(resource_user_shutdown_done, tcp)); - } -} - static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; if (!tcp->shutting_down) { @@ -324,8 +310,7 @@ static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp *tcp = (grpc_tcp *)ep; uv_close((uv_handle_t *)tcp->handle, uv_close_callback); - uv_resource_user_maybe_shutdown(exec_ctx, tcp); - TCP_UNREF(tcp, "destroy"); + TCP_UNREF(exec_ctx, tcp, "destroy"); } static char *uv_get_peer(grpc_endpoint *ep) { @@ -335,15 +320,18 @@ static char *uv_get_peer(grpc_endpoint *ep) { static grpc_resource_user *uv_get_resource_user(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - return &tcp->resource_user; + return tcp->resource_user; } static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) { return NULL; } +static int uv_get_fd(grpc_endpoint *ep) { return -1; } + static grpc_endpoint_vtable vtable = { uv_endpoint_read, uv_endpoint_write, uv_get_workqueue, uv_add_to_pollset, uv_add_to_pollset_set, uv_endpoint_shutdown, - uv_destroy, uv_get_resource_user, uv_get_peer}; + uv_destroy, uv_get_resource_user, uv_get_peer, + uv_get_fd}; grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, grpc_resource_quota *resource_quota, @@ -364,8 +352,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, gpr_ref_init(&tcp->refcount, 1); tcp->peer_string = gpr_strdup(peer_string); tcp->shutting_down = false; - tcp->resource_user_shutting_down = false; - grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string); + tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); /* Tell network status tracking code about the new endpoint */ grpc_network_status_register_endpoint(&tcp->base); diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 46f0491d10..62afbcef51 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -40,10 +40,10 @@ #include "src/core/lib/iomgr/network_status_tracker.h" #include "src/core/lib/iomgr/sockaddr_windows.h" +#include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/log_windows.h> -#include <grpc/support/slice_buffer.h> #include <grpc/support/string_util.h> #include <grpc/support/useful.h> @@ -105,50 +105,39 @@ typedef struct grpc_tcp { grpc_closure *read_cb; grpc_closure *write_cb; - gpr_slice read_slice; - gpr_slice_buffer *write_slices; - gpr_slice_buffer *read_slices; + grpc_slice read_slice; + grpc_slice_buffer *write_slices; + grpc_slice_buffer *read_slices; - grpc_resource_user resource_user; + grpc_resource_user *resource_user; /* The IO Completion Port runs from another thread. We need some mechanism to protect ourselves when requesting a shutdown. */ gpr_mu mu; int shutting_down; - gpr_atm resource_user_shutdown_count; - char *peer_string; } grpc_tcp; -static void win_unref_closure(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - grpc_error *error); - -static void win_maybe_shutdown_resource_user(grpc_exec_ctx *exec_ctx, - grpc_tcp *tcp) { - if (gpr_atm_full_fetch_add(&tcp->resource_user_shutdown_count, 1) == 0) { - grpc_resource_user_shutdown(exec_ctx, &tcp->resource_user, - grpc_closure_create(win_unref_closure, tcp)); - } -} - -static void tcp_free(grpc_tcp *tcp) { +static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { grpc_winsocket_destroy(tcp->socket); gpr_mu_destroy(&tcp->mu); gpr_free(tcp->peer_string); + grpc_resource_user_unref(exec_ctx, tcp->resource_user); gpr_free(tcp); } /*#define GRPC_TCP_REFCOUNT_DEBUG*/ #ifdef GRPC_TCP_REFCOUNT_DEBUG -#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__) +#define TCP_UNREF(exec_ctx, tcp, reason) \ + tcp_unref((exec_ctx), (tcp), (reason), __FILE__, __LINE__) #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) -static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file, - int line) { +static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, + const char *reason, const char *file, int line) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp, reason, tcp->refcount.count, tcp->refcount.count - 1); if (gpr_unref(&tcp->refcount)) { - tcp_free(tcp); + tcp_free(exec_ctx, tcp); } } @@ -159,28 +148,23 @@ static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file, gpr_ref(&tcp->refcount); } #else -#define TCP_UNREF(tcp, reason) tcp_unref((tcp)) +#define TCP_UNREF(exec_ctx, tcp, reason) tcp_unref((exec_ctx), (tcp)) #define TCP_REF(tcp, reason) tcp_ref((tcp)) -static void tcp_unref(grpc_tcp *tcp) { +static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { if (gpr_unref(&tcp->refcount)) { - tcp_free(tcp); + tcp_free(exec_ctx, tcp); } } static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } #endif -static void win_unref_closure(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - TCP_UNREF(arg, "resource_user"); -} - /* Asynchronous callback from the IOCP, or the background thread. */ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { grpc_tcp *tcp = tcpp; grpc_closure *cb = tcp->read_cb; grpc_winsocket *socket = tcp->socket; - gpr_slice sub; + grpc_slice sub; grpc_winsocket_callback_info *info = &socket->read_info; GRPC_ERROR_REF(error); @@ -190,25 +174,25 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { char *utf8_message = gpr_format_message(info->wsa_error); error = GRPC_ERROR_CREATE(utf8_message); gpr_free(utf8_message); - gpr_slice_unref(tcp->read_slice); + grpc_slice_unref(tcp->read_slice); } else { if (info->bytes_transfered != 0 && !tcp->shutting_down) { - sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered); - gpr_slice_buffer_add(tcp->read_slices, sub); + sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered); + grpc_slice_buffer_add(tcp->read_slices, sub); } else { - gpr_slice_unref(tcp->read_slice); + grpc_slice_unref(tcp->read_slice); error = GRPC_ERROR_CREATE("End of TCP stream"); } } } tcp->read_cb = NULL; - TCP_UNREF(tcp, "read"); + TCP_UNREF(exec_ctx, tcp, "read"); grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - gpr_slice_buffer *read_slices, grpc_closure *cb) { + grpc_slice_buffer *read_slices, grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_winsocket *handle = tcp->socket; grpc_winsocket_callback_info *info = &handle->read_info; @@ -225,13 +209,13 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->read_cb = cb; tcp->read_slices = read_slices; - gpr_slice_buffer_reset_and_unref(read_slices); + grpc_slice_buffer_reset_and_unref(read_slices); - tcp->read_slice = gpr_slice_malloc(8192); + tcp->read_slice = grpc_slice_malloc(8192); - buffer.len = (ULONG)GPR_SLICE_LENGTH( + buffer.len = (ULONG)GRPC_SLICE_LENGTH( tcp->read_slice); // we know slice size fits in 32bit. - buffer.buf = (char *)GPR_SLICE_START_PTR(tcp->read_slice); + buffer.buf = (char *)GRPC_SLICE_START_PTR(tcp->read_slice); TCP_REF(tcp, "read"); @@ -287,13 +271,13 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { } } - TCP_UNREF(tcp, "write"); + TCP_UNREF(exec_ctx, tcp, "write"); grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } /* Initiates a write. */ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - gpr_slice_buffer *slices, grpc_closure *cb) { + grpc_slice_buffer *slices, grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_winsocket *socket = tcp->socket; grpc_winsocket_callback_info *info = &socket->write_info; @@ -320,10 +304,10 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } for (i = 0; i < tcp->write_slices->count; i++) { - len = GPR_SLICE_LENGTH(tcp->write_slices->slices[i]); + len = GRPC_SLICE_LENGTH(tcp->write_slices->slices[i]); GPR_ASSERT(len <= ULONG_MAX); buffers[i].len = (ULONG)len; - buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices->slices[i]); + buffers[i].buf = (char *)GRPC_SLICE_START_PTR(tcp->write_slices->slices[i]); } /* First, let's try a synchronous, non-blocking write. */ @@ -355,7 +339,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (status != 0) { int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { - TCP_UNREF(tcp, "write"); + TCP_UNREF(exec_ctx, tcp, "write"); grpc_exec_ctx_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"), NULL); return; @@ -396,15 +380,14 @@ static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { callback. See the comments in on_read and on_write. */ tcp->shutting_down = 1; grpc_winsocket_shutdown(tcp->socket); - win_maybe_shutdown_resource_user(exec_ctx, tcp); gpr_mu_unlock(&tcp->mu); + grpc_resource_user_shutdown(exec_ctx, tcp->resource_user); } static void win_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp *tcp = (grpc_tcp *)ep; - win_maybe_shutdown_resource_user(exec_ctx, tcp); - TCP_UNREF(tcp, "destroy"); + TCP_UNREF(exec_ctx, tcp, "destroy"); } static char *win_get_peer(grpc_endpoint *ep) { @@ -416,9 +399,11 @@ static grpc_workqueue *win_get_workqueue(grpc_endpoint *ep) { return NULL; } static grpc_resource_user *win_get_resource_user(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - return &tcp->resource_user; + return tcp->resource_user; } +static int win_get_fd(grpc_endpoint *ep) { return -1; } + static grpc_endpoint_vtable vtable = {win_read, win_write, win_get_workqueue, @@ -427,7 +412,8 @@ static grpc_endpoint_vtable vtable = {win_read, win_shutdown, win_destroy, win_get_resource_user, - win_get_peer}; + win_get_peer, + win_get_fd}; grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, grpc_resource_quota *resource_quota, @@ -441,7 +427,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, grpc_closure_init(&tcp->on_read, on_read, tcp); grpc_closure_init(&tcp->on_write, on_write, tcp); tcp->peer_string = gpr_strdup(peer_string); - grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string); + tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); /* Tell network status tracking code about the new endpoint */ grpc_network_status_register_endpoint(&tcp->base); diff --git a/src/core/lib/iomgr/wakeup_fd_pipe.c b/src/core/lib/iomgr/wakeup_fd_pipe.c index 183f0eb930..e186d63683 100644 --- a/src/core/lib/iomgr/wakeup_fd_pipe.c +++ b/src/core/lib/iomgr/wakeup_fd_pipe.c @@ -95,6 +95,8 @@ static void pipe_destroy(grpc_wakeup_fd* fd_info) { static int pipe_check_availability(void) { grpc_wakeup_fd fd; + fd.read_fd = fd.write_fd = -1; + if (pipe_init(&fd) == GRPC_ERROR_NONE) { pipe_destroy(&fd); return 1; |