aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/completion_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface/completion_queue.c')
-rw-r--r--src/core/surface/completion_queue.c66
1 files changed, 35 insertions, 31 deletions
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 4837f5b978..b59c36e03a 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -36,7 +36,7 @@
#include <stdio.h>
#include <string.h>
-#include "src/core/iomgr/iomgr_completion_queue_interface.h"
+#include "src/core/iomgr/pollset.h"
#include "src/core/surface/call.h"
#include "src/core/surface/event_string.h"
#include "src/core/surface/surface_trace.h"
@@ -61,6 +61,7 @@ typedef struct event {
/* Completion queue structure */
struct grpc_completion_queue {
+ /* TODO(ctiller): see if this can be removed */
int allow_polling;
/* When refs drops to zero, we are in shutdown mode, and will be destroyable
@@ -100,7 +101,7 @@ void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) {
/* Create and append an event to the queue. Returns the event so that its data
members can be filled in.
- Requires grpc_iomgr_mu locked. */
+ Requires GRPC_POLLSET_MU(&cc->pollset) locked. */
static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data) {
@@ -126,7 +127,8 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
ev->bucket_prev = cc->buckets[bucket]->bucket_prev;
ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev;
}
- gpr_cv_broadcast(&grpc_iomgr_cv);
+ gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
+ grpc_pollset_kick(&cc->pollset);
return ev;
}
@@ -149,7 +151,7 @@ static void end_op_locked(grpc_completion_queue *cc,
if (gpr_unref(&cc->refs)) {
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
- gpr_cv_broadcast(&grpc_iomgr_cv);
+ gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
}
}
@@ -157,11 +159,11 @@ void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data,
grpc_byte_buffer *read) {
event *ev;
- gpr_mu_lock(&grpc_iomgr_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_READ, tag, call, on_finish, user_data);
ev->base.data.read = read;
end_op_locked(cc, GRPC_READ);
- gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag,
@@ -169,11 +171,11 @@ void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag,
grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error) {
event *ev;
- gpr_mu_lock(&grpc_iomgr_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_INVOKE_ACCEPTED, tag, call, on_finish, user_data);
ev->base.data.invoke_accepted = error;
end_op_locked(cc, GRPC_INVOKE_ACCEPTED);
- gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
@@ -181,11 +183,11 @@ void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error) {
event *ev;
- gpr_mu_lock(&grpc_iomgr_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_WRITE_ACCEPTED, tag, call, on_finish, user_data);
ev->base.data.write_accepted = error;
end_op_locked(cc, GRPC_WRITE_ACCEPTED);
- gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
@@ -193,11 +195,11 @@ void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error) {
event *ev;
- gpr_mu_lock(&grpc_iomgr_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_FINISH_ACCEPTED, tag, call, on_finish, user_data);
ev->base.data.finish_accepted = error;
end_op_locked(cc, GRPC_FINISH_ACCEPTED);
- gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
@@ -206,13 +208,13 @@ void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
void *user_data, size_t count,
grpc_metadata *elements) {
event *ev;
- gpr_mu_lock(&grpc_iomgr_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_CLIENT_METADATA_READ, tag, call, on_finish,
user_data);
ev->base.data.client_metadata_read.count = count;
ev->base.data.client_metadata_read.elements = elements;
end_op_locked(cc, GRPC_CLIENT_METADATA_READ);
- gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call,
@@ -221,14 +223,14 @@ void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call,
grpc_metadata *metadata_elements,
size_t metadata_count) {
event *ev;
- gpr_mu_lock(&grpc_iomgr_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_FINISHED, tag, call, on_finish, user_data);
ev->base.data.finished.status = status;
ev->base.data.finished.details = details;
ev->base.data.finished.metadata_count = metadata_count;
ev->base.data.finished.metadata_elements = metadata_elements;
end_op_locked(cc, GRPC_FINISHED);
- gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
@@ -237,7 +239,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
gpr_timespec deadline, size_t metadata_count,
grpc_metadata *metadata_elements) {
event *ev;
- gpr_mu_lock(&grpc_iomgr_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_SERVER_RPC_NEW, tag, call, on_finish, user_data);
ev->base.data.server_rpc_new.method = method;
ev->base.data.server_rpc_new.host = host;
@@ -245,7 +247,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
ev->base.data.server_rpc_new.metadata_count = metadata_count;
ev->base.data.server_rpc_new.metadata_elements = metadata_elements;
end_op_locked(cc, GRPC_SERVER_RPC_NEW);
- gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
@@ -262,7 +264,7 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline) {
event *ev = NULL;
- gpr_mu_lock(&grpc_iomgr_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
if (cc->queue != NULL) {
gpr_uintptr bucket;
@@ -288,15 +290,16 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc,
ev = create_shutdown_event();
break;
}
- if (cc->allow_polling && grpc_iomgr_work(deadline)) {
+ if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) {
continue;
}
- if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) {
- gpr_mu_unlock(&grpc_iomgr_mu);
+ if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset),
+ GRPC_POLLSET_MU(&cc->pollset), deadline)) {
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
return NULL;
}
}
- gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
return &ev->base;
}
@@ -334,7 +337,7 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline) {
event *ev = NULL;
- gpr_mu_lock(&grpc_iomgr_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
if ((ev = pluck_event(cc, tag))) {
break;
@@ -343,15 +346,16 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ev = create_shutdown_event();
break;
}
- if (cc->allow_polling && grpc_iomgr_work(deadline)) {
+ if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) {
continue;
}
- if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) {
- gpr_mu_unlock(&grpc_iomgr_mu);
+ if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset),
+ GRPC_POLLSET_MU(&cc->pollset), deadline)) {
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
return NULL;
}
}
- gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
return &ev->base;
}
@@ -360,11 +364,11 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
if (gpr_unref(&cc->refs)) {
- gpr_mu_lock(&grpc_iomgr_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
- gpr_cv_broadcast(&grpc_iomgr_cv);
- gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
}