diff options
Diffstat (limited to 'test/core')
-rw-r--r-- | test/core/util/trickle_endpoint.c | 40 | ||||
-rw-r--r-- | test/core/util/trickle_endpoint.h | 5 |
2 files changed, 31 insertions, 14 deletions
diff --git a/test/core/util/trickle_endpoint.c b/test/core/util/trickle_endpoint.c index 8d661e04c6..5b6c666950 100644 --- a/test/core/util/trickle_endpoint.c +++ b/test/core/util/trickle_endpoint.c @@ -47,7 +47,9 @@ typedef struct { grpc_endpoint base; + double bytes_per_second; grpc_endpoint *wrapped; + gpr_timespec last_write; gpr_mu mu; grpc_slice_buffer write_buffer; @@ -68,8 +70,11 @@ static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, for (size_t i = 0; i < slices->count; i++) { grpc_slice_ref_internal(slices->slices[i]); } - grpc_slice_buffer_addn(&te->write_buffer, slices->slices, slices->count); gpr_mu_lock(&te->mu); + if (te->write_buffer.length == 0) { + te->last_write = gpr_now(GPR_CLOCK_MONOTONIC); + } + grpc_slice_buffer_addn(&te->write_buffer, slices->slices, slices->count); grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_REF(te->error)); gpr_mu_unlock(&te->mu); } @@ -147,10 +152,12 @@ static const grpc_endpoint_vtable vtable = {te_read, te_get_peer, te_get_fd}; -grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap) { +grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap, + double bytes_per_second) { trickle_endpoint *te = gpr_malloc(sizeof(*te)); te->base.vtable = &vtable; te->wrapped = wrap; + te->bytes_per_second = bytes_per_second; gpr_mu_init(&te->mu); grpc_slice_buffer_init(&te->write_buffer); grpc_slice_buffer_init(&te->writing_buffer); @@ -159,18 +166,27 @@ grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap) { return &te->base; } -size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - size_t bytes) { +static double ts2dbl(gpr_timespec s) { return s.tv_sec + 1e-9 * s.tv_nsec; } + +size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep) { trickle_endpoint *te = (trickle_endpoint *)ep; gpr_mu_lock(&te->mu); - if (bytes > 0 && !te->writing) { - grpc_slice_buffer_move_first(&te->write_buffer, - GPR_MIN(bytes, te->write_buffer.length), - &te->writing_buffer); - te->writing = true; - grpc_endpoint_write( - exec_ctx, te->wrapped, &te->writing_buffer, - grpc_closure_create(te_finish_write, te, grpc_schedule_on_exec_ctx)); + if (!te->writing && te->write_buffer.length > 0) { + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + double elapsed = ts2dbl(gpr_time_sub(now, te->last_write)); + size_t bytes = (size_t)(te->bytes_per_second * elapsed); + // gpr_log(GPR_DEBUG, "%lf elapsed --> %" PRIdPTR " bytes", elapsed, bytes); + if (bytes > 0) { + grpc_slice_buffer_move_first(&te->write_buffer, + GPR_MIN(bytes, te->write_buffer.length), + &te->writing_buffer); + te->writing = true; + te->last_write = now; + grpc_endpoint_write( + exec_ctx, te->wrapped, &te->writing_buffer, + grpc_closure_create(te_finish_write, te, grpc_schedule_on_exec_ctx)); + } } size_t backlog = te->write_buffer.length; gpr_mu_unlock(&te->mu); diff --git a/test/core/util/trickle_endpoint.h b/test/core/util/trickle_endpoint.h index 5f16818ebb..7e8d9d91e3 100644 --- a/test/core/util/trickle_endpoint.h +++ b/test/core/util/trickle_endpoint.h @@ -36,10 +36,11 @@ #include "src/core/lib/iomgr/endpoint.h" -grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap); +grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap, + double bytes_per_second); /* Allow up to \a bytes through the endpoint. Returns the new backlog. */ size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx, - grpc_endpoint *endpoint, size_t bytes); + grpc_endpoint *endpoint); #endif |