diff options
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/call.c | 3 | ||||
-rw-r--r-- | src/core/surface/channel.c | 7 | ||||
-rw-r--r-- | src/core/surface/channel.h | 3 | ||||
-rw-r--r-- | src/core/surface/channel_connectivity.c | 3 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 17 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 3 | ||||
-rw-r--r-- | src/core/surface/secure_channel_create.c | 21 | ||||
-rw-r--r-- | src/core/surface/server.c | 31 | ||||
-rw-r--r-- | src/core/surface/server.h | 1 | ||||
-rw-r--r-- | src/core/surface/server_chttp2.c | 9 |
10 files changed, 60 insertions, 38 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 4168c2ef0c..c2b3040319 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -499,7 +499,8 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) { } else { c->destroy_closure.cb = destroy_call; c->destroy_closure.cb_arg = c; - grpc_iomgr_add_callback(&c->destroy_closure); + grpc_workqueue_push(grpc_channel_get_workqueue(c->channel), + &c->destroy_closure, 1); } } } diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index a89523b3ab..bf4aee190f 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -79,6 +79,7 @@ struct grpc_channel { registered_call *registered_calls; grpc_iomgr_closure destroy_closure; char *target; + grpc_workqueue *workqueue; }; #define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1)) @@ -92,7 +93,8 @@ struct grpc_channel { grpc_channel *grpc_channel_create_from_filters( const char *target, const grpc_channel_filter **filters, size_t num_filters, - const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) { + const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_workqueue *workqueue, + int is_client) { size_t i; size_t size = sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters); @@ -104,6 +106,7 @@ grpc_channel *grpc_channel_create_from_filters( /* decremented by grpc_channel_destroy */ gpr_ref_init(&channel->refs, 1); channel->metadata_context = mdctx; + channel->workqueue = workqueue; channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status", 0); channel->grpc_compression_algorithm_string = grpc_mdstr_from_string(mdctx, "grpc-encoding", 0); @@ -311,7 +314,7 @@ void grpc_channel_internal_unref(grpc_channel *channel) { if (gpr_unref(&channel->refs)) { channel->destroy_closure.cb = destroy_channel; channel->destroy_closure.cb_arg = channel; - grpc_iomgr_add_callback(&channel->destroy_closure); + grpc_workqueue_push(channel->workqueue, &channel->destroy_closure, 1); } } diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index 593faec7df..9fc821d64b 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -40,7 +40,8 @@ grpc_channel *grpc_channel_create_from_filters( const char *target, const grpc_channel_filter **filters, size_t count, - const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client); + const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_workqueue *workqueue, + int is_client); /** Get a (borrowed) pointer to this channels underlying channel stack */ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel); diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c index 88a7c16598..12b15f353f 100644 --- a/src/core/surface/channel_connectivity.c +++ b/src/core/surface/channel_connectivity.c @@ -176,7 +176,8 @@ void grpc_channel_watch_connectivity_state( "grpc_channel_watch_connectivity_state called on something that is " "not a client channel, but '%s'", client_channel_elem->filter->name); - grpc_iomgr_add_delayed_callback(&w->on_complete, 1); + grpc_workqueue_push(grpc_channel_get_workqueue(channel), &w->on_complete, + 1); } else { GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity"); grpc_client_channel_add_interested_party(client_channel_elem, diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 9e2cf1cf66..7a4ec00abb 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -74,7 +74,8 @@ static void connected(void *arg, grpc_endpoint *tcp) { grpc_iomgr_closure *notify; if (tcp != NULL) { c->result->transport = grpc_create_chttp2_transport( - c->args.channel_args, tcp, c->args.metadata_context, 1); + c->args.channel_args, tcp, c->args.metadata_context, c->args.workqueue, + 1); grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0); GPR_ASSERT(c->result->transport); c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *)); @@ -85,7 +86,7 @@ static void connected(void *arg, grpc_endpoint *tcp) { } notify = c->notify; c->notify = NULL; - grpc_iomgr_add_callback(notify); + notify->cb(notify->cb_arg, 1); } static void connector_connect(grpc_connector *con, @@ -98,8 +99,9 @@ static void connector_connect(grpc_connector *con, c->notify = notify; c->args = *args; c->result = result; - grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr, - args->addr_len, args->deadline); + grpc_tcp_client_connect(connected, c, args->interested_parties, + args->workqueue, args->addr, args->addr_len, + args->deadline); } static const grpc_connector_vtable connector_vtable = { @@ -164,6 +166,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target, grpc_resolver *resolver; subchannel_factory *f; grpc_mdctx *mdctx = grpc_mdctx_create(); + grpc_workqueue *workqueue = grpc_workqueue_create(); size_t n = 0; GPR_ASSERT(!reserved); if (grpc_channel_args_is_census_enabled(args)) { @@ -173,8 +176,8 @@ grpc_channel *grpc_insecure_channel_create(const char *target, filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); - channel = - grpc_channel_create_from_filters(target, filters, n, args, mdctx, 1); + channel = grpc_channel_create_from_filters(target, filters, n, args, mdctx, + workqueue, 1); f = gpr_malloc(sizeof(*f)); f->base.vtable = &subchannel_factory_vtable; @@ -184,7 +187,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target, f->merge_args = grpc_channel_args_copy(args); f->master = channel; GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory"); - resolver = grpc_resolver_create(target, &f->base); + resolver = grpc_resolver_create(target, &f->base, workqueue); if (!resolver) { return NULL; } diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 80704cbf67..a5de900eff 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -150,7 +150,8 @@ grpc_channel *grpc_lame_client_channel_create(const char *target, channel_data *chand; static const grpc_channel_filter *filters[] = {&lame_filter}; channel = grpc_channel_create_from_filters(target, filters, 1, NULL, - grpc_mdctx_create(), 1); + grpc_mdctx_create(), + grpc_workqueue_create(), 1); elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); GPR_ASSERT(elem->filter == &lame_filter); chand = (channel_data *)elem->channel_data; diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index d141260421..ec077af8dd 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -57,7 +57,6 @@ typedef struct { gpr_refcount refs; grpc_channel_security_connector *security_connector; - grpc_workqueue *workqueue; grpc_iomgr_closure *notify; grpc_connect_in_args args; @@ -72,7 +71,6 @@ static void connector_ref(grpc_connector *con) { static void connector_unref(grpc_connector *con) { connector *c = (connector *)con; if (gpr_unref(&c->refs)) { - grpc_workqueue_unref(c->workqueue); gpr_free(c); } } @@ -88,7 +86,8 @@ static void on_secure_transport_setup_done(void *arg, memset(c->result, 0, sizeof(*c->result)); } else { c->result->transport = grpc_create_chttp2_transport( - c->args.channel_args, secure_endpoint, c->args.metadata_context, 1); + c->args.channel_args, secure_endpoint, c->args.metadata_context, + c->args.workqueue, 1); grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0); c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2); c->result->filters[0] = &grpc_http_client_filter; @@ -124,8 +123,9 @@ static void connector_connect(grpc_connector *con, c->notify = notify; c->args = *args; c->result = result; - grpc_tcp_client_connect(connected, c, args->interested_parties, c->workqueue, - args->addr, args->addr_len, args->deadline); + grpc_tcp_client_connect(connected, c, args->interested_parties, + args->workqueue, args->addr, args->addr_len, + args->deadline); } static const grpc_connector_vtable connector_vtable = { @@ -167,8 +167,6 @@ static grpc_subchannel *subchannel_factory_create_subchannel( memset(c, 0, sizeof(*c)); c->base.vtable = &connector_vtable; c->security_connector = f->security_connector; - c->workqueue = grpc_channel_get_workqueue(f->master); - grpc_workqueue_ref(c->workqueue); gpr_ref_init(&c->refs, 1); args->mdctx = f->mdctx; args->args = final_args; @@ -197,6 +195,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, grpc_channel_args *new_args_from_connector; grpc_channel_security_connector *connector; grpc_mdctx *mdctx; + grpc_workqueue *workqueue; grpc_resolver *resolver; subchannel_factory *f; #define MAX_FILTERS 3 @@ -219,6 +218,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, "Failed to create security connector."); } mdctx = grpc_mdctx_create(); + workqueue = grpc_workqueue_create(); connector_arg = grpc_security_connector_to_arg(&connector->base); args_copy = grpc_channel_args_copy_and_add( @@ -231,8 +231,8 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); - channel = - grpc_channel_create_from_filters(target, filters, n, args_copy, mdctx, 1); + channel = grpc_channel_create_from_filters(target, filters, n, args_copy, + mdctx, workqueue, 1); f = gpr_malloc(sizeof(*f)); f->base.vtable = &subchannel_factory_vtable; @@ -244,8 +244,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, f->merge_args = grpc_channel_args_copy(args_copy); f->master = channel; GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory"); - resolver = grpc_resolver_create(target, &f->base, - grpc_channel_get_workqueue(channel)); + resolver = grpc_resolver_create(target, &f->base, workqueue); if (!resolver) { return NULL; } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 3d404f78a4..aba0f94fd4 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -219,6 +219,8 @@ struct grpc_server { /** when did we print the last shutdown progress message */ gpr_timespec last_shutdown_message_time; + + grpc_workqueue *workqueue; }; #define SERVER_FROM_CALL_ELEM(elem) \ @@ -314,7 +316,7 @@ static void kill_zombie(void *elem, int success) { } static void request_matcher_zombify_all_pending_calls( - request_matcher *request_matcher) { + request_matcher *request_matcher, grpc_workqueue *workqueue) { while (request_matcher->pending_head) { call_data *calld = request_matcher->pending_head; request_matcher->pending_head = calld->pending_next; @@ -324,7 +326,7 @@ static void request_matcher_zombify_all_pending_calls( grpc_iomgr_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_iomgr_add_callback(&calld->kill_zombie_closure); + grpc_workqueue_push(workqueue, &calld->kill_zombie_closure, 1); } } @@ -363,6 +365,7 @@ static void server_delete(grpc_server *server) { } request_matcher_destroy(&server->unregistered_request_matcher); gpr_stack_lockfree_destroy(server->request_freelist); + grpc_workqueue_unref(server->workqueue); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); @@ -401,7 +404,8 @@ static void destroy_channel(channel_data *chand) { maybe_finish_shutdown(chand->server); chand->finish_destroy_channel_closure.cb = finish_destroy_channel; chand->finish_destroy_channel_closure.cb_arg = chand; - grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure); + grpc_workqueue_push(chand->server->workqueue, + &chand->finish_destroy_channel_closure, 1); } static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, @@ -414,7 +418,7 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_iomgr_add_callback(&calld->kill_zombie_closure); + grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1); return; } @@ -505,10 +509,11 @@ static void kill_pending_work_locked(grpc_server *server) { registered_method *rm; request_matcher_kill_requests(server, &server->unregistered_request_matcher); request_matcher_zombify_all_pending_calls( - &server->unregistered_request_matcher); + &server->unregistered_request_matcher, server->workqueue); for (rm = server->registered_methods; rm; rm = rm->next) { request_matcher_kill_requests(server, &rm->request_matcher); - request_matcher_zombify_all_pending_calls(&rm->request_matcher); + request_matcher_zombify_all_pending_calls(&rm->request_matcher, + server->workqueue); } } @@ -561,6 +566,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { static void server_on_recv(void *ptr, int success) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; gpr_timespec op_deadline; if (success && !calld->got_initial_metadata) { @@ -595,7 +601,8 @@ static void server_on_recv(void *ptr, int success) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_iomgr_add_callback(&calld->kill_zombie_closure); + grpc_workqueue_push(chand->server->workqueue, + &calld->kill_zombie_closure, 1); } else { gpr_mu_unlock(&calld->mu_state); } @@ -606,7 +613,8 @@ static void server_on_recv(void *ptr, int success) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_iomgr_add_callback(&calld->kill_zombie_closure); + grpc_workqueue_push(chand->server->workqueue, + &calld->kill_zombie_closure, 1); } else if (calld->state == PENDING) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); @@ -799,6 +807,7 @@ grpc_server *grpc_server_create_from_filters( gpr_ref_init(&server->internal_refcount, 1); server->root_channel_data.next = server->root_channel_data.prev = &server->root_channel_data; + server->workqueue = grpc_workqueue_create(); /* TODO(ctiller): expose a channel_arg for this */ server->max_requested_calls = 32768; @@ -873,6 +882,7 @@ void grpc_server_start(grpc_server *server) { server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); for (i = 0; i < server->cq_count; i++) { server->pollsets[i] = grpc_cq_pollset(server->cqs[i]); + grpc_workqueue_add_to_pollset(server->workqueue, server->pollsets[i]); } for (l = server->listeners; l; l = l->next) { @@ -883,6 +893,7 @@ void grpc_server_start(grpc_server *server) { void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, grpc_channel_filter const **extra_filters, size_t num_extra_filters, grpc_mdctx *mdctx, + grpc_workqueue *workqueue, const grpc_channel_args *args) { size_t num_filters = s->channel_filter_count + num_extra_filters + 1; grpc_channel_filter const **filters = @@ -917,7 +928,7 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, } channel = grpc_channel_create_from_filters(NULL, filters, num_filters, args, - mdctx, 0); + mdctx, workqueue, 0); chand = (channel_data *)grpc_channel_stack_element( grpc_channel_get_channel_stack(channel), 0) ->channel_data; @@ -1119,7 +1130,7 @@ static grpc_call_error queue_call_request(grpc_server *server, grpc_iomgr_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_iomgr_add_callback(&calld->kill_zombie_closure); + grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1); } else { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; diff --git a/src/core/surface/server.h b/src/core/surface/server.h index c638d682bb..1d82d07ced 100644 --- a/src/core/surface/server.h +++ b/src/core/surface/server.h @@ -58,6 +58,7 @@ void grpc_server_listener_destroy_done(void *server); void grpc_server_setup_transport(grpc_server *server, grpc_transport *transport, grpc_channel_filter const **extra_filters, size_t num_extra_filters, grpc_mdctx *mdctx, + grpc_workqueue *workqueue, const grpc_channel_args *args); const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server); diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index 4ab845bc00..91cf6ece9c 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -43,11 +43,11 @@ #include <grpc/support/useful.h> static void setup_transport(void *server, grpc_transport *transport, - grpc_mdctx *mdctx) { + grpc_mdctx *mdctx, grpc_workqueue *workqueue) { static grpc_channel_filter const *extra_filters[] = { &grpc_http_server_filter}; grpc_server_setup_transport(server, transport, extra_filters, - GPR_ARRAY_SIZE(extra_filters), mdctx, + GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue, grpc_server_get_channel_args(server)); } @@ -60,9 +60,10 @@ static void new_transport(void *server, grpc_endpoint *tcp) { * case. */ grpc_mdctx *mdctx = grpc_mdctx_create(); + grpc_workqueue *workqueue = grpc_workqueue_create(); grpc_transport *transport = grpc_create_chttp2_transport( - grpc_server_get_channel_args(server), tcp, mdctx, 0); - setup_transport(server, transport, mdctx); + grpc_server_get_channel_args(server), tcp, mdctx, workqueue, 0); + setup_transport(server, transport, mdctx, workqueue); grpc_chttp2_transport_start_reading(transport, NULL, 0); } |