diff options
author | kpayson64 <kpayson@google.com> | 2017-11-07 13:32:41 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-11-07 13:32:41 -0800 |
commit | c03867ff224a98dab5a93b3ba70b95c46f05a440 (patch) | |
tree | e5ec75cf6c7b1b36159eb2fc73ecf216207e91bb | |
parent | 4c8f8fe1f1522f2f86e3648358e05fdac72888bb (diff) | |
parent | e1533572d517df2e660cf57556d296b59ab89cfc (diff) |
Merge pull request #12648 from kpayson64/attempt_2
Add back mpscq request matcher implementation
-rw-r--r-- | include/grpc++/server_builder.h | 5 | ||||
-rw-r--r-- | src/core/lib/support/mpscq.cc | 37 | ||||
-rw-r--r-- | src/core/lib/support/mpscq.h | 30 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.cc | 2 | ||||
-rw-r--r-- | src/core/lib/surface/server.cc | 181 |
5 files changed, 130 insertions, 125 deletions
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 0888bef0d9..bf842baf6f 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -202,7 +202,10 @@ class ServerBuilder { struct SyncServerSettings { SyncServerSettings() - : num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {} + : num_cqs(GPR_MAX(1, gpr_cpu_num_cores())), + min_pollers(1), + max_pollers(2), + cq_timeout_msec(10000) {} /// Number of server completion queues to create to listen to incoming RPCs. int num_cqs; diff --git a/src/core/lib/support/mpscq.cc b/src/core/lib/support/mpscq.cc index db25f24264..b270777d5c 100644 --- a/src/core/lib/support/mpscq.cc +++ b/src/core/lib/support/mpscq.cc @@ -31,11 +31,12 @@ void gpr_mpscq_destroy(gpr_mpscq* q) { GPR_ASSERT(q->tail == &q->stub); } -void gpr_mpscq_push(gpr_mpscq* q, gpr_mpscq_node* n) { +bool gpr_mpscq_push(gpr_mpscq* q, gpr_mpscq_node* n) { gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL); gpr_mpscq_node* prev = (gpr_mpscq_node*)gpr_atm_full_xchg(&q->head, (gpr_atm)n); gpr_atm_rel_store(&prev->next, (gpr_atm)n); + return prev == &q->stub; } gpr_mpscq_node* gpr_mpscq_pop(gpr_mpscq* q) { @@ -77,3 +78,37 @@ gpr_mpscq_node* gpr_mpscq_pop_and_check_end(gpr_mpscq* q, bool* empty) { *empty = false; return NULL; } + +void gpr_locked_mpscq_init(gpr_locked_mpscq* q) { + gpr_mpscq_init(&q->queue); + gpr_mu_init(&q->mu); +} + +void gpr_locked_mpscq_destroy(gpr_locked_mpscq* q) { + gpr_mpscq_destroy(&q->queue); + gpr_mu_destroy(&q->mu); +} + +bool gpr_locked_mpscq_push(gpr_locked_mpscq* q, gpr_mpscq_node* n) { + return gpr_mpscq_push(&q->queue, n); +} + +gpr_mpscq_node* gpr_locked_mpscq_try_pop(gpr_locked_mpscq* q) { + if (gpr_mu_trylock(&q->mu)) { + gpr_mpscq_node* n = gpr_mpscq_pop(&q->queue); + gpr_mu_unlock(&q->mu); + return n; + } + return NULL; +} + +gpr_mpscq_node* gpr_locked_mpscq_pop(gpr_locked_mpscq* q) { + gpr_mu_lock(&q->mu); + bool empty = false; + gpr_mpscq_node* n; + do { + n = gpr_mpscq_pop_and_check_end(&q->queue, &empty); + } while (n == NULL && !empty); + gpr_mu_unlock(&q->mu); + return n; +} diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h index 1cc9d89feb..fb22742050 100644 --- a/src/core/lib/support/mpscq.h +++ b/src/core/lib/support/mpscq.h @@ -20,6 +20,7 @@ #define GRPC_CORE_LIB_SUPPORT_MPSCQ_H #include <grpc/support/atm.h> +#include <grpc/support/sync.h> #include <stdbool.h> #include <stddef.h> @@ -49,13 +50,40 @@ typedef struct gpr_mpscq { void gpr_mpscq_init(gpr_mpscq* q); void gpr_mpscq_destroy(gpr_mpscq* q); // Push a node -void gpr_mpscq_push(gpr_mpscq* q, gpr_mpscq_node* n); +// Thread safe - can be called from multiple threads concurrently +// Returns true if this was possibly the first node (may return true +// sporadically, will not return false sporadically) +bool gpr_mpscq_push(gpr_mpscq* q, gpr_mpscq_node* n); // Pop a node (returns NULL if no node is ready - which doesn't indicate that // the queue is empty!!) +// Thread compatible - can only be called from one thread at a time gpr_mpscq_node* gpr_mpscq_pop(gpr_mpscq* q); // Pop a node; sets *empty to true if the queue is empty, or false if it is not gpr_mpscq_node* gpr_mpscq_pop_and_check_end(gpr_mpscq* q, bool* empty); +// An mpscq with a lock: it's safe to pop from multiple threads, but doing +// only one thread will succeed concurrently +typedef struct gpr_locked_mpscq { + gpr_mpscq queue; + gpr_mu mu; +} gpr_locked_mpscq; + +void gpr_locked_mpscq_init(gpr_locked_mpscq* q); +void gpr_locked_mpscq_destroy(gpr_locked_mpscq* q); +// Push a node +// Thread safe - can be called from multiple threads concurrently +// Returns true if this was possibly the first node (may return true +// sporadically, will not return false sporadically) +bool gpr_locked_mpscq_push(gpr_locked_mpscq* q, gpr_mpscq_node* n); + +// Pop a node (returns NULL if no node is ready - which doesn't indicate that +// the queue is empty!!) +// Thread safe - can be called from multiple threads concurrently +gpr_mpscq_node* gpr_locked_mpscq_try_pop(gpr_locked_mpscq* q); + +// Pop a node. Returns NULL only if the queue was empty at some point after +// calling this function +gpr_mpscq_node* gpr_locked_mpscq_pop(gpr_locked_mpscq* q); #ifdef __cplusplus } #endif diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 14054e82e7..922df923ae 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -376,8 +376,8 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq, (grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq) { *tag = storage->tag; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - storage->done(&exec_ctx, storage->done_arg, storage); *ok = (storage->next & (uintptr_t)(1)) == 1; + storage->done(&exec_ctx, storage->done_arg, storage); ret = 1; cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq); if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index eb7a4e2d30..da7ae17bef 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -33,7 +33,8 @@ #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/support/stack_lockfree.h" +#include "src/core/lib/support/mpscq.h" +#include "src/core/lib/support/spinlock.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -63,6 +64,7 @@ grpc_tracer_flag grpc_server_channel_trace = GRPC_TRACER_INITIALIZER(false, "server_channel"); typedef struct requested_call { + gpr_mpscq_node request_link; /* must be first */ requested_call_type type; size_t cq_idx; void* tag; @@ -128,10 +130,7 @@ typedef struct request_matcher request_matcher; struct call_data { grpc_call* call; - /** protects state */ - gpr_mu mu_state; - /** the current state of a call - see call_state */ - call_state state; + gpr_atm state; bool path_set; bool host_set; @@ -162,7 +161,7 @@ struct request_matcher { grpc_server* server; call_data* pending_head; call_data* pending_tail; - gpr_stack_lockfree** requests_per_cq; + gpr_locked_mpscq* requests_per_cq; }; struct registered_method { @@ -207,11 +206,6 @@ struct grpc_server { registered_method* registered_methods; /** one request matcher for unregistered methods */ request_matcher unregistered_request_matcher; - /** free list of available requested_calls_per_cq indices */ - gpr_stack_lockfree** request_freelist_per_cq; - /** requested call backing data */ - requested_call** requested_calls_per_cq; - int max_requested_calls_per_cq; gpr_atm shutdown_flag; uint8_t shutdown_published; @@ -313,21 +307,20 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx* exec_ctx, * request_matcher */ -static void request_matcher_init(request_matcher* rm, size_t entries, - grpc_server* server) { +static void request_matcher_init(request_matcher* rm, grpc_server* server) { memset(rm, 0, sizeof(*rm)); rm->server = server; - rm->requests_per_cq = (gpr_stack_lockfree**)gpr_malloc( + rm->requests_per_cq = (gpr_locked_mpscq*)gpr_malloc( sizeof(*rm->requests_per_cq) * server->cq_count); for (size_t i = 0; i < server->cq_count; i++) { - rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries); + gpr_locked_mpscq_init(&rm->requests_per_cq[i]); } } static void request_matcher_destroy(request_matcher* rm) { for (size_t i = 0; i < rm->server->cq_count; i++) { - GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1); - gpr_stack_lockfree_destroy(rm->requests_per_cq[i]); + GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == NULL); + gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]); } gpr_free(rm->requests_per_cq); } @@ -342,9 +335,7 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx* exec_ctx, while (rm->pending_head) { call_data* calld = rm->pending_head; rm->pending_head = calld->pending_next; - gpr_mu_lock(&calld->mu_state); - calld->state = ZOMBIED; - gpr_mu_unlock(&calld->mu_state); + gpr_atm_no_barrier_store(&calld->state, ZOMBIED); GRPC_CLOSURE_INIT( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), @@ -357,13 +348,17 @@ static void request_matcher_kill_requests(grpc_exec_ctx* exec_ctx, grpc_server* server, request_matcher* rm, grpc_error* error) { - int request_id; + requested_call* rc; for (size_t i = 0; i < server->cq_count; i++) { - while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != - -1) { - fail_call(exec_ctx, server, i, - &server->requested_calls_per_cq[i][request_id], - GRPC_ERROR_REF(error)); + /* Here we know: + 1. no requests are being added (since the server is shut down) + 2. no other threads are pulling (since the shut down process is single + threaded) + So, we can ignore the queue lock and just pop, with the guarantee that a + NULL returned here truly means that the queue is empty */ + while ((rc = (requested_call*)gpr_mpscq_pop( + &rm->requests_per_cq[i].queue)) != NULL) { + fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error)); } } GRPC_ERROR_UNREF(error); @@ -398,13 +393,7 @@ static void server_delete(grpc_exec_ctx* exec_ctx, grpc_server* server) { } for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server"); - if (server->started) { - gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]); - gpr_free(server->requested_calls_per_cq[i]); - } } - gpr_free(server->request_freelist_per_cq); - gpr_free(server->requested_calls_per_cq); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); @@ -462,21 +451,7 @@ static void destroy_channel(grpc_exec_ctx* exec_ctx, channel_data* chand, static void done_request_event(grpc_exec_ctx* exec_ctx, void* req, grpc_cq_completion* c) { - requested_call* rc = (requested_call*)req; - grpc_server* server = rc->server; - - if (rc >= server->requested_calls_per_cq[rc->cq_idx] && - rc < server->requested_calls_per_cq[rc->cq_idx] + - server->max_requested_calls_per_cq) { - GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX); - gpr_stack_lockfree_push( - server->request_freelist_per_cq[rc->cq_idx], - (int)(rc - server->requested_calls_per_cq[rc->cq_idx])); - } else { - gpr_free(req); - } - - server_unref(exec_ctx, server); + gpr_free(req); } static void publish_call(grpc_exec_ctx* exec_ctx, grpc_server* server, @@ -508,10 +483,6 @@ static void publish_call(grpc_exec_ctx* exec_ctx, grpc_server* server, GPR_UNREACHABLE_CODE(return ); } - grpc_call_element* elem = - grpc_call_stack_element(grpc_call_get_call_stack(call), 0); - channel_data* chand = (channel_data*)elem->channel_data; - server_ref(chand->server); grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event, rc, &rc->completion); } @@ -525,9 +496,7 @@ static void publish_new_rpc(grpc_exec_ctx* exec_ctx, void* arg, grpc_server* server = rm->server; if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) { - gpr_mu_lock(&calld->mu_state); - calld->state = ZOMBIED; - gpr_mu_unlock(&calld->mu_state); + gpr_atm_no_barrier_store(&calld->state, ZOMBIED); GRPC_CLOSURE_INIT( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), @@ -539,16 +508,14 @@ static void publish_new_rpc(grpc_exec_ctx* exec_ctx, void* arg, for (size_t i = 0; i < server->cq_count; i++) { size_t cq_idx = (chand->cq_idx + i) % server->cq_count; - int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); - if (request_id == -1) { + requested_call* rc = + (requested_call*)gpr_locked_mpscq_try_pop(&rm->requests_per_cq[cq_idx]); + if (rc == NULL) { continue; } else { GRPC_STATS_INC_SERVER_CQS_CHECKED(exec_ctx, i); - gpr_mu_lock(&calld->mu_state); - calld->state = ACTIVATED; - gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, - &server->requested_calls_per_cq[cq_idx][request_id]); + gpr_atm_no_barrier_store(&calld->state, ACTIVATED); + publish_call(exec_ctx, server, calld, cq_idx, rc); return; /* early out */ } } @@ -556,9 +523,27 @@ static void publish_new_rpc(grpc_exec_ctx* exec_ctx, void* arg, /* no cq to take the request found: queue it on the slow list */ GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx); gpr_mu_lock(&server->mu_call); - gpr_mu_lock(&calld->mu_state); - calld->state = PENDING; - gpr_mu_unlock(&calld->mu_state); + + // We need to ensure that all the queues are empty. We do this under + // the server mu_call lock to ensure that if something is added to + // an empty request queue, it will block until the call is actually + // added to the pending list. + for (size_t i = 0; i < server->cq_count; i++) { + size_t cq_idx = (chand->cq_idx + i) % server->cq_count; + requested_call* rc = + (requested_call*)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); + if (rc == NULL) { + continue; + } else { + gpr_mu_unlock(&server->mu_call); + GRPC_STATS_INC_SERVER_CQS_CHECKED(exec_ctx, i + server->cq_count); + gpr_atm_no_barrier_store(&calld->state, ACTIVATED); + publish_call(exec_ctx, server, calld, cq_idx, rc); + return; /* early out */ + } + } + + gpr_atm_no_barrier_store(&calld->state, PENDING); if (rm->pending_head == NULL) { rm->pending_tail = rm->pending_head = calld; } else { @@ -576,9 +561,7 @@ static void finish_start_new_rpc( call_data* calld = (call_data*)elem->call_data; if (gpr_atm_acq_load(&server->shutdown_flag)) { - gpr_mu_lock(&calld->mu_state); - calld->state = ZOMBIED; - gpr_mu_unlock(&calld->mu_state); + gpr_atm_no_barrier_store(&calld->state, ZOMBIED); GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE); @@ -807,21 +790,14 @@ static void got_initial_metadata(grpc_exec_ctx* exec_ctx, void* ptr, if (error == GRPC_ERROR_NONE) { start_new_rpc(exec_ctx, elem); } else { - gpr_mu_lock(&calld->mu_state); - if (calld->state == NOT_STARTED) { - calld->state = ZOMBIED; - gpr_mu_unlock(&calld->mu_state); + if (gpr_atm_full_cas(&calld->state, NOT_STARTED, ZOMBIED)) { GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE); - } else if (calld->state == PENDING) { - calld->state = ZOMBIED; - gpr_mu_unlock(&calld->mu_state); + } else if (gpr_atm_full_cas(&calld->state, PENDING, ZOMBIED)) { /* zombied call will be destroyed when it's removed from the pending queue... later */ - } else { - gpr_mu_unlock(&calld->mu_state); } } } @@ -885,7 +861,6 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, memset(calld, 0, sizeof(call_data)); calld->deadline = GRPC_MILLIS_INF_FUTURE; calld->call = grpc_call_from_top_element(elem); - gpr_mu_init(&calld->mu_state); GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata, server_on_recv_initial_metadata, elem, @@ -912,8 +887,6 @@ static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_metadata_array_destroy(&calld->initial_metadata); grpc_byte_buffer_destroy(calld->payload); - gpr_mu_destroy(&calld->mu_state); - server_unref(exec_ctx, chand->server); } @@ -1020,8 +993,6 @@ grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) { server->root_channel_data.next = server->root_channel_data.prev = &server->root_channel_data; - /* TODO(ctiller): expose a channel_arg for this */ - server->max_requested_calls_per_cq = 32768; server->channel_args = grpc_channel_args_copy(args); return server; @@ -1095,29 +1066,15 @@ void grpc_server_start(grpc_server* server) { server->pollset_count = 0; server->pollsets = (grpc_pollset**)gpr_malloc(sizeof(grpc_pollset*) * server->cq_count); - server->request_freelist_per_cq = (gpr_stack_lockfree**)gpr_malloc( - sizeof(*server->request_freelist_per_cq) * server->cq_count); - server->requested_calls_per_cq = (requested_call**)gpr_malloc( - sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { if (grpc_cq_can_listen(server->cqs[i])) { server->pollsets[server->pollset_count++] = grpc_cq_pollset(server->cqs[i]); } - server->request_freelist_per_cq[i] = - gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq); - for (int j = 0; j < server->max_requested_calls_per_cq; j++) { - gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j); - } - server->requested_calls_per_cq[i] = - (requested_call*)gpr_malloc((size_t)server->max_requested_calls_per_cq * - sizeof(*server->requested_calls_per_cq[i])); } - request_matcher_init(&server->unregistered_request_matcher, - (size_t)server->max_requested_calls_per_cq, server); + request_matcher_init(&server->unregistered_request_matcher, server); for (registered_method* rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_init(&rm->matcher, - (size_t)server->max_requested_calls_per_cq, server); + request_matcher_init(&rm->matcher, server); } server_ref(server); @@ -1373,21 +1330,11 @@ static grpc_call_error queue_call_request(grpc_exec_ctx* exec_ctx, requested_call* rc) { call_data* calld = NULL; request_matcher* rm = NULL; - int request_id; if (gpr_atm_acq_load(&server->shutdown_flag)) { fail_call(exec_ctx, server, cq_idx, rc, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); return GRPC_CALL_OK; } - request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]); - if (request_id == -1) { - /* out of request ids: just fail this one */ - fail_call(exec_ctx, server, cq_idx, rc, - grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"), - GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq)); - return GRPC_CALL_OK; - } switch (rc->type) { case BATCH_CALL: rm = &server->unregistered_request_matcher; @@ -1396,20 +1343,17 @@ static grpc_call_error queue_call_request(grpc_exec_ctx* exec_ctx, rm = &rc->data.registered.method->matcher; break; } - server->requested_calls_per_cq[cq_idx][request_id] = *rc; - gpr_free(rc); - if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) { + if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) { /* this was the first queued request: we need to lock and start matching calls */ gpr_mu_lock(&server->mu_call); while ((calld = rm->pending_head) != NULL) { - request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); - if (request_id == -1) break; + rc = (requested_call*)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); + if (rc == NULL) break; rm->pending_head = calld->pending_next; gpr_mu_unlock(&server->mu_call); - gpr_mu_lock(&calld->mu_state); - if (calld->state == ZOMBIED) { - gpr_mu_unlock(&calld->mu_state); + if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) { + // Zombied Call GRPC_CLOSURE_INIT( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), @@ -1417,11 +1361,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx* exec_ctx, GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE); } else { - GPR_ASSERT(calld->state == PENDING); - calld->state = ACTIVATED; - gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, - &server->requested_calls_per_cq[cq_idx][request_id]); + publish_call(exec_ctx, server, calld, cq_idx, rc); } gpr_mu_lock(&server->mu_call); } @@ -1540,7 +1480,6 @@ static void fail_call(grpc_exec_ctx* exec_ctx, grpc_server* server, rc->initial_metadata->count = 0; GPR_ASSERT(error != GRPC_ERROR_NONE); - server_ref(server); grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error, done_request_event, rc, &rc->completion); } |