aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/completion_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface/completion_queue.c')
-rw-r--r--src/core/surface/completion_queue.c75
1 files changed, 63 insertions, 12 deletions
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 3f60b0b0ba..36d69cfe5f 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -45,6 +45,11 @@
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
+typedef struct {
+ grpc_pollset_worker *worker;
+ void *tag;
+} plucker;
+
/* Completion queue structure */
struct grpc_completion_queue {
/** completed events */
@@ -60,6 +65,8 @@ struct grpc_completion_queue {
int shutdown;
int shutdown_called;
int is_server_cq;
+ int num_pluckers;
+ plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
};
grpc_completion_queue *grpc_completion_queue_create(void) {
@@ -107,6 +114,11 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) {
}
void grpc_cq_begin_op(grpc_completion_queue *cc) {
+#ifndef NDEBUG
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ GPR_ASSERT(!cc->shutdown_called);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+#endif
gpr_ref(&cc->pending_events);
}
@@ -117,6 +129,8 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
void (*done)(void *done_arg, grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) {
int shutdown;
+ int i;
+ grpc_pollset_worker *pluck_worker;
storage->tag = tag;
storage->done = done;
@@ -130,7 +144,14 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
cc->completed_tail->next =
((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
cc->completed_tail = storage;
- grpc_pollset_kick(&cc->pollset);
+ pluck_worker = NULL;
+ for (i = 0; i < cc->num_pluckers; i++) {
+ if (cc->pluckers[i].tag == tag) {
+ pluck_worker = cc->pluckers[i].worker;
+ break;
+ }
+ }
+ grpc_pollset_kick(&cc->pollset, pluck_worker);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
} else {
cc->completed_tail->next =
@@ -147,6 +168,7 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline) {
grpc_event ret;
+ grpc_pollset_worker worker;
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -172,7 +194,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
- if (!grpc_pollset_work(&cc->pollset, deadline)) {
+ if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
@@ -184,11 +206,37 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
return ret;
}
+static int add_plucker(grpc_completion_queue *cc, void *tag,
+ grpc_pollset_worker *worker) {
+ if (cc->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
+ return 0;
+ }
+ cc->pluckers[cc->num_pluckers].tag = tag;
+ cc->pluckers[cc->num_pluckers].worker = worker;
+ cc->num_pluckers++;
+ return 1;
+}
+
+static void del_plucker(grpc_completion_queue *cc, void *tag,
+ grpc_pollset_worker *worker) {
+ int i;
+ for (i = 0; i < cc->num_pluckers; i++) {
+ if (cc->pluckers[i].tag == tag && cc->pluckers[i].worker == worker) {
+ cc->num_pluckers--;
+ GPR_SWAP(plucker, cc->pluckers[i], cc->pluckers[cc->num_pluckers]);
+ return;
+ }
+ }
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+}
+
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline) {
grpc_event ret;
grpc_cq_completion *c;
grpc_cq_completion *prev;
+ grpc_pollset_worker worker;
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -219,12 +267,24 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
- if (!grpc_pollset_work(&cc->pollset, deadline)) {
+ if (!add_plucker(cc, tag, &worker)) {
+ gpr_log(GPR_DEBUG,
+ "Too many outstanding grpc_completion_queue_pluck calls: maximum is %d",
+ GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
+ /* TODO(ctiller): should we use a different result here */
ret.type = GRPC_QUEUE_TIMEOUT;
break;
}
+ if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
+ del_plucker(cc, tag, &worker);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ memset(&ret, 0, sizeof(ret));
+ ret.type = GRPC_QUEUE_TIMEOUT;
+ break;
+ }
+ del_plucker(cc, tag, &worker);
}
done:
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
@@ -261,15 +321,6 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
return &cc->pollset;
}
-void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) {
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
- grpc_pollset_kick(&cc->pollset);
- grpc_pollset_work(&cc->pollset,
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_millis(100, GPR_TIMESPAN)));
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
-}
-
void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }