aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Ken Payson <kpayson@google.com>2017-11-07 11:27:02 -0800
committerGravatar Ken Payson <kpayson@google.com>2017-11-07 11:27:02 -0800
commite1533572d517df2e660cf57556d296b59ab89cfc (patch)
tree30b6e30869717333c23d966e038bd9b3a738b625 /src
parentc55a4b1ab577c3ef32794c48ca4ae9fe6f810646 (diff)
Add back mpscq request matcher
Diffstat (limited to 'src')
-rw-r--r--src/core/lib/support/mpscq.cc37
-rw-r--r--src/core/lib/support/mpscq.h30
-rw-r--r--src/core/lib/surface/completion_queue.cc2
-rw-r--r--src/core/lib/surface/server.cc181
4 files changed, 126 insertions, 124 deletions
diff --git a/src/core/lib/support/mpscq.cc b/src/core/lib/support/mpscq.cc
index db25f24264..b270777d5c 100644
--- a/src/core/lib/support/mpscq.cc
+++ b/src/core/lib/support/mpscq.cc
@@ -31,11 +31,12 @@ void gpr_mpscq_destroy(gpr_mpscq* q) {
GPR_ASSERT(q->tail == &q->stub);
}
-void gpr_mpscq_push(gpr_mpscq* q, gpr_mpscq_node* n) {
+bool gpr_mpscq_push(gpr_mpscq* q, gpr_mpscq_node* n) {
gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL);
gpr_mpscq_node* prev =
(gpr_mpscq_node*)gpr_atm_full_xchg(&q->head, (gpr_atm)n);
gpr_atm_rel_store(&prev->next, (gpr_atm)n);
+ return prev == &q->stub;
}
gpr_mpscq_node* gpr_mpscq_pop(gpr_mpscq* q) {
@@ -77,3 +78,37 @@ gpr_mpscq_node* gpr_mpscq_pop_and_check_end(gpr_mpscq* q, bool* empty) {
*empty = false;
return NULL;
}
+
+void gpr_locked_mpscq_init(gpr_locked_mpscq* q) {
+ gpr_mpscq_init(&q->queue);
+ gpr_mu_init(&q->mu);
+}
+
+void gpr_locked_mpscq_destroy(gpr_locked_mpscq* q) {
+ gpr_mpscq_destroy(&q->queue);
+ gpr_mu_destroy(&q->mu);
+}
+
+bool gpr_locked_mpscq_push(gpr_locked_mpscq* q, gpr_mpscq_node* n) {
+ return gpr_mpscq_push(&q->queue, n);
+}
+
+gpr_mpscq_node* gpr_locked_mpscq_try_pop(gpr_locked_mpscq* q) {
+ if (gpr_mu_trylock(&q->mu)) {
+ gpr_mpscq_node* n = gpr_mpscq_pop(&q->queue);
+ gpr_mu_unlock(&q->mu);
+ return n;
+ }
+ return NULL;
+}
+
+gpr_mpscq_node* gpr_locked_mpscq_pop(gpr_locked_mpscq* q) {
+ gpr_mu_lock(&q->mu);
+ bool empty = false;
+ gpr_mpscq_node* n;
+ do {
+ n = gpr_mpscq_pop_and_check_end(&q->queue, &empty);
+ } while (n == NULL && !empty);
+ gpr_mu_unlock(&q->mu);
+ return n;
+}
diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h
index 1cc9d89feb..fb22742050 100644
--- a/src/core/lib/support/mpscq.h
+++ b/src/core/lib/support/mpscq.h
@@ -20,6 +20,7 @@
#define GRPC_CORE_LIB_SUPPORT_MPSCQ_H
#include <grpc/support/atm.h>
+#include <grpc/support/sync.h>
#include <stdbool.h>
#include <stddef.h>
@@ -49,13 +50,40 @@ typedef struct gpr_mpscq {
void gpr_mpscq_init(gpr_mpscq* q);
void gpr_mpscq_destroy(gpr_mpscq* q);
// Push a node
-void gpr_mpscq_push(gpr_mpscq* q, gpr_mpscq_node* n);
+// Thread safe - can be called from multiple threads concurrently
+// Returns true if this was possibly the first node (may return true
+// sporadically, will not return false sporadically)
+bool gpr_mpscq_push(gpr_mpscq* q, gpr_mpscq_node* n);
// Pop a node (returns NULL if no node is ready - which doesn't indicate that
// the queue is empty!!)
+// Thread compatible - can only be called from one thread at a time
gpr_mpscq_node* gpr_mpscq_pop(gpr_mpscq* q);
// Pop a node; sets *empty to true if the queue is empty, or false if it is not
gpr_mpscq_node* gpr_mpscq_pop_and_check_end(gpr_mpscq* q, bool* empty);
+// An mpscq with a lock: it's safe to pop from multiple threads, but doing
+// only one thread will succeed concurrently
+typedef struct gpr_locked_mpscq {
+ gpr_mpscq queue;
+ gpr_mu mu;
+} gpr_locked_mpscq;
+
+void gpr_locked_mpscq_init(gpr_locked_mpscq* q);
+void gpr_locked_mpscq_destroy(gpr_locked_mpscq* q);
+// Push a node
+// Thread safe - can be called from multiple threads concurrently
+// Returns true if this was possibly the first node (may return true
+// sporadically, will not return false sporadically)
+bool gpr_locked_mpscq_push(gpr_locked_mpscq* q, gpr_mpscq_node* n);
+
+// Pop a node (returns NULL if no node is ready - which doesn't indicate that
+// the queue is empty!!)
+// Thread safe - can be called from multiple threads concurrently
+gpr_mpscq_node* gpr_locked_mpscq_try_pop(gpr_locked_mpscq* q);
+
+// Pop a node. Returns NULL only if the queue was empty at some point after
+// calling this function
+gpr_mpscq_node* gpr_locked_mpscq_pop(gpr_locked_mpscq* q);
#ifdef __cplusplus
}
#endif
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index 14054e82e7..922df923ae 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -376,8 +376,8 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
(grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq) {
*tag = storage->tag;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- storage->done(&exec_ctx, storage->done_arg, storage);
*ok = (storage->next & (uintptr_t)(1)) == 1;
+ storage->done(&exec_ctx, storage->done_arg, storage);
ret = 1;
cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc
index eb7a4e2d30..da7ae17bef 100644
--- a/src/core/lib/surface/server.cc
+++ b/src/core/lib/surface/server.cc
@@ -33,7 +33,8 @@
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/slice/slice_internal.h"
-#include "src/core/lib/support/stack_lockfree.h"
+#include "src/core/lib/support/mpscq.h"
+#include "src/core/lib/support/spinlock.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h"
@@ -63,6 +64,7 @@ grpc_tracer_flag grpc_server_channel_trace =
GRPC_TRACER_INITIALIZER(false, "server_channel");
typedef struct requested_call {
+ gpr_mpscq_node request_link; /* must be first */
requested_call_type type;
size_t cq_idx;
void* tag;
@@ -128,10 +130,7 @@ typedef struct request_matcher request_matcher;
struct call_data {
grpc_call* call;
- /** protects state */
- gpr_mu mu_state;
- /** the current state of a call - see call_state */
- call_state state;
+ gpr_atm state;
bool path_set;
bool host_set;
@@ -162,7 +161,7 @@ struct request_matcher {
grpc_server* server;
call_data* pending_head;
call_data* pending_tail;
- gpr_stack_lockfree** requests_per_cq;
+ gpr_locked_mpscq* requests_per_cq;
};
struct registered_method {
@@ -207,11 +206,6 @@ struct grpc_server {
registered_method* registered_methods;
/** one request matcher for unregistered methods */
request_matcher unregistered_request_matcher;
- /** free list of available requested_calls_per_cq indices */
- gpr_stack_lockfree** request_freelist_per_cq;
- /** requested call backing data */
- requested_call** requested_calls_per_cq;
- int max_requested_calls_per_cq;
gpr_atm shutdown_flag;
uint8_t shutdown_published;
@@ -313,21 +307,20 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx* exec_ctx,
* request_matcher
*/
-static void request_matcher_init(request_matcher* rm, size_t entries,
- grpc_server* server) {
+static void request_matcher_init(request_matcher* rm, grpc_server* server) {
memset(rm, 0, sizeof(*rm));
rm->server = server;
- rm->requests_per_cq = (gpr_stack_lockfree**)gpr_malloc(
+ rm->requests_per_cq = (gpr_locked_mpscq*)gpr_malloc(
sizeof(*rm->requests_per_cq) * server->cq_count);
for (size_t i = 0; i < server->cq_count; i++) {
- rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries);
+ gpr_locked_mpscq_init(&rm->requests_per_cq[i]);
}
}
static void request_matcher_destroy(request_matcher* rm) {
for (size_t i = 0; i < rm->server->cq_count; i++) {
- GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1);
- gpr_stack_lockfree_destroy(rm->requests_per_cq[i]);
+ GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == NULL);
+ gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]);
}
gpr_free(rm->requests_per_cq);
}
@@ -342,9 +335,7 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx* exec_ctx,
while (rm->pending_head) {
call_data* calld = rm->pending_head;
rm->pending_head = calld->pending_next;
- gpr_mu_lock(&calld->mu_state);
- calld->state = ZOMBIED;
- gpr_mu_unlock(&calld->mu_state);
+ gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
@@ -357,13 +348,17 @@ static void request_matcher_kill_requests(grpc_exec_ctx* exec_ctx,
grpc_server* server,
request_matcher* rm,
grpc_error* error) {
- int request_id;
+ requested_call* rc;
for (size_t i = 0; i < server->cq_count; i++) {
- while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) !=
- -1) {
- fail_call(exec_ctx, server, i,
- &server->requested_calls_per_cq[i][request_id],
- GRPC_ERROR_REF(error));
+ /* Here we know:
+ 1. no requests are being added (since the server is shut down)
+ 2. no other threads are pulling (since the shut down process is single
+ threaded)
+ So, we can ignore the queue lock and just pop, with the guarantee that a
+ NULL returned here truly means that the queue is empty */
+ while ((rc = (requested_call*)gpr_mpscq_pop(
+ &rm->requests_per_cq[i].queue)) != NULL) {
+ fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error));
}
}
GRPC_ERROR_UNREF(error);
@@ -398,13 +393,7 @@ static void server_delete(grpc_exec_ctx* exec_ctx, grpc_server* server) {
}
for (i = 0; i < server->cq_count; i++) {
GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server");
- if (server->started) {
- gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]);
- gpr_free(server->requested_calls_per_cq[i]);
- }
}
- gpr_free(server->request_freelist_per_cq);
- gpr_free(server->requested_calls_per_cq);
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
@@ -462,21 +451,7 @@ static void destroy_channel(grpc_exec_ctx* exec_ctx, channel_data* chand,
static void done_request_event(grpc_exec_ctx* exec_ctx, void* req,
grpc_cq_completion* c) {
- requested_call* rc = (requested_call*)req;
- grpc_server* server = rc->server;
-
- if (rc >= server->requested_calls_per_cq[rc->cq_idx] &&
- rc < server->requested_calls_per_cq[rc->cq_idx] +
- server->max_requested_calls_per_cq) {
- GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX);
- gpr_stack_lockfree_push(
- server->request_freelist_per_cq[rc->cq_idx],
- (int)(rc - server->requested_calls_per_cq[rc->cq_idx]));
- } else {
- gpr_free(req);
- }
-
- server_unref(exec_ctx, server);
+ gpr_free(req);
}
static void publish_call(grpc_exec_ctx* exec_ctx, grpc_server* server,
@@ -508,10 +483,6 @@ static void publish_call(grpc_exec_ctx* exec_ctx, grpc_server* server,
GPR_UNREACHABLE_CODE(return );
}
- grpc_call_element* elem =
- grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
- channel_data* chand = (channel_data*)elem->channel_data;
- server_ref(chand->server);
grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE,
done_request_event, rc, &rc->completion);
}
@@ -525,9 +496,7 @@ static void publish_new_rpc(grpc_exec_ctx* exec_ctx, void* arg,
grpc_server* server = rm->server;
if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) {
- gpr_mu_lock(&calld->mu_state);
- calld->state = ZOMBIED;
- gpr_mu_unlock(&calld->mu_state);
+ gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
@@ -539,16 +508,14 @@ static void publish_new_rpc(grpc_exec_ctx* exec_ctx, void* arg,
for (size_t i = 0; i < server->cq_count; i++) {
size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
- int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]);
- if (request_id == -1) {
+ requested_call* rc =
+ (requested_call*)gpr_locked_mpscq_try_pop(&rm->requests_per_cq[cq_idx]);
+ if (rc == NULL) {
continue;
} else {
GRPC_STATS_INC_SERVER_CQS_CHECKED(exec_ctx, i);
- gpr_mu_lock(&calld->mu_state);
- calld->state = ACTIVATED;
- gpr_mu_unlock(&calld->mu_state);
- publish_call(exec_ctx, server, calld, cq_idx,
- &server->requested_calls_per_cq[cq_idx][request_id]);
+ gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
+ publish_call(exec_ctx, server, calld, cq_idx, rc);
return; /* early out */
}
}
@@ -556,9 +523,27 @@ static void publish_new_rpc(grpc_exec_ctx* exec_ctx, void* arg,
/* no cq to take the request found: queue it on the slow list */
GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx);
gpr_mu_lock(&server->mu_call);
- gpr_mu_lock(&calld->mu_state);
- calld->state = PENDING;
- gpr_mu_unlock(&calld->mu_state);
+
+ // We need to ensure that all the queues are empty. We do this under
+ // the server mu_call lock to ensure that if something is added to
+ // an empty request queue, it will block until the call is actually
+ // added to the pending list.
+ for (size_t i = 0; i < server->cq_count; i++) {
+ size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
+ requested_call* rc =
+ (requested_call*)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]);
+ if (rc == NULL) {
+ continue;
+ } else {
+ gpr_mu_unlock(&server->mu_call);
+ GRPC_STATS_INC_SERVER_CQS_CHECKED(exec_ctx, i + server->cq_count);
+ gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
+ publish_call(exec_ctx, server, calld, cq_idx, rc);
+ return; /* early out */
+ }
+ }
+
+ gpr_atm_no_barrier_store(&calld->state, PENDING);
if (rm->pending_head == NULL) {
rm->pending_tail = rm->pending_head = calld;
} else {
@@ -576,9 +561,7 @@ static void finish_start_new_rpc(
call_data* calld = (call_data*)elem->call_data;
if (gpr_atm_acq_load(&server->shutdown_flag)) {
- gpr_mu_lock(&calld->mu_state);
- calld->state = ZOMBIED;
- gpr_mu_unlock(&calld->mu_state);
+ gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE);
@@ -807,21 +790,14 @@ static void got_initial_metadata(grpc_exec_ctx* exec_ctx, void* ptr,
if (error == GRPC_ERROR_NONE) {
start_new_rpc(exec_ctx, elem);
} else {
- gpr_mu_lock(&calld->mu_state);
- if (calld->state == NOT_STARTED) {
- calld->state = ZOMBIED;
- gpr_mu_unlock(&calld->mu_state);
+ if (gpr_atm_full_cas(&calld->state, NOT_STARTED, ZOMBIED)) {
GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure,
GRPC_ERROR_NONE);
- } else if (calld->state == PENDING) {
- calld->state = ZOMBIED;
- gpr_mu_unlock(&calld->mu_state);
+ } else if (gpr_atm_full_cas(&calld->state, PENDING, ZOMBIED)) {
/* zombied call will be destroyed when it's removed from the pending
queue... later */
- } else {
- gpr_mu_unlock(&calld->mu_state);
}
}
}
@@ -885,7 +861,6 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
memset(calld, 0, sizeof(call_data));
calld->deadline = GRPC_MILLIS_INF_FUTURE;
calld->call = grpc_call_from_top_element(elem);
- gpr_mu_init(&calld->mu_state);
GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata,
server_on_recv_initial_metadata, elem,
@@ -912,8 +887,6 @@ static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_metadata_array_destroy(&calld->initial_metadata);
grpc_byte_buffer_destroy(calld->payload);
- gpr_mu_destroy(&calld->mu_state);
-
server_unref(exec_ctx, chand->server);
}
@@ -1020,8 +993,6 @@ grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
server->root_channel_data.next = server->root_channel_data.prev =
&server->root_channel_data;
- /* TODO(ctiller): expose a channel_arg for this */
- server->max_requested_calls_per_cq = 32768;
server->channel_args = grpc_channel_args_copy(args);
return server;
@@ -1095,29 +1066,15 @@ void grpc_server_start(grpc_server* server) {
server->pollset_count = 0;
server->pollsets =
(grpc_pollset**)gpr_malloc(sizeof(grpc_pollset*) * server->cq_count);
- server->request_freelist_per_cq = (gpr_stack_lockfree**)gpr_malloc(
- sizeof(*server->request_freelist_per_cq) * server->cq_count);
- server->requested_calls_per_cq = (requested_call**)gpr_malloc(
- sizeof(*server->requested_calls_per_cq) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
if (grpc_cq_can_listen(server->cqs[i])) {
server->pollsets[server->pollset_count++] =
grpc_cq_pollset(server->cqs[i]);
}
- server->request_freelist_per_cq[i] =
- gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq);
- for (int j = 0; j < server->max_requested_calls_per_cq; j++) {
- gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j);
- }
- server->requested_calls_per_cq[i] =
- (requested_call*)gpr_malloc((size_t)server->max_requested_calls_per_cq *
- sizeof(*server->requested_calls_per_cq[i]));
}
- request_matcher_init(&server->unregistered_request_matcher,
- (size_t)server->max_requested_calls_per_cq, server);
+ request_matcher_init(&server->unregistered_request_matcher, server);
for (registered_method* rm = server->registered_methods; rm; rm = rm->next) {
- request_matcher_init(&rm->matcher,
- (size_t)server->max_requested_calls_per_cq, server);
+ request_matcher_init(&rm->matcher, server);
}
server_ref(server);
@@ -1373,21 +1330,11 @@ static grpc_call_error queue_call_request(grpc_exec_ctx* exec_ctx,
requested_call* rc) {
call_data* calld = NULL;
request_matcher* rm = NULL;
- int request_id;
if (gpr_atm_acq_load(&server->shutdown_flag)) {
fail_call(exec_ctx, server, cq_idx, rc,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
return GRPC_CALL_OK;
}
- request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]);
- if (request_id == -1) {
- /* out of request ids: just fail this one */
- fail_call(exec_ctx, server, cq_idx, rc,
- grpc_error_set_int(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"),
- GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq));
- return GRPC_CALL_OK;
- }
switch (rc->type) {
case BATCH_CALL:
rm = &server->unregistered_request_matcher;
@@ -1396,20 +1343,17 @@ static grpc_call_error queue_call_request(grpc_exec_ctx* exec_ctx,
rm = &rc->data.registered.method->matcher;
break;
}
- server->requested_calls_per_cq[cq_idx][request_id] = *rc;
- gpr_free(rc);
- if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) {
+ if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) {
/* this was the first queued request: we need to lock and start
matching calls */
gpr_mu_lock(&server->mu_call);
while ((calld = rm->pending_head) != NULL) {
- request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]);
- if (request_id == -1) break;
+ rc = (requested_call*)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]);
+ if (rc == NULL) break;
rm->pending_head = calld->pending_next;
gpr_mu_unlock(&server->mu_call);
- gpr_mu_lock(&calld->mu_state);
- if (calld->state == ZOMBIED) {
- gpr_mu_unlock(&calld->mu_state);
+ if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
+ // Zombied Call
GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
@@ -1417,11 +1361,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx* exec_ctx,
GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure,
GRPC_ERROR_NONE);
} else {
- GPR_ASSERT(calld->state == PENDING);
- calld->state = ACTIVATED;
- gpr_mu_unlock(&calld->mu_state);
- publish_call(exec_ctx, server, calld, cq_idx,
- &server->requested_calls_per_cq[cq_idx][request_id]);
+ publish_call(exec_ctx, server, calld, cq_idx, rc);
}
gpr_mu_lock(&server->mu_call);
}
@@ -1540,7 +1480,6 @@ static void fail_call(grpc_exec_ctx* exec_ctx, grpc_server* server,
rc->initial_metadata->count = 0;
GPR_ASSERT(error != GRPC_ERROR_NONE);
- server_ref(server);
grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error,
done_request_event, rc, &rc->completion);
}