aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Yang Gao <yangg@google.com>2017-07-31 07:37:31 -0700
committerGravatar GitHub <noreply@github.com>2017-07-31 07:37:31 -0700
commitf787594e688cb59fe207bb3199b9cb4cfa0c87ca (patch)
tree34bef80126ff38ebf98307f1c866e3b9e72d95b2 /src/core
parent025ca9e4b24befd239f7b043846eb0bb177b6ead (diff)
parent097cbfc94fc96f079d7be8d0dcbcb9b282484bbd (diff)
Merge pull request #11703 from yang-g/cq_shutdown
Allow adding events to cq after shutdown is called.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/channel_connectivity.c2
-rw-r--r--src/core/lib/surface/alarm.c3
-rw-r--r--src/core/lib/surface/call.c6
-rw-r--r--src/core/lib/surface/channel_ping.c2
-rw-r--r--src/core/lib/surface/completion_queue.c68
-rw-r--r--src/core/lib/surface/completion_queue.h5
-rw-r--r--src/core/lib/surface/server.c14
7 files changed, 60 insertions, 40 deletions
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c
index c3dca14305..b83c95275f 100644
--- a/src/core/ext/filters/client_channel/channel_connectivity.c
+++ b/src/core/ext/filters/client_channel/channel_connectivity.c
@@ -208,7 +208,7 @@ void grpc_channel_watch_connectivity_state(
7, (channel, (int)last_observed_state, deadline.tv_sec, deadline.tv_nsec,
(int)deadline.clock_type, cq, tag));
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
gpr_mu_init(&w->mu);
GRPC_CLOSURE_INIT(&w->on_complete, watch_complete, w,
diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c
index ef8405cca8..55934964f3 100644
--- a/src/core/lib/surface/alarm.c
+++ b/src/core/lib/surface/alarm.c
@@ -18,6 +18,7 @@
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/completion_queue.h"
@@ -49,7 +50,7 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
alarm->cq = cq;
alarm->tag = tag;
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&exec_ctx, &alarm->alarm,
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 2365d27307..04613f17e3 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -1422,7 +1422,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (nops == 0) {
if (!is_notify_tag_closure) {
- grpc_cq_begin_op(call->cq, notify_tag);
+ GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE,
free_no_op_completion, NULL,
gpr_malloc(sizeof(grpc_cq_completion)));
@@ -1723,7 +1723,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
GRPC_CALL_INTERNAL_REF(call, "completion");
if (!is_notify_tag_closure) {
- grpc_cq_begin_op(call->cq, notify_tag);
+ GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
}
gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
@@ -1844,6 +1844,8 @@ const char *grpc_call_error_to_string(grpc_call_error error) {
return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
+ case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
+ return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
case GRPC_CALL_OK:
return "GRPC_CALL_OK";
}
diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c
index 80eb80af78..e85b308850 100644
--- a/src/core/lib/surface/channel_ping.c
+++ b/src/core/lib/surface/channel_ping.c
@@ -59,7 +59,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx);
op->send_ping = &pr->closure;
op->bind_pollset = grpc_cq_pollset(cq);
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
top_elem->filter->start_transport_op(&exec_ctx, top_elem, op);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 3851993c92..3d82a32e82 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -196,7 +196,7 @@ typedef struct cq_vtable {
void (*init)(void *data);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq);
void (*destroy)(void *data);
- void (*begin_op)(grpc_completion_queue *cq, void *tag);
+ bool (*begin_op)(grpc_completion_queue *cq, void *tag);
void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
@@ -288,8 +288,8 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq);
-static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
-static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
+static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
+static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq, void *tag,
@@ -522,33 +522,6 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx,
}
}
-static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
- cq_next_data *cqd = DATA_FROM_CQ(cq);
- GPR_ASSERT(!cqd->shutdown_called);
- gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1);
-}
-
-static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
- GPR_ASSERT(!cqd->shutdown_called);
- gpr_ref(&cqd->pending_events);
-}
-
-void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
-#ifndef NDEBUG
- gpr_mu_lock(cq->mu);
- if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
- cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
- cq->outstanding_tags =
- gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) *
- cq->outstanding_tag_capacity);
- }
- cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
- gpr_mu_unlock(cq->mu);
-#endif
- cq->vtable->begin_op(cq, tag);
-}
-
#ifndef NDEBUG
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
int found = 0;
@@ -576,6 +549,41 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {}
#endif
+static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
+ while (true) {
+ gpr_atm count = gpr_atm_no_barrier_load(&cqd->pending_events);
+ if (count == 0) {
+ return false;
+ } else if (gpr_atm_no_barrier_cas(&cqd->pending_events, count, count + 1)) {
+ break;
+ }
+ }
+ return true;
+}
+
+static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ GPR_ASSERT(!cqd->shutdown_called);
+ gpr_ref(&cqd->pending_events);
+ return true;
+}
+
+bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
+#ifndef NDEBUG
+ gpr_mu_lock(cq->mu);
+ if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
+ cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
+ cq->outstanding_tags =
+ gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) *
+ cq->outstanding_tag_capacity);
+ }
+ cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
+ gpr_mu_unlock(cq->mu);
+#endif
+ return cq->vtable->begin_op(cq, tag);
+}
+
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion
* type of GRPC_CQ_NEXT) */
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index af44482513..69d144bd95 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -72,8 +72,9 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc);
/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made.
- \a tag is currently used only in debug builds. */
-void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag);
+ \a tag is currently used only in debug builds. Return true on success, and
+ false if completion_queue has been shutdown. */
+bool grpc_cq_begin_op(grpc_completion_queue *cc, void *tag);
/* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to
grpc_cq_begin_op */
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index fce7f8dca1..66dcc299aa 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -1259,7 +1259,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
}
/* stay locked, and gather up some stuff to do */
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
if (server->shutdown_published) {
grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown,
NULL, gpr_malloc(sizeof(grpc_cq_completion)));
@@ -1446,7 +1446,11 @@ grpc_call_error grpc_server_request_call(
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
}
- grpc_cq_begin_op(cq_for_notification, tag);
+ if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
+ gpr_free(rc);
+ error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
+ goto done;
+ }
details->reserved = NULL;
rc->cq_idx = cq_idx;
rc->type = BATCH_CALL;
@@ -1496,7 +1500,11 @@ grpc_call_error grpc_server_request_registered_call(
error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
goto done;
}
- grpc_cq_begin_op(cq_for_notification, tag);
+ if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
+ gpr_free(rc);
+ error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
+ goto done;
+ }
rc->cq_idx = cq_idx;
rc->type = REGISTERED_CALL;
rc->server = server;