diff options
Diffstat (limited to 'test/core/util/trickle_endpoint.c')
-rw-r--r-- | test/core/util/trickle_endpoint.c | 27 |
1 files changed, 26 insertions, 1 deletions
diff --git a/test/core/util/trickle_endpoint.c b/test/core/util/trickle_endpoint.c index 58ac59711b..69386a0718 100644 --- a/test/core/util/trickle_endpoint.c +++ b/test/core/util/trickle_endpoint.c @@ -44,6 +44,8 @@ #include <grpc/support/useful.h> #include "src/core/lib/slice/slice_internal.h" +#define WRITE_BUFFER_SIZE (2 * 1024 * 1024) + typedef struct { grpc_endpoint base; double bytes_per_second; @@ -55,6 +57,7 @@ typedef struct { grpc_slice_buffer writing_buffer; grpc_error *error; bool writing; + grpc_closure *write_cb; } trickle_endpoint; static void te_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -63,10 +66,20 @@ static void te_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_endpoint_read(exec_ctx, te->wrapped, slices, cb); } +static void maybe_call_write_cb_locked(grpc_exec_ctx *exec_ctx, + trickle_endpoint *te) { + if (te->write_cb != NULL && (te->error != GRPC_ERROR_NONE || + te->write_buffer.length <= WRITE_BUFFER_SIZE)) { + grpc_closure_sched(exec_ctx, te->write_cb, GRPC_ERROR_REF(te->error)); + te->write_cb = NULL; + } +} + static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_slice_buffer *slices, grpc_closure *cb) { trickle_endpoint *te = (trickle_endpoint *)ep; gpr_mu_lock(&te->mu); + GPR_ASSERT(te->write_cb == NULL); if (te->write_buffer.length == 0) { te->last_write = gpr_now(GPR_CLOCK_MONOTONIC); } @@ -74,7 +87,8 @@ static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_slice_buffer_add(&te->write_buffer, grpc_slice_copy(slices->slices[i])); } - grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_REF(te->error)); + te->write_cb = cb; + maybe_call_write_cb_locked(exec_ctx, te); gpr_mu_unlock(&te->mu); } @@ -102,6 +116,7 @@ static void te_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (te->error == GRPC_ERROR_NONE) { te->error = GRPC_ERROR_REF(why); } + maybe_call_write_cb_locked(exec_ctx, te); gpr_mu_unlock(&te->mu); grpc_endpoint_shutdown(exec_ctx, te->wrapped, why); } @@ -157,6 +172,7 @@ grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap, te->base.vtable = &vtable; te->wrapped = wrap; te->bytes_per_second = bytes_per_second; + te->write_cb = NULL; gpr_mu_init(&te->mu); grpc_slice_buffer_init(&te->write_buffer); grpc_slice_buffer_init(&te->writing_buffer); @@ -187,9 +203,18 @@ size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx, grpc_endpoint_write( exec_ctx, te->wrapped, &te->writing_buffer, grpc_closure_create(te_finish_write, te, grpc_schedule_on_exec_ctx)); + maybe_call_write_cb_locked(exec_ctx, te); } } size_t backlog = te->write_buffer.length; gpr_mu_unlock(&te->mu); return backlog; } + +size_t grpc_trickle_get_backlog(grpc_endpoint *ep) { + trickle_endpoint *te = (trickle_endpoint *)ep; + gpr_mu_lock(&te->mu); + size_t backlog = te->write_buffer.length; + gpr_mu_unlock(&te->mu); + return backlog; +} |