aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
authorGravatar yang-g <yangg@google.com>2017-07-05 16:50:51 -0700
committerGravatar yang-g <yangg@google.com>2017-07-28 12:49:40 -0700
commit0eaf7debd2915f947ae50367b1768cfad44205dc (patch)
tree04274fa22c8e197a83814a5aa734dcd2bed5434d /src/core/lib/surface
parent8dc1b7db51c49869ab24404bc810500db51fc86d (diff)
Allow adding events to cq after shutdown is called.
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/alarm.c3
-rw-r--r--src/core/lib/surface/call.c6
-rw-r--r--src/core/lib/surface/channel_ping.c2
-rw-r--r--src/core/lib/surface/completion_queue.c11
-rw-r--r--src/core/lib/surface/completion_queue.h5
-rw-r--r--src/core/lib/surface/server.c14
6 files changed, 31 insertions, 10 deletions
diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c
index ef8405cca8..632cd6e239 100644
--- a/src/core/lib/surface/alarm.c
+++ b/src/core/lib/surface/alarm.c
@@ -18,6 +18,7 @@
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/completion_queue.h"
@@ -49,7 +50,7 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
alarm->cq = cq;
alarm->tag = tag;
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag) == 0);
GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&exec_ctx, &alarm->alarm,
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 2365d27307..a173fadc43 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -1422,7 +1422,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (nops == 0) {
if (!is_notify_tag_closure) {
- grpc_cq_begin_op(call->cq, notify_tag);
+ GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag) == 0);
grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE,
free_no_op_completion, NULL,
gpr_malloc(sizeof(grpc_cq_completion)));
@@ -1723,7 +1723,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
GRPC_CALL_INTERNAL_REF(call, "completion");
if (!is_notify_tag_closure) {
- grpc_cq_begin_op(call->cq, notify_tag);
+ GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag) == 0);
}
gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
@@ -1844,6 +1844,8 @@ const char *grpc_call_error_to_string(grpc_call_error error) {
return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
+ case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
+ return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
case GRPC_CALL_OK:
return "GRPC_CALL_OK";
}
diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c
index 80eb80af78..8b1150b8b4 100644
--- a/src/core/lib/surface/channel_ping.c
+++ b/src/core/lib/surface/channel_ping.c
@@ -59,7 +59,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx);
op->send_ping = &pr->closure;
op->bind_pollset = grpc_cq_pollset(cq);
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag) == 0);
top_elem->filter->start_transport_op(&exec_ctx, top_elem, op);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 3851993c92..ef1dfb1eb0 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -525,7 +525,16 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx,
static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
cq_next_data *cqd = DATA_FROM_CQ(cq);
GPR_ASSERT(!cqd->shutdown_called);
- gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1);
+ while (true) {
+ gpr_atm count = gpr_atm_no_barrier_load(&cqd->pending_events.count);
+ if (count == 0) {
+ return 1;
+ } else if (gpr_atm_no_barrier_cas(&cqd->pending_events.count, count,
+ count + 1)) {
+ break;
+ }
+ }
+ return 0;
}
static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index af44482513..7f3d39824e 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -72,8 +72,9 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc);
/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made.
- \a tag is currently used only in debug builds. */
-void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag);
+ \a tag is currently used only in debug builds. Return 0 on success and 1 if
+ completion_queue has been shutdown. */
+int grpc_cq_begin_op(grpc_completion_queue *cc, void *tag);
/* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to
grpc_cq_begin_op */
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index fce7f8dca1..957593a1e2 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -1259,7 +1259,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
}
/* stay locked, and gather up some stuff to do */
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag) == 0);
if (server->shutdown_published) {
grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown,
NULL, gpr_malloc(sizeof(grpc_cq_completion)));
@@ -1446,7 +1446,11 @@ grpc_call_error grpc_server_request_call(
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
}
- grpc_cq_begin_op(cq_for_notification, tag);
+ if (grpc_cq_begin_op(cq_for_notification, tag)) {
+ gpr_free(rc);
+ error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
+ goto done;
+ }
details->reserved = NULL;
rc->cq_idx = cq_idx;
rc->type = BATCH_CALL;
@@ -1496,7 +1500,11 @@ grpc_call_error grpc_server_request_registered_call(
error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
goto done;
}
- grpc_cq_begin_op(cq_for_notification, tag);
+ if (grpc_cq_begin_op(cq_for_notification, tag)) {
+ gpr_free(rc);
+ error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
+ goto done;
+ }
rc->cq_idx = cq_idx;
rc->type = REGISTERED_CALL;
rc->server = server;