From 547cb4b283273a68c61da28f97b149b8c55c8daf Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 28 Mar 2017 09:50:33 -0700 Subject: Make chunk sizes configurable, push channel args down a little deeper --- test/core/iomgr/tcp_posix_test.c | 55 +++++++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 23 deletions(-) (limited to 'test/core/iomgr/tcp_posix_test.c') diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 5a55be888f..2c53a003d2 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -183,10 +183,12 @@ static void read_test(size_t num_bytes, size_t slice_size) { create_sockets(sv); - grpc_resource_quota *resource_quota = grpc_resource_quota_create("read_test"); - ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), resource_quota, - slice_size, "test"); - grpc_resource_quota_unref_internal(&exec_ctx, resource_quota); + grpc_arg a[] = {{.key = GRPC_ARG_TCP_READ_CHUNK_SIZE, + .type = GRPC_ARG_INTEGER, + .value.integer = (int)slice_size}}; + grpc_channel_args args = {.num_args = GPR_ARRAY_SIZE(a), .args = a}; + ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "read_test"), &args, + "test"); grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); written_bytes = fill_socket_partial(sv[0], num_bytes); @@ -233,11 +235,12 @@ static void large_read_test(size_t slice_size) { create_sockets(sv); - grpc_resource_quota *resource_quota = - grpc_resource_quota_create("large_read_test"); - ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), resource_quota, - slice_size, "test"); - grpc_resource_quota_unref_internal(&exec_ctx, resource_quota); + grpc_arg a[] = {{.key = GRPC_ARG_TCP_READ_CHUNK_SIZE, + .type = GRPC_ARG_INTEGER, + .value.integer = (int)slice_size}}; + grpc_channel_args args = {.num_args = GPR_ARRAY_SIZE(a), .args = a}; + ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "large_read_test"), + &args, "test"); grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); written_bytes = fill_socket(sv[0]); @@ -372,11 +375,12 @@ static void write_test(size_t num_bytes, size_t slice_size) { create_sockets(sv); - grpc_resource_quota *resource_quota = - grpc_resource_quota_create("write_test"); - ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"), resource_quota, - GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test"); - grpc_resource_quota_unref_internal(&exec_ctx, resource_quota); + grpc_arg a[] = {{.key = GRPC_ARG_TCP_READ_CHUNK_SIZE, + .type = GRPC_ARG_INTEGER, + .value.integer = (int)slice_size}}; + grpc_channel_args args = {.num_args = GPR_ARRAY_SIZE(a), .args = a}; + ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "write_test"), &args, + "test"); grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); state.ep = ep; @@ -441,12 +445,13 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { create_sockets(sv); - grpc_resource_quota *resource_quota = - grpc_resource_quota_create("release_fd_test"); - ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), resource_quota, - slice_size, "test"); + grpc_arg a[] = {{.key = GRPC_ARG_TCP_READ_CHUNK_SIZE, + .type = GRPC_ARG_INTEGER, + .value.integer = (int)slice_size}}; + grpc_channel_args args = {.num_args = GPR_ARRAY_SIZE(a), .args = a}; + ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "read_test"), &args, + "test"); GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0); - grpc_resource_quota_unref_internal(&exec_ctx, resource_quota); grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); written_bytes = fill_socket_partial(sv[0], num_bytes); @@ -534,10 +539,14 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair( create_sockets(sv); grpc_resource_quota *resource_quota = grpc_resource_quota_create("tcp_posix_test_socketpair"); - f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"), - resource_quota, slice_size, "test"); - f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), - resource_quota, slice_size, "test"); + grpc_arg a[] = {{.key = GRPC_ARG_TCP_READ_CHUNK_SIZE, + .type = GRPC_ARG_INTEGER, + .value.integer = (int)slice_size}}; + grpc_channel_args args = {.num_args = GPR_ARRAY_SIZE(a), .args = a}; + f.client_ep = grpc_tcp_create( + &exec_ctx, grpc_fd_create(sv[0], "fixture:client"), &args, "test"); + f.server_ep = grpc_tcp_create( + &exec_ctx, grpc_fd_create(sv[1], "fixture:server"), &args, "test"); grpc_resource_quota_unref_internal(&exec_ctx, resource_quota); grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset); grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, g_pollset); -- cgit v1.2.3 From f840110de243ae225f5825c0dda47e9fe89b582c Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 17 Apr 2017 09:47:28 -0700 Subject: Fixup tests --- src/core/lib/iomgr/ev_epoll_linux.c | 2 +- src/core/lib/iomgr/ev_epollex_linux.c | 65 +++++++++++++--------- src/core/lib/iomgr/ev_poll_posix.c | 2 +- src/core/lib/iomgr/ev_posix.c | 4 +- src/core/lib/iomgr/ev_posix.h | 2 +- .../google_default/google_default_credentials.c | 2 +- src/core/lib/surface/alarm.c | 4 +- src/core/lib/surface/call.c | 2 +- src/core/lib/surface/completion_queue.c | 19 ++++--- src/core/lib/surface/completion_queue.h | 12 ++-- src/core/lib/surface/server.c | 2 +- test/core/end2end/fixtures/http_proxy_fixture.c | 2 +- test/core/http/httpcli_test.c | 2 +- test/core/http/httpscli_test.c | 2 +- test/core/iomgr/endpoint_pair_test.c | 2 +- test/core/iomgr/ev_epoll_linux_test.c | 2 +- test/core/iomgr/fd_posix_test.c | 2 +- test/core/iomgr/pollset_set_test.c | 2 +- test/core/iomgr/resolve_address_posix_test.c | 2 +- test/core/iomgr/resolve_address_test.c | 2 +- test/core/iomgr/tcp_client_posix_test.c | 2 +- test/core/iomgr/tcp_posix_test.c | 2 +- test/core/iomgr/tcp_server_posix_test.c | 2 +- test/core/iomgr/udp_server_test.c | 2 +- test/core/security/secure_endpoint_test.c | 2 +- test/core/surface/concurrent_connectivity_test.c | 2 +- test/core/util/port_server_client.c | 2 +- test/core/util/test_tcp_server.c | 9 ++- 28 files changed, 90 insertions(+), 67 deletions(-) (limited to 'test/core/iomgr/tcp_posix_test.c') diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 766891169f..0f1bd833f7 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -1333,7 +1333,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, /* pollset_shutdown is guaranteed to be called before pollset_destroy. So other * than destroying the mutexes, there is nothing special that needs to be done * here */ -static void pollset_destroy(grpc_pollset *pollset) { +static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { GPR_ASSERT(!pollset_has_workers(pollset)); gpr_mu_destroy(&pollset->po.mu); } diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index fdde21756b..f0b9ee39f6 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -265,7 +265,8 @@ static gpr_mu fd_freelist_mu; #ifdef GRPC_FD_REF_COUNT_DEBUG #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__) -#define UNREF_BY(ec, fd, n, reason) unref_by(ec, fd, n, reason, __FILE__, __LINE__) +#define UNREF_BY(ec, fd, n, reason) \ + unref_by(ec, fd, n, reason, __FILE__, __LINE__) static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) { gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd, @@ -280,23 +281,23 @@ static void ref_by(grpc_fd *fd, int n) { } static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { -grpc_fd *fd = arg; - /* Add the fd to the freelist */ - grpc_iomgr_unregister_object(&fd->iomgr_object); - pollable_destroy(&fd->pollable); - gpr_mu_lock(&fd_freelist_mu); - fd->freelist_next = fd_freelist; - fd_freelist = fd; + grpc_fd *fd = arg; + /* Add the fd to the freelist */ + grpc_iomgr_unregister_object(&fd->iomgr_object); + pollable_destroy(&fd->pollable); + gpr_mu_lock(&fd_freelist_mu); + fd->freelist_next = fd_freelist; + fd_freelist = fd; - grpc_lfev_destroy(&fd->read_closure); - grpc_lfev_destroy(&fd->write_closure); + grpc_lfev_destroy(&fd->read_closure); + grpc_lfev_destroy(&fd->write_closure); - gpr_mu_unlock(&fd_freelist_mu); + gpr_mu_unlock(&fd_freelist_mu); } #ifdef GRPC_FD_REF_COUNT_DEBUG -static void unref_by(grpc_exec_ctx *exec_ctx, grpc_fd *fd, int n, const char *reason, const char *file, - int line) { +static void unref_by(grpc_exec_ctx *exec_ctx, grpc_fd *fd, int n, + const char *reason, const char *file, int line) { gpr_atm old; gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd, (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst), @@ -307,7 +308,9 @@ static void unref_by(grpc_exec_ctx *exec_ctx, grpc_fd *fd, int n) { #endif old = gpr_atm_full_fetch_add(&fd->refst, -n); if (old == n) { - grpc_closure_sched(exec_ctx, grpc_closure_create(fd_destroy, fd, grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); + grpc_closure_sched(exec_ctx, grpc_closure_create(fd_destroy, fd, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); } else { GPR_ASSERT(old > n); } @@ -660,7 +663,8 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) { return error; } -static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, grpc_pollset_worker *specific_worker) { +static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, + grpc_pollset_worker *specific_worker) { if (grpc_polling_trace) { gpr_log(GPR_DEBUG, "PS:%p kick %p tls_pollset=%p tls_worker=%p " @@ -711,7 +715,6 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, grpc_p gpr_cv_signal(&specific_worker->cv); return GRPC_ERROR_NONE; } - } /* p->po.mu must be held before calling this function */ @@ -816,7 +819,8 @@ static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable *p) { static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { pollable_destroy(&pollset->pollable); if (pollset_is_pollable_fd(pollset, pollset->current_pollable)) { - UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable, 2, "pollset_pollable"); + UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable, 2, + "pollset_pollable"); } } @@ -967,7 +971,8 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, return pollset->shutdown_closure == NULL; } -static void end_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, +static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl) { worker_remove(&pollset->root_worker, PWL_POLLSET, worker); if (NEW_ROOT == @@ -1027,21 +1032,23 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_tls_set(&g_current_thread_worker, 0); pollset_maybe_finish_shutdown(exec_ctx, pollset); } - end_worker(pollset, &worker, worker_hdl); + end_worker(exec_ctx, pollset, &worker, worker_hdl); if (worker.pollable != &pollset->pollable) { gpr_mu_unlock(&worker.pollable->po.mu); } return error; } -static void unref_fd_no_longer_poller(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { +static void unref_fd_no_longer_poller(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { grpc_fd *fd = arg; UNREF_BY(exec_ctx, fd, 2, "pollset_pollable"); } /* expects pollsets locked, flag whether fd is locked or not */ -static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_fd *fd, bool fd_locked) { +static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, grpc_fd *fd, + bool fd_locked) { static const char *err_desc = "pollset_add_fd"; grpc_error *error = GRPC_ERROR_NONE; if (pollset->current_pollable == &g_empty_pollable) { @@ -1063,7 +1070,10 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, grpc_pollset * pollable_add_fd(&pollset->pollable, had_fd); pollable_add_fd(&pollset->pollable, fd); } - grpc_closure_sched(exec_ctx, grpc_closure_create(unref_fd_no_longer_poller, had_fd, grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); + grpc_closure_sched(exec_ctx, + grpc_closure_create(unref_fd_no_longer_poller, had_fd, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); } return error; } @@ -1226,13 +1236,16 @@ static void pg_broadcast(grpc_exec_ctx *exec_ctx, polling_group *from, for (polling_obj *a = from->po.next; a != &from->po; a = a->next) { for (polling_obj *b = to->po.next; b != &to->po; b = b->next) { if (po_cmp(a, b) < 0) { - gpr_mu_lock(&a->mu); gpr_mu_lock(&b->mu); + gpr_mu_lock(&a->mu); + gpr_mu_lock(&b->mu); } else { GPR_ASSERT(po_cmp(a, b) != 0); - gpr_mu_lock(&b->mu); gpr_mu_lock(&a->mu); + gpr_mu_lock(&b->mu); + gpr_mu_lock(&a->mu); } pg_notify(exec_ctx, a, b); - gpr_mu_unlock(&a->mu); gpr_mu_unlock(&b->mu); + gpr_mu_unlock(&a->mu); + gpr_mu_unlock(&b->mu); } } } diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index b13ec2cbc1..a83bad319a 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -817,7 +817,7 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { pollset->pollset_set_count = 0; } -static void pollset_destroy(grpc_pollset *pollset) { +static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { GPR_ASSERT(!pollset_has_workers(pollset)); GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); while (pollset->local_wakeup_cache) { diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 620bbde65d..a01de93e9b 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -204,8 +204,8 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, g_event_engine->pollset_shutdown(exec_ctx, pollset, closure); } -void grpc_pollset_destroy(grpc_pollset *pollset) { - g_event_engine->pollset_destroy(pollset); +void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { + g_event_engine->pollset_destroy(exec_ctx, pollset); } grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index e84a86082a..59832aaf24 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -66,7 +66,7 @@ typedef struct grpc_event_engine_vtable { void (*pollset_init)(grpc_pollset *pollset, gpr_mu **mu); void (*pollset_shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure); - void (*pollset_destroy)(grpc_pollset *pollset); + void (*pollset_destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); grpc_error *(*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker, gpr_timespec now, gpr_timespec deadline); diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c index 97501e6788..4d8c451ea8 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.c +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c @@ -99,7 +99,7 @@ static void on_compute_engine_detection_http_response(grpc_exec_ctx *exec_ctx, } static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, grpc_error *e) { - grpc_pollset_destroy(p); + grpc_pollset_destroy(exec_ctx, p); } static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) { diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c index e71c0ebfc5..b72d534b7e 100644 --- a/src/core/lib/surface/alarm.c +++ b/src/core/lib/surface/alarm.c @@ -81,7 +81,9 @@ void grpc_alarm_cancel(grpc_alarm *alarm) { } void grpc_alarm_destroy(grpc_alarm *alarm) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_alarm_cancel(alarm); - GRPC_CQ_INTERNAL_UNREF(alarm->cq, "alarm"); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm"); gpr_free(alarm); + grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 3e96d09798..6bd481d3bc 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -509,7 +509,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, } } if (c->cq) { - GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, c->cq, "bind"); } get_final_status(call, set_status_value_directly, &c->final_info.final_status, diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 35e9f7eb30..366842d121 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -182,20 +182,21 @@ void grpc_cq_internal_ref(grpc_completion_queue *cc) { static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_completion_queue *cc = arg; - GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy"); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cc, "pollset_destroy"); } #ifdef GRPC_CQ_REF_COUNT_DEBUG -void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason, - const char *file, int line) { +void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, + const char *reason, const char *file, int line) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason); #else -void grpc_cq_internal_unref(grpc_completion_queue *cc) { +void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc) { #endif if (gpr_unref(&cc->owning_refs)) { GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head); - grpc_pollset_destroy(POLLSET_FROM_CQ(cc)); + grpc_pollset_destroy(exec_ctx, POLLSET_FROM_CQ(cc)); #ifndef NDEBUG gpr_free(cc->outstanding_tags); #endif @@ -469,7 +470,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, is_finished_arg.first_loop = false; } GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); - GRPC_CQ_INTERNAL_UNREF(cc, "next"); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "next"); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(is_finished_arg.stolen_completion == NULL); @@ -664,7 +665,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, } done: GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); - GRPC_CQ_INTERNAL_UNREF(cc, "pluck"); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "pluck"); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(is_finished_arg.stolen_completion == NULL); @@ -701,7 +702,9 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) { GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc)); GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); grpc_completion_queue_shutdown(cc); - GRPC_CQ_INTERNAL_UNREF(cc, "destroy"); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "destroy"); + grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_destroy", 0); } diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 1ff3d64293..a2e7b2d6a4 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -65,17 +65,17 @@ typedef struct grpc_cq_completion { #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line); -void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason, - const char *file, int line); +void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, + const char *reason, const char *file, int line); #define GRPC_CQ_INTERNAL_REF(cc, reason) \ grpc_cq_internal_ref(cc, reason, __FILE__, __LINE__) -#define GRPC_CQ_INTERNAL_UNREF(cc, reason) \ - grpc_cq_internal_unref(cc, reason, __FILE__, __LINE__) +#define GRPC_CQ_INTERNAL_UNREF(ec, cc, reason) \ + grpc_cq_internal_unref(ec, cc, reason, __FILE__, __LINE__) #else void grpc_cq_internal_ref(grpc_completion_queue *cc); -void grpc_cq_internal_unref(grpc_completion_queue *cc); +void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc); #define GRPC_CQ_INTERNAL_REF(cc, reason) grpc_cq_internal_ref(cc) -#define GRPC_CQ_INTERNAL_UNREF(cc, reason) grpc_cq_internal_unref(cc) +#define GRPC_CQ_INTERNAL_UNREF(ec, cc, reason) grpc_cq_internal_unref(ec, cc) #endif /* Flag that an operation is beginning: the completion channel will not finish diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 38800ea37b..b0328cec98 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -408,7 +408,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { request_matcher_destroy(&server->unregistered_request_matcher); } for (i = 0; i < server->cq_count; i++) { - GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server"); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server"); if (server->started) { gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]); gpr_free(server->requested_calls_per_cq[i]); diff --git a/test/core/end2end/fixtures/http_proxy_fixture.c b/test/core/end2end/fixtures/http_proxy_fixture.c index 451ed268d3..259abfd151 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.c +++ b/test/core/end2end/fixtures/http_proxy_fixture.c @@ -469,7 +469,7 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(void) { static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_pollset* pollset = arg; - grpc_pollset_destroy(pollset); + grpc_pollset_destroy(exec_ctx, pollset); gpr_free(pollset); } diff --git a/test/core/http/httpcli_test.c b/test/core/http/httpcli_test.c index d3b45c4505..21135ddf6b 100644 --- a/test/core/http/httpcli_test.c +++ b/test/core/http/httpcli_test.c @@ -155,7 +155,7 @@ static void test_post(int port) { } static void destroy_pops(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { - grpc_pollset_destroy(grpc_polling_entity_pollset(p)); + grpc_pollset_destroy(exec_ctx, grpc_polling_entity_pollset(p)); } int main(int argc, char **argv) { diff --git a/test/core/http/httpscli_test.c b/test/core/http/httpscli_test.c index acc94091f4..73eaae87d7 100644 --- a/test/core/http/httpscli_test.c +++ b/test/core/http/httpscli_test.c @@ -157,7 +157,7 @@ static void test_post(int port) { } static void destroy_pops(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { - grpc_pollset_destroy(grpc_polling_entity_pollset(p)); + grpc_pollset_destroy(exec_ctx, grpc_polling_entity_pollset(p)); } int main(int argc, char **argv) { diff --git a/test/core/iomgr/endpoint_pair_test.c b/test/core/iomgr/endpoint_pair_test.c index c8a60776b9..4561c3846e 100644 --- a/test/core/iomgr/endpoint_pair_test.c +++ b/test/core/iomgr/endpoint_pair_test.c @@ -70,7 +70,7 @@ static grpc_endpoint_test_config configs[] = { static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { - grpc_pollset_destroy(p); + grpc_pollset_destroy(exec_ctx, p); } int main(int argc, char **argv) { diff --git a/test/core/iomgr/ev_epoll_linux_test.c b/test/core/iomgr/ev_epoll_linux_test.c index 0856023b14..7693b5a398 100644 --- a/test/core/iomgr/ev_epoll_linux_test.c +++ b/test/core/iomgr/ev_epoll_linux_test.c @@ -113,7 +113,7 @@ static void test_pollset_init(test_pollset *pollsets, int num_pollsets) { static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { - grpc_pollset_destroy(p); + grpc_pollset_destroy(exec_ctx, p); } static void test_pollset_cleanup(grpc_exec_ctx *exec_ctx, diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index 81d2692a08..d6cee8e43a 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -535,7 +535,7 @@ static void test_grpc_fd_change(void) { static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { - grpc_pollset_destroy(p); + grpc_pollset_destroy(exec_ctx, p); } int main(int argc, char **argv) { diff --git a/test/core/iomgr/pollset_set_test.c b/test/core/iomgr/pollset_set_test.c index 3a9d459579..469551c827 100644 --- a/test/core/iomgr/pollset_set_test.c +++ b/test/core/iomgr/pollset_set_test.c @@ -86,7 +86,7 @@ static void init_test_pollsets(test_pollset *pollsets, const int num_pollsets) { static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { - grpc_pollset_destroy(p); + grpc_pollset_destroy(exec_ctx, p); } static void cleanup_test_pollsets(grpc_exec_ctx *exec_ctx, diff --git a/test/core/iomgr/resolve_address_posix_test.c b/test/core/iomgr/resolve_address_posix_test.c index fa88aca431..7fba0b92be 100644 --- a/test/core/iomgr/resolve_address_posix_test.c +++ b/test/core/iomgr/resolve_address_posix_test.c @@ -81,7 +81,7 @@ void args_finish(grpc_exec_ctx *exec_ctx, args_struct *args) { grpc_pollset_shutdown(exec_ctx, args->pollset, &do_nothing_cb); // exec_ctx needs to be flushed before calling grpc_pollset_destroy() grpc_exec_ctx_flush(exec_ctx); - grpc_pollset_destroy(args->pollset); + grpc_pollset_destroy(exec_ctx, args->pollset); gpr_free(args->pollset); } diff --git a/test/core/iomgr/resolve_address_test.c b/test/core/iomgr/resolve_address_test.c index ea79adc090..0425c3b3f5 100644 --- a/test/core/iomgr/resolve_address_test.c +++ b/test/core/iomgr/resolve_address_test.c @@ -76,7 +76,7 @@ void args_finish(grpc_exec_ctx *exec_ctx, args_struct *args) { grpc_pollset_shutdown(exec_ctx, args->pollset, &do_nothing_cb); // exec_ctx needs to be flushed before calling grpc_pollset_destroy() grpc_exec_ctx_flush(exec_ctx); - grpc_pollset_destroy(args->pollset); + grpc_pollset_destroy(exec_ctx, args->pollset); gpr_free(args->pollset); } diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index 2fae6774e8..6e1bb43eb5 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -197,7 +197,7 @@ void test_fails(void) { static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { - grpc_pollset_destroy(p); + grpc_pollset_destroy(exec_ctx, p); } int main(int argc, char **argv) { diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 2c53a003d2..a1c54e19c1 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -562,7 +562,7 @@ static grpc_endpoint_test_config configs[] = { static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { - grpc_pollset_destroy(p); + grpc_pollset_destroy(exec_ctx, p); } int main(int argc, char **argv) { diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index 112743b95b..88aead1dd0 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -444,7 +444,7 @@ static void test_connect(size_t num_connects, static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { - grpc_pollset_destroy(p); + grpc_pollset_destroy(exec_ctx, p); } int main(int argc, char **argv) { diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c index 1f1696a7a7..ee78d6b4ad 100644 --- a/test/core/iomgr/udp_server_test.c +++ b/test/core/iomgr/udp_server_test.c @@ -307,7 +307,7 @@ static void test_receive(int number_of_clients) { static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { - grpc_pollset_destroy(p); + grpc_pollset_destroy(exec_ctx, p); } int main(int argc, char **argv) { diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c index 71d8057ac3..cd6ff2ceac 100644 --- a/test/core/security/secure_endpoint_test.c +++ b/test/core/security/secure_endpoint_test.c @@ -185,7 +185,7 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) { static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { - grpc_pollset_destroy(p); + grpc_pollset_destroy(exec_ctx, p); } int main(int argc, char **argv) { diff --git a/test/core/surface/concurrent_connectivity_test.c b/test/core/surface/concurrent_connectivity_test.c index d6841ea1f8..f0e3394b2e 100644 --- a/test/core/surface/concurrent_connectivity_test.c +++ b/test/core/surface/concurrent_connectivity_test.c @@ -162,7 +162,7 @@ void bad_server_thread(void *vargs) { static void done_pollset_shutdown(grpc_exec_ctx *exec_ctx, void *pollset, grpc_error *error) { - grpc_pollset_destroy(pollset); + grpc_pollset_destroy(exec_ctx, pollset); gpr_free(pollset); } diff --git a/test/core/util/port_server_client.c b/test/core/util/port_server_client.c index 38054dd1e7..2cd50a4ff7 100644 --- a/test/core/util/port_server_client.c +++ b/test/core/util/port_server_client.c @@ -58,7 +58,7 @@ typedef struct freereq { static void destroy_pops_and_shutdown(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { grpc_pollset *pollset = grpc_polling_entity_pollset(p); - grpc_pollset_destroy(pollset); + grpc_pollset_destroy(exec_ctx, pollset); gpr_free(pollset); grpc_shutdown(); } diff --git a/test/core/util/test_tcp_server.c b/test/core/util/test_tcp_server.c index 496e579bc3..1908698009 100644 --- a/test/core/util/test_tcp_server.c +++ b/test/core/util/test_tcp_server.c @@ -106,6 +106,10 @@ void test_tcp_server_poll(test_tcp_server *server, int seconds) { } static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} +static void finish_pollset(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_pollset_destroy(exec_ctx, arg); +} void test_tcp_server_destroy(test_tcp_server *server) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -120,9 +124,10 @@ void test_tcp_server_destroy(test_tcp_server *server) { gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), shutdown_deadline) < 0) { test_tcp_server_poll(server, 1); } - grpc_pollset_shutdown(&exec_ctx, server->pollset, &do_nothing_cb); + grpc_pollset_shutdown(&exec_ctx, server->pollset, + grpc_closure_create(finish_pollset, server->pollset, + grpc_schedule_on_exec_ctx)); grpc_exec_ctx_finish(&exec_ctx); - grpc_pollset_destroy(server->pollset); gpr_free(server->pollset); grpc_shutdown(); } -- cgit v1.2.3