diff options
Diffstat (limited to 'test/core/util/trickle_endpoint.cc')
-rw-r--r-- | test/core/util/trickle_endpoint.cc | 88 |
1 files changed, 50 insertions, 38 deletions
diff --git a/test/core/util/trickle_endpoint.cc b/test/core/util/trickle_endpoint.cc index e36f64816e..763e684237 100644 --- a/test/core/util/trickle_endpoint.cc +++ b/test/core/util/trickle_endpoint.cc @@ -34,24 +34,24 @@ typedef struct { grpc_endpoint base; double bytes_per_second; - grpc_endpoint *wrapped; + grpc_endpoint* wrapped; gpr_timespec last_write; gpr_mu mu; grpc_slice_buffer write_buffer; grpc_slice_buffer writing_buffer; - grpc_error *error; + grpc_error* error; bool writing; - grpc_closure *write_cb; + grpc_closure* write_cb; } trickle_endpoint; -static void te_read(grpc_endpoint *ep, grpc_slice_buffer *slices, - grpc_closure *cb) { - trickle_endpoint *te = (trickle_endpoint *)ep; +static void te_read(grpc_endpoint* ep, grpc_slice_buffer* slices, + grpc_closure* cb) { + trickle_endpoint* te = (trickle_endpoint*)ep; grpc_endpoint_read(te->wrapped, slices, cb); } -static void maybe_call_write_cb_locked(trickle_endpoint *te) { +static void maybe_call_write_cb_locked(trickle_endpoint* te) { if (te->write_cb != NULL && (te->error != GRPC_ERROR_NONE || te->write_buffer.length <= WRITE_BUFFER_SIZE)) { GRPC_CLOSURE_SCHED(te->write_cb, GRPC_ERROR_REF(te->error)); @@ -59,9 +59,9 @@ static void maybe_call_write_cb_locked(trickle_endpoint *te) { } } -static void te_write(grpc_endpoint *ep, grpc_slice_buffer *slices, - grpc_closure *cb) { - trickle_endpoint *te = (trickle_endpoint *)ep; +static void te_write(grpc_endpoint* ep, grpc_slice_buffer* slices, + grpc_closure* cb) { + trickle_endpoint* te = (trickle_endpoint*)ep; gpr_mu_lock(&te->mu); GPR_ASSERT(te->write_cb == NULL); if (te->write_buffer.length == 0) { @@ -76,19 +76,25 @@ static void te_write(grpc_endpoint *ep, grpc_slice_buffer *slices, gpr_mu_unlock(&te->mu); } -static void te_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { - trickle_endpoint *te = (trickle_endpoint *)ep; +static void te_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) { + trickle_endpoint* te = (trickle_endpoint*)ep; grpc_endpoint_add_to_pollset(te->wrapped, pollset); } -static void te_add_to_pollset_set(grpc_endpoint *ep, - grpc_pollset_set *pollset_set) { - trickle_endpoint *te = (trickle_endpoint *)ep; +static void te_add_to_pollset_set(grpc_endpoint* ep, + grpc_pollset_set* pollset_set) { + trickle_endpoint* te = (trickle_endpoint*)ep; grpc_endpoint_add_to_pollset_set(te->wrapped, pollset_set); } -static void te_shutdown(grpc_endpoint *ep, grpc_error *why) { - trickle_endpoint *te = (trickle_endpoint *)ep; +static void te_delete_from_pollset_set(grpc_endpoint* ep, + grpc_pollset_set* pollset_set) { + trickle_endpoint* te = (trickle_endpoint*)ep; + grpc_endpoint_delete_from_pollset_set(te->wrapped, pollset_set); +} + +static void te_shutdown(grpc_endpoint* ep, grpc_error* why) { + trickle_endpoint* te = (trickle_endpoint*)ep; gpr_mu_lock(&te->mu); if (te->error == GRPC_ERROR_NONE) { te->error = GRPC_ERROR_REF(why); @@ -98,8 +104,8 @@ static void te_shutdown(grpc_endpoint *ep, grpc_error *why) { grpc_endpoint_shutdown(te->wrapped, why); } -static void te_destroy(grpc_endpoint *ep) { - trickle_endpoint *te = (trickle_endpoint *)ep; +static void te_destroy(grpc_endpoint* ep) { + trickle_endpoint* te = (trickle_endpoint*)ep; grpc_endpoint_destroy(te->wrapped); gpr_mu_destroy(&te->mu); grpc_slice_buffer_destroy_internal(&te->write_buffer); @@ -108,37 +114,43 @@ static void te_destroy(grpc_endpoint *ep) { gpr_free(te); } -static grpc_resource_user *te_get_resource_user(grpc_endpoint *ep) { - trickle_endpoint *te = (trickle_endpoint *)ep; +static grpc_resource_user* te_get_resource_user(grpc_endpoint* ep) { + trickle_endpoint* te = (trickle_endpoint*)ep; return grpc_endpoint_get_resource_user(te->wrapped); } -static char *te_get_peer(grpc_endpoint *ep) { - trickle_endpoint *te = (trickle_endpoint *)ep; +static char* te_get_peer(grpc_endpoint* ep) { + trickle_endpoint* te = (trickle_endpoint*)ep; return grpc_endpoint_get_peer(te->wrapped); } -static int te_get_fd(grpc_endpoint *ep) { - trickle_endpoint *te = (trickle_endpoint *)ep; +static int te_get_fd(grpc_endpoint* ep) { + trickle_endpoint* te = (trickle_endpoint*)ep; return grpc_endpoint_get_fd(te->wrapped); } -static void te_finish_write(void *arg, grpc_error *error) { - trickle_endpoint *te = (trickle_endpoint *)arg; +static void te_finish_write(void* arg, grpc_error* error) { + trickle_endpoint* te = (trickle_endpoint*)arg; gpr_mu_lock(&te->mu); te->writing = false; grpc_slice_buffer_reset_and_unref(&te->writing_buffer); gpr_mu_unlock(&te->mu); } -static const grpc_endpoint_vtable vtable = { - te_read, te_write, te_add_to_pollset, te_add_to_pollset_set, - te_shutdown, te_destroy, te_get_resource_user, te_get_peer, - te_get_fd}; - -grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap, +static const grpc_endpoint_vtable vtable = {te_read, + te_write, + te_add_to_pollset, + te_add_to_pollset_set, + te_delete_from_pollset_set, + te_shutdown, + te_destroy, + te_get_resource_user, + te_get_peer, + te_get_fd}; + +grpc_endpoint* grpc_trickle_endpoint_create(grpc_endpoint* wrap, double bytes_per_second) { - trickle_endpoint *te = (trickle_endpoint *)gpr_malloc(sizeof(*te)); + trickle_endpoint* te = (trickle_endpoint*)gpr_malloc(sizeof(*te)); te->base.vtable = &vtable; te->wrapped = wrap; te->bytes_per_second = bytes_per_second; @@ -155,8 +167,8 @@ static double ts2dbl(gpr_timespec s) { return (double)s.tv_sec + 1e-9 * (double)s.tv_nsec; } -size_t grpc_trickle_endpoint_trickle(grpc_endpoint *ep) { - trickle_endpoint *te = (trickle_endpoint *)ep; +size_t grpc_trickle_endpoint_trickle(grpc_endpoint* ep) { + trickle_endpoint* te = (trickle_endpoint*)ep; gpr_mu_lock(&te->mu); if (!te->writing && te->write_buffer.length > 0) { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); @@ -180,8 +192,8 @@ size_t grpc_trickle_endpoint_trickle(grpc_endpoint *ep) { return backlog; } -size_t grpc_trickle_get_backlog(grpc_endpoint *ep) { - trickle_endpoint *te = (trickle_endpoint *)ep; +size_t grpc_trickle_get_backlog(grpc_endpoint* ep) { + trickle_endpoint* te = (trickle_endpoint*)ep; gpr_mu_lock(&te->mu); size_t backlog = te->write_buffer.length; gpr_mu_unlock(&te->mu); |