aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core/util
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-12-06 09:05:05 -0800
committerGravatar GitHub <noreply@github.com>2017-12-06 09:05:05 -0800
commitad4d2dde0052efbbf49d64b0843c45f0381cfeb3 (patch)
tree6a657f8c6179d873b34505cdc24bce9462ca68eb /test/core/util
parenta3df36cc2505a89c2f481eea4a66a87b3002844a (diff)
Revert "All instances of exec_ctx being passed around in src/core removed"
Diffstat (limited to 'test/core/util')
-rw-r--r--test/core/util/mock_endpoint.cc36
-rw-r--r--test/core/util/mock_endpoint.h3
-rw-r--r--test/core/util/one_corpus_entry_fuzzer.cc8
-rw-r--r--test/core/util/passthru_endpoint.cc45
-rw-r--r--test/core/util/port_server_client.cc130
-rw-r--r--test/core/util/reconnect_server.cc6
-rw-r--r--test/core/util/test_tcp_server.cc36
-rw-r--r--test/core/util/trickle_endpoint.cc56
-rw-r--r--test/core/util/trickle_endpoint.h3
9 files changed, 170 insertions, 153 deletions
diff --git a/test/core/util/mock_endpoint.cc b/test/core/util/mock_endpoint.cc
index 4b35a581b1..d9545efa49 100644
--- a/test/core/util/mock_endpoint.cc
+++ b/test/core/util/mock_endpoint.cc
@@ -40,13 +40,13 @@ typedef struct grpc_mock_endpoint {
grpc_resource_user* resource_user;
} grpc_mock_endpoint;
-static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+static void me_read(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
+ grpc_slice_buffer* slices, grpc_closure* cb) {
grpc_mock_endpoint* m = (grpc_mock_endpoint*)ep;
gpr_mu_lock(&m->mu);
if (m->read_buffer.count > 0) {
grpc_slice_buffer_swap(&m->read_buffer, slices);
- GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_ERROR_NONE);
} else {
m->on_read = cb;
m->on_read_out = slices;
@@ -54,41 +54,44 @@ static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
gpr_mu_unlock(&m->mu);
}
-static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+static void me_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
+ grpc_slice_buffer* slices, grpc_closure* cb) {
grpc_mock_endpoint* m = (grpc_mock_endpoint*)ep;
for (size_t i = 0; i < slices->count; i++) {
m->on_write(slices->slices[i]);
}
- GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_ERROR_NONE);
}
-static void me_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {}
+static void me_add_to_pollset(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
+ grpc_pollset* pollset) {}
-static void me_add_to_pollset_set(grpc_endpoint* ep,
+static void me_add_to_pollset_set(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
grpc_pollset_set* pollset) {}
-static void me_delete_from_pollset_set(grpc_endpoint* ep,
+static void me_delete_from_pollset_set(grpc_exec_ctx* exec_ctx,
+ grpc_endpoint* ep,
grpc_pollset_set* pollset) {}
-static void me_shutdown(grpc_endpoint* ep, grpc_error* why) {
+static void me_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
+ grpc_error* why) {
grpc_mock_endpoint* m = (grpc_mock_endpoint*)ep;
gpr_mu_lock(&m->mu);
if (m->on_read) {
- GRPC_CLOSURE_SCHED(m->on_read,
+ GRPC_CLOSURE_SCHED(exec_ctx, m->on_read,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Endpoint Shutdown", &why, 1));
m->on_read = nullptr;
}
gpr_mu_unlock(&m->mu);
- grpc_resource_user_shutdown(m->resource_user);
+ grpc_resource_user_shutdown(exec_ctx, m->resource_user);
GRPC_ERROR_UNREF(why);
}
-static void me_destroy(grpc_endpoint* ep) {
+static void me_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) {
grpc_mock_endpoint* m = (grpc_mock_endpoint*)ep;
grpc_slice_buffer_destroy(&m->read_buffer);
- grpc_resource_user_unref(m->resource_user);
+ grpc_resource_user_unref(exec_ctx, m->resource_user);
gpr_free(m);
}
@@ -131,12 +134,13 @@ grpc_endpoint* grpc_mock_endpoint_create(void (*on_write)(grpc_slice slice),
return &m->base;
}
-void grpc_mock_endpoint_put_read(grpc_endpoint* ep, grpc_slice slice) {
+void grpc_mock_endpoint_put_read(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
+ grpc_slice slice) {
grpc_mock_endpoint* m = (grpc_mock_endpoint*)ep;
gpr_mu_lock(&m->mu);
if (m->on_read != nullptr) {
grpc_slice_buffer_add(m->on_read_out, slice);
- GRPC_CLOSURE_SCHED(m->on_read, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, m->on_read, GRPC_ERROR_NONE);
m->on_read = nullptr;
} else {
grpc_slice_buffer_add(&m->read_buffer, slice);
diff --git a/test/core/util/mock_endpoint.h b/test/core/util/mock_endpoint.h
index 6521d3e8e8..ccabaf7c3b 100644
--- a/test/core/util/mock_endpoint.h
+++ b/test/core/util/mock_endpoint.h
@@ -23,7 +23,8 @@
grpc_endpoint* grpc_mock_endpoint_create(void (*on_write)(grpc_slice slice),
grpc_resource_quota* resource_quota);
-void grpc_mock_endpoint_put_read(grpc_endpoint* mock_endpoint,
+void grpc_mock_endpoint_put_read(grpc_exec_ctx* exec_ctx,
+ grpc_endpoint* mock_endpoint,
grpc_slice slice);
#endif
diff --git a/test/core/util/one_corpus_entry_fuzzer.cc b/test/core/util/one_corpus_entry_fuzzer.cc
index c745eb5dc6..c0b67da1e2 100644
--- a/test/core/util/one_corpus_entry_fuzzer.cc
+++ b/test/core/util/one_corpus_entry_fuzzer.cc
@@ -18,10 +18,7 @@
#include <stdbool.h>
-#include <grpc/grpc.h>
-
#include <grpc/support/log.h>
-#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/load_file.h"
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size);
@@ -33,15 +30,10 @@ int main(int argc, char** argv) {
grpc_slice buffer;
squelch = false;
leak_check = false;
- /* TODO(yashkt) Calling grpc_init breaks tests. Fix the tests and replace
- * grpc_core::ExecCtx::GlobalInit with grpc_init and GlobalShutdown with
- * grpc_shutdown */
GPR_ASSERT(
GRPC_LOG_IF_ERROR("load_file", grpc_load_file(argv[1], 0, &buffer)));
LLVMFuzzerTestOneInput(GRPC_SLICE_START_PTR(buffer),
GRPC_SLICE_LENGTH(buffer));
- grpc_core::ExecCtx::GlobalInit();
grpc_slice_unref(buffer);
- grpc_core::ExecCtx::GlobalShutdown();
return 0;
}
diff --git a/test/core/util/passthru_endpoint.cc b/test/core/util/passthru_endpoint.cc
index 5f127cb960..a9efe22b69 100644
--- a/test/core/util/passthru_endpoint.cc
+++ b/test/core/util/passthru_endpoint.cc
@@ -49,22 +49,22 @@ struct passthru_endpoint {
int halves;
grpc_passthru_endpoint_stats* stats;
grpc_passthru_endpoint_stats
- dummy_stats; // used if constructor stats == nullptr
+ dummy_stats; // used if constructor stats == NULL
bool shutdown;
half client;
half server;
};
-static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+static void me_read(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
+ grpc_slice_buffer* slices, grpc_closure* cb) {
half* m = (half*)ep;
gpr_mu_lock(&m->parent->mu);
if (m->parent->shutdown) {
GRPC_CLOSURE_SCHED(
- cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
+ exec_ctx, cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
} else if (m->read_buffer.count > 0) {
grpc_slice_buffer_swap(&m->read_buffer, slices);
- GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_ERROR_NONE);
} else {
m->on_read = cb;
m->on_read_out = slices;
@@ -77,8 +77,8 @@ static half* other_half(half* h) {
return &h->parent->client;
}
-static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+static void me_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
+ grpc_slice_buffer* slices, grpc_closure* cb) {
half* m = other_half((half*)ep);
gpr_mu_lock(&m->parent->mu);
grpc_error* error = GRPC_ERROR_NONE;
@@ -89,7 +89,7 @@ static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
for (size_t i = 0; i < slices->count; i++) {
grpc_slice_buffer_add(m->on_read_out, grpc_slice_copy(slices->slices[i]));
}
- GRPC_CLOSURE_SCHED(m->on_read, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, m->on_read, GRPC_ERROR_NONE);
m->on_read = nullptr;
} else {
for (size_t i = 0; i < slices->count; i++) {
@@ -98,49 +98,52 @@ static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
}
}
gpr_mu_unlock(&m->parent->mu);
- GRPC_CLOSURE_SCHED(cb, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, error);
}
-static void me_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {}
+static void me_add_to_pollset(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
+ grpc_pollset* pollset) {}
-static void me_add_to_pollset_set(grpc_endpoint* ep,
+static void me_add_to_pollset_set(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
grpc_pollset_set* pollset) {}
-static void me_delete_from_pollset_set(grpc_endpoint* ep,
+static void me_delete_from_pollset_set(grpc_exec_ctx* exec_ctx,
+ grpc_endpoint* ep,
grpc_pollset_set* pollset) {}
-static void me_shutdown(grpc_endpoint* ep, grpc_error* why) {
+static void me_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
+ grpc_error* why) {
half* m = (half*)ep;
gpr_mu_lock(&m->parent->mu);
m->parent->shutdown = true;
if (m->on_read) {
GRPC_CLOSURE_SCHED(
- m->on_read,
+ exec_ctx, m->on_read,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
m->on_read = nullptr;
}
m = other_half(m);
if (m->on_read) {
GRPC_CLOSURE_SCHED(
- m->on_read,
+ exec_ctx, m->on_read,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
m->on_read = nullptr;
}
gpr_mu_unlock(&m->parent->mu);
- grpc_resource_user_shutdown(m->resource_user);
+ grpc_resource_user_shutdown(exec_ctx, m->resource_user);
GRPC_ERROR_UNREF(why);
}
-static void me_destroy(grpc_endpoint* ep) {
+static void me_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) {
passthru_endpoint* p = ((half*)ep)->parent;
gpr_mu_lock(&p->mu);
if (0 == --p->halves) {
gpr_mu_unlock(&p->mu);
gpr_mu_destroy(&p->mu);
- grpc_slice_buffer_destroy_internal(&p->client.read_buffer);
- grpc_slice_buffer_destroy_internal(&p->server.read_buffer);
- grpc_resource_user_unref(p->client.resource_user);
- grpc_resource_user_unref(p->server.resource_user);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &p->client.read_buffer);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &p->server.read_buffer);
+ grpc_resource_user_unref(exec_ctx, p->client.resource_user);
+ grpc_resource_user_unref(exec_ctx, p->server.resource_user);
gpr_free(p);
} else {
gpr_mu_unlock(&p->mu);
diff --git a/test/core/util/port_server_client.cc b/test/core/util/port_server_client.cc
index 7e76c8063f..edec50b424 100644
--- a/test/core/util/port_server_client.cc
+++ b/test/core/util/port_server_client.cc
@@ -40,19 +40,22 @@ typedef struct freereq {
int done;
} freereq;
-static void destroy_pops_and_shutdown(void* p, grpc_error* error) {
+static void destroy_pops_and_shutdown(grpc_exec_ctx* exec_ctx, void* p,
+ grpc_error* error) {
grpc_pollset* pollset = grpc_polling_entity_pollset((grpc_polling_entity*)p);
- grpc_pollset_destroy(pollset);
+ grpc_pollset_destroy(exec_ctx, pollset);
gpr_free(pollset);
}
-static void freed_port_from_server(void* arg, grpc_error* error) {
+static void freed_port_from_server(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
freereq* pr = (freereq*)arg;
gpr_mu_lock(pr->mu);
pr->done = 1;
GRPC_LOG_IF_ERROR(
"pollset_kick",
- grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr));
+ grpc_pollset_kick(exec_ctx, grpc_polling_entity_pollset(&pr->pops),
+ nullptr));
gpr_mu_unlock(pr->mu);
}
@@ -62,7 +65,7 @@ void grpc_free_port_using_server(int port) {
grpc_httpcli_response rsp;
freereq pr;
char* path;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_closure* shutdown_closure;
grpc_init();
@@ -84,30 +87,30 @@ void grpc_free_port_using_server(int port) {
grpc_httpcli_context_init(&context);
grpc_resource_quota* resource_quota =
grpc_resource_quota_create("port_server_client/free");
- grpc_httpcli_get(&context, &pr.pops, resource_quota, &req,
- grpc_core::ExecCtx::Get()->Now() + 30 * GPR_MS_PER_SEC,
+ grpc_httpcli_get(&exec_ctx, &context, &pr.pops, resource_quota, &req,
+ grpc_exec_ctx_now(&exec_ctx) + 30 * GPR_MS_PER_SEC,
GRPC_CLOSURE_CREATE(freed_port_from_server, &pr,
grpc_schedule_on_exec_ctx),
&rsp);
- grpc_resource_quota_unref_internal(resource_quota);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
+ grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(pr.mu);
while (!pr.done) {
grpc_pollset_worker* worker = nullptr;
if (!GRPC_LOG_IF_ERROR(
"pollset_work",
- grpc_pollset_work(
- grpc_polling_entity_pollset(&pr.pops), &worker,
- grpc_core::ExecCtx::Get()->Now() + GPR_MS_PER_SEC))) {
+ grpc_pollset_work(&exec_ctx, grpc_polling_entity_pollset(&pr.pops),
+ &worker,
+ grpc_exec_ctx_now(&exec_ctx) + GPR_MS_PER_SEC))) {
pr.done = 1;
}
}
gpr_mu_unlock(pr.mu);
- grpc_httpcli_context_destroy(&context);
- grpc_pollset_shutdown(grpc_polling_entity_pollset(&pr.pops),
+ grpc_httpcli_context_destroy(&exec_ctx, &context);
+ grpc_pollset_shutdown(&exec_ctx, grpc_polling_entity_pollset(&pr.pops),
shutdown_closure);
-
+ grpc_exec_ctx_finish(&exec_ctx);
gpr_free(path);
grpc_http_response_destroy(&rsp);
@@ -124,7 +127,8 @@ typedef struct portreq {
grpc_httpcli_response response;
} portreq;
-static void got_port_from_server(void* arg, grpc_error* error) {
+static void got_port_from_server(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
size_t i;
int port = 0;
portreq* pr = (portreq*)arg;
@@ -150,7 +154,8 @@ static void got_port_from_server(void* arg, grpc_error* error) {
pr->port = 0;
GRPC_LOG_IF_ERROR(
"pollset_kick",
- grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr));
+ grpc_pollset_kick(exec_ctx, grpc_polling_entity_pollset(&pr->pops),
+ nullptr));
gpr_mu_unlock(pr->mu);
return;
}
@@ -167,12 +172,12 @@ static void got_port_from_server(void* arg, grpc_error* error) {
memset(&pr->response, 0, sizeof(pr->response));
grpc_resource_quota* resource_quota =
grpc_resource_quota_create("port_server_client/pick_retry");
- grpc_httpcli_get(pr->ctx, &pr->pops, resource_quota, &req,
- grpc_core::ExecCtx::Get()->Now() + 30 * GPR_MS_PER_SEC,
+ grpc_httpcli_get(exec_ctx, pr->ctx, &pr->pops, resource_quota, &req,
+ grpc_exec_ctx_now(exec_ctx) + 30 * GPR_MS_PER_SEC,
GRPC_CLOSURE_CREATE(got_port_from_server, pr,
grpc_schedule_on_exec_ctx),
&pr->response);
- grpc_resource_quota_unref_internal(resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
return;
}
GPR_ASSERT(response);
@@ -186,7 +191,8 @@ static void got_port_from_server(void* arg, grpc_error* error) {
pr->port = port;
GRPC_LOG_IF_ERROR(
"pollset_kick",
- grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr));
+ grpc_pollset_kick(exec_ctx, grpc_polling_entity_pollset(&pr->pops),
+ nullptr));
gpr_mu_unlock(pr->mu);
}
@@ -194,55 +200,53 @@ int grpc_pick_port_using_server(void) {
grpc_httpcli_context context;
grpc_httpcli_request req;
portreq pr;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_closure* shutdown_closure;
grpc_init();
- {
- grpc_core::ExecCtx exec_ctx;
- memset(&pr, 0, sizeof(pr));
- memset(&req, 0, sizeof(req));
- grpc_pollset* pollset = (grpc_pollset*)gpr_zalloc(grpc_pollset_size());
- grpc_pollset_init(pollset, &pr.mu);
- pr.pops = grpc_polling_entity_create_from_pollset(pollset);
- shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops,
- grpc_schedule_on_exec_ctx);
- pr.port = -1;
- pr.server = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS);
- pr.ctx = &context;
-
- req.host = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS);
- req.http.path = const_cast<char*>("/get");
- grpc_httpcli_context_init(&context);
- grpc_resource_quota* resource_quota =
- grpc_resource_quota_create("port_server_client/pick");
- grpc_httpcli_get(&context, &pr.pops, resource_quota, &req,
- grpc_core::ExecCtx::Get()->Now() + 30 * GPR_MS_PER_SEC,
- GRPC_CLOSURE_CREATE(got_port_from_server, &pr,
- grpc_schedule_on_exec_ctx),
- &pr.response);
- grpc_resource_quota_unref_internal(resource_quota);
- grpc_core::ExecCtx::Get()->Flush();
- gpr_mu_lock(pr.mu);
- while (pr.port == -1) {
- grpc_pollset_worker* worker = nullptr;
- if (!GRPC_LOG_IF_ERROR(
- "pollset_work",
- grpc_pollset_work(
- grpc_polling_entity_pollset(&pr.pops), &worker,
- grpc_core::ExecCtx::Get()->Now() + GPR_MS_PER_SEC))) {
- pr.port = 0;
- }
- }
- gpr_mu_unlock(pr.mu);
+ memset(&pr, 0, sizeof(pr));
+ memset(&req, 0, sizeof(req));
+ grpc_pollset* pollset = (grpc_pollset*)gpr_zalloc(grpc_pollset_size());
+ grpc_pollset_init(pollset, &pr.mu);
+ pr.pops = grpc_polling_entity_create_from_pollset(pollset);
+ shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops,
+ grpc_schedule_on_exec_ctx);
+ pr.port = -1;
+ pr.server = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS);
+ pr.ctx = &context;
- grpc_http_response_destroy(&pr.response);
- grpc_httpcli_context_destroy(&context);
- grpc_pollset_shutdown(grpc_polling_entity_pollset(&pr.pops),
- shutdown_closure);
+ req.host = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS);
+ req.http.path = const_cast<char*>("/get");
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_httpcli_context_init(&context);
+ grpc_resource_quota* resource_quota =
+ grpc_resource_quota_create("port_server_client/pick");
+ grpc_httpcli_get(
+ &exec_ctx, &context, &pr.pops, resource_quota, &req,
+ grpc_exec_ctx_now(&exec_ctx) + 30 * GPR_MS_PER_SEC,
+ GRPC_CLOSURE_CREATE(got_port_from_server, &pr, grpc_schedule_on_exec_ctx),
+ &pr.response);
+ grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
+ grpc_exec_ctx_flush(&exec_ctx);
+ gpr_mu_lock(pr.mu);
+ while (pr.port == -1) {
+ grpc_pollset_worker* worker = nullptr;
+ if (!GRPC_LOG_IF_ERROR(
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, grpc_polling_entity_pollset(&pr.pops),
+ &worker,
+ grpc_exec_ctx_now(&exec_ctx) + GPR_MS_PER_SEC))) {
+ pr.port = 0;
+ }
}
+ gpr_mu_unlock(pr.mu);
+
+ grpc_http_response_destroy(&pr.response);
+ grpc_httpcli_context_destroy(&exec_ctx, &context);
+ grpc_pollset_shutdown(&exec_ctx, grpc_polling_entity_pollset(&pr.pops),
+ shutdown_closure);
+ grpc_exec_ctx_finish(&exec_ctx);
grpc_shutdown();
return pr.port;
diff --git a/test/core/util/reconnect_server.cc b/test/core/util/reconnect_server.cc
index bcafc4e898..4775b074eb 100644
--- a/test/core/util/reconnect_server.cc
+++ b/test/core/util/reconnect_server.cc
@@ -55,7 +55,7 @@ static void pretty_print_backoffs(reconnect_server* server) {
}
}
-static void on_connect(void* arg, grpc_endpoint* tcp,
+static void on_connect(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* tcp,
grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor) {
gpr_free(acceptor);
@@ -65,9 +65,9 @@ static void on_connect(void* arg, grpc_endpoint* tcp,
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
timestamp_list* new_tail;
peer = grpc_endpoint_get_peer(tcp);
- grpc_endpoint_shutdown(tcp,
+ grpc_endpoint_shutdown(exec_ctx, tcp,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connected"));
- grpc_endpoint_destroy(tcp);
+ grpc_endpoint_destroy(exec_ctx, tcp);
if (peer) {
last_colon = strrchr(peer, ':');
if (server->peer == nullptr) {
diff --git a/test/core/util/test_tcp_server.cc b/test/core/util/test_tcp_server.cc
index 5f6af4e707..da34da6fd0 100644
--- a/test/core/util/test_tcp_server.cc
+++ b/test/core/util/test_tcp_server.cc
@@ -33,7 +33,8 @@
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
-static void on_server_destroyed(void* data, grpc_error* error) {
+static void on_server_destroyed(grpc_exec_ctx* exec_ctx, void* data,
+ grpc_error* error) {
test_tcp_server* server = static_cast<test_tcp_server*>(data);
server->shutdown = 1;
}
@@ -55,46 +56,51 @@ void test_tcp_server_start(test_tcp_server* server, int port) {
grpc_resolved_address resolved_addr;
struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr;
int port_added;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
addr->sin_family = AF_INET;
addr->sin_port = htons((uint16_t)port);
memset(&addr->sin_addr, 0, sizeof(addr->sin_addr));
- grpc_error* error = grpc_tcp_server_create(&server->shutdown_complete,
- nullptr, &server->tcp_server);
+ grpc_error* error = grpc_tcp_server_create(
+ &exec_ctx, &server->shutdown_complete, nullptr, &server->tcp_server);
GPR_ASSERT(error == GRPC_ERROR_NONE);
error =
grpc_tcp_server_add_port(server->tcp_server, &resolved_addr, &port_added);
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(port_added == port);
- grpc_tcp_server_start(server->tcp_server, &server->pollset, 1,
+ grpc_tcp_server_start(&exec_ctx, server->tcp_server, &server->pollset, 1,
server->on_connect, server->cb_data);
gpr_log(GPR_INFO, "test tcp server listening on 0.0.0.0:%d", port);
+
+ grpc_exec_ctx_finish(&exec_ctx);
}
void test_tcp_server_poll(test_tcp_server* server, int seconds) {
grpc_pollset_worker* worker = nullptr;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_millis deadline = grpc_timespec_to_millis_round_up(
grpc_timeout_seconds_to_deadline(seconds));
gpr_mu_lock(server->mu);
- GRPC_LOG_IF_ERROR("pollset_work",
- grpc_pollset_work(server->pollset, &worker, deadline));
+ GRPC_LOG_IF_ERROR(
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, server->pollset, &worker, deadline));
gpr_mu_unlock(server->mu);
+ grpc_exec_ctx_finish(&exec_ctx);
}
-static void do_nothing(void* arg, grpc_error* error) {}
-static void finish_pollset(void* arg, grpc_error* error) {
- grpc_pollset_destroy(static_cast<grpc_pollset*>(arg));
+static void do_nothing(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {}
+static void finish_pollset(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_pollset_destroy(exec_ctx, static_cast<grpc_pollset*>(arg));
}
void test_tcp_server_destroy(test_tcp_server* server) {
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_timespec shutdown_deadline;
grpc_closure do_nothing_cb;
- grpc_tcp_server_unref(server->tcp_server);
+ grpc_tcp_server_unref(&exec_ctx, server->tcp_server);
GRPC_CLOSURE_INIT(&do_nothing_cb, do_nothing, nullptr,
grpc_schedule_on_exec_ctx);
shutdown_deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
@@ -103,10 +109,10 @@ void test_tcp_server_destroy(test_tcp_server* server) {
gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), shutdown_deadline) < 0) {
test_tcp_server_poll(server, 1);
}
- grpc_pollset_shutdown(server->pollset,
+ grpc_pollset_shutdown(&exec_ctx, server->pollset,
GRPC_CLOSURE_CREATE(finish_pollset, server->pollset,
grpc_schedule_on_exec_ctx));
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_finish(&exec_ctx);
gpr_free(server->pollset);
grpc_shutdown();
}
diff --git a/test/core/util/trickle_endpoint.cc b/test/core/util/trickle_endpoint.cc
index f95ed62463..4544fb7f49 100644
--- a/test/core/util/trickle_endpoint.cc
+++ b/test/core/util/trickle_endpoint.cc
@@ -45,23 +45,24 @@ typedef struct {
grpc_closure* write_cb;
} trickle_endpoint;
-static void te_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+static void te_read(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
+ grpc_slice_buffer* slices, grpc_closure* cb) {
trickle_endpoint* te = (trickle_endpoint*)ep;
- grpc_endpoint_read(te->wrapped, slices, cb);
+ grpc_endpoint_read(exec_ctx, te->wrapped, slices, cb);
}
-static void maybe_call_write_cb_locked(trickle_endpoint* te) {
+static void maybe_call_write_cb_locked(grpc_exec_ctx* exec_ctx,
+ trickle_endpoint* te) {
if (te->write_cb != nullptr &&
(te->error != GRPC_ERROR_NONE ||
te->write_buffer.length <= WRITE_BUFFER_SIZE)) {
- GRPC_CLOSURE_SCHED(te->write_cb, GRPC_ERROR_REF(te->error));
+ GRPC_CLOSURE_SCHED(exec_ctx, te->write_cb, GRPC_ERROR_REF(te->error));
te->write_cb = nullptr;
}
}
-static void te_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+static void te_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
+ grpc_slice_buffer* slices, grpc_closure* cb) {
trickle_endpoint* te = (trickle_endpoint*)ep;
gpr_mu_lock(&te->mu);
GPR_ASSERT(te->write_cb == nullptr);
@@ -73,44 +74,47 @@ static void te_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_slice_copy(slices->slices[i]));
}
te->write_cb = cb;
- maybe_call_write_cb_locked(te);
+ maybe_call_write_cb_locked(exec_ctx, te);
gpr_mu_unlock(&te->mu);
}
-static void te_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {
+static void te_add_to_pollset(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
+ grpc_pollset* pollset) {
trickle_endpoint* te = (trickle_endpoint*)ep;
- grpc_endpoint_add_to_pollset(te->wrapped, pollset);
+ grpc_endpoint_add_to_pollset(exec_ctx, te->wrapped, pollset);
}
-static void te_add_to_pollset_set(grpc_endpoint* ep,
+static void te_add_to_pollset_set(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
grpc_pollset_set* pollset_set) {
trickle_endpoint* te = (trickle_endpoint*)ep;
- grpc_endpoint_add_to_pollset_set(te->wrapped, pollset_set);
+ grpc_endpoint_add_to_pollset_set(exec_ctx, te->wrapped, pollset_set);
}
-static void te_delete_from_pollset_set(grpc_endpoint* ep,
+static void te_delete_from_pollset_set(grpc_exec_ctx* exec_ctx,
+ grpc_endpoint* ep,
grpc_pollset_set* pollset_set) {
trickle_endpoint* te = (trickle_endpoint*)ep;
- grpc_endpoint_delete_from_pollset_set(te->wrapped, pollset_set);
+ grpc_endpoint_delete_from_pollset_set(exec_ctx, te->wrapped, pollset_set);
}
-static void te_shutdown(grpc_endpoint* ep, grpc_error* why) {
+static void te_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
+ grpc_error* why) {
trickle_endpoint* te = (trickle_endpoint*)ep;
gpr_mu_lock(&te->mu);
if (te->error == GRPC_ERROR_NONE) {
te->error = GRPC_ERROR_REF(why);
}
- maybe_call_write_cb_locked(te);
+ maybe_call_write_cb_locked(exec_ctx, te);
gpr_mu_unlock(&te->mu);
- grpc_endpoint_shutdown(te->wrapped, why);
+ grpc_endpoint_shutdown(exec_ctx, te->wrapped, why);
}
-static void te_destroy(grpc_endpoint* ep) {
+static void te_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) {
trickle_endpoint* te = (trickle_endpoint*)ep;
- grpc_endpoint_destroy(te->wrapped);
+ grpc_endpoint_destroy(exec_ctx, te->wrapped);
gpr_mu_destroy(&te->mu);
- grpc_slice_buffer_destroy_internal(&te->write_buffer);
- grpc_slice_buffer_destroy_internal(&te->writing_buffer);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &te->write_buffer);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &te->writing_buffer);
GRPC_ERROR_UNREF(te->error);
gpr_free(te);
}
@@ -130,7 +134,8 @@ static int te_get_fd(grpc_endpoint* ep) {
return grpc_endpoint_get_fd(te->wrapped);
}
-static void te_finish_write(void* arg, grpc_error* error) {
+static void te_finish_write(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
trickle_endpoint* te = (trickle_endpoint*)arg;
gpr_mu_lock(&te->mu);
te->writing = false;
@@ -168,7 +173,8 @@ static double ts2dbl(gpr_timespec s) {
return (double)s.tv_sec + 1e-9 * (double)s.tv_nsec;
}
-size_t grpc_trickle_endpoint_trickle(grpc_endpoint* ep) {
+size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx* exec_ctx,
+ grpc_endpoint* ep) {
trickle_endpoint* te = (trickle_endpoint*)ep;
gpr_mu_lock(&te->mu);
if (!te->writing && te->write_buffer.length > 0) {
@@ -183,9 +189,9 @@ size_t grpc_trickle_endpoint_trickle(grpc_endpoint* ep) {
te->writing = true;
te->last_write = now;
grpc_endpoint_write(
- te->wrapped, &te->writing_buffer,
+ exec_ctx, te->wrapped, &te->writing_buffer,
GRPC_CLOSURE_CREATE(te_finish_write, te, grpc_schedule_on_exec_ctx));
- maybe_call_write_cb_locked(te);
+ maybe_call_write_cb_locked(exec_ctx, te);
}
}
size_t backlog = te->write_buffer.length;
diff --git a/test/core/util/trickle_endpoint.h b/test/core/util/trickle_endpoint.h
index cd07de905a..11c113bda8 100644
--- a/test/core/util/trickle_endpoint.h
+++ b/test/core/util/trickle_endpoint.h
@@ -25,7 +25,8 @@ grpc_endpoint* grpc_trickle_endpoint_create(grpc_endpoint* wrap,
double bytes_per_second);
/* Allow up to \a bytes through the endpoint. Returns the new backlog. */
-size_t grpc_trickle_endpoint_trickle(grpc_endpoint* endpoint);
+size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx* exec_ctx,
+ grpc_endpoint* endpoint);
size_t grpc_trickle_get_backlog(grpc_endpoint* endpoint);