diff options
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/call.c | 4 | ||||
-rw-r--r-- | src/core/surface/call.h | 1 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 6 | ||||
-rw-r--r-- | src/core/surface/server.c | 628 | ||||
-rw-r--r-- | src/core/surface/server.h | 2 | ||||
-rw-r--r-- | src/core/surface/server_chttp2.c | 4 |
6 files changed, 449 insertions, 196 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 58a2436937..743ef0c65b 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -258,6 +258,10 @@ void grpc_call_set_completion_queue(grpc_call *call, call->cq = cq; } +grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) { + return call->cq; +} + void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); } static void destroy_call(void *call, int ignored_success) { diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 05014c631c..55e434433d 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -89,6 +89,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, const void *server_transport_data); void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq); +grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call); void grpc_call_internal_ref(grpc_call *call); void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion); diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 411dbabfd3..a8fdeed87f 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -47,6 +47,7 @@ typedef struct { } call_data; typedef struct { + grpc_mdelem *status; grpc_mdelem *message; } channel_data; @@ -57,6 +58,7 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, switch (op->type) { case GRPC_SEND_START: + grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->status)); grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->message)); grpc_call_stream_closed(elem); break; @@ -93,18 +95,22 @@ static void init_channel_elem(grpc_channel_element *elem, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { channel_data *channeld = elem->channel_data; + char status[12]; GPR_ASSERT(is_first); GPR_ASSERT(is_last); channeld->message = grpc_mdelem_from_strings(mdctx, "grpc-message", "Rpc sent on a lame channel."); + gpr_ltoa(GRPC_STATUS_UNKNOWN, status); + channeld->status = grpc_mdelem_from_strings(mdctx, "grpc-status", status); } static void destroy_channel_elem(grpc_channel_element *elem) { channel_data *channeld = elem->channel_data; grpc_mdelem_unref(channeld->message); + grpc_mdelem_unref(channeld->status); } static const grpc_channel_filter lame_filter = { diff --git a/src/core/surface/server.c b/src/core/surface/server.c index ee0f96a580..22588194ea 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -53,13 +53,63 @@ typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list; typedef struct listener { void *arg; - void (*start)(grpc_server *server, void *arg, grpc_pollset *pollset); + void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets, size_t pollset_count); void (*destroy)(grpc_server *server, void *arg); struct listener *next; } listener; typedef struct call_data call_data; typedef struct channel_data channel_data; +typedef struct registered_method registered_method; + +typedef struct { + call_data *next; + call_data *prev; +} call_link; + +typedef enum { LEGACY_CALL, BATCH_CALL, REGISTERED_CALL } requested_call_type; + +typedef struct { + requested_call_type type; + void *tag; + union { + struct { + grpc_completion_queue *cq_bind; + grpc_call **call; + grpc_call_details *details; + grpc_metadata_array *initial_metadata; + } batch; + struct { + grpc_completion_queue *cq_bind; + grpc_call **call; + registered_method *registered_method; + gpr_timespec *deadline; + grpc_metadata_array *initial_metadata; + grpc_byte_buffer **optional_payload; + } registered; + } data; +} requested_call; + +typedef struct { + requested_call *calls; + size_t count; + size_t capacity; +} requested_call_array; + +struct registered_method { + char *method; + char *host; + call_data *pending; + requested_call_array requested; + grpc_completion_queue *cq; + registered_method *next; +}; + +typedef struct channel_registered_method { + registered_method *server_registered_method; + grpc_mdstr *method; + grpc_mdstr *host; +} channel_registered_method; struct channel_data { grpc_server *server; @@ -69,33 +119,25 @@ struct channel_data { /* linked list of all channels on a server */ channel_data *next; channel_data *prev; + channel_registered_method *registered_methods; + gpr_uint32 registered_method_slots; + gpr_uint32 registered_method_max_probes; }; -typedef void (*new_call_cb)(grpc_server *server, grpc_completion_queue *cq, - grpc_call **call, grpc_call_details *details, - grpc_metadata_array *initial_metadata, - call_data *calld, void *user_data); - -typedef struct { - void *user_data; - grpc_completion_queue *cq; - grpc_call **call; - grpc_call_details *details; - grpc_metadata_array *initial_metadata; - new_call_cb cb; -} requested_call; - struct grpc_server { size_t channel_filter_count; const grpc_channel_filter **channel_filters; grpc_channel_args *channel_args; - grpc_completion_queue *cq; + grpc_completion_queue *unregistered_cq; + + grpc_completion_queue **cqs; + grpc_pollset **pollsets; + size_t cq_count; gpr_mu mu; - requested_call *requested_calls; - size_t requested_call_count; - size_t requested_call_capacity; + registered_method *registered_methods; + requested_call_array requested_calls; gpr_uint8 shutdown; gpr_uint8 have_shutdown_tag; @@ -108,11 +150,6 @@ struct grpc_server { gpr_refcount internal_refcount; }; -typedef struct { - call_data *next; - call_data *prev; -} call_link; - typedef enum { /* waiting for metadata */ NOT_STARTED, @@ -125,7 +162,7 @@ typedef enum { } call_state; typedef struct legacy_data { - grpc_metadata_array *initial_metadata; + grpc_metadata_array initial_metadata; } legacy_data; struct call_data { @@ -137,9 +174,9 @@ struct call_data { grpc_mdstr *host; legacy_data *legacy; - grpc_call_details *details; + grpc_completion_queue *cq_new; - gpr_uint8 included[CALL_LIST_COUNT]; + call_data **root[CALL_LIST_COUNT]; call_link links[CALL_LIST_COUNT]; }; @@ -148,30 +185,33 @@ struct call_data { static void do_nothing(void *unused, grpc_op_error ignored) {} -static int call_list_join(grpc_server *server, call_data *call, - call_list list) { - if (call->included[list]) return 0; - call->included[list] = 1; - if (!server->lists[list]) { - server->lists[list] = call; +static void begin_call(grpc_server *server, call_data *calld, + requested_call *rc); +static void fail_call(grpc_server *server, requested_call *rc); + +static int call_list_join(call_data **root, call_data *call, call_list list) { + GPR_ASSERT(!call->root[list]); + call->root[list] = root; + if (!*root) { + *root = call; call->links[list].next = call->links[list].prev = call; } else { - call->links[list].next = server->lists[list]; - call->links[list].prev = server->lists[list]->links[list].prev; + call->links[list].next = *root; + call->links[list].prev = (*root)->links[list].prev; call->links[list].next->links[list].prev = call->links[list].prev->links[list].next = call; } return 1; } -static call_data *call_list_remove_head(grpc_server *server, call_list list) { - call_data *out = server->lists[list]; +static call_data *call_list_remove_head(call_data **root, call_list list) { + call_data *out = *root; if (out) { - out->included[list] = 0; + out->root[list] = NULL; if (out->links[list].next == out) { - server->lists[list] = NULL; + *root = NULL; } else { - server->lists[list] = out->links[list].next; + *root = out->links[list].next; out->links[list].next->links[list].prev = out->links[list].prev; out->links[list].prev->links[list].next = out->links[list].next; } @@ -179,23 +219,39 @@ static call_data *call_list_remove_head(grpc_server *server, call_list list) { return out; } -static int call_list_remove(grpc_server *server, call_data *call, - call_list list) { - if (!call->included[list]) return 0; - call->included[list] = 0; - if (server->lists[list] == call) { - server->lists[list] = call->links[list].next; - if (server->lists[list] == call) { - server->lists[list] = NULL; +static int call_list_remove(call_data *call, call_list list) { + call_data **root = call->root[list]; + if (root == NULL) return 0; + call->root[list] = NULL; + if (*root == call) { + *root = call->links[list].next; + if (*root == call) { + *root = NULL; return 1; } } - GPR_ASSERT(server->lists[list] != call); + GPR_ASSERT(*root != call); call->links[list].next->links[list].prev = call->links[list].prev; call->links[list].prev->links[list].next = call->links[list].next; return 1; } +static void requested_call_array_destroy(requested_call_array *array) { + gpr_free(array->calls); +} + +static requested_call *requested_call_array_add(requested_call_array *array) { + requested_call *rc; + if (array->count == array->capacity) { + array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2); + array->calls = + gpr_realloc(array->calls, sizeof(requested_call) * array->capacity); + } + rc = &array->calls[array->count++]; + memset(rc, 0, sizeof(*rc)); + return rc; +} + static void server_ref(grpc_server *server) { gpr_ref(&server->internal_refcount); } @@ -205,7 +261,7 @@ static void server_unref(grpc_server *server) { grpc_channel_args_destroy(server->channel_args); gpr_mu_destroy(&server->mu); gpr_free(server->channel_filters); - gpr_free(server->requested_calls); + requested_call_array_destroy(&server->requested_calls); gpr_free(server); } } @@ -223,7 +279,6 @@ static void orphan_channel(channel_data *chand) { static void finish_destroy_channel(void *cd, int success) { channel_data *chand = cd; grpc_server *server = chand->server; - /*gpr_log(GPR_INFO, "destroy channel %p", chand->channel);*/ grpc_channel_destroy(chand->channel); server_unref(server); } @@ -236,23 +291,63 @@ static void destroy_channel(channel_data *chand) { grpc_iomgr_add_callback(finish_destroy_channel, chand); } +static void finish_start_new_rpc_and_unlock(grpc_server *server, + grpc_call_element *elem, + call_data **pending_root, + requested_call_array *array) { + requested_call rc; + call_data *calld = elem->call_data; + if (array->count == 0) { + calld->state = PENDING; + call_list_join(pending_root, calld, PENDING_START); + gpr_mu_unlock(&server->mu); + } else { + rc = array->calls[--array->count]; + calld->state = ACTIVATED; + gpr_mu_unlock(&server->mu); + begin_call(server, calld, &rc); + } +} + static void start_new_rpc(grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; grpc_server *server = chand->server; + gpr_uint32 i; + gpr_uint32 hash; + channel_registered_method *rm; gpr_mu_lock(&server->mu); - if (server->requested_call_count > 0) { - requested_call rc = server->requested_calls[--server->requested_call_count]; - calld->state = ACTIVATED; - gpr_mu_unlock(&server->mu); - rc.cb(server, rc.cq, rc.call, rc.details, rc.initial_metadata, calld, - rc.user_data); - } else { - calld->state = PENDING; - call_list_join(server, calld, PENDING_START); - gpr_mu_unlock(&server->mu); + if (chand->registered_methods && calld->path && calld->host) { + /* check for an exact match with host */ + hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash); + for (i = 0; i < chand->registered_method_max_probes; i++) { + rm = &chand->registered_methods[(hash + i) % + chand->registered_method_slots]; + if (!rm) break; + if (rm->host != calld->host) continue; + if (rm->method != calld->path) continue; + finish_start_new_rpc_and_unlock(server, elem, + &rm->server_registered_method->pending, + &rm->server_registered_method->requested); + return; + } + /* check for a wildcard method definition (no host set) */ + hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash); + for (i = 0; i <= chand->registered_method_max_probes; i++) { + rm = &chand->registered_methods[(hash + i) % + chand->registered_method_slots]; + if (!rm) break; + if (rm->host != NULL) continue; + if (rm->method != calld->path) continue; + finish_start_new_rpc_and_unlock(server, elem, + &rm->server_registered_method->pending, + &rm->server_registered_method->requested); + return; + } } + finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START], + &server->requested_calls); } static void kill_zombie(void *elem, int success) { @@ -267,7 +362,7 @@ static void stream_closed(grpc_call_element *elem) { case ACTIVATED: break; case PENDING: - call_list_remove(chand->server, calld, PENDING_START); + call_list_remove(calld, PENDING_START); /* fallthrough intended */ case NOT_STARTED: calld->state = ZOMBIED; @@ -398,7 +493,7 @@ static void init_call_elem(grpc_call_element *elem, calld->call = grpc_call_from_top_element(elem); gpr_mu_lock(&chand->server->mu); - call_list_join(chand->server, calld, ALL_CALLS); + call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS); gpr_mu_unlock(&chand->server->mu); server_ref(chand->server); @@ -407,15 +502,17 @@ static void init_call_elem(grpc_call_element *elem, static void destroy_call_elem(grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - int i; + size_t i; gpr_mu_lock(&chand->server->mu); for (i = 0; i < CALL_LIST_COUNT; i++) { - call_list_remove(chand->server, elem->call_data, i); + call_list_remove(elem->call_data, i); } if (chand->server->shutdown && chand->server->have_shutdown_tag && chand->server->lists[ALL_CALLS] == NULL) { - grpc_cq_end_server_shutdown(chand->server->cq, chand->server->shutdown_tag); + for (i = 0; i < chand->server->cq_count; i++) { + grpc_cq_end_server_shutdown(chand->server->cqs[i], chand->server->shutdown_tag); + } } gpr_mu_unlock(&chand->server->mu); @@ -427,8 +524,7 @@ static void destroy_call_elem(grpc_call_element *elem) { } if (calld->legacy) { - gpr_free(calld->legacy->initial_metadata->metadata); - gpr_free(calld->legacy->initial_metadata); + gpr_free(calld->legacy->initial_metadata.metadata); gpr_free(calld->legacy); } @@ -447,6 +543,7 @@ static void init_channel_elem(grpc_channel_element *elem, chand->path_key = grpc_mdstr_from_string(metadata_context, ":path"); chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority"); chand->next = chand->prev = chand; + chand->registered_methods = NULL; } static void destroy_channel_elem(grpc_channel_element *elem) { @@ -464,11 +561,20 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } static const grpc_channel_filter server_surface_filter = { - call_op, channel_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "server", + call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server", }; +static void addcq(grpc_server *server, grpc_completion_queue *cq) { + size_t i, n; + for (i = 0; i < server->cq_count; i++) { + if (server->cqs[i] == cq) return; + } + n = server->cq_count++; + server->cqs = gpr_realloc(server->cqs, server->cq_count * sizeof(grpc_completion_queue*)); + server->cqs[n] = cq; +} + grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, grpc_channel_filter **filters, size_t filter_count, @@ -478,10 +584,11 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, grpc_server *server = gpr_malloc(sizeof(grpc_server)); memset(server, 0, sizeof(grpc_server)); + if (cq) addcq(server, cq); gpr_mu_init(&server->mu); - server->cq = cq; + server->unregistered_cq = cq; /* decremented by grpc_server_destroy */ gpr_ref_init(&server->internal_refcount, 1); server->root_channel_data.next = server->root_channel_data.prev = @@ -509,11 +616,49 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, return server; } +static int streq(const char *a, const char *b) { + if (a == NULL && b == NULL) return 1; + if (a == NULL) return 0; + if (b == NULL) return 0; + return 0 == strcmp(a, b); +} + +void *grpc_server_register_method(grpc_server *server, const char *method, + const char *host, grpc_completion_queue *cq_new_rpc) { + registered_method *m; + if (!method) { + gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__); + return NULL; + } + for (m = server->registered_methods; m; m = m->next) { + if (streq(m->method, method) && streq(m->host, host)) { + gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method, + host ? host : "*"); + return NULL; + } + } + addcq(server, cq_new_rpc); + m = gpr_malloc(sizeof(registered_method)); + memset(m, 0, sizeof(*m)); + m->method = gpr_strdup(method); + m->host = gpr_strdup(host); + m->next = server->registered_methods; + m->cq = cq_new_rpc; + server->registered_methods = m; + return m; +} + void grpc_server_start(grpc_server *server) { listener *l; + size_t i; + + server->pollsets = gpr_malloc(sizeof(grpc_pollset*) * server->cq_count); + for (i = 0; i < server->cq_count; i++) { + server->pollsets[i] = grpc_cq_pollset(server->cqs[i]); + } for (l = server->listeners; l; l = l->next) { - l->start(server, l->arg, grpc_cq_pollset(server->cq)); + l->start(server, l->arg, server->pollsets, server->cq_count); } } @@ -525,8 +670,18 @@ grpc_transport_setup_result grpc_server_setup_transport( grpc_channel_filter const **filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_filters); size_t i; + size_t num_registered_methods; + size_t alloc; + registered_method *rm; + channel_registered_method *crm; grpc_channel *channel; channel_data *chand; + grpc_mdstr *host; + grpc_mdstr *method; + gpr_uint32 hash; + gpr_uint32 slots; + gpr_uint32 probes; + gpr_uint32 max_probes = 0; for (i = 0; i < s->channel_filter_count; i++) { filters[i] = s->channel_filters[i]; @@ -536,7 +691,9 @@ grpc_transport_setup_result grpc_server_setup_transport( } filters[i] = &grpc_connected_channel_filter; - grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cq)); + for (i = 0; i < s->cq_count; i++) { + grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cqs[i])); + } channel = grpc_channel_create_from_filters(filters, num_filters, s->channel_args, mdctx, 0); @@ -546,6 +703,35 @@ grpc_transport_setup_result grpc_server_setup_transport( server_ref(s); chand->channel = channel; + num_registered_methods = 0; + for (rm = s->registered_methods; rm; rm = rm->next) { + num_registered_methods++; + } + /* build a lookup table phrased in terms of mdstr's in this channels context + to quickly find registered methods */ + if (num_registered_methods > 0) { + slots = 2 * num_registered_methods; + alloc = sizeof(channel_registered_method) * slots; + chand->registered_methods = gpr_malloc(alloc); + memset(chand->registered_methods, 0, alloc); + for (rm = s->registered_methods; rm; rm = rm->next) { + host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL; + method = grpc_mdstr_from_string(mdctx, rm->method); + hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash); + for (probes = 0; chand->registered_methods[(hash + probes) % slots] + .server_registered_method != NULL; + probes++) + ; + if (probes > max_probes) max_probes = probes; + crm = &chand->registered_methods[(hash + probes) % slots]; + crm->server_registered_method = rm; + crm->host = host; + crm->method = method; + } + chand->registered_method_slots = slots; + chand->registered_method_max_probes = max_probes; + } + gpr_mu_lock(&s->mu); chand->next = &s->root_channel_data; chand->prev = chand->next->prev; @@ -558,17 +744,17 @@ grpc_transport_setup_result grpc_server_setup_transport( grpc_channel_get_channel_stack(channel), transport); } -void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, +static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, void *shutdown_tag) { listener *l; - requested_call *requested_calls; - size_t requested_call_count; + requested_call_array requested_calls; channel_data **channels; channel_data *c; size_t nchannels; size_t i; grpc_channel_op op; grpc_channel_element *elem; + registered_method *rm; /* lock, and gather up some stuff to do */ gpr_mu_lock(&server->mu); @@ -591,18 +777,28 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, i++; } + /* collect all unregistered then registered calls */ requested_calls = server->requested_calls; - requested_call_count = server->requested_call_count; - server->requested_calls = NULL; - server->requested_call_count = 0; + memset(&server->requested_calls, 0, sizeof(server->requested_calls)); + for (rm = server->registered_methods; rm; rm = rm->next) { + if (requested_calls.count + rm->requested.count > requested_calls.capacity) { + requested_calls.capacity = GPR_MAX(requested_calls.count + rm->requested.count, 2 * requested_calls.capacity); + requested_calls.calls = gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) * requested_calls.capacity); + } + memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls, sizeof(*requested_calls.calls) * rm->requested.count); + requested_calls.count += rm->requested.count; + memset(&rm->requested, 0, sizeof(rm->requested)); + } server->shutdown = 1; server->have_shutdown_tag = have_shutdown_tag; server->shutdown_tag = shutdown_tag; if (have_shutdown_tag) { - grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_SHUTDOWN); - if (server->lists[ALL_CALLS] == NULL) { - grpc_cq_end_server_shutdown(server->cq, shutdown_tag); + for (i = 0; i < server->cq_count; i++) { + grpc_cq_begin_op(server->cqs[i], NULL, GRPC_SERVER_SHUTDOWN); + if (server->lists[ALL_CALLS] == NULL) { + grpc_cq_end_server_shutdown(server->cqs[i], shutdown_tag); + } } } gpr_mu_unlock(&server->mu); @@ -623,13 +819,10 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, gpr_free(channels); /* terminate all the requested calls */ - for (i = 0; i < requested_call_count; i++) { - requested_calls[i].cb(server, requested_calls[i].cq, - requested_calls[i].call, requested_calls[i].details, - requested_calls[i].initial_metadata, NULL, - requested_calls[i].user_data); + for (i = 0; i < requested_calls.count; i++) { + fail_call(server, &requested_calls.calls[i]); } - gpr_free(requested_calls); + gpr_free(requested_calls.calls); /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { @@ -664,7 +857,7 @@ void grpc_server_destroy(grpc_server *server) { void grpc_server_add_listener(grpc_server *server, void *arg, void (*start)(grpc_server *server, void *arg, - grpc_pollset *pollset), + grpc_pollset **pollsets, size_t pollset_count), void (*destroy)(grpc_server *server, void *arg)) { listener *l = gpr_malloc(sizeof(listener)); l->arg = arg; @@ -675,47 +868,93 @@ void grpc_server_add_listener(grpc_server *server, void *arg, } static grpc_call_error queue_call_request(grpc_server *server, - grpc_completion_queue *cq, - grpc_call **call, - grpc_call_details *details, - grpc_metadata_array *initial_metadata, - new_call_cb cb, void *user_data) { - call_data *calld; - requested_call *rc; + requested_call *rc) { + call_data *calld = NULL; + requested_call_array *requested_calls = NULL; gpr_mu_lock(&server->mu); if (server->shutdown) { gpr_mu_unlock(&server->mu); - cb(server, cq, call, details, initial_metadata, NULL, user_data); + fail_call(server, rc); return GRPC_CALL_OK; } - calld = call_list_remove_head(server, PENDING_START); + switch (rc->type) { + case LEGACY_CALL: + case BATCH_CALL: + calld = + call_list_remove_head(&server->lists[PENDING_START], PENDING_START); + requested_calls = &server->requested_calls; + break; + case REGISTERED_CALL: + calld = call_list_remove_head( + &rc->data.registered.registered_method->pending, PENDING_START); + requested_calls = &rc->data.registered.registered_method->requested; + break; + } if (calld) { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&server->mu); - cb(server, cq, call, details, initial_metadata, calld, user_data); + begin_call(server, calld, rc); return GRPC_CALL_OK; } else { - if (server->requested_call_count == server->requested_call_capacity) { - server->requested_call_capacity = - GPR_MAX(server->requested_call_capacity + 8, - server->requested_call_capacity * 2); - server->requested_calls = - gpr_realloc(server->requested_calls, - sizeof(requested_call) * server->requested_call_capacity); - } - rc = &server->requested_calls[server->requested_call_count++]; - rc->cb = cb; - rc->cq = cq; - rc->call = call; - rc->details = details; - rc->user_data = user_data; - rc->initial_metadata = initial_metadata; + *requested_call_array_add(requested_calls) = *rc; gpr_mu_unlock(&server->mu); return GRPC_CALL_OK; } } +grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, + grpc_call_details *details, + grpc_metadata_array *initial_metadata, + grpc_completion_queue *cq_bind, + void *tag) { + requested_call rc; + grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE); + rc.type = BATCH_CALL; + rc.tag = tag; + rc.data.batch.cq_bind = cq_bind; + rc.data.batch.call = call; + rc.data.batch.details = details; + rc.data.batch.initial_metadata = initial_metadata; + return queue_call_request(server, &rc); +} + +grpc_call_error grpc_server_request_registered_call( + grpc_server *server, void *rm, grpc_call **call, + gpr_timespec *deadline, grpc_metadata_array *initial_metadata, + grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bind, + void *tag) { + requested_call rc; + registered_method *registered_method = rm; + grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE); + rc.type = REGISTERED_CALL; + rc.tag = tag; + rc.data.registered.cq_bind = cq_bind; + rc.data.registered.call = call; + rc.data.registered.registered_method = registered_method; + rc.data.registered.deadline = deadline; + rc.data.registered.initial_metadata = initial_metadata; + rc.data.registered.optional_payload = optional_payload; + return queue_call_request(server, &rc); +} + +grpc_call_error grpc_server_request_call_old(grpc_server *server, + void *tag_new) { + requested_call rc; + grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_SERVER_RPC_NEW); + rc.type = LEGACY_CALL; + rc.tag = tag_new; + return queue_call_request(server, &rc); +} + +static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag); +static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, + void *tag); +static void publish_was_not_set(grpc_call *call, grpc_op_error status, + void *tag) { + abort(); +} + static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { gpr_slice slice = value->slice; size_t len = GPR_SLICE_LENGTH(slice); @@ -727,57 +966,84 @@ static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1); } -static void publish_request(grpc_call *call, grpc_op_error status, void *tag) { - grpc_call_element *elem = - grpc_call_stack_element(grpc_call_get_call_stack(call), 0); - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - grpc_server *server = chand->server; - - if (status == GRPC_OP_OK) { - cpstr(&calld->details->host, &calld->details->host_capacity, calld->host); - cpstr(&calld->details->method, &calld->details->method_capacity, - calld->path); - calld->details->deadline = calld->deadline; - grpc_cq_end_op_complete(server->cq, tag, call, do_nothing, NULL, - GRPC_OP_OK); - } else { - abort(); +static void begin_call(grpc_server *server, call_data *calld, + requested_call *rc) { + grpc_ioreq_completion_func publish = publish_was_not_set; + grpc_ioreq req[2]; + grpc_ioreq *r = req; + + /* called once initial metadata has been read by the call, but BEFORE + the ioreq to fetch it out of the call has been executed. + This means metadata related fields can be relied on in calld, but to + fill in the metadata array passed by the client, we need to perform + an ioreq op, that should complete immediately. */ + + switch (rc->type) { + case LEGACY_CALL: + calld->legacy = gpr_malloc(sizeof(legacy_data)); + memset(calld->legacy, 0, sizeof(legacy_data)); + r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; + r->data.recv_metadata = &calld->legacy->initial_metadata; + r++; + publish = publish_legacy; + break; + case BATCH_CALL: + cpstr(&rc->data.batch.details->host, + &rc->data.batch.details->host_capacity, calld->host); + cpstr(&rc->data.batch.details->method, + &rc->data.batch.details->method_capacity, calld->path); + grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind); + *rc->data.batch.call = calld->call; + r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; + r->data.recv_metadata = rc->data.batch.initial_metadata; + r++; + calld->cq_new = server->unregistered_cq; + publish = publish_registered_or_batch; + break; + case REGISTERED_CALL: + *rc->data.registered.deadline = calld->deadline; + grpc_call_set_completion_queue(calld->call, rc->data.registered.cq_bind); + *rc->data.registered.call = calld->call; + r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; + r->data.recv_metadata = rc->data.registered.initial_metadata; + r++; + if (rc->data.registered.optional_payload) { + r->op = GRPC_IOREQ_RECV_MESSAGE; + r->data.recv_message = rc->data.registered.optional_payload; + r++; + } + calld->cq_new = rc->data.registered.registered_method->cq; + publish = publish_registered_or_batch; + break; } -} -static void begin_request(grpc_server *server, grpc_completion_queue *cq, - grpc_call **call, grpc_call_details *details, - grpc_metadata_array *initial_metadata, - call_data *calld, void *tag) { - grpc_ioreq req; - if (!calld) { - *call = NULL; - initial_metadata->count = 0; - grpc_cq_end_op_complete(cq, tag, NULL, do_nothing, NULL, GRPC_OP_ERROR); - return; - } - calld->details = details; - grpc_call_set_completion_queue(calld->call, cq); - *call = calld->call; - req.op = GRPC_IOREQ_RECV_INITIAL_METADATA; - req.data.recv_metadata = initial_metadata; grpc_call_internal_ref(calld->call); - grpc_call_start_ioreq_and_call_back(calld->call, &req, 1, publish_request, - tag); + grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, + rc->tag); } -grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, - grpc_call_details *details, - grpc_metadata_array *initial_metadata, - grpc_completion_queue *cq, void *tag) { - grpc_cq_begin_op(cq, NULL, GRPC_OP_COMPLETE); - return queue_call_request(server, cq, call, details, initial_metadata, - begin_request, tag); +static void fail_call(grpc_server *server, requested_call *rc) { + switch (rc->type) { + case LEGACY_CALL: + grpc_cq_end_new_rpc(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL, NULL, + NULL, gpr_inf_past, 0, NULL); + break; + case BATCH_CALL: + *rc->data.batch.call = NULL; + rc->data.batch.initial_metadata->count = 0; + grpc_cq_end_op_complete(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL, + GRPC_OP_ERROR); + break; + case REGISTERED_CALL: + *rc->data.registered.call = NULL; + rc->data.registered.initial_metadata->count = 0; + grpc_cq_end_op_complete(rc->data.registered.registered_method->cq, rc->tag, NULL, do_nothing, NULL, + GRPC_OP_ERROR); + break; + } } -static void publish_legacy_request(grpc_call *call, grpc_op_error status, - void *tag) { +static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag) { grpc_call_element *elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); call_data *calld = elem->call_data; @@ -785,47 +1051,23 @@ static void publish_legacy_request(grpc_call *call, grpc_op_error status, grpc_server *server = chand->server; if (status == GRPC_OP_OK) { - grpc_cq_end_new_rpc(server->cq, tag, call, do_nothing, NULL, + grpc_cq_end_new_rpc(server->unregistered_cq, tag, call, do_nothing, NULL, grpc_mdstr_as_c_string(calld->path), grpc_mdstr_as_c_string(calld->host), calld->deadline, - calld->legacy->initial_metadata->count, - calld->legacy->initial_metadata->metadata); + calld->legacy->initial_metadata.count, + calld->legacy->initial_metadata.metadata); } else { + gpr_log(GPR_ERROR, "should never reach here"); abort(); } } -static void begin_legacy_request(grpc_server *server, grpc_completion_queue *cq, - grpc_call **call, grpc_call_details *details, - grpc_metadata_array *initial_metadata, - call_data *calld, void *tag) { - grpc_ioreq req; - GPR_ASSERT(call == NULL); - GPR_ASSERT(details == NULL); - if (!calld) { - gpr_free(initial_metadata); - grpc_cq_end_new_rpc(cq, tag, NULL, do_nothing, NULL, NULL, NULL, - gpr_inf_past, 0, NULL); - return; - } - req.op = GRPC_IOREQ_RECV_INITIAL_METADATA; - req.data.recv_metadata = initial_metadata; - calld->legacy = gpr_malloc(sizeof(legacy_data)); - memset(calld->legacy, 0, sizeof(legacy_data)); - calld->legacy->initial_metadata = initial_metadata; - grpc_call_internal_ref(calld->call); - grpc_call_start_ioreq_and_call_back(calld->call, &req, 1, - publish_legacy_request, tag); -} - -grpc_call_error grpc_server_request_call_old(grpc_server *server, - void *tag_new) { - grpc_metadata_array *client_metadata = - gpr_malloc(sizeof(grpc_metadata_array)); - memset(client_metadata, 0, sizeof(*client_metadata)); - grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW); - return queue_call_request(server, server->cq, NULL, NULL, client_metadata, - begin_legacy_request, tag_new); +static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, + void *tag) { + grpc_call_element *elem = + grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + call_data *calld = elem->call_data; + grpc_cq_end_op_complete(calld->cq_new, tag, call, do_nothing, NULL, status); } const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { diff --git a/src/core/surface/server.h b/src/core/surface/server.h index 50574d66a4..c8861f420d 100644 --- a/src/core/surface/server.h +++ b/src/core/surface/server.h @@ -48,7 +48,7 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, and when it shuts down, it will call destroy */ void grpc_server_add_listener(grpc_server *server, void *listener, void (*start)(grpc_server *server, void *arg, - grpc_pollset *pollset), + grpc_pollset **pollsets, size_t npollsets), void (*destroy)(grpc_server *server, void *arg)); /* Setup a transport - creates a channel stack, binds the transport to the diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index 5ba7d47efd..3b6abb7d03 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -59,9 +59,9 @@ static void new_transport(void *server, grpc_endpoint *tcp) { } /* Server callback: start listening on our ports */ -static void start(grpc_server *server, void *tcpp, grpc_pollset *pollset) { +static void start(grpc_server *server, void *tcpp, grpc_pollset **pollsets, size_t pollset_count) { grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_start(tcp, pollset, new_transport, server); + grpc_tcp_server_start(tcp, pollsets, pollset_count, new_transport, server); } /* Server callback: destroy the tcp listener (so we don't generate further |