From f9e6adf998ed36479ccbb8eb3cdc58b02cc161dd Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 6 May 2015 11:45:59 -0700 Subject: Completion queue binding for new requests API change Move completion queue binding for new requests to the new request request time, not server instantiation time. --- src/core/surface/server.c | 67 +++++++++++++++++----------------------- src/core/surface/server.h | 3 +- src/core/surface/server_create.c | 5 ++- 3 files changed, 31 insertions(+), 44 deletions(-) (limited to 'src/core') diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 01644b4471..96c1b7c3eb 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -74,16 +74,15 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; typedef struct { requested_call_type type; void *tag; + grpc_completion_queue *cq_bound_to_call; + grpc_completion_queue *cq_for_notification; + grpc_call **call; union { struct { - grpc_completion_queue *cq_bind; - grpc_call **call; grpc_call_details *details; grpc_metadata_array *initial_metadata; } batch; struct { - grpc_completion_queue *cq_bind; - grpc_call **call; registered_method *registered_method; gpr_timespec *deadline; grpc_metadata_array *initial_metadata; @@ -103,7 +102,6 @@ struct registered_method { char *host; call_data *pending; requested_call_array requested; - grpc_completion_queue *cq; registered_method *next; }; @@ -130,7 +128,6 @@ struct grpc_server { size_t channel_filter_count; const grpc_channel_filter **channel_filters; grpc_channel_args *channel_args; - grpc_completion_queue *unregistered_cq; grpc_completion_queue **cqs; grpc_pollset **pollsets; @@ -602,7 +599,8 @@ static const grpc_channel_filter server_surface_filter = { destroy_channel_elem, "server", }; -static void addcq(grpc_server *server, grpc_completion_queue *cq) { +void grpc_server_register_completion_queue(grpc_server *server, + grpc_completion_queue *cq) { size_t i, n; for (i = 0; i < server->cq_count; i++) { if (server->cqs[i] == cq) return; @@ -614,8 +612,7 @@ static void addcq(grpc_server *server, grpc_completion_queue *cq) { server->cqs[n] = cq; } -grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, - grpc_channel_filter **filters, +grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *args) { size_t i; @@ -626,12 +623,10 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); memset(server, 0, sizeof(grpc_server)); - if (cq) addcq(server, cq); gpr_mu_init(&server->mu); gpr_cv_init(&server->cv); - server->unregistered_cq = cq; /* decremented by grpc_server_destroy */ gpr_ref_init(&server->internal_refcount, 1); server->root_channel_data.next = server->root_channel_data.prev = @@ -667,8 +662,7 @@ static int streq(const char *a, const char *b) { } void *grpc_server_register_method(grpc_server *server, const char *method, - const char *host, - grpc_completion_queue *cq_new_rpc) { + const char *host) { registered_method *m; if (!method) { gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__); @@ -681,13 +675,11 @@ void *grpc_server_register_method(grpc_server *server, const char *method, return NULL; } } - addcq(server, cq_new_rpc); m = gpr_malloc(sizeof(registered_method)); memset(m, 0, sizeof(*m)); m->method = gpr_strdup(method); m->host = gpr_strdup(host); m->next = server->registered_methods; - m->cq = cq_new_rpc; server->registered_methods = m; return m; } @@ -1012,17 +1004,18 @@ static grpc_call_error queue_call_request(grpc_server *server, } } -grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, - grpc_call_details *details, - grpc_metadata_array *initial_metadata, - grpc_completion_queue *cq_bind, - void *tag) { +grpc_call_error grpc_server_request_call( + grpc_server *server, grpc_call **call, grpc_call_details *details, + grpc_metadata_array *initial_metadata, + grpc_completion_queue *cq_bound_to_call, + grpc_completion_queue *cq_for_notification, void *tag) { requested_call rc; - grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE); + grpc_cq_begin_op(cq_for_notification, NULL, GRPC_OP_COMPLETE); rc.type = BATCH_CALL; rc.tag = tag; - rc.data.batch.cq_bind = cq_bind; - rc.data.batch.call = call; + rc.cq_bound_to_call = cq_bound_to_call; + rc.cq_for_notification = cq_for_notification; + rc.call = call; rc.data.batch.details = details; rc.data.batch.initial_metadata = initial_metadata; return queue_call_request(server, &rc); @@ -1031,14 +1024,16 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, grpc_call_error grpc_server_request_registered_call( grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline, grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload, - grpc_completion_queue *cq_bind, void *tag) { + grpc_completion_queue *cq_bound_to_call, + grpc_completion_queue *cq_for_notification, void *tag) { requested_call rc; registered_method *registered_method = rm; - grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE); + grpc_cq_begin_op(cq_for_notification, NULL, GRPC_OP_COMPLETE); rc.type = REGISTERED_CALL; rc.tag = tag; - rc.data.registered.cq_bind = cq_bind; - rc.data.registered.call = call; + rc.cq_bound_to_call = cq_bound_to_call; + rc.cq_for_notification = cq_for_notification; + rc.call = call; rc.data.registered.registered_method = registered_method; rc.data.registered.deadline = deadline; rc.data.registered.initial_metadata = initial_metadata; @@ -1076,6 +1071,9 @@ static void begin_call(grpc_server *server, call_data *calld, fill in the metadata array passed by the client, we need to perform an ioreq op, that should complete immediately. */ + grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call); + *rc->call = calld->call; + calld->cq_new = rc->cq_for_notification; switch (rc->type) { case BATCH_CALL: cpstr(&rc->data.batch.details->host, @@ -1083,18 +1081,13 @@ static void begin_call(grpc_server *server, call_data *calld, cpstr(&rc->data.batch.details->method, &rc->data.batch.details->method_capacity, calld->path); rc->data.batch.details->deadline = calld->deadline; - grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind); - *rc->data.batch.call = calld->call; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->data.recv_metadata = rc->data.batch.initial_metadata; r++; - calld->cq_new = server->unregistered_cq; publish = publish_registered_or_batch; break; case REGISTERED_CALL: *rc->data.registered.deadline = calld->deadline; - grpc_call_set_completion_queue(calld->call, rc->data.registered.cq_bind); - *rc->data.registered.call = calld->call; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->data.recv_metadata = rc->data.registered.initial_metadata; r++; @@ -1103,7 +1096,6 @@ static void begin_call(grpc_server *server, call_data *calld, r->data.recv_message = rc->data.registered.optional_payload; r++; } - calld->cq_new = rc->data.registered.registered_method->cq; publish = publish_registered_or_batch; break; } @@ -1114,20 +1106,17 @@ static void begin_call(grpc_server *server, call_data *calld, } static void fail_call(grpc_server *server, requested_call *rc) { + *rc->call = NULL; switch (rc->type) { case BATCH_CALL: - *rc->data.batch.call = NULL; rc->data.batch.initial_metadata->count = 0; - grpc_cq_end_op(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL, - GRPC_OP_ERROR); break; case REGISTERED_CALL: - *rc->data.registered.call = NULL; rc->data.registered.initial_metadata->count = 0; - grpc_cq_end_op(rc->data.registered.registered_method->cq, rc->tag, NULL, - do_nothing, NULL, GRPC_OP_ERROR); break; } + grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, do_nothing, NULL, + GRPC_OP_ERROR); } static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, diff --git a/src/core/surface/server.h b/src/core/surface/server.h index 2cfa38fa43..c6331033e0 100644 --- a/src/core/surface/server.h +++ b/src/core/surface/server.h @@ -39,8 +39,7 @@ #include "src/core/transport/transport.h" /* Create a server */ -grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, - grpc_channel_filter **filters, +grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *args); diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c index f629c7c72d..b7390675ad 100644 --- a/src/core/surface/server_create.c +++ b/src/core/surface/server_create.c @@ -35,7 +35,6 @@ #include "src/core/surface/completion_queue.h" #include "src/core/surface/server.h" -grpc_server *grpc_server_create(grpc_completion_queue *cq, - const grpc_channel_args *args) { - return grpc_server_create_from_filters(cq, NULL, 0, args); +grpc_server *grpc_server_create(const grpc_channel_args *args) { + return grpc_server_create_from_filters(NULL, 0, args); } -- cgit v1.2.3