aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/surface/server.c152
1 files changed, 115 insertions, 37 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 972ac28cd2..81eaf4fc94 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -99,7 +99,7 @@ typedef struct {
struct registered_method {
char *method;
char *host;
- call_link pending;
+ call_data *pending;
requested_call_array requested;
registered_method *next;
};
@@ -118,6 +118,9 @@ struct channel_data {
/* linked list of all channels on a server */
channel_data *next;
channel_data *prev;
+ channel_registered_method *registered_methods;
+ gpr_uint32 registered_method_slots;
+ gpr_uint32 registered_method_max_probes;
};
struct grpc_server {
@@ -167,7 +170,7 @@ struct call_data {
legacy_data *legacy;
- gpr_uint8 included[CALL_LIST_COUNT];
+ call_data **root[CALL_LIST_COUNT];
call_link links[CALL_LIST_COUNT];
};
@@ -180,30 +183,30 @@ static void begin_call(grpc_server *server, call_data *calld,
requested_call *rc);
static void fail_call(grpc_server *server, requested_call *rc);
-static int call_list_join(grpc_server *server, call_data *call,
+static int call_list_join(call_data **root, call_data *call,
call_list list) {
- if (call->included[list]) return 0;
- call->included[list] = 1;
- if (!server->lists[list]) {
- server->lists[list] = call;
+ GPR_ASSERT(!call->root[list]);
+ call->root[list] = root;
+ if (!*root) {
+ *root = call;
call->links[list].next = call->links[list].prev = call;
} else {
- call->links[list].next = server->lists[list];
- call->links[list].prev = server->lists[list]->links[list].prev;
+ call->links[list].next = *root;
+ call->links[list].prev = (*root)->links[list].prev;
call->links[list].next->links[list].prev =
call->links[list].prev->links[list].next = call;
}
return 1;
}
-static call_data *call_list_remove_head(grpc_server *server, call_list list) {
- call_data *out = server->lists[list];
+static call_data *call_list_remove_head(call_data **root, call_list list) {
+ call_data *out = *root;
if (out) {
- out->included[list] = 0;
+ out->root[list] = NULL;
if (out->links[list].next == out) {
- server->lists[list] = NULL;
+ *root = NULL;
} else {
- server->lists[list] = out->links[list].next;
+ *root = out->links[list].next;
out->links[list].next->links[list].prev = out->links[list].prev;
out->links[list].prev->links[list].next = out->links[list].next;
}
@@ -211,18 +214,18 @@ static call_data *call_list_remove_head(grpc_server *server, call_list list) {
return out;
}
-static int call_list_remove(grpc_server *server, call_data *call,
- call_list list) {
- if (!call->included[list]) return 0;
- call->included[list] = 0;
- if (server->lists[list] == call) {
- server->lists[list] = call->links[list].next;
- if (server->lists[list] == call) {
- server->lists[list] = NULL;
+static int call_list_remove(call_data *call, call_list list) {
+ call_data **root = call->root[list];
+ if (root == NULL) return 0;
+ call->root[list] = NULL;
+ if (*root == call) {
+ *root = call->links[list].next;
+ if (*root == call) {
+ *root = NULL;
return 1;
}
}
- GPR_ASSERT(server->lists[list] != call);
+ GPR_ASSERT(*root != call);
call->links[list].next->links[list].prev = call->links[list].prev;
call->links[list].prev->links[list].next = call->links[list].next;
return 1;
@@ -283,23 +286,53 @@ static void destroy_channel(channel_data *chand) {
grpc_iomgr_add_callback(finish_destroy_channel, chand);
}
+static void finish_start_new_rpc_and_unlock(grpc_server *server, grpc_call_element *elem, call_data **pending_root, requested_call_array *array) {
+ requested_call rc;
+ call_data *calld = elem->call_data;
+ if (array->count == 0) {
+ calld->state = PENDING;
+ call_list_join(pending_root, calld, PENDING_START);
+ gpr_mu_unlock(&server->mu);
+ } else {
+ rc = server->requested_calls.calls[--server->requested_calls.count];
+ calld->state = ACTIVATED;
+ gpr_mu_unlock(&server->mu);
+ begin_call(server, calld, &rc);
+ }
+}
+
static void start_new_rpc(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
grpc_server *server = chand->server;
+ gpr_uint32 i;
+ gpr_uint32 hash;
+ channel_registered_method *rm;
gpr_mu_lock(&server->mu);
- if (server->requested_calls.count > 0) {
- requested_call rc =
- server->requested_calls.calls[--server->requested_calls.count];
- calld->state = ACTIVATED;
- gpr_mu_unlock(&server->mu);
- begin_call(server, calld, &rc);
- } else {
- calld->state = PENDING;
- call_list_join(server, calld, PENDING_START);
- gpr_mu_unlock(&server->mu);
+ if (chand->registered_methods && calld->path && calld->host) {
+ /* check for an exact match with host */
+ hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
+ for (i = 0; i < chand->registered_method_max_probes; i++) {
+ rm = &chand->registered_methods[(hash + i) % chand->registered_method_slots];
+ if (!rm) break;
+ if (rm->host != calld->host) continue;
+ if (rm->method != calld->path) continue;
+ finish_start_new_rpc_and_unlock(server, elem, &rm->server_registered_method->pending, &rm->server_registered_method->requested);
+ return;
+ }
+ /* check for a wildcard method definition (no host set) */
+ hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash);
+ for (i = 0; i < chand->registered_method_max_probes; i++) {
+ rm = &chand->registered_methods[(hash + i) % chand->registered_method_slots];
+ if (!rm) break;
+ if (rm->host != NULL) continue;
+ if (rm->method != calld->path) continue;
+ finish_start_new_rpc_and_unlock(server, elem, &rm->server_registered_method->pending, &rm->server_registered_method->requested);
+ return;
+ }
}
+ finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START], &server->requested_calls);
}
static void kill_zombie(void *elem, int success) {
@@ -314,7 +347,7 @@ static void stream_closed(grpc_call_element *elem) {
case ACTIVATED:
break;
case PENDING:
- call_list_remove(chand->server, calld, PENDING_START);
+ call_list_remove(calld, PENDING_START);
/* fallthrough intended */
case NOT_STARTED:
calld->state = ZOMBIED;
@@ -445,7 +478,7 @@ static void init_call_elem(grpc_call_element *elem,
calld->call = grpc_call_from_top_element(elem);
gpr_mu_lock(&chand->server->mu);
- call_list_join(chand->server, calld, ALL_CALLS);
+ call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
gpr_mu_unlock(&chand->server->mu);
server_ref(chand->server);
@@ -458,7 +491,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
gpr_mu_lock(&chand->server->mu);
for (i = 0; i < CALL_LIST_COUNT; i++) {
- call_list_remove(chand->server, elem->call_data, i);
+ call_list_remove(elem->call_data, i);
}
if (chand->server->shutdown && chand->server->have_shutdown_tag &&
chand->server->lists[ALL_CALLS] == NULL) {
@@ -493,6 +526,7 @@ static void init_channel_elem(grpc_channel_element *elem,
chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
chand->next = chand->prev = chand;
+ chand->registered_methods = NULL;
}
static void destroy_channel_elem(grpc_channel_element *elem) {
@@ -600,8 +634,18 @@ grpc_transport_setup_result grpc_server_setup_transport(
grpc_channel_filter const **filters =
gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
size_t i;
+ size_t num_registered_methods;
+ size_t alloc;
+ registered_method *rm;
+ channel_registered_method *crm;
grpc_channel *channel;
channel_data *chand;
+ grpc_mdstr *host;
+ grpc_mdstr *method;
+ gpr_uint32 hash;
+ gpr_uint32 slots;
+ gpr_uint32 probes;
+ gpr_uint32 max_probes = 0;
for (i = 0; i < s->channel_filter_count; i++) {
filters[i] = s->channel_filters[i];
@@ -621,6 +665,32 @@ grpc_transport_setup_result grpc_server_setup_transport(
server_ref(s);
chand->channel = channel;
+ num_registered_methods = 0;
+ for (rm = s->registered_methods; rm; rm = rm->next) {
+ num_registered_methods++;
+ }
+ /* build a lookup table phrased in terms of mdstr's in this channels context
+ to quickly find registered methods */
+ if (num_registered_methods > 0) {
+ slots = 2 * num_registered_methods;
+ alloc = sizeof(channel_registered_method) * slots;
+ chand->registered_methods = gpr_malloc(alloc);
+ memset(chand->registered_methods, 0, alloc);
+ for (rm = s->registered_methods; rm; rm = rm->next) {
+ host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL;
+ method = grpc_mdstr_from_string(mdctx, rm->host);
+ hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
+ for (probes = 0; chand->registered_methods[(hash + probes) % slots].server_registered_method != NULL; probes++);
+ if (probes > max_probes) max_probes = probes;
+ crm = &chand->registered_methods[(hash + probes) % slots];
+ crm->server_registered_method = rm;
+ crm->host = host;
+ crm->method = method;
+ }
+ chand->registered_method_slots = slots;
+ chand->registered_method_max_probes = max_probes;
+ }
+
gpr_mu_lock(&s->mu);
chand->next = &s->root_channel_data;
chand->prev = chand->next->prev;
@@ -752,7 +822,15 @@ static grpc_call_error queue_call_request(grpc_server *server,
fail_call(server, rc);
return GRPC_CALL_OK;
}
- calld = call_list_remove_head(server, PENDING_START);
+ switch (rc->type) {
+ case LEGACY_CALL:
+ case BATCH_CALL:
+ calld = call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
+ break;
+ case REGISTERED_CALL:
+ calld = call_list_remove_head(&rc->data.registered.registered_method->pending, PENDING_START);
+ break;
+ }
if (calld) {
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;