diff options
Diffstat (limited to 'test/core/util')
-rw-r--r-- | test/core/util/mock_endpoint.cc | 36 | ||||
-rw-r--r-- | test/core/util/mock_endpoint.h | 3 | ||||
-rw-r--r-- | test/core/util/one_corpus_entry_fuzzer.cc | 8 | ||||
-rw-r--r-- | test/core/util/passthru_endpoint.cc | 45 | ||||
-rw-r--r-- | test/core/util/port_server_client.cc | 130 | ||||
-rw-r--r-- | test/core/util/reconnect_server.cc | 6 | ||||
-rw-r--r-- | test/core/util/test_tcp_server.cc | 36 | ||||
-rw-r--r-- | test/core/util/trickle_endpoint.cc | 56 | ||||
-rw-r--r-- | test/core/util/trickle_endpoint.h | 3 |
9 files changed, 153 insertions, 170 deletions
diff --git a/test/core/util/mock_endpoint.cc b/test/core/util/mock_endpoint.cc index d9545efa49..4b35a581b1 100644 --- a/test/core/util/mock_endpoint.cc +++ b/test/core/util/mock_endpoint.cc @@ -40,13 +40,13 @@ typedef struct grpc_mock_endpoint { grpc_resource_user* resource_user; } grpc_mock_endpoint; -static void me_read(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - grpc_slice_buffer* slices, grpc_closure* cb) { +static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices, + grpc_closure* cb) { grpc_mock_endpoint* m = (grpc_mock_endpoint*)ep; gpr_mu_lock(&m->mu); if (m->read_buffer.count > 0) { grpc_slice_buffer_swap(&m->read_buffer, slices); - GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE); } else { m->on_read = cb; m->on_read_out = slices; @@ -54,44 +54,41 @@ static void me_read(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, gpr_mu_unlock(&m->mu); } -static void me_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - grpc_slice_buffer* slices, grpc_closure* cb) { +static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices, + grpc_closure* cb) { grpc_mock_endpoint* m = (grpc_mock_endpoint*)ep; for (size_t i = 0; i < slices->count; i++) { m->on_write(slices->slices[i]); } - GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE); } -static void me_add_to_pollset(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - grpc_pollset* pollset) {} +static void me_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {} -static void me_add_to_pollset_set(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, +static void me_add_to_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pollset) {} -static void me_delete_from_pollset_set(grpc_exec_ctx* exec_ctx, - grpc_endpoint* ep, +static void me_delete_from_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pollset) {} -static void me_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - grpc_error* why) { +static void me_shutdown(grpc_endpoint* ep, grpc_error* why) { grpc_mock_endpoint* m = (grpc_mock_endpoint*)ep; gpr_mu_lock(&m->mu); if (m->on_read) { - GRPC_CLOSURE_SCHED(exec_ctx, m->on_read, + GRPC_CLOSURE_SCHED(m->on_read, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Endpoint Shutdown", &why, 1)); m->on_read = nullptr; } gpr_mu_unlock(&m->mu); - grpc_resource_user_shutdown(exec_ctx, m->resource_user); + grpc_resource_user_shutdown(m->resource_user); GRPC_ERROR_UNREF(why); } -static void me_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) { +static void me_destroy(grpc_endpoint* ep) { grpc_mock_endpoint* m = (grpc_mock_endpoint*)ep; grpc_slice_buffer_destroy(&m->read_buffer); - grpc_resource_user_unref(exec_ctx, m->resource_user); + grpc_resource_user_unref(m->resource_user); gpr_free(m); } @@ -134,13 +131,12 @@ grpc_endpoint* grpc_mock_endpoint_create(void (*on_write)(grpc_slice slice), return &m->base; } -void grpc_mock_endpoint_put_read(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - grpc_slice slice) { +void grpc_mock_endpoint_put_read(grpc_endpoint* ep, grpc_slice slice) { grpc_mock_endpoint* m = (grpc_mock_endpoint*)ep; gpr_mu_lock(&m->mu); if (m->on_read != nullptr) { grpc_slice_buffer_add(m->on_read_out, slice); - GRPC_CLOSURE_SCHED(exec_ctx, m->on_read, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(m->on_read, GRPC_ERROR_NONE); m->on_read = nullptr; } else { grpc_slice_buffer_add(&m->read_buffer, slice); diff --git a/test/core/util/mock_endpoint.h b/test/core/util/mock_endpoint.h index ccabaf7c3b..6521d3e8e8 100644 --- a/test/core/util/mock_endpoint.h +++ b/test/core/util/mock_endpoint.h @@ -23,8 +23,7 @@ grpc_endpoint* grpc_mock_endpoint_create(void (*on_write)(grpc_slice slice), grpc_resource_quota* resource_quota); -void grpc_mock_endpoint_put_read(grpc_exec_ctx* exec_ctx, - grpc_endpoint* mock_endpoint, +void grpc_mock_endpoint_put_read(grpc_endpoint* mock_endpoint, grpc_slice slice); #endif diff --git a/test/core/util/one_corpus_entry_fuzzer.cc b/test/core/util/one_corpus_entry_fuzzer.cc index c0b67da1e2..c745eb5dc6 100644 --- a/test/core/util/one_corpus_entry_fuzzer.cc +++ b/test/core/util/one_corpus_entry_fuzzer.cc @@ -18,7 +18,10 @@ #include <stdbool.h> +#include <grpc/grpc.h> + #include <grpc/support/log.h> +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/load_file.h" extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size); @@ -30,10 +33,15 @@ int main(int argc, char** argv) { grpc_slice buffer; squelch = false; leak_check = false; + /* TODO(yashkt) Calling grpc_init breaks tests. Fix the tests and replace + * grpc_core::ExecCtx::GlobalInit with grpc_init and GlobalShutdown with + * grpc_shutdown */ GPR_ASSERT( GRPC_LOG_IF_ERROR("load_file", grpc_load_file(argv[1], 0, &buffer))); LLVMFuzzerTestOneInput(GRPC_SLICE_START_PTR(buffer), GRPC_SLICE_LENGTH(buffer)); + grpc_core::ExecCtx::GlobalInit(); grpc_slice_unref(buffer); + grpc_core::ExecCtx::GlobalShutdown(); return 0; } diff --git a/test/core/util/passthru_endpoint.cc b/test/core/util/passthru_endpoint.cc index a9efe22b69..5f127cb960 100644 --- a/test/core/util/passthru_endpoint.cc +++ b/test/core/util/passthru_endpoint.cc @@ -49,22 +49,22 @@ struct passthru_endpoint { int halves; grpc_passthru_endpoint_stats* stats; grpc_passthru_endpoint_stats - dummy_stats; // used if constructor stats == NULL + dummy_stats; // used if constructor stats == nullptr bool shutdown; half client; half server; }; -static void me_read(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - grpc_slice_buffer* slices, grpc_closure* cb) { +static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices, + grpc_closure* cb) { half* m = (half*)ep; gpr_mu_lock(&m->parent->mu); if (m->parent->shutdown) { GRPC_CLOSURE_SCHED( - exec_ctx, cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown")); + cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown")); } else if (m->read_buffer.count > 0) { grpc_slice_buffer_swap(&m->read_buffer, slices); - GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE); } else { m->on_read = cb; m->on_read_out = slices; @@ -77,8 +77,8 @@ static half* other_half(half* h) { return &h->parent->client; } -static void me_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - grpc_slice_buffer* slices, grpc_closure* cb) { +static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices, + grpc_closure* cb) { half* m = other_half((half*)ep); gpr_mu_lock(&m->parent->mu); grpc_error* error = GRPC_ERROR_NONE; @@ -89,7 +89,7 @@ static void me_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, for (size_t i = 0; i < slices->count; i++) { grpc_slice_buffer_add(m->on_read_out, grpc_slice_copy(slices->slices[i])); } - GRPC_CLOSURE_SCHED(exec_ctx, m->on_read, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(m->on_read, GRPC_ERROR_NONE); m->on_read = nullptr; } else { for (size_t i = 0; i < slices->count; i++) { @@ -98,52 +98,49 @@ static void me_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, } } gpr_mu_unlock(&m->parent->mu); - GRPC_CLOSURE_SCHED(exec_ctx, cb, error); + GRPC_CLOSURE_SCHED(cb, error); } -static void me_add_to_pollset(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - grpc_pollset* pollset) {} +static void me_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {} -static void me_add_to_pollset_set(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, +static void me_add_to_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pollset) {} -static void me_delete_from_pollset_set(grpc_exec_ctx* exec_ctx, - grpc_endpoint* ep, +static void me_delete_from_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pollset) {} -static void me_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - grpc_error* why) { +static void me_shutdown(grpc_endpoint* ep, grpc_error* why) { half* m = (half*)ep; gpr_mu_lock(&m->parent->mu); m->parent->shutdown = true; if (m->on_read) { GRPC_CLOSURE_SCHED( - exec_ctx, m->on_read, + m->on_read, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1)); m->on_read = nullptr; } m = other_half(m); if (m->on_read) { GRPC_CLOSURE_SCHED( - exec_ctx, m->on_read, + m->on_read, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1)); m->on_read = nullptr; } gpr_mu_unlock(&m->parent->mu); - grpc_resource_user_shutdown(exec_ctx, m->resource_user); + grpc_resource_user_shutdown(m->resource_user); GRPC_ERROR_UNREF(why); } -static void me_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) { +static void me_destroy(grpc_endpoint* ep) { passthru_endpoint* p = ((half*)ep)->parent; gpr_mu_lock(&p->mu); if (0 == --p->halves) { gpr_mu_unlock(&p->mu); gpr_mu_destroy(&p->mu); - grpc_slice_buffer_destroy_internal(exec_ctx, &p->client.read_buffer); - grpc_slice_buffer_destroy_internal(exec_ctx, &p->server.read_buffer); - grpc_resource_user_unref(exec_ctx, p->client.resource_user); - grpc_resource_user_unref(exec_ctx, p->server.resource_user); + grpc_slice_buffer_destroy_internal(&p->client.read_buffer); + grpc_slice_buffer_destroy_internal(&p->server.read_buffer); + grpc_resource_user_unref(p->client.resource_user); + grpc_resource_user_unref(p->server.resource_user); gpr_free(p); } else { gpr_mu_unlock(&p->mu); diff --git a/test/core/util/port_server_client.cc b/test/core/util/port_server_client.cc index edec50b424..7e76c8063f 100644 --- a/test/core/util/port_server_client.cc +++ b/test/core/util/port_server_client.cc @@ -40,22 +40,19 @@ typedef struct freereq { int done; } freereq; -static void destroy_pops_and_shutdown(grpc_exec_ctx* exec_ctx, void* p, - grpc_error* error) { +static void destroy_pops_and_shutdown(void* p, grpc_error* error) { grpc_pollset* pollset = grpc_polling_entity_pollset((grpc_polling_entity*)p); - grpc_pollset_destroy(exec_ctx, pollset); + grpc_pollset_destroy(pollset); gpr_free(pollset); } -static void freed_port_from_server(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void freed_port_from_server(void* arg, grpc_error* error) { freereq* pr = (freereq*)arg; gpr_mu_lock(pr->mu); pr->done = 1; GRPC_LOG_IF_ERROR( "pollset_kick", - grpc_pollset_kick(exec_ctx, grpc_polling_entity_pollset(&pr->pops), - nullptr)); + grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr)); gpr_mu_unlock(pr->mu); } @@ -65,7 +62,7 @@ void grpc_free_port_using_server(int port) { grpc_httpcli_response rsp; freereq pr; char* path; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_core::ExecCtx exec_ctx; grpc_closure* shutdown_closure; grpc_init(); @@ -87,30 +84,30 @@ void grpc_free_port_using_server(int port) { grpc_httpcli_context_init(&context); grpc_resource_quota* resource_quota = grpc_resource_quota_create("port_server_client/free"); - grpc_httpcli_get(&exec_ctx, &context, &pr.pops, resource_quota, &req, - grpc_exec_ctx_now(&exec_ctx) + 30 * GPR_MS_PER_SEC, + grpc_httpcli_get(&context, &pr.pops, resource_quota, &req, + grpc_core::ExecCtx::Get()->Now() + 30 * GPR_MS_PER_SEC, GRPC_CLOSURE_CREATE(freed_port_from_server, &pr, grpc_schedule_on_exec_ctx), &rsp); - grpc_resource_quota_unref_internal(&exec_ctx, resource_quota); - grpc_exec_ctx_flush(&exec_ctx); + grpc_resource_quota_unref_internal(resource_quota); + grpc_core::ExecCtx::Get()->Flush(); gpr_mu_lock(pr.mu); while (!pr.done) { grpc_pollset_worker* worker = nullptr; if (!GRPC_LOG_IF_ERROR( "pollset_work", - grpc_pollset_work(&exec_ctx, grpc_polling_entity_pollset(&pr.pops), - &worker, - grpc_exec_ctx_now(&exec_ctx) + GPR_MS_PER_SEC))) { + grpc_pollset_work( + grpc_polling_entity_pollset(&pr.pops), &worker, + grpc_core::ExecCtx::Get()->Now() + GPR_MS_PER_SEC))) { pr.done = 1; } } gpr_mu_unlock(pr.mu); - grpc_httpcli_context_destroy(&exec_ctx, &context); - grpc_pollset_shutdown(&exec_ctx, grpc_polling_entity_pollset(&pr.pops), + grpc_httpcli_context_destroy(&context); + grpc_pollset_shutdown(grpc_polling_entity_pollset(&pr.pops), shutdown_closure); - grpc_exec_ctx_finish(&exec_ctx); + gpr_free(path); grpc_http_response_destroy(&rsp); @@ -127,8 +124,7 @@ typedef struct portreq { grpc_httpcli_response response; } portreq; -static void got_port_from_server(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void got_port_from_server(void* arg, grpc_error* error) { size_t i; int port = 0; portreq* pr = (portreq*)arg; @@ -154,8 +150,7 @@ static void got_port_from_server(grpc_exec_ctx* exec_ctx, void* arg, pr->port = 0; GRPC_LOG_IF_ERROR( "pollset_kick", - grpc_pollset_kick(exec_ctx, grpc_polling_entity_pollset(&pr->pops), - nullptr)); + grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr)); gpr_mu_unlock(pr->mu); return; } @@ -172,12 +167,12 @@ static void got_port_from_server(grpc_exec_ctx* exec_ctx, void* arg, memset(&pr->response, 0, sizeof(pr->response)); grpc_resource_quota* resource_quota = grpc_resource_quota_create("port_server_client/pick_retry"); - grpc_httpcli_get(exec_ctx, pr->ctx, &pr->pops, resource_quota, &req, - grpc_exec_ctx_now(exec_ctx) + 30 * GPR_MS_PER_SEC, + grpc_httpcli_get(pr->ctx, &pr->pops, resource_quota, &req, + grpc_core::ExecCtx::Get()->Now() + 30 * GPR_MS_PER_SEC, GRPC_CLOSURE_CREATE(got_port_from_server, pr, grpc_schedule_on_exec_ctx), &pr->response); - grpc_resource_quota_unref_internal(exec_ctx, resource_quota); + grpc_resource_quota_unref_internal(resource_quota); return; } GPR_ASSERT(response); @@ -191,8 +186,7 @@ static void got_port_from_server(grpc_exec_ctx* exec_ctx, void* arg, pr->port = port; GRPC_LOG_IF_ERROR( "pollset_kick", - grpc_pollset_kick(exec_ctx, grpc_polling_entity_pollset(&pr->pops), - nullptr)); + grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr)); gpr_mu_unlock(pr->mu); } @@ -200,53 +194,55 @@ int grpc_pick_port_using_server(void) { grpc_httpcli_context context; grpc_httpcli_request req; portreq pr; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_closure* shutdown_closure; grpc_init(); + { + grpc_core::ExecCtx exec_ctx; + memset(&pr, 0, sizeof(pr)); + memset(&req, 0, sizeof(req)); + grpc_pollset* pollset = (grpc_pollset*)gpr_zalloc(grpc_pollset_size()); + grpc_pollset_init(pollset, &pr.mu); + pr.pops = grpc_polling_entity_create_from_pollset(pollset); + shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops, + grpc_schedule_on_exec_ctx); + pr.port = -1; + pr.server = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS); + pr.ctx = &context; + + req.host = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS); + req.http.path = const_cast<char*>("/get"); - memset(&pr, 0, sizeof(pr)); - memset(&req, 0, sizeof(req)); - grpc_pollset* pollset = (grpc_pollset*)gpr_zalloc(grpc_pollset_size()); - grpc_pollset_init(pollset, &pr.mu); - pr.pops = grpc_polling_entity_create_from_pollset(pollset); - shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops, - grpc_schedule_on_exec_ctx); - pr.port = -1; - pr.server = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS); - pr.ctx = &context; + grpc_httpcli_context_init(&context); + grpc_resource_quota* resource_quota = + grpc_resource_quota_create("port_server_client/pick"); + grpc_httpcli_get(&context, &pr.pops, resource_quota, &req, + grpc_core::ExecCtx::Get()->Now() + 30 * GPR_MS_PER_SEC, + GRPC_CLOSURE_CREATE(got_port_from_server, &pr, + grpc_schedule_on_exec_ctx), + &pr.response); + grpc_resource_quota_unref_internal(resource_quota); + grpc_core::ExecCtx::Get()->Flush(); + gpr_mu_lock(pr.mu); + while (pr.port == -1) { + grpc_pollset_worker* worker = nullptr; + if (!GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work( + grpc_polling_entity_pollset(&pr.pops), &worker, + grpc_core::ExecCtx::Get()->Now() + GPR_MS_PER_SEC))) { + pr.port = 0; + } + } + gpr_mu_unlock(pr.mu); - req.host = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS); - req.http.path = const_cast<char*>("/get"); + grpc_http_response_destroy(&pr.response); + grpc_httpcli_context_destroy(&context); + grpc_pollset_shutdown(grpc_polling_entity_pollset(&pr.pops), + shutdown_closure); - grpc_httpcli_context_init(&context); - grpc_resource_quota* resource_quota = - grpc_resource_quota_create("port_server_client/pick"); - grpc_httpcli_get( - &exec_ctx, &context, &pr.pops, resource_quota, &req, - grpc_exec_ctx_now(&exec_ctx) + 30 * GPR_MS_PER_SEC, - GRPC_CLOSURE_CREATE(got_port_from_server, &pr, grpc_schedule_on_exec_ctx), - &pr.response); - grpc_resource_quota_unref_internal(&exec_ctx, resource_quota); - grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(pr.mu); - while (pr.port == -1) { - grpc_pollset_worker* worker = nullptr; - if (!GRPC_LOG_IF_ERROR( - "pollset_work", - grpc_pollset_work(&exec_ctx, grpc_polling_entity_pollset(&pr.pops), - &worker, - grpc_exec_ctx_now(&exec_ctx) + GPR_MS_PER_SEC))) { - pr.port = 0; - } + grpc_core::ExecCtx::Get()->Flush(); } - gpr_mu_unlock(pr.mu); - - grpc_http_response_destroy(&pr.response); - grpc_httpcli_context_destroy(&exec_ctx, &context); - grpc_pollset_shutdown(&exec_ctx, grpc_polling_entity_pollset(&pr.pops), - shutdown_closure); - grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); return pr.port; diff --git a/test/core/util/reconnect_server.cc b/test/core/util/reconnect_server.cc index 4775b074eb..bcafc4e898 100644 --- a/test/core/util/reconnect_server.cc +++ b/test/core/util/reconnect_server.cc @@ -55,7 +55,7 @@ static void pretty_print_backoffs(reconnect_server* server) { } } -static void on_connect(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* tcp, +static void on_connect(void* arg, grpc_endpoint* tcp, grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor) { gpr_free(acceptor); @@ -65,9 +65,9 @@ static void on_connect(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* tcp, gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); timestamp_list* new_tail; peer = grpc_endpoint_get_peer(tcp); - grpc_endpoint_shutdown(exec_ctx, tcp, + grpc_endpoint_shutdown(tcp, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connected")); - grpc_endpoint_destroy(exec_ctx, tcp); + grpc_endpoint_destroy(tcp); if (peer) { last_colon = strrchr(peer, ':'); if (server->peer == nullptr) { diff --git a/test/core/util/test_tcp_server.cc b/test/core/util/test_tcp_server.cc index da34da6fd0..5f6af4e707 100644 --- a/test/core/util/test_tcp_server.cc +++ b/test/core/util/test_tcp_server.cc @@ -33,8 +33,7 @@ #include "test/core/util/port.h" #include "test/core/util/test_config.h" -static void on_server_destroyed(grpc_exec_ctx* exec_ctx, void* data, - grpc_error* error) { +static void on_server_destroyed(void* data, grpc_error* error) { test_tcp_server* server = static_cast<test_tcp_server*>(data); server->shutdown = 1; } @@ -56,51 +55,46 @@ void test_tcp_server_start(test_tcp_server* server, int port) { grpc_resolved_address resolved_addr; struct sockaddr_in* addr = (struct sockaddr_in*)resolved_addr.addr; int port_added; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_core::ExecCtx exec_ctx; addr->sin_family = AF_INET; addr->sin_port = htons((uint16_t)port); memset(&addr->sin_addr, 0, sizeof(addr->sin_addr)); - grpc_error* error = grpc_tcp_server_create( - &exec_ctx, &server->shutdown_complete, nullptr, &server->tcp_server); + grpc_error* error = grpc_tcp_server_create(&server->shutdown_complete, + nullptr, &server->tcp_server); GPR_ASSERT(error == GRPC_ERROR_NONE); error = grpc_tcp_server_add_port(server->tcp_server, &resolved_addr, &port_added); GPR_ASSERT(error == GRPC_ERROR_NONE); GPR_ASSERT(port_added == port); - grpc_tcp_server_start(&exec_ctx, server->tcp_server, &server->pollset, 1, + grpc_tcp_server_start(server->tcp_server, &server->pollset, 1, server->on_connect, server->cb_data); gpr_log(GPR_INFO, "test tcp server listening on 0.0.0.0:%d", port); - - grpc_exec_ctx_finish(&exec_ctx); } void test_tcp_server_poll(test_tcp_server* server, int seconds) { grpc_pollset_worker* worker = nullptr; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_core::ExecCtx exec_ctx; grpc_millis deadline = grpc_timespec_to_millis_round_up( grpc_timeout_seconds_to_deadline(seconds)); gpr_mu_lock(server->mu); - GRPC_LOG_IF_ERROR( - "pollset_work", - grpc_pollset_work(&exec_ctx, server->pollset, &worker, deadline)); + GRPC_LOG_IF_ERROR("pollset_work", + grpc_pollset_work(server->pollset, &worker, deadline)); gpr_mu_unlock(server->mu); - grpc_exec_ctx_finish(&exec_ctx); } -static void do_nothing(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {} -static void finish_pollset(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { - grpc_pollset_destroy(exec_ctx, static_cast<grpc_pollset*>(arg)); +static void do_nothing(void* arg, grpc_error* error) {} +static void finish_pollset(void* arg, grpc_error* error) { + grpc_pollset_destroy(static_cast<grpc_pollset*>(arg)); } void test_tcp_server_destroy(test_tcp_server* server) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_core::ExecCtx exec_ctx; gpr_timespec shutdown_deadline; grpc_closure do_nothing_cb; - grpc_tcp_server_unref(&exec_ctx, server->tcp_server); + grpc_tcp_server_unref(server->tcp_server); GRPC_CLOSURE_INIT(&do_nothing_cb, do_nothing, nullptr, grpc_schedule_on_exec_ctx); shutdown_deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), @@ -109,10 +103,10 @@ void test_tcp_server_destroy(test_tcp_server* server) { gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), shutdown_deadline) < 0) { test_tcp_server_poll(server, 1); } - grpc_pollset_shutdown(&exec_ctx, server->pollset, + grpc_pollset_shutdown(server->pollset, GRPC_CLOSURE_CREATE(finish_pollset, server->pollset, grpc_schedule_on_exec_ctx)); - grpc_exec_ctx_finish(&exec_ctx); + grpc_core::ExecCtx::Get()->Flush(); gpr_free(server->pollset); grpc_shutdown(); } diff --git a/test/core/util/trickle_endpoint.cc b/test/core/util/trickle_endpoint.cc index 4544fb7f49..f95ed62463 100644 --- a/test/core/util/trickle_endpoint.cc +++ b/test/core/util/trickle_endpoint.cc @@ -45,24 +45,23 @@ typedef struct { grpc_closure* write_cb; } trickle_endpoint; -static void te_read(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - grpc_slice_buffer* slices, grpc_closure* cb) { +static void te_read(grpc_endpoint* ep, grpc_slice_buffer* slices, + grpc_closure* cb) { trickle_endpoint* te = (trickle_endpoint*)ep; - grpc_endpoint_read(exec_ctx, te->wrapped, slices, cb); + grpc_endpoint_read(te->wrapped, slices, cb); } -static void maybe_call_write_cb_locked(grpc_exec_ctx* exec_ctx, - trickle_endpoint* te) { +static void maybe_call_write_cb_locked(trickle_endpoint* te) { if (te->write_cb != nullptr && (te->error != GRPC_ERROR_NONE || te->write_buffer.length <= WRITE_BUFFER_SIZE)) { - GRPC_CLOSURE_SCHED(exec_ctx, te->write_cb, GRPC_ERROR_REF(te->error)); + GRPC_CLOSURE_SCHED(te->write_cb, GRPC_ERROR_REF(te->error)); te->write_cb = nullptr; } } -static void te_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - grpc_slice_buffer* slices, grpc_closure* cb) { +static void te_write(grpc_endpoint* ep, grpc_slice_buffer* slices, + grpc_closure* cb) { trickle_endpoint* te = (trickle_endpoint*)ep; gpr_mu_lock(&te->mu); GPR_ASSERT(te->write_cb == nullptr); @@ -74,47 +73,44 @@ static void te_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, grpc_slice_copy(slices->slices[i])); } te->write_cb = cb; - maybe_call_write_cb_locked(exec_ctx, te); + maybe_call_write_cb_locked(te); gpr_mu_unlock(&te->mu); } -static void te_add_to_pollset(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - grpc_pollset* pollset) { +static void te_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) { trickle_endpoint* te = (trickle_endpoint*)ep; - grpc_endpoint_add_to_pollset(exec_ctx, te->wrapped, pollset); + grpc_endpoint_add_to_pollset(te->wrapped, pollset); } -static void te_add_to_pollset_set(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, +static void te_add_to_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pollset_set) { trickle_endpoint* te = (trickle_endpoint*)ep; - grpc_endpoint_add_to_pollset_set(exec_ctx, te->wrapped, pollset_set); + grpc_endpoint_add_to_pollset_set(te->wrapped, pollset_set); } -static void te_delete_from_pollset_set(grpc_exec_ctx* exec_ctx, - grpc_endpoint* ep, +static void te_delete_from_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pollset_set) { trickle_endpoint* te = (trickle_endpoint*)ep; - grpc_endpoint_delete_from_pollset_set(exec_ctx, te->wrapped, pollset_set); + grpc_endpoint_delete_from_pollset_set(te->wrapped, pollset_set); } -static void te_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - grpc_error* why) { +static void te_shutdown(grpc_endpoint* ep, grpc_error* why) { trickle_endpoint* te = (trickle_endpoint*)ep; gpr_mu_lock(&te->mu); if (te->error == GRPC_ERROR_NONE) { te->error = GRPC_ERROR_REF(why); } - maybe_call_write_cb_locked(exec_ctx, te); + maybe_call_write_cb_locked(te); gpr_mu_unlock(&te->mu); - grpc_endpoint_shutdown(exec_ctx, te->wrapped, why); + grpc_endpoint_shutdown(te->wrapped, why); } -static void te_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) { +static void te_destroy(grpc_endpoint* ep) { trickle_endpoint* te = (trickle_endpoint*)ep; - grpc_endpoint_destroy(exec_ctx, te->wrapped); + grpc_endpoint_destroy(te->wrapped); gpr_mu_destroy(&te->mu); - grpc_slice_buffer_destroy_internal(exec_ctx, &te->write_buffer); - grpc_slice_buffer_destroy_internal(exec_ctx, &te->writing_buffer); + grpc_slice_buffer_destroy_internal(&te->write_buffer); + grpc_slice_buffer_destroy_internal(&te->writing_buffer); GRPC_ERROR_UNREF(te->error); gpr_free(te); } @@ -134,8 +130,7 @@ static int te_get_fd(grpc_endpoint* ep) { return grpc_endpoint_get_fd(te->wrapped); } -static void te_finish_write(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { +static void te_finish_write(void* arg, grpc_error* error) { trickle_endpoint* te = (trickle_endpoint*)arg; gpr_mu_lock(&te->mu); te->writing = false; @@ -173,8 +168,7 @@ static double ts2dbl(gpr_timespec s) { return (double)s.tv_sec + 1e-9 * (double)s.tv_nsec; } -size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx* exec_ctx, - grpc_endpoint* ep) { +size_t grpc_trickle_endpoint_trickle(grpc_endpoint* ep) { trickle_endpoint* te = (trickle_endpoint*)ep; gpr_mu_lock(&te->mu); if (!te->writing && te->write_buffer.length > 0) { @@ -189,9 +183,9 @@ size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx* exec_ctx, te->writing = true; te->last_write = now; grpc_endpoint_write( - exec_ctx, te->wrapped, &te->writing_buffer, + te->wrapped, &te->writing_buffer, GRPC_CLOSURE_CREATE(te_finish_write, te, grpc_schedule_on_exec_ctx)); - maybe_call_write_cb_locked(exec_ctx, te); + maybe_call_write_cb_locked(te); } } size_t backlog = te->write_buffer.length; diff --git a/test/core/util/trickle_endpoint.h b/test/core/util/trickle_endpoint.h index 11c113bda8..cd07de905a 100644 --- a/test/core/util/trickle_endpoint.h +++ b/test/core/util/trickle_endpoint.h @@ -25,8 +25,7 @@ grpc_endpoint* grpc_trickle_endpoint_create(grpc_endpoint* wrap, double bytes_per_second); /* Allow up to \a bytes through the endpoint. Returns the new backlog. */ -size_t grpc_trickle_endpoint_trickle(grpc_exec_ctx* exec_ctx, - grpc_endpoint* endpoint); +size_t grpc_trickle_endpoint_trickle(grpc_endpoint* endpoint); size_t grpc_trickle_get_backlog(grpc_endpoint* endpoint); |