aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core/util/trickle_endpoint.c
diff options
context:
space:
mode:
Diffstat (limited to 'test/core/util/trickle_endpoint.c')
-rw-r--r--test/core/util/trickle_endpoint.c27
1 files changed, 26 insertions, 1 deletions
diff --git a/test/core/util/trickle_endpoint.c b/test/core/util/trickle_endpoint.c
index 58ac59711b..69386a0718 100644
--- a/test/core/util/trickle_endpoint.c
+++ b/test/core/util/trickle_endpoint.c
@@ -44,6 +44,8 @@
#include <grpc/support/useful.h>
#include "src/core/lib/slice/slice_internal.h"
+#define WRITE_BUFFER_SIZE (2 * 1024 * 1024)
+
typedef struct {
grpc_endpoint base;
double bytes_per_second;
@@ -55,6 +57,7 @@ typedef struct {
grpc_slice_buffer writing_buffer;
grpc_error *error;
bool writing;
+ grpc_closure *write_cb;
} trickle_endpoint;
static void te_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
@@ -63,10 +66,20 @@ static void te_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_endpoint_read(exec_ctx, te->wrapped, slices, cb);
}
+static void maybe_call_write_cb_locked(grpc_exec_ctx *exec_ctx,
+ trickle_endpoint *te) {
+ if (te->write_cb != NULL && (te->error != GRPC_ERROR_NONE ||
+ te->write_buffer.length <= WRITE_BUFFER_SIZE)) {
+ grpc_closure_sched(exec_ctx, te->write_cb, GRPC_ERROR_REF(te->error));
+ te->write_cb = NULL;
+ }
+}
+
static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_slice_buffer *slices, grpc_closure *cb) {
trickle_endpoint *te = (trickle_endpoint *)ep;
gpr_mu_lock(&te->mu);
+ GPR_ASSERT(te->write_cb == NULL);
if (te->write_buffer.length == 0) {
te->last_write = gpr_now(GPR_CLOCK_MONOTONIC);
}
@@ -74,7 +87,8 @@ static void te_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_slice_buffer_add(&te->write_buffer,
grpc_slice_copy(slices->slices[i]));
}
- grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_REF(te->error));
+ te->write_cb = cb;
+ maybe_call_write_cb_locked(exec_ctx, te);
gpr_mu_unlock(&te->mu);
}
@@ -102,6 +116,7 @@ static void te_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (te->error == GRPC_ERROR_NONE) {
te->error = GRPC_ERROR_REF(why);
}
+ maybe_call_write_cb_locked(exec_ctx, te);
gpr_mu_unlock(&te->mu);
grpc_endpoint_shutdown(exec_ctx, te->wrapped, why);
}
@@ -157,6 +172,7 @@ grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap,
te->base.vtable = &vtable;
te->wrapped = wrap;
te->bytes_per_second = bytes_per_second;
+ te->write_cb = NULL;
gpr_mu_init(&te->mu);
grpc_slice_buffer_init(&te->write_buffer);
grpc_slice_buffer_init(&te->writing_buffer);
@@ -187,9 +203,18 @@ size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx *exec_ctx,
grpc_endpoint_write(
exec_ctx, te->wrapped, &te->writing_buffer,
grpc_closure_create(te_finish_write, te, grpc_schedule_on_exec_ctx));
+ maybe_call_write_cb_locked(exec_ctx, te);
}
}
size_t backlog = te->write_buffer.length;
gpr_mu_unlock(&te->mu);
return backlog;
}
+
+size_t grpc_trickle_get_backlog(grpc_endpoint *ep) {
+ trickle_endpoint *te = (trickle_endpoint *)ep;
+ gpr_mu_lock(&te->mu);
+ size_t backlog = te->write_buffer.length;
+ gpr_mu_unlock(&te->mu);
+ return backlog;
+}