aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/call.c3
-rw-r--r--src/core/surface/channel.c7
-rw-r--r--src/core/surface/channel.h3
-rw-r--r--src/core/surface/channel_connectivity.c3
-rw-r--r--src/core/surface/channel_create.c17
-rw-r--r--src/core/surface/lame_client.c3
-rw-r--r--src/core/surface/secure_channel_create.c21
-rw-r--r--src/core/surface/server.c31
-rw-r--r--src/core/surface/server.h1
-rw-r--r--src/core/surface/server_chttp2.c9
10 files changed, 60 insertions, 38 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 4168c2ef0c..c2b3040319 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -499,7 +499,8 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
} else {
c->destroy_closure.cb = destroy_call;
c->destroy_closure.cb_arg = c;
- grpc_iomgr_add_callback(&c->destroy_closure);
+ grpc_workqueue_push(grpc_channel_get_workqueue(c->channel),
+ &c->destroy_closure, 1);
}
}
}
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index a89523b3ab..bf4aee190f 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -79,6 +79,7 @@ struct grpc_channel {
registered_call *registered_calls;
grpc_iomgr_closure destroy_closure;
char *target;
+ grpc_workqueue *workqueue;
};
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
@@ -92,7 +93,8 @@ struct grpc_channel {
grpc_channel *grpc_channel_create_from_filters(
const char *target, const grpc_channel_filter **filters, size_t num_filters,
- const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) {
+ const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_workqueue *workqueue,
+ int is_client) {
size_t i;
size_t size =
sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters);
@@ -104,6 +106,7 @@ grpc_channel *grpc_channel_create_from_filters(
/* decremented by grpc_channel_destroy */
gpr_ref_init(&channel->refs, 1);
channel->metadata_context = mdctx;
+ channel->workqueue = workqueue;
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status", 0);
channel->grpc_compression_algorithm_string =
grpc_mdstr_from_string(mdctx, "grpc-encoding", 0);
@@ -311,7 +314,7 @@ void grpc_channel_internal_unref(grpc_channel *channel) {
if (gpr_unref(&channel->refs)) {
channel->destroy_closure.cb = destroy_channel;
channel->destroy_closure.cb_arg = channel;
- grpc_iomgr_add_callback(&channel->destroy_closure);
+ grpc_workqueue_push(channel->workqueue, &channel->destroy_closure, 1);
}
}
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index 593faec7df..9fc821d64b 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -40,7 +40,8 @@
grpc_channel *grpc_channel_create_from_filters(
const char *target, const grpc_channel_filter **filters, size_t count,
- const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client);
+ const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_workqueue *workqueue,
+ int is_client);
/** Get a (borrowed) pointer to this channels underlying channel stack */
grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index 88a7c16598..12b15f353f 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -176,7 +176,8 @@ void grpc_channel_watch_connectivity_state(
"grpc_channel_watch_connectivity_state called on something that is "
"not a client channel, but '%s'",
client_channel_elem->filter->name);
- grpc_iomgr_add_delayed_callback(&w->on_complete, 1);
+ grpc_workqueue_push(grpc_channel_get_workqueue(channel), &w->on_complete,
+ 1);
} else {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity");
grpc_client_channel_add_interested_party(client_channel_elem,
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 9e2cf1cf66..7a4ec00abb 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -74,7 +74,8 @@ static void connected(void *arg, grpc_endpoint *tcp) {
grpc_iomgr_closure *notify;
if (tcp != NULL) {
c->result->transport = grpc_create_chttp2_transport(
- c->args.channel_args, tcp, c->args.metadata_context, 1);
+ c->args.channel_args, tcp, c->args.metadata_context, c->args.workqueue,
+ 1);
grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0);
GPR_ASSERT(c->result->transport);
c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *));
@@ -85,7 +86,7 @@ static void connected(void *arg, grpc_endpoint *tcp) {
}
notify = c->notify;
c->notify = NULL;
- grpc_iomgr_add_callback(notify);
+ notify->cb(notify->cb_arg, 1);
}
static void connector_connect(grpc_connector *con,
@@ -98,8 +99,9 @@ static void connector_connect(grpc_connector *con,
c->notify = notify;
c->args = *args;
c->result = result;
- grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr,
- args->addr_len, args->deadline);
+ grpc_tcp_client_connect(connected, c, args->interested_parties,
+ args->workqueue, args->addr, args->addr_len,
+ args->deadline);
}
static const grpc_connector_vtable connector_vtable = {
@@ -164,6 +166,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
grpc_resolver *resolver;
subchannel_factory *f;
grpc_mdctx *mdctx = grpc_mdctx_create();
+ grpc_workqueue *workqueue = grpc_workqueue_create();
size_t n = 0;
GPR_ASSERT(!reserved);
if (grpc_channel_args_is_census_enabled(args)) {
@@ -173,8 +176,8 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
- channel =
- grpc_channel_create_from_filters(target, filters, n, args, mdctx, 1);
+ channel = grpc_channel_create_from_filters(target, filters, n, args, mdctx,
+ workqueue, 1);
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;
@@ -184,7 +187,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
f->merge_args = grpc_channel_args_copy(args);
f->master = channel;
GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory");
- resolver = grpc_resolver_create(target, &f->base);
+ resolver = grpc_resolver_create(target, &f->base, workqueue);
if (!resolver) {
return NULL;
}
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 80704cbf67..a5de900eff 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -150,7 +150,8 @@ grpc_channel *grpc_lame_client_channel_create(const char *target,
channel_data *chand;
static const grpc_channel_filter *filters[] = {&lame_filter};
channel = grpc_channel_create_from_filters(target, filters, 1, NULL,
- grpc_mdctx_create(), 1);
+ grpc_mdctx_create(),
+ grpc_workqueue_create(), 1);
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
GPR_ASSERT(elem->filter == &lame_filter);
chand = (channel_data *)elem->channel_data;
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index d141260421..ec077af8dd 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -57,7 +57,6 @@ typedef struct {
gpr_refcount refs;
grpc_channel_security_connector *security_connector;
- grpc_workqueue *workqueue;
grpc_iomgr_closure *notify;
grpc_connect_in_args args;
@@ -72,7 +71,6 @@ static void connector_ref(grpc_connector *con) {
static void connector_unref(grpc_connector *con) {
connector *c = (connector *)con;
if (gpr_unref(&c->refs)) {
- grpc_workqueue_unref(c->workqueue);
gpr_free(c);
}
}
@@ -88,7 +86,8 @@ static void on_secure_transport_setup_done(void *arg,
memset(c->result, 0, sizeof(*c->result));
} else {
c->result->transport = grpc_create_chttp2_transport(
- c->args.channel_args, secure_endpoint, c->args.metadata_context, 1);
+ c->args.channel_args, secure_endpoint, c->args.metadata_context,
+ c->args.workqueue, 1);
grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0);
c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2);
c->result->filters[0] = &grpc_http_client_filter;
@@ -124,8 +123,9 @@ static void connector_connect(grpc_connector *con,
c->notify = notify;
c->args = *args;
c->result = result;
- grpc_tcp_client_connect(connected, c, args->interested_parties, c->workqueue,
- args->addr, args->addr_len, args->deadline);
+ grpc_tcp_client_connect(connected, c, args->interested_parties,
+ args->workqueue, args->addr, args->addr_len,
+ args->deadline);
}
static const grpc_connector_vtable connector_vtable = {
@@ -167,8 +167,6 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
memset(c, 0, sizeof(*c));
c->base.vtable = &connector_vtable;
c->security_connector = f->security_connector;
- c->workqueue = grpc_channel_get_workqueue(f->master);
- grpc_workqueue_ref(c->workqueue);
gpr_ref_init(&c->refs, 1);
args->mdctx = f->mdctx;
args->args = final_args;
@@ -197,6 +195,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
grpc_channel_args *new_args_from_connector;
grpc_channel_security_connector *connector;
grpc_mdctx *mdctx;
+ grpc_workqueue *workqueue;
grpc_resolver *resolver;
subchannel_factory *f;
#define MAX_FILTERS 3
@@ -219,6 +218,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
"Failed to create security connector.");
}
mdctx = grpc_mdctx_create();
+ workqueue = grpc_workqueue_create();
connector_arg = grpc_security_connector_to_arg(&connector->base);
args_copy = grpc_channel_args_copy_and_add(
@@ -231,8 +231,8 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
- channel =
- grpc_channel_create_from_filters(target, filters, n, args_copy, mdctx, 1);
+ channel = grpc_channel_create_from_filters(target, filters, n, args_copy,
+ mdctx, workqueue, 1);
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;
@@ -244,8 +244,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
f->merge_args = grpc_channel_args_copy(args_copy);
f->master = channel;
GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory");
- resolver = grpc_resolver_create(target, &f->base,
- grpc_channel_get_workqueue(channel));
+ resolver = grpc_resolver_create(target, &f->base, workqueue);
if (!resolver) {
return NULL;
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 3d404f78a4..aba0f94fd4 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -219,6 +219,8 @@ struct grpc_server {
/** when did we print the last shutdown progress message */
gpr_timespec last_shutdown_message_time;
+
+ grpc_workqueue *workqueue;
};
#define SERVER_FROM_CALL_ELEM(elem) \
@@ -314,7 +316,7 @@ static void kill_zombie(void *elem, int success) {
}
static void request_matcher_zombify_all_pending_calls(
- request_matcher *request_matcher) {
+ request_matcher *request_matcher, grpc_workqueue *workqueue) {
while (request_matcher->pending_head) {
call_data *calld = request_matcher->pending_head;
request_matcher->pending_head = calld->pending_next;
@@ -324,7 +326,7 @@ static void request_matcher_zombify_all_pending_calls(
grpc_iomgr_closure_init(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
- grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+ grpc_workqueue_push(workqueue, &calld->kill_zombie_closure, 1);
}
}
@@ -363,6 +365,7 @@ static void server_delete(grpc_server *server) {
}
request_matcher_destroy(&server->unregistered_request_matcher);
gpr_stack_lockfree_destroy(server->request_freelist);
+ grpc_workqueue_unref(server->workqueue);
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
@@ -401,7 +404,8 @@ static void destroy_channel(channel_data *chand) {
maybe_finish_shutdown(chand->server);
chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
chand->finish_destroy_channel_closure.cb_arg = chand;
- grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure);
+ grpc_workqueue_push(chand->server->workqueue,
+ &chand->finish_destroy_channel_closure, 1);
}
static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
@@ -414,7 +418,7 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+ grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1);
return;
}
@@ -505,10 +509,11 @@ static void kill_pending_work_locked(grpc_server *server) {
registered_method *rm;
request_matcher_kill_requests(server, &server->unregistered_request_matcher);
request_matcher_zombify_all_pending_calls(
- &server->unregistered_request_matcher);
+ &server->unregistered_request_matcher, server->workqueue);
for (rm = server->registered_methods; rm; rm = rm->next) {
request_matcher_kill_requests(server, &rm->request_matcher);
- request_matcher_zombify_all_pending_calls(&rm->request_matcher);
+ request_matcher_zombify_all_pending_calls(&rm->request_matcher,
+ server->workqueue);
}
}
@@ -561,6 +566,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
static void server_on_recv(void *ptr, int success) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
gpr_timespec op_deadline;
if (success && !calld->got_initial_metadata) {
@@ -595,7 +601,8 @@ static void server_on_recv(void *ptr, int success) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+ grpc_workqueue_push(chand->server->workqueue,
+ &calld->kill_zombie_closure, 1);
} else {
gpr_mu_unlock(&calld->mu_state);
}
@@ -606,7 +613,8 @@ static void server_on_recv(void *ptr, int success) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+ grpc_workqueue_push(chand->server->workqueue,
+ &calld->kill_zombie_closure, 1);
} else if (calld->state == PENDING) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
@@ -799,6 +807,7 @@ grpc_server *grpc_server_create_from_filters(
gpr_ref_init(&server->internal_refcount, 1);
server->root_channel_data.next = server->root_channel_data.prev =
&server->root_channel_data;
+ server->workqueue = grpc_workqueue_create();
/* TODO(ctiller): expose a channel_arg for this */
server->max_requested_calls = 32768;
@@ -873,6 +882,7 @@ void grpc_server_start(grpc_server *server) {
server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
+ grpc_workqueue_add_to_pollset(server->workqueue, server->pollsets[i]);
}
for (l = server->listeners; l; l = l->next) {
@@ -883,6 +893,7 @@ void grpc_server_start(grpc_server *server) {
void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
grpc_channel_filter const **extra_filters,
size_t num_extra_filters, grpc_mdctx *mdctx,
+ grpc_workqueue *workqueue,
const grpc_channel_args *args) {
size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
grpc_channel_filter const **filters =
@@ -917,7 +928,7 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
}
channel = grpc_channel_create_from_filters(NULL, filters, num_filters, args,
- mdctx, 0);
+ mdctx, workqueue, 0);
chand = (channel_data *)grpc_channel_stack_element(
grpc_channel_get_channel_stack(channel), 0)
->channel_data;
@@ -1119,7 +1130,7 @@ static grpc_call_error queue_call_request(grpc_server *server,
grpc_iomgr_closure_init(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
- grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+ grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1);
} else {
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
index c638d682bb..1d82d07ced 100644
--- a/src/core/surface/server.h
+++ b/src/core/surface/server.h
@@ -58,6 +58,7 @@ void grpc_server_listener_destroy_done(void *server);
void grpc_server_setup_transport(grpc_server *server, grpc_transport *transport,
grpc_channel_filter const **extra_filters,
size_t num_extra_filters, grpc_mdctx *mdctx,
+ grpc_workqueue *workqueue,
const grpc_channel_args *args);
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server);
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index 4ab845bc00..91cf6ece9c 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -43,11 +43,11 @@
#include <grpc/support/useful.h>
static void setup_transport(void *server, grpc_transport *transport,
- grpc_mdctx *mdctx) {
+ grpc_mdctx *mdctx, grpc_workqueue *workqueue) {
static grpc_channel_filter const *extra_filters[] = {
&grpc_http_server_filter};
grpc_server_setup_transport(server, transport, extra_filters,
- GPR_ARRAY_SIZE(extra_filters), mdctx,
+ GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue,
grpc_server_get_channel_args(server));
}
@@ -60,9 +60,10 @@ static void new_transport(void *server, grpc_endpoint *tcp) {
* case.
*/
grpc_mdctx *mdctx = grpc_mdctx_create();
+ grpc_workqueue *workqueue = grpc_workqueue_create();
grpc_transport *transport = grpc_create_chttp2_transport(
- grpc_server_get_channel_args(server), tcp, mdctx, 0);
- setup_transport(server, transport, mdctx);
+ grpc_server_get_channel_args(server), tcp, mdctx, workqueue, 0);
+ setup_transport(server, transport, mdctx, workqueue);
grpc_chttp2_transport_start_reading(transport, NULL, 0);
}