aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/server.c67
-rw-r--r--src/core/surface/server.h3
-rw-r--r--src/core/surface/server_create.c5
3 files changed, 31 insertions, 44 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 01644b4471..96c1b7c3eb 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -74,16 +74,15 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
typedef struct {
requested_call_type type;
void *tag;
+ grpc_completion_queue *cq_bound_to_call;
+ grpc_completion_queue *cq_for_notification;
+ grpc_call **call;
union {
struct {
- grpc_completion_queue *cq_bind;
- grpc_call **call;
grpc_call_details *details;
grpc_metadata_array *initial_metadata;
} batch;
struct {
- grpc_completion_queue *cq_bind;
- grpc_call **call;
registered_method *registered_method;
gpr_timespec *deadline;
grpc_metadata_array *initial_metadata;
@@ -103,7 +102,6 @@ struct registered_method {
char *host;
call_data *pending;
requested_call_array requested;
- grpc_completion_queue *cq;
registered_method *next;
};
@@ -130,7 +128,6 @@ struct grpc_server {
size_t channel_filter_count;
const grpc_channel_filter **channel_filters;
grpc_channel_args *channel_args;
- grpc_completion_queue *unregistered_cq;
grpc_completion_queue **cqs;
grpc_pollset **pollsets;
@@ -602,7 +599,8 @@ static const grpc_channel_filter server_surface_filter = {
destroy_channel_elem, "server",
};
-static void addcq(grpc_server *server, grpc_completion_queue *cq) {
+void grpc_server_register_completion_queue(grpc_server *server,
+ grpc_completion_queue *cq) {
size_t i, n;
for (i = 0; i < server->cq_count; i++) {
if (server->cqs[i] == cq) return;
@@ -614,8 +612,7 @@ static void addcq(grpc_server *server, grpc_completion_queue *cq) {
server->cqs[n] = cq;
}
-grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
- grpc_channel_filter **filters,
+grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
size_t filter_count,
const grpc_channel_args *args) {
size_t i;
@@ -626,12 +623,10 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
memset(server, 0, sizeof(grpc_server));
- if (cq) addcq(server, cq);
gpr_mu_init(&server->mu);
gpr_cv_init(&server->cv);
- server->unregistered_cq = cq;
/* decremented by grpc_server_destroy */
gpr_ref_init(&server->internal_refcount, 1);
server->root_channel_data.next = server->root_channel_data.prev =
@@ -667,8 +662,7 @@ static int streq(const char *a, const char *b) {
}
void *grpc_server_register_method(grpc_server *server, const char *method,
- const char *host,
- grpc_completion_queue *cq_new_rpc) {
+ const char *host) {
registered_method *m;
if (!method) {
gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__);
@@ -681,13 +675,11 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
return NULL;
}
}
- addcq(server, cq_new_rpc);
m = gpr_malloc(sizeof(registered_method));
memset(m, 0, sizeof(*m));
m->method = gpr_strdup(method);
m->host = gpr_strdup(host);
m->next = server->registered_methods;
- m->cq = cq_new_rpc;
server->registered_methods = m;
return m;
}
@@ -1012,17 +1004,18 @@ static grpc_call_error queue_call_request(grpc_server *server,
}
}
-grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
- grpc_call_details *details,
- grpc_metadata_array *initial_metadata,
- grpc_completion_queue *cq_bind,
- void *tag) {
+grpc_call_error grpc_server_request_call(
+ grpc_server *server, grpc_call **call, grpc_call_details *details,
+ grpc_metadata_array *initial_metadata,
+ grpc_completion_queue *cq_bound_to_call,
+ grpc_completion_queue *cq_for_notification, void *tag) {
requested_call rc;
- grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE);
+ grpc_cq_begin_op(cq_for_notification, NULL, GRPC_OP_COMPLETE);
rc.type = BATCH_CALL;
rc.tag = tag;
- rc.data.batch.cq_bind = cq_bind;
- rc.data.batch.call = call;
+ rc.cq_bound_to_call = cq_bound_to_call;
+ rc.cq_for_notification = cq_for_notification;
+ rc.call = call;
rc.data.batch.details = details;
rc.data.batch.initial_metadata = initial_metadata;
return queue_call_request(server, &rc);
@@ -1031,14 +1024,16 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
grpc_call_error grpc_server_request_registered_call(
grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline,
grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
- grpc_completion_queue *cq_bind, void *tag) {
+ grpc_completion_queue *cq_bound_to_call,
+ grpc_completion_queue *cq_for_notification, void *tag) {
requested_call rc;
registered_method *registered_method = rm;
- grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE);
+ grpc_cq_begin_op(cq_for_notification, NULL, GRPC_OP_COMPLETE);
rc.type = REGISTERED_CALL;
rc.tag = tag;
- rc.data.registered.cq_bind = cq_bind;
- rc.data.registered.call = call;
+ rc.cq_bound_to_call = cq_bound_to_call;
+ rc.cq_for_notification = cq_for_notification;
+ rc.call = call;
rc.data.registered.registered_method = registered_method;
rc.data.registered.deadline = deadline;
rc.data.registered.initial_metadata = initial_metadata;
@@ -1076,6 +1071,9 @@ static void begin_call(grpc_server *server, call_data *calld,
fill in the metadata array passed by the client, we need to perform
an ioreq op, that should complete immediately. */
+ grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
+ *rc->call = calld->call;
+ calld->cq_new = rc->cq_for_notification;
switch (rc->type) {
case BATCH_CALL:
cpstr(&rc->data.batch.details->host,
@@ -1083,18 +1081,13 @@ static void begin_call(grpc_server *server, call_data *calld,
cpstr(&rc->data.batch.details->method,
&rc->data.batch.details->method_capacity, calld->path);
rc->data.batch.details->deadline = calld->deadline;
- grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind);
- *rc->data.batch.call = calld->call;
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.batch.initial_metadata;
r++;
- calld->cq_new = server->unregistered_cq;
publish = publish_registered_or_batch;
break;
case REGISTERED_CALL:
*rc->data.registered.deadline = calld->deadline;
- grpc_call_set_completion_queue(calld->call, rc->data.registered.cq_bind);
- *rc->data.registered.call = calld->call;
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.registered.initial_metadata;
r++;
@@ -1103,7 +1096,6 @@ static void begin_call(grpc_server *server, call_data *calld,
r->data.recv_message = rc->data.registered.optional_payload;
r++;
}
- calld->cq_new = rc->data.registered.registered_method->cq;
publish = publish_registered_or_batch;
break;
}
@@ -1114,20 +1106,17 @@ static void begin_call(grpc_server *server, call_data *calld,
}
static void fail_call(grpc_server *server, requested_call *rc) {
+ *rc->call = NULL;
switch (rc->type) {
case BATCH_CALL:
- *rc->data.batch.call = NULL;
rc->data.batch.initial_metadata->count = 0;
- grpc_cq_end_op(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL,
- GRPC_OP_ERROR);
break;
case REGISTERED_CALL:
- *rc->data.registered.call = NULL;
rc->data.registered.initial_metadata->count = 0;
- grpc_cq_end_op(rc->data.registered.registered_method->cq, rc->tag, NULL,
- do_nothing, NULL, GRPC_OP_ERROR);
break;
}
+ grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, do_nothing, NULL,
+ GRPC_OP_ERROR);
}
static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
index 2cfa38fa43..c6331033e0 100644
--- a/src/core/surface/server.h
+++ b/src/core/surface/server.h
@@ -39,8 +39,7 @@
#include "src/core/transport/transport.h"
/* Create a server */
-grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
- grpc_channel_filter **filters,
+grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
size_t filter_count,
const grpc_channel_args *args);
diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c
index f629c7c72d..b7390675ad 100644
--- a/src/core/surface/server_create.c
+++ b/src/core/surface/server_create.c
@@ -35,7 +35,6 @@
#include "src/core/surface/completion_queue.h"
#include "src/core/surface/server.h"
-grpc_server *grpc_server_create(grpc_completion_queue *cq,
- const grpc_channel_args *args) {
- return grpc_server_create_from_filters(cq, NULL, 0, args);
+grpc_server *grpc_server_create(const grpc_channel_args *args) {
+ return grpc_server_create_from_filters(NULL, 0, args);
}