aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core/util/trickle_endpoint.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/core/util/trickle_endpoint.cc')
-rw-r--r--test/core/util/trickle_endpoint.cc88
1 files changed, 50 insertions, 38 deletions
diff --git a/test/core/util/trickle_endpoint.cc b/test/core/util/trickle_endpoint.cc
index e36f64816e..763e684237 100644
--- a/test/core/util/trickle_endpoint.cc
+++ b/test/core/util/trickle_endpoint.cc
@@ -34,24 +34,24 @@
typedef struct {
grpc_endpoint base;
double bytes_per_second;
- grpc_endpoint *wrapped;
+ grpc_endpoint* wrapped;
gpr_timespec last_write;
gpr_mu mu;
grpc_slice_buffer write_buffer;
grpc_slice_buffer writing_buffer;
- grpc_error *error;
+ grpc_error* error;
bool writing;
- grpc_closure *write_cb;
+ grpc_closure* write_cb;
} trickle_endpoint;
-static void te_read(grpc_endpoint *ep, grpc_slice_buffer *slices,
- grpc_closure *cb) {
- trickle_endpoint *te = (trickle_endpoint *)ep;
+static void te_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
+ grpc_closure* cb) {
+ trickle_endpoint* te = (trickle_endpoint*)ep;
grpc_endpoint_read(te->wrapped, slices, cb);
}
-static void maybe_call_write_cb_locked(trickle_endpoint *te) {
+static void maybe_call_write_cb_locked(trickle_endpoint* te) {
if (te->write_cb != NULL && (te->error != GRPC_ERROR_NONE ||
te->write_buffer.length <= WRITE_BUFFER_SIZE)) {
GRPC_CLOSURE_SCHED(te->write_cb, GRPC_ERROR_REF(te->error));
@@ -59,9 +59,9 @@ static void maybe_call_write_cb_locked(trickle_endpoint *te) {
}
}
-static void te_write(grpc_endpoint *ep, grpc_slice_buffer *slices,
- grpc_closure *cb) {
- trickle_endpoint *te = (trickle_endpoint *)ep;
+static void te_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
+ grpc_closure* cb) {
+ trickle_endpoint* te = (trickle_endpoint*)ep;
gpr_mu_lock(&te->mu);
GPR_ASSERT(te->write_cb == NULL);
if (te->write_buffer.length == 0) {
@@ -76,19 +76,25 @@ static void te_write(grpc_endpoint *ep, grpc_slice_buffer *slices,
gpr_mu_unlock(&te->mu);
}
-static void te_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
- trickle_endpoint *te = (trickle_endpoint *)ep;
+static void te_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {
+ trickle_endpoint* te = (trickle_endpoint*)ep;
grpc_endpoint_add_to_pollset(te->wrapped, pollset);
}
-static void te_add_to_pollset_set(grpc_endpoint *ep,
- grpc_pollset_set *pollset_set) {
- trickle_endpoint *te = (trickle_endpoint *)ep;
+static void te_add_to_pollset_set(grpc_endpoint* ep,
+ grpc_pollset_set* pollset_set) {
+ trickle_endpoint* te = (trickle_endpoint*)ep;
grpc_endpoint_add_to_pollset_set(te->wrapped, pollset_set);
}
-static void te_shutdown(grpc_endpoint *ep, grpc_error *why) {
- trickle_endpoint *te = (trickle_endpoint *)ep;
+static void te_delete_from_pollset_set(grpc_endpoint* ep,
+ grpc_pollset_set* pollset_set) {
+ trickle_endpoint* te = (trickle_endpoint*)ep;
+ grpc_endpoint_delete_from_pollset_set(te->wrapped, pollset_set);
+}
+
+static void te_shutdown(grpc_endpoint* ep, grpc_error* why) {
+ trickle_endpoint* te = (trickle_endpoint*)ep;
gpr_mu_lock(&te->mu);
if (te->error == GRPC_ERROR_NONE) {
te->error = GRPC_ERROR_REF(why);
@@ -98,8 +104,8 @@ static void te_shutdown(grpc_endpoint *ep, grpc_error *why) {
grpc_endpoint_shutdown(te->wrapped, why);
}
-static void te_destroy(grpc_endpoint *ep) {
- trickle_endpoint *te = (trickle_endpoint *)ep;
+static void te_destroy(grpc_endpoint* ep) {
+ trickle_endpoint* te = (trickle_endpoint*)ep;
grpc_endpoint_destroy(te->wrapped);
gpr_mu_destroy(&te->mu);
grpc_slice_buffer_destroy_internal(&te->write_buffer);
@@ -108,37 +114,43 @@ static void te_destroy(grpc_endpoint *ep) {
gpr_free(te);
}
-static grpc_resource_user *te_get_resource_user(grpc_endpoint *ep) {
- trickle_endpoint *te = (trickle_endpoint *)ep;
+static grpc_resource_user* te_get_resource_user(grpc_endpoint* ep) {
+ trickle_endpoint* te = (trickle_endpoint*)ep;
return grpc_endpoint_get_resource_user(te->wrapped);
}
-static char *te_get_peer(grpc_endpoint *ep) {
- trickle_endpoint *te = (trickle_endpoint *)ep;
+static char* te_get_peer(grpc_endpoint* ep) {
+ trickle_endpoint* te = (trickle_endpoint*)ep;
return grpc_endpoint_get_peer(te->wrapped);
}
-static int te_get_fd(grpc_endpoint *ep) {
- trickle_endpoint *te = (trickle_endpoint *)ep;
+static int te_get_fd(grpc_endpoint* ep) {
+ trickle_endpoint* te = (trickle_endpoint*)ep;
return grpc_endpoint_get_fd(te->wrapped);
}
-static void te_finish_write(void *arg, grpc_error *error) {
- trickle_endpoint *te = (trickle_endpoint *)arg;
+static void te_finish_write(void* arg, grpc_error* error) {
+ trickle_endpoint* te = (trickle_endpoint*)arg;
gpr_mu_lock(&te->mu);
te->writing = false;
grpc_slice_buffer_reset_and_unref(&te->writing_buffer);
gpr_mu_unlock(&te->mu);
}
-static const grpc_endpoint_vtable vtable = {
- te_read, te_write, te_add_to_pollset, te_add_to_pollset_set,
- te_shutdown, te_destroy, te_get_resource_user, te_get_peer,
- te_get_fd};
-
-grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap,
+static const grpc_endpoint_vtable vtable = {te_read,
+ te_write,
+ te_add_to_pollset,
+ te_add_to_pollset_set,
+ te_delete_from_pollset_set,
+ te_shutdown,
+ te_destroy,
+ te_get_resource_user,
+ te_get_peer,
+ te_get_fd};
+
+grpc_endpoint* grpc_trickle_endpoint_create(grpc_endpoint* wrap,
double bytes_per_second) {
- trickle_endpoint *te = (trickle_endpoint *)gpr_malloc(sizeof(*te));
+ trickle_endpoint* te = (trickle_endpoint*)gpr_malloc(sizeof(*te));
te->base.vtable = &vtable;
te->wrapped = wrap;
te->bytes_per_second = bytes_per_second;
@@ -155,8 +167,8 @@ static double ts2dbl(gpr_timespec s) {
return (double)s.tv_sec + 1e-9 * (double)s.tv_nsec;
}
-size_t grpc_trickle_endpoint_trickle(grpc_endpoint *ep) {
- trickle_endpoint *te = (trickle_endpoint *)ep;
+size_t grpc_trickle_endpoint_trickle(grpc_endpoint* ep) {
+ trickle_endpoint* te = (trickle_endpoint*)ep;
gpr_mu_lock(&te->mu);
if (!te->writing && te->write_buffer.length > 0) {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
@@ -180,8 +192,8 @@ size_t grpc_trickle_endpoint_trickle(grpc_endpoint *ep) {
return backlog;
}
-size_t grpc_trickle_get_backlog(grpc_endpoint *ep) {
- trickle_endpoint *te = (trickle_endpoint *)ep;
+size_t grpc_trickle_get_backlog(grpc_endpoint* ep) {
+ trickle_endpoint* te = (trickle_endpoint*)ep;
gpr_mu_lock(&te->mu);
size_t backlog = te->write_buffer.length;
gpr_mu_unlock(&te->mu);