aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-05-06 11:45:59 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-05-06 11:45:59 -0700
commitf9e6adf998ed36479ccbb8eb3cdc58b02cc161dd (patch)
treeb1c9c0efd3bfc4984effb9747b0f09e208a1d768 /src/core
parent97c5559040204dcff338df79b16390014fbc82c9 (diff)
Completion queue binding for new requests API change
Move completion queue binding for new requests to the new request request time, not server instantiation time.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/surface/server.c67
-rw-r--r--src/core/surface/server.h3
-rw-r--r--src/core/surface/server_create.c5
3 files changed, 31 insertions, 44 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 01644b4471..96c1b7c3eb 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -74,16 +74,15 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
typedef struct {
requested_call_type type;
void *tag;
+ grpc_completion_queue *cq_bound_to_call;
+ grpc_completion_queue *cq_for_notification;
+ grpc_call **call;
union {
struct {
- grpc_completion_queue *cq_bind;
- grpc_call **call;
grpc_call_details *details;
grpc_metadata_array *initial_metadata;
} batch;
struct {
- grpc_completion_queue *cq_bind;
- grpc_call **call;
registered_method *registered_method;
gpr_timespec *deadline;
grpc_metadata_array *initial_metadata;
@@ -103,7 +102,6 @@ struct registered_method {
char *host;
call_data *pending;
requested_call_array requested;
- grpc_completion_queue *cq;
registered_method *next;
};
@@ -130,7 +128,6 @@ struct grpc_server {
size_t channel_filter_count;
const grpc_channel_filter **channel_filters;
grpc_channel_args *channel_args;
- grpc_completion_queue *unregistered_cq;
grpc_completion_queue **cqs;
grpc_pollset **pollsets;
@@ -602,7 +599,8 @@ static const grpc_channel_filter server_surface_filter = {
destroy_channel_elem, "server",
};
-static void addcq(grpc_server *server, grpc_completion_queue *cq) {
+void grpc_server_register_completion_queue(grpc_server *server,
+ grpc_completion_queue *cq) {
size_t i, n;
for (i = 0; i < server->cq_count; i++) {
if (server->cqs[i] == cq) return;
@@ -614,8 +612,7 @@ static void addcq(grpc_server *server, grpc_completion_queue *cq) {
server->cqs[n] = cq;
}
-grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
- grpc_channel_filter **filters,
+grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
size_t filter_count,
const grpc_channel_args *args) {
size_t i;
@@ -626,12 +623,10 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
memset(server, 0, sizeof(grpc_server));
- if (cq) addcq(server, cq);
gpr_mu_init(&server->mu);
gpr_cv_init(&server->cv);
- server->unregistered_cq = cq;
/* decremented by grpc_server_destroy */
gpr_ref_init(&server->internal_refcount, 1);
server->root_channel_data.next = server->root_channel_data.prev =
@@ -667,8 +662,7 @@ static int streq(const char *a, const char *b) {
}
void *grpc_server_register_method(grpc_server *server, const char *method,
- const char *host,
- grpc_completion_queue *cq_new_rpc) {
+ const char *host) {
registered_method *m;
if (!method) {
gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__);
@@ -681,13 +675,11 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
return NULL;
}
}
- addcq(server, cq_new_rpc);
m = gpr_malloc(sizeof(registered_method));
memset(m, 0, sizeof(*m));
m->method = gpr_strdup(method);
m->host = gpr_strdup(host);
m->next = server->registered_methods;
- m->cq = cq_new_rpc;
server->registered_methods = m;
return m;
}
@@ -1012,17 +1004,18 @@ static grpc_call_error queue_call_request(grpc_server *server,
}
}
-grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
- grpc_call_details *details,
- grpc_metadata_array *initial_metadata,
- grpc_completion_queue *cq_bind,
- void *tag) {
+grpc_call_error grpc_server_request_call(
+ grpc_server *server, grpc_call **call, grpc_call_details *details,
+ grpc_metadata_array *initial_metadata,
+ grpc_completion_queue *cq_bound_to_call,
+ grpc_completion_queue *cq_for_notification, void *tag) {
requested_call rc;
- grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE);
+ grpc_cq_begin_op(cq_for_notification, NULL, GRPC_OP_COMPLETE);
rc.type = BATCH_CALL;
rc.tag = tag;
- rc.data.batch.cq_bind = cq_bind;
- rc.data.batch.call = call;
+ rc.cq_bound_to_call = cq_bound_to_call;
+ rc.cq_for_notification = cq_for_notification;
+ rc.call = call;
rc.data.batch.details = details;
rc.data.batch.initial_metadata = initial_metadata;
return queue_call_request(server, &rc);
@@ -1031,14 +1024,16 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
grpc_call_error grpc_server_request_registered_call(
grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline,
grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
- grpc_completion_queue *cq_bind, void *tag) {
+ grpc_completion_queue *cq_bound_to_call,
+ grpc_completion_queue *cq_for_notification, void *tag) {
requested_call rc;
registered_method *registered_method = rm;
- grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE);
+ grpc_cq_begin_op(cq_for_notification, NULL, GRPC_OP_COMPLETE);
rc.type = REGISTERED_CALL;
rc.tag = tag;
- rc.data.registered.cq_bind = cq_bind;
- rc.data.registered.call = call;
+ rc.cq_bound_to_call = cq_bound_to_call;
+ rc.cq_for_notification = cq_for_notification;
+ rc.call = call;
rc.data.registered.registered_method = registered_method;
rc.data.registered.deadline = deadline;
rc.data.registered.initial_metadata = initial_metadata;
@@ -1076,6 +1071,9 @@ static void begin_call(grpc_server *server, call_data *calld,
fill in the metadata array passed by the client, we need to perform
an ioreq op, that should complete immediately. */
+ grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
+ *rc->call = calld->call;
+ calld->cq_new = rc->cq_for_notification;
switch (rc->type) {
case BATCH_CALL:
cpstr(&rc->data.batch.details->host,
@@ -1083,18 +1081,13 @@ static void begin_call(grpc_server *server, call_data *calld,
cpstr(&rc->data.batch.details->method,
&rc->data.batch.details->method_capacity, calld->path);
rc->data.batch.details->deadline = calld->deadline;
- grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind);
- *rc->data.batch.call = calld->call;
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.batch.initial_metadata;
r++;
- calld->cq_new = server->unregistered_cq;
publish = publish_registered_or_batch;
break;
case REGISTERED_CALL:
*rc->data.registered.deadline = calld->deadline;
- grpc_call_set_completion_queue(calld->call, rc->data.registered.cq_bind);
- *rc->data.registered.call = calld->call;
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.registered.initial_metadata;
r++;
@@ -1103,7 +1096,6 @@ static void begin_call(grpc_server *server, call_data *calld,
r->data.recv_message = rc->data.registered.optional_payload;
r++;
}
- calld->cq_new = rc->data.registered.registered_method->cq;
publish = publish_registered_or_batch;
break;
}
@@ -1114,20 +1106,17 @@ static void begin_call(grpc_server *server, call_data *calld,
}
static void fail_call(grpc_server *server, requested_call *rc) {
+ *rc->call = NULL;
switch (rc->type) {
case BATCH_CALL:
- *rc->data.batch.call = NULL;
rc->data.batch.initial_metadata->count = 0;
- grpc_cq_end_op(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL,
- GRPC_OP_ERROR);
break;
case REGISTERED_CALL:
- *rc->data.registered.call = NULL;
rc->data.registered.initial_metadata->count = 0;
- grpc_cq_end_op(rc->data.registered.registered_method->cq, rc->tag, NULL,
- do_nothing, NULL, GRPC_OP_ERROR);
break;
}
+ grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, do_nothing, NULL,
+ GRPC_OP_ERROR);
}
static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
index 2cfa38fa43..c6331033e0 100644
--- a/src/core/surface/server.h
+++ b/src/core/surface/server.h
@@ -39,8 +39,7 @@
#include "src/core/transport/transport.h"
/* Create a server */
-grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
- grpc_channel_filter **filters,
+grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
size_t filter_count,
const grpc_channel_args *args);
diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c
index f629c7c72d..b7390675ad 100644
--- a/src/core/surface/server_create.c
+++ b/src/core/surface/server_create.c
@@ -35,7 +35,6 @@
#include "src/core/surface/completion_queue.h"
#include "src/core/surface/server.h"
-grpc_server *grpc_server_create(grpc_completion_queue *cq,
- const grpc_channel_args *args) {
- return grpc_server_create_from_filters(cq, NULL, 0, args);
+grpc_server *grpc_server_create(const grpc_channel_args *args) {
+ return grpc_server_create_from_filters(NULL, 0, args);
}