From 04cc8be23362025625c787ecce69514087e2fc2b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 10 Feb 2015 16:11:22 -0800 Subject: First draft registered methods --- src/core/surface/server.c | 152 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 115 insertions(+), 37 deletions(-) (limited to 'src/core/surface') diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 972ac28cd2..81eaf4fc94 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -99,7 +99,7 @@ typedef struct { struct registered_method { char *method; char *host; - call_link pending; + call_data *pending; requested_call_array requested; registered_method *next; }; @@ -118,6 +118,9 @@ struct channel_data { /* linked list of all channels on a server */ channel_data *next; channel_data *prev; + channel_registered_method *registered_methods; + gpr_uint32 registered_method_slots; + gpr_uint32 registered_method_max_probes; }; struct grpc_server { @@ -167,7 +170,7 @@ struct call_data { legacy_data *legacy; - gpr_uint8 included[CALL_LIST_COUNT]; + call_data **root[CALL_LIST_COUNT]; call_link links[CALL_LIST_COUNT]; }; @@ -180,30 +183,30 @@ static void begin_call(grpc_server *server, call_data *calld, requested_call *rc); static void fail_call(grpc_server *server, requested_call *rc); -static int call_list_join(grpc_server *server, call_data *call, +static int call_list_join(call_data **root, call_data *call, call_list list) { - if (call->included[list]) return 0; - call->included[list] = 1; - if (!server->lists[list]) { - server->lists[list] = call; + GPR_ASSERT(!call->root[list]); + call->root[list] = root; + if (!*root) { + *root = call; call->links[list].next = call->links[list].prev = call; } else { - call->links[list].next = server->lists[list]; - call->links[list].prev = server->lists[list]->links[list].prev; + call->links[list].next = *root; + call->links[list].prev = (*root)->links[list].prev; call->links[list].next->links[list].prev = call->links[list].prev->links[list].next = call; } return 1; } -static call_data *call_list_remove_head(grpc_server *server, call_list list) { - call_data *out = server->lists[list]; +static call_data *call_list_remove_head(call_data **root, call_list list) { + call_data *out = *root; if (out) { - out->included[list] = 0; + out->root[list] = NULL; if (out->links[list].next == out) { - server->lists[list] = NULL; + *root = NULL; } else { - server->lists[list] = out->links[list].next; + *root = out->links[list].next; out->links[list].next->links[list].prev = out->links[list].prev; out->links[list].prev->links[list].next = out->links[list].next; } @@ -211,18 +214,18 @@ static call_data *call_list_remove_head(grpc_server *server, call_list list) { return out; } -static int call_list_remove(grpc_server *server, call_data *call, - call_list list) { - if (!call->included[list]) return 0; - call->included[list] = 0; - if (server->lists[list] == call) { - server->lists[list] = call->links[list].next; - if (server->lists[list] == call) { - server->lists[list] = NULL; +static int call_list_remove(call_data *call, call_list list) { + call_data **root = call->root[list]; + if (root == NULL) return 0; + call->root[list] = NULL; + if (*root == call) { + *root = call->links[list].next; + if (*root == call) { + *root = NULL; return 1; } } - GPR_ASSERT(server->lists[list] != call); + GPR_ASSERT(*root != call); call->links[list].next->links[list].prev = call->links[list].prev; call->links[list].prev->links[list].next = call->links[list].next; return 1; @@ -283,23 +286,53 @@ static void destroy_channel(channel_data *chand) { grpc_iomgr_add_callback(finish_destroy_channel, chand); } +static void finish_start_new_rpc_and_unlock(grpc_server *server, grpc_call_element *elem, call_data **pending_root, requested_call_array *array) { + requested_call rc; + call_data *calld = elem->call_data; + if (array->count == 0) { + calld->state = PENDING; + call_list_join(pending_root, calld, PENDING_START); + gpr_mu_unlock(&server->mu); + } else { + rc = server->requested_calls.calls[--server->requested_calls.count]; + calld->state = ACTIVATED; + gpr_mu_unlock(&server->mu); + begin_call(server, calld, &rc); + } +} + static void start_new_rpc(grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; grpc_server *server = chand->server; + gpr_uint32 i; + gpr_uint32 hash; + channel_registered_method *rm; gpr_mu_lock(&server->mu); - if (server->requested_calls.count > 0) { - requested_call rc = - server->requested_calls.calls[--server->requested_calls.count]; - calld->state = ACTIVATED; - gpr_mu_unlock(&server->mu); - begin_call(server, calld, &rc); - } else { - calld->state = PENDING; - call_list_join(server, calld, PENDING_START); - gpr_mu_unlock(&server->mu); + if (chand->registered_methods && calld->path && calld->host) { + /* check for an exact match with host */ + hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash); + for (i = 0; i < chand->registered_method_max_probes; i++) { + rm = &chand->registered_methods[(hash + i) % chand->registered_method_slots]; + if (!rm) break; + if (rm->host != calld->host) continue; + if (rm->method != calld->path) continue; + finish_start_new_rpc_and_unlock(server, elem, &rm->server_registered_method->pending, &rm->server_registered_method->requested); + return; + } + /* check for a wildcard method definition (no host set) */ + hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash); + for (i = 0; i < chand->registered_method_max_probes; i++) { + rm = &chand->registered_methods[(hash + i) % chand->registered_method_slots]; + if (!rm) break; + if (rm->host != NULL) continue; + if (rm->method != calld->path) continue; + finish_start_new_rpc_and_unlock(server, elem, &rm->server_registered_method->pending, &rm->server_registered_method->requested); + return; + } } + finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START], &server->requested_calls); } static void kill_zombie(void *elem, int success) { @@ -314,7 +347,7 @@ static void stream_closed(grpc_call_element *elem) { case ACTIVATED: break; case PENDING: - call_list_remove(chand->server, calld, PENDING_START); + call_list_remove(calld, PENDING_START); /* fallthrough intended */ case NOT_STARTED: calld->state = ZOMBIED; @@ -445,7 +478,7 @@ static void init_call_elem(grpc_call_element *elem, calld->call = grpc_call_from_top_element(elem); gpr_mu_lock(&chand->server->mu); - call_list_join(chand->server, calld, ALL_CALLS); + call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS); gpr_mu_unlock(&chand->server->mu); server_ref(chand->server); @@ -458,7 +491,7 @@ static void destroy_call_elem(grpc_call_element *elem) { gpr_mu_lock(&chand->server->mu); for (i = 0; i < CALL_LIST_COUNT; i++) { - call_list_remove(chand->server, elem->call_data, i); + call_list_remove(elem->call_data, i); } if (chand->server->shutdown && chand->server->have_shutdown_tag && chand->server->lists[ALL_CALLS] == NULL) { @@ -493,6 +526,7 @@ static void init_channel_elem(grpc_channel_element *elem, chand->path_key = grpc_mdstr_from_string(metadata_context, ":path"); chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority"); chand->next = chand->prev = chand; + chand->registered_methods = NULL; } static void destroy_channel_elem(grpc_channel_element *elem) { @@ -600,8 +634,18 @@ grpc_transport_setup_result grpc_server_setup_transport( grpc_channel_filter const **filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_filters); size_t i; + size_t num_registered_methods; + size_t alloc; + registered_method *rm; + channel_registered_method *crm; grpc_channel *channel; channel_data *chand; + grpc_mdstr *host; + grpc_mdstr *method; + gpr_uint32 hash; + gpr_uint32 slots; + gpr_uint32 probes; + gpr_uint32 max_probes = 0; for (i = 0; i < s->channel_filter_count; i++) { filters[i] = s->channel_filters[i]; @@ -621,6 +665,32 @@ grpc_transport_setup_result grpc_server_setup_transport( server_ref(s); chand->channel = channel; + num_registered_methods = 0; + for (rm = s->registered_methods; rm; rm = rm->next) { + num_registered_methods++; + } + /* build a lookup table phrased in terms of mdstr's in this channels context + to quickly find registered methods */ + if (num_registered_methods > 0) { + slots = 2 * num_registered_methods; + alloc = sizeof(channel_registered_method) * slots; + chand->registered_methods = gpr_malloc(alloc); + memset(chand->registered_methods, 0, alloc); + for (rm = s->registered_methods; rm; rm = rm->next) { + host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL; + method = grpc_mdstr_from_string(mdctx, rm->host); + hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash); + for (probes = 0; chand->registered_methods[(hash + probes) % slots].server_registered_method != NULL; probes++); + if (probes > max_probes) max_probes = probes; + crm = &chand->registered_methods[(hash + probes) % slots]; + crm->server_registered_method = rm; + crm->host = host; + crm->method = method; + } + chand->registered_method_slots = slots; + chand->registered_method_max_probes = max_probes; + } + gpr_mu_lock(&s->mu); chand->next = &s->root_channel_data; chand->prev = chand->next->prev; @@ -752,7 +822,15 @@ static grpc_call_error queue_call_request(grpc_server *server, fail_call(server, rc); return GRPC_CALL_OK; } - calld = call_list_remove_head(server, PENDING_START); + switch (rc->type) { + case LEGACY_CALL: + case BATCH_CALL: + calld = call_list_remove_head(&server->lists[PENDING_START], PENDING_START); + break; + case REGISTERED_CALL: + calld = call_list_remove_head(&rc->data.registered.registered_method->pending, PENDING_START); + break; + } if (calld) { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; -- cgit v1.2.3