diff options
-rw-r--r-- | include/grpc/grpc.h | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/buffer_pool.c | 19 | ||||
-rw-r--r-- | src/core/lib/iomgr/endpoint.c | 4 | ||||
-rw-r--r-- | test/core/bad_client/bad_client.c | 4 | ||||
-rw-r--r-- | test/core/http/httpcli_test.c | 11 | ||||
-rw-r--r-- | test/core/http/httpscli_test.c | 11 | ||||
-rw-r--r-- | test/core/internal_api_canaries/iomgr.c | 1 | ||||
-rw-r--r-- | test/core/iomgr/endpoint_pair_test.c | 5 | ||||
-rw-r--r-- | test/core/iomgr/fd_conservation_posix_test.c | 5 | ||||
-rw-r--r-- | test/core/surface/concurrent_connectivity_test.c | 2 | ||||
-rw-r--r-- | test/core/util/mock_endpoint.c | 22 | ||||
-rw-r--r-- | test/core/util/mock_endpoint.h | 3 | ||||
-rw-r--r-- | test/core/util/passthru_endpoint.c | 27 | ||||
-rw-r--r-- | test/core/util/passthru_endpoint.h | 3 | ||||
-rw-r--r-- | test/core/util/port_server_client.c | 17 | ||||
-rw-r--r-- | test/core/util/test_tcp_server.c | 4 |
16 files changed, 112 insertions, 28 deletions
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index a893367a9c..a6e02cd072 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -415,7 +415,7 @@ GRPCAPI void grpc_buffer_pool_resize(grpc_buffer_pool *buffer_pool, size_t new_size); /** Fetch a vtable for a grpc_channel_arg that points to a grpc_buffer_pool */ -GRPCAPI grpc_arg_pointer_vtable *grpc_buffer_pool_arg_vtable(void); +GRPCAPI const grpc_arg_pointer_vtable *grpc_buffer_pool_arg_vtable(void); #ifdef __cplusplus } diff --git a/src/core/lib/iomgr/buffer_pool.c b/src/core/lib/iomgr/buffer_pool.c index 0153bce203..78a98027f5 100644 --- a/src/core/lib/iomgr/buffer_pool.c +++ b/src/core/lib/iomgr/buffer_pool.c @@ -37,6 +37,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/useful.h> #include "src/core/lib/iomgr/combiner.h" @@ -366,6 +367,10 @@ void grpc_buffer_pool_resize(grpc_buffer_pool *buffer_pool, size_t size) { grpc_exec_ctx_finish(&exec_ctx); } +/******************************************************************************* + * grpc_buffer_user channel args api + */ + grpc_buffer_pool *grpc_buffer_pool_from_channel_args( grpc_channel_args *channel_args) { for (size_t i = 0; i < channel_args->num_args; i++) { @@ -381,6 +386,20 @@ grpc_buffer_pool *grpc_buffer_pool_from_channel_args( return grpc_buffer_pool_create(); } +static void *bp_copy(void *bp) { + grpc_buffer_pool_ref(bp); + return bp; +} + +static void bp_destroy(void *bp) { grpc_buffer_pool_unref(bp); } + +static int bp_cmp(void *a, void *b) { return GPR_ICMP(a, b); } + +const grpc_arg_pointer_vtable *grpc_buffer_pool_arg_vtable(void) { + static const grpc_arg_pointer_vtable vtable = {bp_copy, bp_destroy, bp_cmp}; + return &vtable; +} + /******************************************************************************* * grpc_buffer_user api */ diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c index f901fcf962..f3548a1d74 100644 --- a/src/core/lib/iomgr/endpoint.c +++ b/src/core/lib/iomgr/endpoint.c @@ -69,3 +69,7 @@ char* grpc_endpoint_get_peer(grpc_endpoint* ep) { grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) { return ep->vtable->get_workqueue(ep); } + +grpc_buffer_user* grpc_endpoint_get_buffer_user(grpc_endpoint* ep) { + return ep->vtable->get_buffer_user(ep); +} diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c index be88d4a69a..06692fe969 100644 --- a/test/core/bad_client/bad_client.c +++ b/test/core/bad_client/bad_client.c @@ -114,7 +114,9 @@ void grpc_run_bad_client_test( grpc_init(); /* Create endpoints */ - sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536); + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + sfd = grpc_iomgr_create_endpoint_pair("fixture", buffer_pool, 65536); + grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool); /* Create server, completion events */ a.server = grpc_server_create(NULL, NULL); diff --git a/test/core/http/httpcli_test.c b/test/core/http/httpcli_test.c index 38b32a3867..d1c6805b03 100644 --- a/test/core/http/httpcli_test.c +++ b/test/core/http/httpcli_test.c @@ -89,8 +89,11 @@ static void test_get(int port) { grpc_http_response response; memset(&response, 0, sizeof(response)); - grpc_httpcli_get(&exec_ctx, &g_context, &g_pops, &req, n_seconds_time(15), + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_httpcli_get(&exec_ctx, &g_context, &g_pops, buffer_pool, &req, + n_seconds_time(15), grpc_closure_create(on_finish, &response), &response); + grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool); gpr_mu_lock(g_mu); while (!g_done) { grpc_pollset_worker *worker = NULL; @@ -126,9 +129,11 @@ static void test_post(int port) { grpc_http_response response; memset(&response, 0, sizeof(response)); - grpc_httpcli_post(&exec_ctx, &g_context, &g_pops, &req, "hello", 5, - n_seconds_time(15), + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_httpcli_post(&exec_ctx, &g_context, &g_pops, buffer_pool, &req, "hello", + 5, n_seconds_time(15), grpc_closure_create(on_finish, &response), &response); + grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool); gpr_mu_lock(g_mu); while (!g_done) { grpc_pollset_worker *worker = NULL; diff --git a/test/core/http/httpscli_test.c b/test/core/http/httpscli_test.c index 359e557689..09ceec7208 100644 --- a/test/core/http/httpscli_test.c +++ b/test/core/http/httpscli_test.c @@ -90,8 +90,11 @@ static void test_get(int port) { grpc_http_response response; memset(&response, 0, sizeof(response)); - grpc_httpcli_get(&exec_ctx, &g_context, &g_pops, &req, n_seconds_time(15), + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_httpcli_get(&exec_ctx, &g_context, &g_pops, buffer_pool, &req, + n_seconds_time(15), grpc_closure_create(on_finish, &response), &response); + grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool); gpr_mu_lock(g_mu); while (!g_done) { grpc_pollset_worker *worker = NULL; @@ -128,9 +131,11 @@ static void test_post(int port) { grpc_http_response response; memset(&response, 0, sizeof(response)); - grpc_httpcli_post(&exec_ctx, &g_context, &g_pops, &req, "hello", 5, - n_seconds_time(15), + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_httpcli_post(&exec_ctx, &g_context, &g_pops, buffer_pool, &req, "hello", + 5, n_seconds_time(15), grpc_closure_create(on_finish, &response), &response); + grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool); gpr_mu_lock(g_mu); while (!g_done) { grpc_pollset_worker *worker = NULL; diff --git a/test/core/internal_api_canaries/iomgr.c b/test/core/internal_api_canaries/iomgr.c index 27d630623e..4f967ce751 100644 --- a/test/core/internal_api_canaries/iomgr.c +++ b/test/core/internal_api_canaries/iomgr.c @@ -84,6 +84,7 @@ static void test_code(void) { grpc_endpoint_add_to_pollset_set, grpc_endpoint_shutdown, grpc_endpoint_destroy, + grpc_endpoint_get_buffer_user, grpc_endpoint_get_peer}; endpoint.vtable = &vtable; diff --git a/test/core/iomgr/endpoint_pair_test.c b/test/core/iomgr/endpoint_pair_test.c index 99b86b6213..a7f00230c2 100644 --- a/test/core/iomgr/endpoint_pair_test.c +++ b/test/core/iomgr/endpoint_pair_test.c @@ -49,7 +49,10 @@ static grpc_endpoint_test_fixture create_fixture_endpoint_pair( size_t slice_size) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_endpoint_test_fixture f; - grpc_endpoint_pair p = grpc_iomgr_create_endpoint_pair("test", slice_size); + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_endpoint_pair p = + grpc_iomgr_create_endpoint_pair("test", buffer_pool, slice_size); + grpc_buffer_pool_unref(buffer_pool); f.client_ep = p.client; f.server_ep = p.server; diff --git a/test/core/iomgr/fd_conservation_posix_test.c b/test/core/iomgr/fd_conservation_posix_test.c index bbb3f46497..4612ff7dc4 100644 --- a/test/core/iomgr/fd_conservation_posix_test.c +++ b/test/core/iomgr/fd_conservation_posix_test.c @@ -52,15 +52,18 @@ int main(int argc, char **argv) { of descriptors */ rlim.rlim_cur = rlim.rlim_max = 10; GPR_ASSERT(0 == setrlimit(RLIMIT_NOFILE, &rlim)); + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); for (i = 0; i < 100; i++) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - p = grpc_iomgr_create_endpoint_pair("test", 1); + p = grpc_iomgr_create_endpoint_pair("test", buffer_pool, 1); grpc_endpoint_destroy(&exec_ctx, p.client); grpc_endpoint_destroy(&exec_ctx, p.server); grpc_exec_ctx_finish(&exec_ctx); } + grpc_buffer_pool_unref(buffer_pool); + grpc_iomgr_shutdown(); return 0; } diff --git a/test/core/surface/concurrent_connectivity_test.c b/test/core/surface/concurrent_connectivity_test.c index f7567f350d..c34ea2c8cb 100644 --- a/test/core/surface/concurrent_connectivity_test.c +++ b/test/core/surface/concurrent_connectivity_test.c @@ -113,7 +113,7 @@ void bad_server_thread(void *vargs) { socklen_t addr_len = sizeof(addr); int port; grpc_tcp_server *s; - grpc_error *error = grpc_tcp_server_create(NULL, NULL, &s); + grpc_error *error = grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s); GPR_ASSERT(error == GRPC_ERROR_NONE); memset(&addr, 0, sizeof(addr)); addr.ss_family = AF_INET; diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c index 13e0e918fb..173a2d8963 100644 --- a/test/core/util/mock_endpoint.c +++ b/test/core/util/mock_endpoint.c @@ -43,6 +43,7 @@ typedef struct grpc_mock_endpoint { gpr_slice_buffer read_buffer; gpr_slice_buffer *on_read_out; grpc_closure *on_read; + grpc_buffer_user buffer_user; } grpc_mock_endpoint; static void me_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -85,16 +86,28 @@ static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { gpr_mu_unlock(&m->mu); } -static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { - grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep; +static void me_really_destroy(grpc_exec_ctx *exec_ctx, void *mp, + grpc_error *error) { + grpc_mock_endpoint *m = mp; gpr_slice_buffer_destroy(&m->read_buffer); gpr_free(m); } +static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { + grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep; + grpc_buffer_user_destroy(exec_ctx, &m->buffer_user, + grpc_closure_create(me_really_destroy, m)); +} + static char *me_get_peer(grpc_endpoint *ep) { return gpr_strdup("fake:mock_endpoint"); } +static grpc_buffer_user *me_get_buffer_user(grpc_endpoint *ep) { + grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep; + return &m->buffer_user; +} + static grpc_workqueue *me_get_workqueue(grpc_endpoint *ep) { return NULL; } static const grpc_endpoint_vtable vtable = { @@ -105,12 +118,15 @@ static const grpc_endpoint_vtable vtable = { me_add_to_pollset_set, me_shutdown, me_destroy, + me_get_buffer_user, me_get_peer, }; -grpc_endpoint *grpc_mock_endpoint_create(void (*on_write)(gpr_slice slice)) { +grpc_endpoint *grpc_mock_endpoint_create(void (*on_write)(gpr_slice slice), + grpc_buffer_pool *buffer_pool) { grpc_mock_endpoint *m = gpr_malloc(sizeof(*m)); m->base.vtable = &vtable; + grpc_buffer_user_init(&m->buffer_user, buffer_pool); gpr_slice_buffer_init(&m->read_buffer); gpr_mu_init(&m->mu); m->on_write = on_write; diff --git a/test/core/util/mock_endpoint.h b/test/core/util/mock_endpoint.h index 051af9866b..bb59a16f7a 100644 --- a/test/core/util/mock_endpoint.h +++ b/test/core/util/mock_endpoint.h @@ -36,7 +36,8 @@ #include "src/core/lib/iomgr/endpoint.h" -grpc_endpoint *grpc_mock_endpoint_create(void (*on_write)(gpr_slice slice)); +grpc_endpoint *grpc_mock_endpoint_create(void (*on_write)(gpr_slice slice), + grpc_buffer_pool *buffer_pool); void grpc_mock_endpoint_put_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *mock_endpoint, gpr_slice slice); diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c index 7ed9e97bd6..239cd3e275 100644 --- a/test/core/util/passthru_endpoint.c +++ b/test/core/util/passthru_endpoint.c @@ -44,6 +44,7 @@ typedef struct { gpr_slice_buffer read_buffer; gpr_slice_buffer *on_read_out; grpc_closure *on_read; + grpc_buffer_user buffer_user; } half; struct passthru_endpoint { @@ -122,7 +123,8 @@ static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { gpr_mu_unlock(&m->parent->mu); } -static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { +static void me_really_destroy(grpc_exec_ctx *exec_ctx, void *ep, + grpc_error *error) { passthru_endpoint *p = ((half *)ep)->parent; gpr_mu_lock(&p->mu); if (0 == --p->halves) { @@ -136,12 +138,23 @@ static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { } } +static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { + half *m = (half *)ep; + grpc_buffer_user_destroy(exec_ctx, &m->buffer_user, + grpc_closure_create(me_really_destroy, m)); +} + static char *me_get_peer(grpc_endpoint *ep) { return gpr_strdup("fake:mock_endpoint"); } static grpc_workqueue *me_get_workqueue(grpc_endpoint *ep) { return NULL; } +static grpc_buffer_user *me_get_buffer_user(grpc_endpoint *ep) { + half *m = (half *)ep; + return &m->buffer_user; +} + static const grpc_endpoint_vtable vtable = { me_read, me_write, @@ -150,23 +163,27 @@ static const grpc_endpoint_vtable vtable = { me_add_to_pollset_set, me_shutdown, me_destroy, + me_get_buffer_user, me_get_peer, }; -static void half_init(half *m, passthru_endpoint *parent) { +static void half_init(half *m, passthru_endpoint *parent, + grpc_buffer_pool *buffer_pool) { m->base.vtable = &vtable; m->parent = parent; gpr_slice_buffer_init(&m->read_buffer); m->on_read = NULL; + grpc_buffer_user_init(&m->buffer_user, buffer_pool); } void grpc_passthru_endpoint_create(grpc_endpoint **client, - grpc_endpoint **server) { + grpc_endpoint **server, + grpc_buffer_pool *buffer_pool) { passthru_endpoint *m = gpr_malloc(sizeof(*m)); m->halves = 2; m->shutdown = 0; - half_init(&m->client, m); - half_init(&m->server, m); + half_init(&m->client, m, buffer_pool); + half_init(&m->server, m, buffer_pool); gpr_mu_init(&m->mu); *client = &m->client.base; *server = &m->server.base; diff --git a/test/core/util/passthru_endpoint.h b/test/core/util/passthru_endpoint.h index aa1d3a1763..9756315084 100644 --- a/test/core/util/passthru_endpoint.h +++ b/test/core/util/passthru_endpoint.h @@ -37,6 +37,7 @@ #include "src/core/lib/iomgr/endpoint.h" void grpc_passthru_endpoint_create(grpc_endpoint **client, - grpc_endpoint **server); + grpc_endpoint **server, + grpc_buffer_pool *buffer_pool); #endif diff --git a/test/core/util/port_server_client.c b/test/core/util/port_server_client.c index a5c8c49650..dd444236e9 100644 --- a/test/core/util/port_server_client.c +++ b/test/core/util/port_server_client.c @@ -99,9 +99,11 @@ void grpc_free_port_using_server(char *server, int port) { req.http.path = path; grpc_httpcli_context_init(&context); - grpc_httpcli_get(&exec_ctx, &context, &pr.pops, &req, + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_httpcli_get(&exec_ctx, &context, &pr.pops, buffer_pool, &req, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), grpc_closure_create(freed_port_from_server, &pr), &rsp); + grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool); gpr_mu_lock(pr.mu); while (!pr.done) { grpc_pollset_worker *worker = NULL; @@ -167,10 +169,12 @@ static void got_port_from_server(grpc_exec_ctx *exec_ctx, void *arg, req.http.path = "/get"; grpc_http_response_destroy(&pr->response); memset(&pr->response, 0, sizeof(pr->response)); - grpc_httpcli_get(exec_ctx, pr->ctx, &pr->pops, &req, + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_httpcli_get(exec_ctx, pr->ctx, &pr->pops, buffer_pool, &req, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), grpc_closure_create(got_port_from_server, pr), &pr->response); + grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool); return; } GPR_ASSERT(response); @@ -211,9 +215,12 @@ int grpc_pick_port_using_server(char *server) { req.http.path = "/get"; grpc_httpcli_context_init(&context); - grpc_httpcli_get( - &exec_ctx, &context, &pr.pops, &req, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), - grpc_closure_create(got_port_from_server, &pr), &pr.response); + grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create(); + grpc_httpcli_get(&exec_ctx, &context, &pr.pops, buffer_pool, &req, + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), + grpc_closure_create(got_port_from_server, &pr), + &pr.response); + grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool); grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(pr.mu); while (pr.port == -1) { diff --git a/test/core/util/test_tcp_server.c b/test/core/util/test_tcp_server.c index 8a0b3932d8..6890da4730 100644 --- a/test/core/util/test_tcp_server.c +++ b/test/core/util/test_tcp_server.c @@ -72,8 +72,8 @@ void test_tcp_server_start(test_tcp_server *server, int port) { addr.sin_port = htons((uint16_t)port); memset(&addr.sin_addr, 0, sizeof(addr.sin_addr)); - grpc_error *error = grpc_tcp_server_create(&server->shutdown_complete, NULL, - &server->tcp_server); + grpc_error *error = grpc_tcp_server_create( + &exec_ctx, &server->shutdown_complete, NULL, &server->tcp_server); GPR_ASSERT(error == GRPC_ERROR_NONE); error = grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr), &port_added); |