aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--include/grpc/grpc.h2
-rw-r--r--src/core/lib/iomgr/buffer_pool.c19
-rw-r--r--src/core/lib/iomgr/endpoint.c4
-rw-r--r--test/core/bad_client/bad_client.c4
-rw-r--r--test/core/http/httpcli_test.c11
-rw-r--r--test/core/http/httpscli_test.c11
-rw-r--r--test/core/internal_api_canaries/iomgr.c1
-rw-r--r--test/core/iomgr/endpoint_pair_test.c5
-rw-r--r--test/core/iomgr/fd_conservation_posix_test.c5
-rw-r--r--test/core/surface/concurrent_connectivity_test.c2
-rw-r--r--test/core/util/mock_endpoint.c22
-rw-r--r--test/core/util/mock_endpoint.h3
-rw-r--r--test/core/util/passthru_endpoint.c27
-rw-r--r--test/core/util/passthru_endpoint.h3
-rw-r--r--test/core/util/port_server_client.c17
-rw-r--r--test/core/util/test_tcp_server.c4
16 files changed, 112 insertions, 28 deletions
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index a893367a9c..a6e02cd072 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -415,7 +415,7 @@ GRPCAPI void grpc_buffer_pool_resize(grpc_buffer_pool *buffer_pool,
size_t new_size);
/** Fetch a vtable for a grpc_channel_arg that points to a grpc_buffer_pool */
-GRPCAPI grpc_arg_pointer_vtable *grpc_buffer_pool_arg_vtable(void);
+GRPCAPI const grpc_arg_pointer_vtable *grpc_buffer_pool_arg_vtable(void);
#ifdef __cplusplus
}
diff --git a/src/core/lib/iomgr/buffer_pool.c b/src/core/lib/iomgr/buffer_pool.c
index 0153bce203..78a98027f5 100644
--- a/src/core/lib/iomgr/buffer_pool.c
+++ b/src/core/lib/iomgr/buffer_pool.c
@@ -37,6 +37,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/combiner.h"
@@ -366,6 +367,10 @@ void grpc_buffer_pool_resize(grpc_buffer_pool *buffer_pool, size_t size) {
grpc_exec_ctx_finish(&exec_ctx);
}
+/*******************************************************************************
+ * grpc_buffer_user channel args api
+ */
+
grpc_buffer_pool *grpc_buffer_pool_from_channel_args(
grpc_channel_args *channel_args) {
for (size_t i = 0; i < channel_args->num_args; i++) {
@@ -381,6 +386,20 @@ grpc_buffer_pool *grpc_buffer_pool_from_channel_args(
return grpc_buffer_pool_create();
}
+static void *bp_copy(void *bp) {
+ grpc_buffer_pool_ref(bp);
+ return bp;
+}
+
+static void bp_destroy(void *bp) { grpc_buffer_pool_unref(bp); }
+
+static int bp_cmp(void *a, void *b) { return GPR_ICMP(a, b); }
+
+const grpc_arg_pointer_vtable *grpc_buffer_pool_arg_vtable(void) {
+ static const grpc_arg_pointer_vtable vtable = {bp_copy, bp_destroy, bp_cmp};
+ return &vtable;
+}
+
/*******************************************************************************
* grpc_buffer_user api
*/
diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c
index f901fcf962..f3548a1d74 100644
--- a/src/core/lib/iomgr/endpoint.c
+++ b/src/core/lib/iomgr/endpoint.c
@@ -69,3 +69,7 @@ char* grpc_endpoint_get_peer(grpc_endpoint* ep) {
grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) {
return ep->vtable->get_workqueue(ep);
}
+
+grpc_buffer_user* grpc_endpoint_get_buffer_user(grpc_endpoint* ep) {
+ return ep->vtable->get_buffer_user(ep);
+}
diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c
index be88d4a69a..06692fe969 100644
--- a/test/core/bad_client/bad_client.c
+++ b/test/core/bad_client/bad_client.c
@@ -114,7 +114,9 @@ void grpc_run_bad_client_test(
grpc_init();
/* Create endpoints */
- sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536);
+ grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
+ sfd = grpc_iomgr_create_endpoint_pair("fixture", buffer_pool, 65536);
+ grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool);
/* Create server, completion events */
a.server = grpc_server_create(NULL, NULL);
diff --git a/test/core/http/httpcli_test.c b/test/core/http/httpcli_test.c
index 38b32a3867..d1c6805b03 100644
--- a/test/core/http/httpcli_test.c
+++ b/test/core/http/httpcli_test.c
@@ -89,8 +89,11 @@ static void test_get(int port) {
grpc_http_response response;
memset(&response, 0, sizeof(response));
- grpc_httpcli_get(&exec_ctx, &g_context, &g_pops, &req, n_seconds_time(15),
+ grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
+ grpc_httpcli_get(&exec_ctx, &g_context, &g_pops, buffer_pool, &req,
+ n_seconds_time(15),
grpc_closure_create(on_finish, &response), &response);
+ grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool);
gpr_mu_lock(g_mu);
while (!g_done) {
grpc_pollset_worker *worker = NULL;
@@ -126,9 +129,11 @@ static void test_post(int port) {
grpc_http_response response;
memset(&response, 0, sizeof(response));
- grpc_httpcli_post(&exec_ctx, &g_context, &g_pops, &req, "hello", 5,
- n_seconds_time(15),
+ grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
+ grpc_httpcli_post(&exec_ctx, &g_context, &g_pops, buffer_pool, &req, "hello",
+ 5, n_seconds_time(15),
grpc_closure_create(on_finish, &response), &response);
+ grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool);
gpr_mu_lock(g_mu);
while (!g_done) {
grpc_pollset_worker *worker = NULL;
diff --git a/test/core/http/httpscli_test.c b/test/core/http/httpscli_test.c
index 359e557689..09ceec7208 100644
--- a/test/core/http/httpscli_test.c
+++ b/test/core/http/httpscli_test.c
@@ -90,8 +90,11 @@ static void test_get(int port) {
grpc_http_response response;
memset(&response, 0, sizeof(response));
- grpc_httpcli_get(&exec_ctx, &g_context, &g_pops, &req, n_seconds_time(15),
+ grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
+ grpc_httpcli_get(&exec_ctx, &g_context, &g_pops, buffer_pool, &req,
+ n_seconds_time(15),
grpc_closure_create(on_finish, &response), &response);
+ grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool);
gpr_mu_lock(g_mu);
while (!g_done) {
grpc_pollset_worker *worker = NULL;
@@ -128,9 +131,11 @@ static void test_post(int port) {
grpc_http_response response;
memset(&response, 0, sizeof(response));
- grpc_httpcli_post(&exec_ctx, &g_context, &g_pops, &req, "hello", 5,
- n_seconds_time(15),
+ grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
+ grpc_httpcli_post(&exec_ctx, &g_context, &g_pops, buffer_pool, &req, "hello",
+ 5, n_seconds_time(15),
grpc_closure_create(on_finish, &response), &response);
+ grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool);
gpr_mu_lock(g_mu);
while (!g_done) {
grpc_pollset_worker *worker = NULL;
diff --git a/test/core/internal_api_canaries/iomgr.c b/test/core/internal_api_canaries/iomgr.c
index 27d630623e..4f967ce751 100644
--- a/test/core/internal_api_canaries/iomgr.c
+++ b/test/core/internal_api_canaries/iomgr.c
@@ -84,6 +84,7 @@ static void test_code(void) {
grpc_endpoint_add_to_pollset_set,
grpc_endpoint_shutdown,
grpc_endpoint_destroy,
+ grpc_endpoint_get_buffer_user,
grpc_endpoint_get_peer};
endpoint.vtable = &vtable;
diff --git a/test/core/iomgr/endpoint_pair_test.c b/test/core/iomgr/endpoint_pair_test.c
index 99b86b6213..a7f00230c2 100644
--- a/test/core/iomgr/endpoint_pair_test.c
+++ b/test/core/iomgr/endpoint_pair_test.c
@@ -49,7 +49,10 @@ static grpc_endpoint_test_fixture create_fixture_endpoint_pair(
size_t slice_size) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_endpoint_test_fixture f;
- grpc_endpoint_pair p = grpc_iomgr_create_endpoint_pair("test", slice_size);
+ grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
+ grpc_endpoint_pair p =
+ grpc_iomgr_create_endpoint_pair("test", buffer_pool, slice_size);
+ grpc_buffer_pool_unref(buffer_pool);
f.client_ep = p.client;
f.server_ep = p.server;
diff --git a/test/core/iomgr/fd_conservation_posix_test.c b/test/core/iomgr/fd_conservation_posix_test.c
index bbb3f46497..4612ff7dc4 100644
--- a/test/core/iomgr/fd_conservation_posix_test.c
+++ b/test/core/iomgr/fd_conservation_posix_test.c
@@ -52,15 +52,18 @@ int main(int argc, char **argv) {
of descriptors */
rlim.rlim_cur = rlim.rlim_max = 10;
GPR_ASSERT(0 == setrlimit(RLIMIT_NOFILE, &rlim));
+ grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
for (i = 0; i < 100; i++) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- p = grpc_iomgr_create_endpoint_pair("test", 1);
+ p = grpc_iomgr_create_endpoint_pair("test", buffer_pool, 1);
grpc_endpoint_destroy(&exec_ctx, p.client);
grpc_endpoint_destroy(&exec_ctx, p.server);
grpc_exec_ctx_finish(&exec_ctx);
}
+ grpc_buffer_pool_unref(buffer_pool);
+
grpc_iomgr_shutdown();
return 0;
}
diff --git a/test/core/surface/concurrent_connectivity_test.c b/test/core/surface/concurrent_connectivity_test.c
index f7567f350d..c34ea2c8cb 100644
--- a/test/core/surface/concurrent_connectivity_test.c
+++ b/test/core/surface/concurrent_connectivity_test.c
@@ -113,7 +113,7 @@ void bad_server_thread(void *vargs) {
socklen_t addr_len = sizeof(addr);
int port;
grpc_tcp_server *s;
- grpc_error *error = grpc_tcp_server_create(NULL, NULL, &s);
+ grpc_error *error = grpc_tcp_server_create(&exec_ctx, NULL, NULL, &s);
GPR_ASSERT(error == GRPC_ERROR_NONE);
memset(&addr, 0, sizeof(addr));
addr.ss_family = AF_INET;
diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c
index 13e0e918fb..173a2d8963 100644
--- a/test/core/util/mock_endpoint.c
+++ b/test/core/util/mock_endpoint.c
@@ -43,6 +43,7 @@ typedef struct grpc_mock_endpoint {
gpr_slice_buffer read_buffer;
gpr_slice_buffer *on_read_out;
grpc_closure *on_read;
+ grpc_buffer_user buffer_user;
} grpc_mock_endpoint;
static void me_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
@@ -85,16 +86,28 @@ static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
gpr_mu_unlock(&m->mu);
}
-static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
- grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep;
+static void me_really_destroy(grpc_exec_ctx *exec_ctx, void *mp,
+ grpc_error *error) {
+ grpc_mock_endpoint *m = mp;
gpr_slice_buffer_destroy(&m->read_buffer);
gpr_free(m);
}
+static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
+ grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep;
+ grpc_buffer_user_destroy(exec_ctx, &m->buffer_user,
+ grpc_closure_create(me_really_destroy, m));
+}
+
static char *me_get_peer(grpc_endpoint *ep) {
return gpr_strdup("fake:mock_endpoint");
}
+static grpc_buffer_user *me_get_buffer_user(grpc_endpoint *ep) {
+ grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep;
+ return &m->buffer_user;
+}
+
static grpc_workqueue *me_get_workqueue(grpc_endpoint *ep) { return NULL; }
static const grpc_endpoint_vtable vtable = {
@@ -105,12 +118,15 @@ static const grpc_endpoint_vtable vtable = {
me_add_to_pollset_set,
me_shutdown,
me_destroy,
+ me_get_buffer_user,
me_get_peer,
};
-grpc_endpoint *grpc_mock_endpoint_create(void (*on_write)(gpr_slice slice)) {
+grpc_endpoint *grpc_mock_endpoint_create(void (*on_write)(gpr_slice slice),
+ grpc_buffer_pool *buffer_pool) {
grpc_mock_endpoint *m = gpr_malloc(sizeof(*m));
m->base.vtable = &vtable;
+ grpc_buffer_user_init(&m->buffer_user, buffer_pool);
gpr_slice_buffer_init(&m->read_buffer);
gpr_mu_init(&m->mu);
m->on_write = on_write;
diff --git a/test/core/util/mock_endpoint.h b/test/core/util/mock_endpoint.h
index 051af9866b..bb59a16f7a 100644
--- a/test/core/util/mock_endpoint.h
+++ b/test/core/util/mock_endpoint.h
@@ -36,7 +36,8 @@
#include "src/core/lib/iomgr/endpoint.h"
-grpc_endpoint *grpc_mock_endpoint_create(void (*on_write)(gpr_slice slice));
+grpc_endpoint *grpc_mock_endpoint_create(void (*on_write)(gpr_slice slice),
+ grpc_buffer_pool *buffer_pool);
void grpc_mock_endpoint_put_read(grpc_exec_ctx *exec_ctx,
grpc_endpoint *mock_endpoint, gpr_slice slice);
diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c
index 7ed9e97bd6..239cd3e275 100644
--- a/test/core/util/passthru_endpoint.c
+++ b/test/core/util/passthru_endpoint.c
@@ -44,6 +44,7 @@ typedef struct {
gpr_slice_buffer read_buffer;
gpr_slice_buffer *on_read_out;
grpc_closure *on_read;
+ grpc_buffer_user buffer_user;
} half;
struct passthru_endpoint {
@@ -122,7 +123,8 @@ static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
gpr_mu_unlock(&m->parent->mu);
}
-static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
+static void me_really_destroy(grpc_exec_ctx *exec_ctx, void *ep,
+ grpc_error *error) {
passthru_endpoint *p = ((half *)ep)->parent;
gpr_mu_lock(&p->mu);
if (0 == --p->halves) {
@@ -136,12 +138,23 @@ static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
}
}
+static void me_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
+ half *m = (half *)ep;
+ grpc_buffer_user_destroy(exec_ctx, &m->buffer_user,
+ grpc_closure_create(me_really_destroy, m));
+}
+
static char *me_get_peer(grpc_endpoint *ep) {
return gpr_strdup("fake:mock_endpoint");
}
static grpc_workqueue *me_get_workqueue(grpc_endpoint *ep) { return NULL; }
+static grpc_buffer_user *me_get_buffer_user(grpc_endpoint *ep) {
+ half *m = (half *)ep;
+ return &m->buffer_user;
+}
+
static const grpc_endpoint_vtable vtable = {
me_read,
me_write,
@@ -150,23 +163,27 @@ static const grpc_endpoint_vtable vtable = {
me_add_to_pollset_set,
me_shutdown,
me_destroy,
+ me_get_buffer_user,
me_get_peer,
};
-static void half_init(half *m, passthru_endpoint *parent) {
+static void half_init(half *m, passthru_endpoint *parent,
+ grpc_buffer_pool *buffer_pool) {
m->base.vtable = &vtable;
m->parent = parent;
gpr_slice_buffer_init(&m->read_buffer);
m->on_read = NULL;
+ grpc_buffer_user_init(&m->buffer_user, buffer_pool);
}
void grpc_passthru_endpoint_create(grpc_endpoint **client,
- grpc_endpoint **server) {
+ grpc_endpoint **server,
+ grpc_buffer_pool *buffer_pool) {
passthru_endpoint *m = gpr_malloc(sizeof(*m));
m->halves = 2;
m->shutdown = 0;
- half_init(&m->client, m);
- half_init(&m->server, m);
+ half_init(&m->client, m, buffer_pool);
+ half_init(&m->server, m, buffer_pool);
gpr_mu_init(&m->mu);
*client = &m->client.base;
*server = &m->server.base;
diff --git a/test/core/util/passthru_endpoint.h b/test/core/util/passthru_endpoint.h
index aa1d3a1763..9756315084 100644
--- a/test/core/util/passthru_endpoint.h
+++ b/test/core/util/passthru_endpoint.h
@@ -37,6 +37,7 @@
#include "src/core/lib/iomgr/endpoint.h"
void grpc_passthru_endpoint_create(grpc_endpoint **client,
- grpc_endpoint **server);
+ grpc_endpoint **server,
+ grpc_buffer_pool *buffer_pool);
#endif
diff --git a/test/core/util/port_server_client.c b/test/core/util/port_server_client.c
index a5c8c49650..dd444236e9 100644
--- a/test/core/util/port_server_client.c
+++ b/test/core/util/port_server_client.c
@@ -99,9 +99,11 @@ void grpc_free_port_using_server(char *server, int port) {
req.http.path = path;
grpc_httpcli_context_init(&context);
- grpc_httpcli_get(&exec_ctx, &context, &pr.pops, &req,
+ grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
+ grpc_httpcli_get(&exec_ctx, &context, &pr.pops, buffer_pool, &req,
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10),
grpc_closure_create(freed_port_from_server, &pr), &rsp);
+ grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool);
gpr_mu_lock(pr.mu);
while (!pr.done) {
grpc_pollset_worker *worker = NULL;
@@ -167,10 +169,12 @@ static void got_port_from_server(grpc_exec_ctx *exec_ctx, void *arg,
req.http.path = "/get";
grpc_http_response_destroy(&pr->response);
memset(&pr->response, 0, sizeof(pr->response));
- grpc_httpcli_get(exec_ctx, pr->ctx, &pr->pops, &req,
+ grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
+ grpc_httpcli_get(exec_ctx, pr->ctx, &pr->pops, buffer_pool, &req,
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10),
grpc_closure_create(got_port_from_server, pr),
&pr->response);
+ grpc_buffer_pool_internal_unref(exec_ctx, buffer_pool);
return;
}
GPR_ASSERT(response);
@@ -211,9 +215,12 @@ int grpc_pick_port_using_server(char *server) {
req.http.path = "/get";
grpc_httpcli_context_init(&context);
- grpc_httpcli_get(
- &exec_ctx, &context, &pr.pops, &req, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10),
- grpc_closure_create(got_port_from_server, &pr), &pr.response);
+ grpc_buffer_pool *buffer_pool = grpc_buffer_pool_create();
+ grpc_httpcli_get(&exec_ctx, &context, &pr.pops, buffer_pool, &req,
+ GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10),
+ grpc_closure_create(got_port_from_server, &pr),
+ &pr.response);
+ grpc_buffer_pool_internal_unref(&exec_ctx, buffer_pool);
grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_lock(pr.mu);
while (pr.port == -1) {
diff --git a/test/core/util/test_tcp_server.c b/test/core/util/test_tcp_server.c
index 8a0b3932d8..6890da4730 100644
--- a/test/core/util/test_tcp_server.c
+++ b/test/core/util/test_tcp_server.c
@@ -72,8 +72,8 @@ void test_tcp_server_start(test_tcp_server *server, int port) {
addr.sin_port = htons((uint16_t)port);
memset(&addr.sin_addr, 0, sizeof(addr.sin_addr));
- grpc_error *error = grpc_tcp_server_create(&server->shutdown_complete, NULL,
- &server->tcp_server);
+ grpc_error *error = grpc_tcp_server_create(
+ &exec_ctx, &server->shutdown_complete, NULL, &server->tcp_server);
GPR_ASSERT(error == GRPC_ERROR_NONE);
error = grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr),
&port_added);