aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core
diff options
context:
space:
mode:
Diffstat (limited to 'test/core')
-rw-r--r--test/core/util/trickle_endpoint.c40
-rw-r--r--test/core/util/trickle_endpoint.h5
2 files changed, 31 insertions, 14 deletions
diff --git a/test/core/util/trickle_endpoint.c b/test/core/util/trickle_endpoint.c
index 8d661e04c6..5b6c666950 100644
--- a/test/core/util/trickle_endpoint.c
+++ b/test/core/util/trickle_endpoint.c
@@ -47,7 +47,9 @@
typedef struct {
grpc_endpoint base;
+ double bytes_per_second;
grpc_endpoint *wrapped;
+ gpr_timespec last_write;
gpr_mu mu;
grpc_slice_buffer write_buffer;
@@ -68,8 +70,11 @@ static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
for (size_t i = 0; i < slices->count; i++) {
grpc_slice_ref_internal(slices->slices[i]);
}
- grpc_slice_buffer_addn(&te->write_buffer, slices->slices, slices->count);
gpr_mu_lock(&te->mu);
+ if (te->write_buffer.length == 0) {
+ te->last_write = gpr_now(GPR_CLOCK_MONOTONIC);
+ }
+ grpc_slice_buffer_addn(&te->write_buffer, slices->slices, slices->count);
grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_REF(te->error));
gpr_mu_unlock(&te->mu);
}
@@ -147,10 +152,12 @@ static const grpc_endpoint_vtable vtable = {te_read,
te_get_peer,
te_get_fd};
-grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap) {
+grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap,
+ double bytes_per_second) {
trickle_endpoint *te = gpr_malloc(sizeof(*te));
te->base.vtable = &vtable;
te->wrapped = wrap;
+ te->bytes_per_second = bytes_per_second;
gpr_mu_init(&te->mu);
grpc_slice_buffer_init(&te->write_buffer);
grpc_slice_buffer_init(&te->writing_buffer);
@@ -159,18 +166,27 @@ grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap) {
return &te->base;
}
-size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
- size_t bytes) {
+static double ts2dbl(gpr_timespec s) { return s.tv_sec + 1e-9 * s.tv_nsec; }
+
+size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx,
+ grpc_endpoint *ep) {
trickle_endpoint *te = (trickle_endpoint *)ep;
gpr_mu_lock(&te->mu);
- if (bytes > 0 && !te->writing) {
- grpc_slice_buffer_move_first(&te->write_buffer,
- GPR_MIN(bytes, te->write_buffer.length),
- &te->writing_buffer);
- te->writing = true;
- grpc_endpoint_write(
- exec_ctx, te->wrapped, &te->writing_buffer,
- grpc_closure_create(te_finish_write, te, grpc_schedule_on_exec_ctx));
+ if (!te->writing && te->write_buffer.length > 0) {
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ double elapsed = ts2dbl(gpr_time_sub(now, te->last_write));
+ size_t bytes = (size_t)(te->bytes_per_second * elapsed);
+ // gpr_log(GPR_DEBUG, "%lf elapsed --> %" PRIdPTR " bytes", elapsed, bytes);
+ if (bytes > 0) {
+ grpc_slice_buffer_move_first(&te->write_buffer,
+ GPR_MIN(bytes, te->write_buffer.length),
+ &te->writing_buffer);
+ te->writing = true;
+ te->last_write = now;
+ grpc_endpoint_write(
+ exec_ctx, te->wrapped, &te->writing_buffer,
+ grpc_closure_create(te_finish_write, te, grpc_schedule_on_exec_ctx));
+ }
}
size_t backlog = te->write_buffer.length;
gpr_mu_unlock(&te->mu);
diff --git a/test/core/util/trickle_endpoint.h b/test/core/util/trickle_endpoint.h
index 5f16818ebb..7e8d9d91e3 100644
--- a/test/core/util/trickle_endpoint.h
+++ b/test/core/util/trickle_endpoint.h
@@ -36,10 +36,11 @@
#include "src/core/lib/iomgr/endpoint.h"
-grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap);
+grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap,
+ double bytes_per_second);
/* Allow up to \a bytes through the endpoint. Returns the new backlog. */
size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx,
- grpc_endpoint *endpoint, size_t bytes);
+ grpc_endpoint *endpoint);
#endif