aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/call.c4
-rw-r--r--src/core/surface/call.h1
-rw-r--r--src/core/surface/lame_client.c6
-rw-r--r--src/core/surface/server.c628
-rw-r--r--src/core/surface/server.h2
-rw-r--r--src/core/surface/server_chttp2.c4
6 files changed, 449 insertions, 196 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 58a2436937..743ef0c65b 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -258,6 +258,10 @@ void grpc_call_set_completion_queue(grpc_call *call,
call->cq = cq;
}
+grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) {
+ return call->cq;
+}
+
void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); }
static void destroy_call(void *call, int ignored_success) {
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 05014c631c..55e434433d 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -89,6 +89,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
const void *server_transport_data);
void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq);
+grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call);
void grpc_call_internal_ref(grpc_call *call);
void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 411dbabfd3..a8fdeed87f 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -47,6 +47,7 @@ typedef struct {
} call_data;
typedef struct {
+ grpc_mdelem *status;
grpc_mdelem *message;
} channel_data;
@@ -57,6 +58,7 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
switch (op->type) {
case GRPC_SEND_START:
+ grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->status));
grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->message));
grpc_call_stream_closed(elem);
break;
@@ -93,18 +95,22 @@ static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
channel_data *channeld = elem->channel_data;
+ char status[12];
GPR_ASSERT(is_first);
GPR_ASSERT(is_last);
channeld->message = grpc_mdelem_from_strings(mdctx, "grpc-message",
"Rpc sent on a lame channel.");
+ gpr_ltoa(GRPC_STATUS_UNKNOWN, status);
+ channeld->status = grpc_mdelem_from_strings(mdctx, "grpc-status", status);
}
static void destroy_channel_elem(grpc_channel_element *elem) {
channel_data *channeld = elem->channel_data;
grpc_mdelem_unref(channeld->message);
+ grpc_mdelem_unref(channeld->status);
}
static const grpc_channel_filter lame_filter = {
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index ee0f96a580..22588194ea 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -53,13 +53,63 @@ typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
typedef struct listener {
void *arg;
- void (*start)(grpc_server *server, void *arg, grpc_pollset *pollset);
+ void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets, size_t pollset_count);
void (*destroy)(grpc_server *server, void *arg);
struct listener *next;
} listener;
typedef struct call_data call_data;
typedef struct channel_data channel_data;
+typedef struct registered_method registered_method;
+
+typedef struct {
+ call_data *next;
+ call_data *prev;
+} call_link;
+
+typedef enum { LEGACY_CALL, BATCH_CALL, REGISTERED_CALL } requested_call_type;
+
+typedef struct {
+ requested_call_type type;
+ void *tag;
+ union {
+ struct {
+ grpc_completion_queue *cq_bind;
+ grpc_call **call;
+ grpc_call_details *details;
+ grpc_metadata_array *initial_metadata;
+ } batch;
+ struct {
+ grpc_completion_queue *cq_bind;
+ grpc_call **call;
+ registered_method *registered_method;
+ gpr_timespec *deadline;
+ grpc_metadata_array *initial_metadata;
+ grpc_byte_buffer **optional_payload;
+ } registered;
+ } data;
+} requested_call;
+
+typedef struct {
+ requested_call *calls;
+ size_t count;
+ size_t capacity;
+} requested_call_array;
+
+struct registered_method {
+ char *method;
+ char *host;
+ call_data *pending;
+ requested_call_array requested;
+ grpc_completion_queue *cq;
+ registered_method *next;
+};
+
+typedef struct channel_registered_method {
+ registered_method *server_registered_method;
+ grpc_mdstr *method;
+ grpc_mdstr *host;
+} channel_registered_method;
struct channel_data {
grpc_server *server;
@@ -69,33 +119,25 @@ struct channel_data {
/* linked list of all channels on a server */
channel_data *next;
channel_data *prev;
+ channel_registered_method *registered_methods;
+ gpr_uint32 registered_method_slots;
+ gpr_uint32 registered_method_max_probes;
};
-typedef void (*new_call_cb)(grpc_server *server, grpc_completion_queue *cq,
- grpc_call **call, grpc_call_details *details,
- grpc_metadata_array *initial_metadata,
- call_data *calld, void *user_data);
-
-typedef struct {
- void *user_data;
- grpc_completion_queue *cq;
- grpc_call **call;
- grpc_call_details *details;
- grpc_metadata_array *initial_metadata;
- new_call_cb cb;
-} requested_call;
-
struct grpc_server {
size_t channel_filter_count;
const grpc_channel_filter **channel_filters;
grpc_channel_args *channel_args;
- grpc_completion_queue *cq;
+ grpc_completion_queue *unregistered_cq;
+
+ grpc_completion_queue **cqs;
+ grpc_pollset **pollsets;
+ size_t cq_count;
gpr_mu mu;
- requested_call *requested_calls;
- size_t requested_call_count;
- size_t requested_call_capacity;
+ registered_method *registered_methods;
+ requested_call_array requested_calls;
gpr_uint8 shutdown;
gpr_uint8 have_shutdown_tag;
@@ -108,11 +150,6 @@ struct grpc_server {
gpr_refcount internal_refcount;
};
-typedef struct {
- call_data *next;
- call_data *prev;
-} call_link;
-
typedef enum {
/* waiting for metadata */
NOT_STARTED,
@@ -125,7 +162,7 @@ typedef enum {
} call_state;
typedef struct legacy_data {
- grpc_metadata_array *initial_metadata;
+ grpc_metadata_array initial_metadata;
} legacy_data;
struct call_data {
@@ -137,9 +174,9 @@ struct call_data {
grpc_mdstr *host;
legacy_data *legacy;
- grpc_call_details *details;
+ grpc_completion_queue *cq_new;
- gpr_uint8 included[CALL_LIST_COUNT];
+ call_data **root[CALL_LIST_COUNT];
call_link links[CALL_LIST_COUNT];
};
@@ -148,30 +185,33 @@ struct call_data {
static void do_nothing(void *unused, grpc_op_error ignored) {}
-static int call_list_join(grpc_server *server, call_data *call,
- call_list list) {
- if (call->included[list]) return 0;
- call->included[list] = 1;
- if (!server->lists[list]) {
- server->lists[list] = call;
+static void begin_call(grpc_server *server, call_data *calld,
+ requested_call *rc);
+static void fail_call(grpc_server *server, requested_call *rc);
+
+static int call_list_join(call_data **root, call_data *call, call_list list) {
+ GPR_ASSERT(!call->root[list]);
+ call->root[list] = root;
+ if (!*root) {
+ *root = call;
call->links[list].next = call->links[list].prev = call;
} else {
- call->links[list].next = server->lists[list];
- call->links[list].prev = server->lists[list]->links[list].prev;
+ call->links[list].next = *root;
+ call->links[list].prev = (*root)->links[list].prev;
call->links[list].next->links[list].prev =
call->links[list].prev->links[list].next = call;
}
return 1;
}
-static call_data *call_list_remove_head(grpc_server *server, call_list list) {
- call_data *out = server->lists[list];
+static call_data *call_list_remove_head(call_data **root, call_list list) {
+ call_data *out = *root;
if (out) {
- out->included[list] = 0;
+ out->root[list] = NULL;
if (out->links[list].next == out) {
- server->lists[list] = NULL;
+ *root = NULL;
} else {
- server->lists[list] = out->links[list].next;
+ *root = out->links[list].next;
out->links[list].next->links[list].prev = out->links[list].prev;
out->links[list].prev->links[list].next = out->links[list].next;
}
@@ -179,23 +219,39 @@ static call_data *call_list_remove_head(grpc_server *server, call_list list) {
return out;
}
-static int call_list_remove(grpc_server *server, call_data *call,
- call_list list) {
- if (!call->included[list]) return 0;
- call->included[list] = 0;
- if (server->lists[list] == call) {
- server->lists[list] = call->links[list].next;
- if (server->lists[list] == call) {
- server->lists[list] = NULL;
+static int call_list_remove(call_data *call, call_list list) {
+ call_data **root = call->root[list];
+ if (root == NULL) return 0;
+ call->root[list] = NULL;
+ if (*root == call) {
+ *root = call->links[list].next;
+ if (*root == call) {
+ *root = NULL;
return 1;
}
}
- GPR_ASSERT(server->lists[list] != call);
+ GPR_ASSERT(*root != call);
call->links[list].next->links[list].prev = call->links[list].prev;
call->links[list].prev->links[list].next = call->links[list].next;
return 1;
}
+static void requested_call_array_destroy(requested_call_array *array) {
+ gpr_free(array->calls);
+}
+
+static requested_call *requested_call_array_add(requested_call_array *array) {
+ requested_call *rc;
+ if (array->count == array->capacity) {
+ array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2);
+ array->calls =
+ gpr_realloc(array->calls, sizeof(requested_call) * array->capacity);
+ }
+ rc = &array->calls[array->count++];
+ memset(rc, 0, sizeof(*rc));
+ return rc;
+}
+
static void server_ref(grpc_server *server) {
gpr_ref(&server->internal_refcount);
}
@@ -205,7 +261,7 @@ static void server_unref(grpc_server *server) {
grpc_channel_args_destroy(server->channel_args);
gpr_mu_destroy(&server->mu);
gpr_free(server->channel_filters);
- gpr_free(server->requested_calls);
+ requested_call_array_destroy(&server->requested_calls);
gpr_free(server);
}
}
@@ -223,7 +279,6 @@ static void orphan_channel(channel_data *chand) {
static void finish_destroy_channel(void *cd, int success) {
channel_data *chand = cd;
grpc_server *server = chand->server;
- /*gpr_log(GPR_INFO, "destroy channel %p", chand->channel);*/
grpc_channel_destroy(chand->channel);
server_unref(server);
}
@@ -236,23 +291,63 @@ static void destroy_channel(channel_data *chand) {
grpc_iomgr_add_callback(finish_destroy_channel, chand);
}
+static void finish_start_new_rpc_and_unlock(grpc_server *server,
+ grpc_call_element *elem,
+ call_data **pending_root,
+ requested_call_array *array) {
+ requested_call rc;
+ call_data *calld = elem->call_data;
+ if (array->count == 0) {
+ calld->state = PENDING;
+ call_list_join(pending_root, calld, PENDING_START);
+ gpr_mu_unlock(&server->mu);
+ } else {
+ rc = array->calls[--array->count];
+ calld->state = ACTIVATED;
+ gpr_mu_unlock(&server->mu);
+ begin_call(server, calld, &rc);
+ }
+}
+
static void start_new_rpc(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
grpc_server *server = chand->server;
+ gpr_uint32 i;
+ gpr_uint32 hash;
+ channel_registered_method *rm;
gpr_mu_lock(&server->mu);
- if (server->requested_call_count > 0) {
- requested_call rc = server->requested_calls[--server->requested_call_count];
- calld->state = ACTIVATED;
- gpr_mu_unlock(&server->mu);
- rc.cb(server, rc.cq, rc.call, rc.details, rc.initial_metadata, calld,
- rc.user_data);
- } else {
- calld->state = PENDING;
- call_list_join(server, calld, PENDING_START);
- gpr_mu_unlock(&server->mu);
+ if (chand->registered_methods && calld->path && calld->host) {
+ /* check for an exact match with host */
+ hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
+ for (i = 0; i < chand->registered_method_max_probes; i++) {
+ rm = &chand->registered_methods[(hash + i) %
+ chand->registered_method_slots];
+ if (!rm) break;
+ if (rm->host != calld->host) continue;
+ if (rm->method != calld->path) continue;
+ finish_start_new_rpc_and_unlock(server, elem,
+ &rm->server_registered_method->pending,
+ &rm->server_registered_method->requested);
+ return;
+ }
+ /* check for a wildcard method definition (no host set) */
+ hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash);
+ for (i = 0; i <= chand->registered_method_max_probes; i++) {
+ rm = &chand->registered_methods[(hash + i) %
+ chand->registered_method_slots];
+ if (!rm) break;
+ if (rm->host != NULL) continue;
+ if (rm->method != calld->path) continue;
+ finish_start_new_rpc_and_unlock(server, elem,
+ &rm->server_registered_method->pending,
+ &rm->server_registered_method->requested);
+ return;
+ }
}
+ finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START],
+ &server->requested_calls);
}
static void kill_zombie(void *elem, int success) {
@@ -267,7 +362,7 @@ static void stream_closed(grpc_call_element *elem) {
case ACTIVATED:
break;
case PENDING:
- call_list_remove(chand->server, calld, PENDING_START);
+ call_list_remove(calld, PENDING_START);
/* fallthrough intended */
case NOT_STARTED:
calld->state = ZOMBIED;
@@ -398,7 +493,7 @@ static void init_call_elem(grpc_call_element *elem,
calld->call = grpc_call_from_top_element(elem);
gpr_mu_lock(&chand->server->mu);
- call_list_join(chand->server, calld, ALL_CALLS);
+ call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
gpr_mu_unlock(&chand->server->mu);
server_ref(chand->server);
@@ -407,15 +502,17 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- int i;
+ size_t i;
gpr_mu_lock(&chand->server->mu);
for (i = 0; i < CALL_LIST_COUNT; i++) {
- call_list_remove(chand->server, elem->call_data, i);
+ call_list_remove(elem->call_data, i);
}
if (chand->server->shutdown && chand->server->have_shutdown_tag &&
chand->server->lists[ALL_CALLS] == NULL) {
- grpc_cq_end_server_shutdown(chand->server->cq, chand->server->shutdown_tag);
+ for (i = 0; i < chand->server->cq_count; i++) {
+ grpc_cq_end_server_shutdown(chand->server->cqs[i], chand->server->shutdown_tag);
+ }
}
gpr_mu_unlock(&chand->server->mu);
@@ -427,8 +524,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
}
if (calld->legacy) {
- gpr_free(calld->legacy->initial_metadata->metadata);
- gpr_free(calld->legacy->initial_metadata);
+ gpr_free(calld->legacy->initial_metadata.metadata);
gpr_free(calld->legacy);
}
@@ -447,6 +543,7 @@ static void init_channel_elem(grpc_channel_element *elem,
chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
chand->next = chand->prev = chand;
+ chand->registered_methods = NULL;
}
static void destroy_channel_elem(grpc_channel_element *elem) {
@@ -464,11 +561,20 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
static const grpc_channel_filter server_surface_filter = {
- call_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "server",
+ call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server",
};
+static void addcq(grpc_server *server, grpc_completion_queue *cq) {
+ size_t i, n;
+ for (i = 0; i < server->cq_count; i++) {
+ if (server->cqs[i] == cq) return;
+ }
+ n = server->cq_count++;
+ server->cqs = gpr_realloc(server->cqs, server->cq_count * sizeof(grpc_completion_queue*));
+ server->cqs[n] = cq;
+}
+
grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
grpc_channel_filter **filters,
size_t filter_count,
@@ -478,10 +584,11 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
grpc_server *server = gpr_malloc(sizeof(grpc_server));
memset(server, 0, sizeof(grpc_server));
+ if (cq) addcq(server, cq);
gpr_mu_init(&server->mu);
- server->cq = cq;
+ server->unregistered_cq = cq;
/* decremented by grpc_server_destroy */
gpr_ref_init(&server->internal_refcount, 1);
server->root_channel_data.next = server->root_channel_data.prev =
@@ -509,11 +616,49 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
return server;
}
+static int streq(const char *a, const char *b) {
+ if (a == NULL && b == NULL) return 1;
+ if (a == NULL) return 0;
+ if (b == NULL) return 0;
+ return 0 == strcmp(a, b);
+}
+
+void *grpc_server_register_method(grpc_server *server, const char *method,
+ const char *host, grpc_completion_queue *cq_new_rpc) {
+ registered_method *m;
+ if (!method) {
+ gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__);
+ return NULL;
+ }
+ for (m = server->registered_methods; m; m = m->next) {
+ if (streq(m->method, method) && streq(m->host, host)) {
+ gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
+ host ? host : "*");
+ return NULL;
+ }
+ }
+ addcq(server, cq_new_rpc);
+ m = gpr_malloc(sizeof(registered_method));
+ memset(m, 0, sizeof(*m));
+ m->method = gpr_strdup(method);
+ m->host = gpr_strdup(host);
+ m->next = server->registered_methods;
+ m->cq = cq_new_rpc;
+ server->registered_methods = m;
+ return m;
+}
+
void grpc_server_start(grpc_server *server) {
listener *l;
+ size_t i;
+
+ server->pollsets = gpr_malloc(sizeof(grpc_pollset*) * server->cq_count);
+ for (i = 0; i < server->cq_count; i++) {
+ server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
+ }
for (l = server->listeners; l; l = l->next) {
- l->start(server, l->arg, grpc_cq_pollset(server->cq));
+ l->start(server, l->arg, server->pollsets, server->cq_count);
}
}
@@ -525,8 +670,18 @@ grpc_transport_setup_result grpc_server_setup_transport(
grpc_channel_filter const **filters =
gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
size_t i;
+ size_t num_registered_methods;
+ size_t alloc;
+ registered_method *rm;
+ channel_registered_method *crm;
grpc_channel *channel;
channel_data *chand;
+ grpc_mdstr *host;
+ grpc_mdstr *method;
+ gpr_uint32 hash;
+ gpr_uint32 slots;
+ gpr_uint32 probes;
+ gpr_uint32 max_probes = 0;
for (i = 0; i < s->channel_filter_count; i++) {
filters[i] = s->channel_filters[i];
@@ -536,7 +691,9 @@ grpc_transport_setup_result grpc_server_setup_transport(
}
filters[i] = &grpc_connected_channel_filter;
- grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cq));
+ for (i = 0; i < s->cq_count; i++) {
+ grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cqs[i]));
+ }
channel = grpc_channel_create_from_filters(filters, num_filters,
s->channel_args, mdctx, 0);
@@ -546,6 +703,35 @@ grpc_transport_setup_result grpc_server_setup_transport(
server_ref(s);
chand->channel = channel;
+ num_registered_methods = 0;
+ for (rm = s->registered_methods; rm; rm = rm->next) {
+ num_registered_methods++;
+ }
+ /* build a lookup table phrased in terms of mdstr's in this channels context
+ to quickly find registered methods */
+ if (num_registered_methods > 0) {
+ slots = 2 * num_registered_methods;
+ alloc = sizeof(channel_registered_method) * slots;
+ chand->registered_methods = gpr_malloc(alloc);
+ memset(chand->registered_methods, 0, alloc);
+ for (rm = s->registered_methods; rm; rm = rm->next) {
+ host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL;
+ method = grpc_mdstr_from_string(mdctx, rm->method);
+ hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
+ for (probes = 0; chand->registered_methods[(hash + probes) % slots]
+ .server_registered_method != NULL;
+ probes++)
+ ;
+ if (probes > max_probes) max_probes = probes;
+ crm = &chand->registered_methods[(hash + probes) % slots];
+ crm->server_registered_method = rm;
+ crm->host = host;
+ crm->method = method;
+ }
+ chand->registered_method_slots = slots;
+ chand->registered_method_max_probes = max_probes;
+ }
+
gpr_mu_lock(&s->mu);
chand->next = &s->root_channel_data;
chand->prev = chand->next->prev;
@@ -558,17 +744,17 @@ grpc_transport_setup_result grpc_server_setup_transport(
grpc_channel_get_channel_stack(channel), transport);
}
-void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
+static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
void *shutdown_tag) {
listener *l;
- requested_call *requested_calls;
- size_t requested_call_count;
+ requested_call_array requested_calls;
channel_data **channels;
channel_data *c;
size_t nchannels;
size_t i;
grpc_channel_op op;
grpc_channel_element *elem;
+ registered_method *rm;
/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu);
@@ -591,18 +777,28 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
i++;
}
+ /* collect all unregistered then registered calls */
requested_calls = server->requested_calls;
- requested_call_count = server->requested_call_count;
- server->requested_calls = NULL;
- server->requested_call_count = 0;
+ memset(&server->requested_calls, 0, sizeof(server->requested_calls));
+ for (rm = server->registered_methods; rm; rm = rm->next) {
+ if (requested_calls.count + rm->requested.count > requested_calls.capacity) {
+ requested_calls.capacity = GPR_MAX(requested_calls.count + rm->requested.count, 2 * requested_calls.capacity);
+ requested_calls.calls = gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) * requested_calls.capacity);
+ }
+ memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls, sizeof(*requested_calls.calls) * rm->requested.count);
+ requested_calls.count += rm->requested.count;
+ memset(&rm->requested, 0, sizeof(rm->requested));
+ }
server->shutdown = 1;
server->have_shutdown_tag = have_shutdown_tag;
server->shutdown_tag = shutdown_tag;
if (have_shutdown_tag) {
- grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_SHUTDOWN);
- if (server->lists[ALL_CALLS] == NULL) {
- grpc_cq_end_server_shutdown(server->cq, shutdown_tag);
+ for (i = 0; i < server->cq_count; i++) {
+ grpc_cq_begin_op(server->cqs[i], NULL, GRPC_SERVER_SHUTDOWN);
+ if (server->lists[ALL_CALLS] == NULL) {
+ grpc_cq_end_server_shutdown(server->cqs[i], shutdown_tag);
+ }
}
}
gpr_mu_unlock(&server->mu);
@@ -623,13 +819,10 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
gpr_free(channels);
/* terminate all the requested calls */
- for (i = 0; i < requested_call_count; i++) {
- requested_calls[i].cb(server, requested_calls[i].cq,
- requested_calls[i].call, requested_calls[i].details,
- requested_calls[i].initial_metadata, NULL,
- requested_calls[i].user_data);
+ for (i = 0; i < requested_calls.count; i++) {
+ fail_call(server, &requested_calls.calls[i]);
}
- gpr_free(requested_calls);
+ gpr_free(requested_calls.calls);
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
@@ -664,7 +857,7 @@ void grpc_server_destroy(grpc_server *server) {
void grpc_server_add_listener(grpc_server *server, void *arg,
void (*start)(grpc_server *server, void *arg,
- grpc_pollset *pollset),
+ grpc_pollset **pollsets, size_t pollset_count),
void (*destroy)(grpc_server *server, void *arg)) {
listener *l = gpr_malloc(sizeof(listener));
l->arg = arg;
@@ -675,47 +868,93 @@ void grpc_server_add_listener(grpc_server *server, void *arg,
}
static grpc_call_error queue_call_request(grpc_server *server,
- grpc_completion_queue *cq,
- grpc_call **call,
- grpc_call_details *details,
- grpc_metadata_array *initial_metadata,
- new_call_cb cb, void *user_data) {
- call_data *calld;
- requested_call *rc;
+ requested_call *rc) {
+ call_data *calld = NULL;
+ requested_call_array *requested_calls = NULL;
gpr_mu_lock(&server->mu);
if (server->shutdown) {
gpr_mu_unlock(&server->mu);
- cb(server, cq, call, details, initial_metadata, NULL, user_data);
+ fail_call(server, rc);
return GRPC_CALL_OK;
}
- calld = call_list_remove_head(server, PENDING_START);
+ switch (rc->type) {
+ case LEGACY_CALL:
+ case BATCH_CALL:
+ calld =
+ call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
+ requested_calls = &server->requested_calls;
+ break;
+ case REGISTERED_CALL:
+ calld = call_list_remove_head(
+ &rc->data.registered.registered_method->pending, PENDING_START);
+ requested_calls = &rc->data.registered.registered_method->requested;
+ break;
+ }
if (calld) {
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
gpr_mu_unlock(&server->mu);
- cb(server, cq, call, details, initial_metadata, calld, user_data);
+ begin_call(server, calld, rc);
return GRPC_CALL_OK;
} else {
- if (server->requested_call_count == server->requested_call_capacity) {
- server->requested_call_capacity =
- GPR_MAX(server->requested_call_capacity + 8,
- server->requested_call_capacity * 2);
- server->requested_calls =
- gpr_realloc(server->requested_calls,
- sizeof(requested_call) * server->requested_call_capacity);
- }
- rc = &server->requested_calls[server->requested_call_count++];
- rc->cb = cb;
- rc->cq = cq;
- rc->call = call;
- rc->details = details;
- rc->user_data = user_data;
- rc->initial_metadata = initial_metadata;
+ *requested_call_array_add(requested_calls) = *rc;
gpr_mu_unlock(&server->mu);
return GRPC_CALL_OK;
}
}
+grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
+ grpc_call_details *details,
+ grpc_metadata_array *initial_metadata,
+ grpc_completion_queue *cq_bind,
+ void *tag) {
+ requested_call rc;
+ grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE);
+ rc.type = BATCH_CALL;
+ rc.tag = tag;
+ rc.data.batch.cq_bind = cq_bind;
+ rc.data.batch.call = call;
+ rc.data.batch.details = details;
+ rc.data.batch.initial_metadata = initial_metadata;
+ return queue_call_request(server, &rc);
+}
+
+grpc_call_error grpc_server_request_registered_call(
+ grpc_server *server, void *rm, grpc_call **call,
+ gpr_timespec *deadline, grpc_metadata_array *initial_metadata,
+ grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bind,
+ void *tag) {
+ requested_call rc;
+ registered_method *registered_method = rm;
+ grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE);
+ rc.type = REGISTERED_CALL;
+ rc.tag = tag;
+ rc.data.registered.cq_bind = cq_bind;
+ rc.data.registered.call = call;
+ rc.data.registered.registered_method = registered_method;
+ rc.data.registered.deadline = deadline;
+ rc.data.registered.initial_metadata = initial_metadata;
+ rc.data.registered.optional_payload = optional_payload;
+ return queue_call_request(server, &rc);
+}
+
+grpc_call_error grpc_server_request_call_old(grpc_server *server,
+ void *tag_new) {
+ requested_call rc;
+ grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_SERVER_RPC_NEW);
+ rc.type = LEGACY_CALL;
+ rc.tag = tag_new;
+ return queue_call_request(server, &rc);
+}
+
+static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag);
+static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
+ void *tag);
+static void publish_was_not_set(grpc_call *call, grpc_op_error status,
+ void *tag) {
+ abort();
+}
+
static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
gpr_slice slice = value->slice;
size_t len = GPR_SLICE_LENGTH(slice);
@@ -727,57 +966,84 @@ static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
}
-static void publish_request(grpc_call *call, grpc_op_error status, void *tag) {
- grpc_call_element *elem =
- grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- grpc_server *server = chand->server;
-
- if (status == GRPC_OP_OK) {
- cpstr(&calld->details->host, &calld->details->host_capacity, calld->host);
- cpstr(&calld->details->method, &calld->details->method_capacity,
- calld->path);
- calld->details->deadline = calld->deadline;
- grpc_cq_end_op_complete(server->cq, tag, call, do_nothing, NULL,
- GRPC_OP_OK);
- } else {
- abort();
+static void begin_call(grpc_server *server, call_data *calld,
+ requested_call *rc) {
+ grpc_ioreq_completion_func publish = publish_was_not_set;
+ grpc_ioreq req[2];
+ grpc_ioreq *r = req;
+
+ /* called once initial metadata has been read by the call, but BEFORE
+ the ioreq to fetch it out of the call has been executed.
+ This means metadata related fields can be relied on in calld, but to
+ fill in the metadata array passed by the client, we need to perform
+ an ioreq op, that should complete immediately. */
+
+ switch (rc->type) {
+ case LEGACY_CALL:
+ calld->legacy = gpr_malloc(sizeof(legacy_data));
+ memset(calld->legacy, 0, sizeof(legacy_data));
+ r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
+ r->data.recv_metadata = &calld->legacy->initial_metadata;
+ r++;
+ publish = publish_legacy;
+ break;
+ case BATCH_CALL:
+ cpstr(&rc->data.batch.details->host,
+ &rc->data.batch.details->host_capacity, calld->host);
+ cpstr(&rc->data.batch.details->method,
+ &rc->data.batch.details->method_capacity, calld->path);
+ grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind);
+ *rc->data.batch.call = calld->call;
+ r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
+ r->data.recv_metadata = rc->data.batch.initial_metadata;
+ r++;
+ calld->cq_new = server->unregistered_cq;
+ publish = publish_registered_or_batch;
+ break;
+ case REGISTERED_CALL:
+ *rc->data.registered.deadline = calld->deadline;
+ grpc_call_set_completion_queue(calld->call, rc->data.registered.cq_bind);
+ *rc->data.registered.call = calld->call;
+ r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
+ r->data.recv_metadata = rc->data.registered.initial_metadata;
+ r++;
+ if (rc->data.registered.optional_payload) {
+ r->op = GRPC_IOREQ_RECV_MESSAGE;
+ r->data.recv_message = rc->data.registered.optional_payload;
+ r++;
+ }
+ calld->cq_new = rc->data.registered.registered_method->cq;
+ publish = publish_registered_or_batch;
+ break;
}
-}
-static void begin_request(grpc_server *server, grpc_completion_queue *cq,
- grpc_call **call, grpc_call_details *details,
- grpc_metadata_array *initial_metadata,
- call_data *calld, void *tag) {
- grpc_ioreq req;
- if (!calld) {
- *call = NULL;
- initial_metadata->count = 0;
- grpc_cq_end_op_complete(cq, tag, NULL, do_nothing, NULL, GRPC_OP_ERROR);
- return;
- }
- calld->details = details;
- grpc_call_set_completion_queue(calld->call, cq);
- *call = calld->call;
- req.op = GRPC_IOREQ_RECV_INITIAL_METADATA;
- req.data.recv_metadata = initial_metadata;
grpc_call_internal_ref(calld->call);
- grpc_call_start_ioreq_and_call_back(calld->call, &req, 1, publish_request,
- tag);
+ grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish,
+ rc->tag);
}
-grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
- grpc_call_details *details,
- grpc_metadata_array *initial_metadata,
- grpc_completion_queue *cq, void *tag) {
- grpc_cq_begin_op(cq, NULL, GRPC_OP_COMPLETE);
- return queue_call_request(server, cq, call, details, initial_metadata,
- begin_request, tag);
+static void fail_call(grpc_server *server, requested_call *rc) {
+ switch (rc->type) {
+ case LEGACY_CALL:
+ grpc_cq_end_new_rpc(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL, NULL,
+ NULL, gpr_inf_past, 0, NULL);
+ break;
+ case BATCH_CALL:
+ *rc->data.batch.call = NULL;
+ rc->data.batch.initial_metadata->count = 0;
+ grpc_cq_end_op_complete(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL,
+ GRPC_OP_ERROR);
+ break;
+ case REGISTERED_CALL:
+ *rc->data.registered.call = NULL;
+ rc->data.registered.initial_metadata->count = 0;
+ grpc_cq_end_op_complete(rc->data.registered.registered_method->cq, rc->tag, NULL, do_nothing, NULL,
+ GRPC_OP_ERROR);
+ break;
+ }
}
-static void publish_legacy_request(grpc_call *call, grpc_op_error status,
- void *tag) {
+static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag) {
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
call_data *calld = elem->call_data;
@@ -785,47 +1051,23 @@ static void publish_legacy_request(grpc_call *call, grpc_op_error status,
grpc_server *server = chand->server;
if (status == GRPC_OP_OK) {
- grpc_cq_end_new_rpc(server->cq, tag, call, do_nothing, NULL,
+ grpc_cq_end_new_rpc(server->unregistered_cq, tag, call, do_nothing, NULL,
grpc_mdstr_as_c_string(calld->path),
grpc_mdstr_as_c_string(calld->host), calld->deadline,
- calld->legacy->initial_metadata->count,
- calld->legacy->initial_metadata->metadata);
+ calld->legacy->initial_metadata.count,
+ calld->legacy->initial_metadata.metadata);
} else {
+ gpr_log(GPR_ERROR, "should never reach here");
abort();
}
}
-static void begin_legacy_request(grpc_server *server, grpc_completion_queue *cq,
- grpc_call **call, grpc_call_details *details,
- grpc_metadata_array *initial_metadata,
- call_data *calld, void *tag) {
- grpc_ioreq req;
- GPR_ASSERT(call == NULL);
- GPR_ASSERT(details == NULL);
- if (!calld) {
- gpr_free(initial_metadata);
- grpc_cq_end_new_rpc(cq, tag, NULL, do_nothing, NULL, NULL, NULL,
- gpr_inf_past, 0, NULL);
- return;
- }
- req.op = GRPC_IOREQ_RECV_INITIAL_METADATA;
- req.data.recv_metadata = initial_metadata;
- calld->legacy = gpr_malloc(sizeof(legacy_data));
- memset(calld->legacy, 0, sizeof(legacy_data));
- calld->legacy->initial_metadata = initial_metadata;
- grpc_call_internal_ref(calld->call);
- grpc_call_start_ioreq_and_call_back(calld->call, &req, 1,
- publish_legacy_request, tag);
-}
-
-grpc_call_error grpc_server_request_call_old(grpc_server *server,
- void *tag_new) {
- grpc_metadata_array *client_metadata =
- gpr_malloc(sizeof(grpc_metadata_array));
- memset(client_metadata, 0, sizeof(*client_metadata));
- grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW);
- return queue_call_request(server, server->cq, NULL, NULL, client_metadata,
- begin_legacy_request, tag_new);
+static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
+ void *tag) {
+ grpc_call_element *elem =
+ grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
+ call_data *calld = elem->call_data;
+ grpc_cq_end_op_complete(calld->cq_new, tag, call, do_nothing, NULL, status);
}
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
index 50574d66a4..c8861f420d 100644
--- a/src/core/surface/server.h
+++ b/src/core/surface/server.h
@@ -48,7 +48,7 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
and when it shuts down, it will call destroy */
void grpc_server_add_listener(grpc_server *server, void *listener,
void (*start)(grpc_server *server, void *arg,
- grpc_pollset *pollset),
+ grpc_pollset **pollsets, size_t npollsets),
void (*destroy)(grpc_server *server, void *arg));
/* Setup a transport - creates a channel stack, binds the transport to the
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index 5ba7d47efd..3b6abb7d03 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -59,9 +59,9 @@ static void new_transport(void *server, grpc_endpoint *tcp) {
}
/* Server callback: start listening on our ports */
-static void start(grpc_server *server, void *tcpp, grpc_pollset *pollset) {
+static void start(grpc_server *server, void *tcpp, grpc_pollset **pollsets, size_t pollset_count) {
grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_start(tcp, pollset, new_transport, server);
+ grpc_tcp_server_start(tcp, pollsets, pollset_count, new_transport, server);
}
/* Server callback: destroy the tcp listener (so we don't generate further