diff options
Diffstat (limited to 'src/core/surface/completion_queue.c')
-rw-r--r-- | src/core/surface/completion_queue.c | 66 |
1 files changed, 35 insertions, 31 deletions
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 4837f5b978..b59c36e03a 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -36,7 +36,7 @@ #include <stdio.h> #include <string.h> -#include "src/core/iomgr/iomgr_completion_queue_interface.h" +#include "src/core/iomgr/pollset.h" #include "src/core/surface/call.h" #include "src/core/surface/event_string.h" #include "src/core/surface/surface_trace.h" @@ -61,6 +61,7 @@ typedef struct event { /* Completion queue structure */ struct grpc_completion_queue { + /* TODO(ctiller): see if this can be removed */ int allow_polling; /* When refs drops to zero, we are in shutdown mode, and will be destroyable @@ -100,7 +101,7 @@ void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) { /* Create and append an event to the queue. Returns the event so that its data members can be filled in. - Requires grpc_iomgr_mu locked. */ + Requires GRPC_POLLSET_MU(&cc->pollset) locked. */ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, void *tag, grpc_call *call, grpc_event_finish_func on_finish, void *user_data) { @@ -126,7 +127,8 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, ev->bucket_prev = cc->buckets[bucket]->bucket_prev; ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev; } - gpr_cv_broadcast(&grpc_iomgr_cv); + gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); + grpc_pollset_kick(&cc->pollset); return ev; } @@ -149,7 +151,7 @@ static void end_op_locked(grpc_completion_queue *cc, if (gpr_unref(&cc->refs)) { GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - gpr_cv_broadcast(&grpc_iomgr_cv); + gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); } } @@ -157,11 +159,11 @@ void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call, grpc_event_finish_func on_finish, void *user_data, grpc_byte_buffer *read) { event *ev; - gpr_mu_lock(&grpc_iomgr_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); ev = add_locked(cc, GRPC_READ, tag, call, on_finish, user_data); ev->base.data.read = read; end_op_locked(cc, GRPC_READ); - gpr_mu_unlock(&grpc_iomgr_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag, @@ -169,11 +171,11 @@ void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error) { event *ev; - gpr_mu_lock(&grpc_iomgr_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); ev = add_locked(cc, GRPC_INVOKE_ACCEPTED, tag, call, on_finish, user_data); ev->base.data.invoke_accepted = error; end_op_locked(cc, GRPC_INVOKE_ACCEPTED); - gpr_mu_unlock(&grpc_iomgr_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, @@ -181,11 +183,11 @@ void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error) { event *ev; - gpr_mu_lock(&grpc_iomgr_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); ev = add_locked(cc, GRPC_WRITE_ACCEPTED, tag, call, on_finish, user_data); ev->base.data.write_accepted = error; end_op_locked(cc, GRPC_WRITE_ACCEPTED); - gpr_mu_unlock(&grpc_iomgr_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag, @@ -193,11 +195,11 @@ void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error) { event *ev; - gpr_mu_lock(&grpc_iomgr_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); ev = add_locked(cc, GRPC_FINISH_ACCEPTED, tag, call, on_finish, user_data); ev->base.data.finish_accepted = error; end_op_locked(cc, GRPC_FINISH_ACCEPTED); - gpr_mu_unlock(&grpc_iomgr_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag, @@ -206,13 +208,13 @@ void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag, void *user_data, size_t count, grpc_metadata *elements) { event *ev; - gpr_mu_lock(&grpc_iomgr_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); ev = add_locked(cc, GRPC_CLIENT_METADATA_READ, tag, call, on_finish, user_data); ev->base.data.client_metadata_read.count = count; ev->base.data.client_metadata_read.elements = elements; end_op_locked(cc, GRPC_CLIENT_METADATA_READ); - gpr_mu_unlock(&grpc_iomgr_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call, @@ -221,14 +223,14 @@ void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call, grpc_metadata *metadata_elements, size_t metadata_count) { event *ev; - gpr_mu_lock(&grpc_iomgr_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); ev = add_locked(cc, GRPC_FINISHED, tag, call, on_finish, user_data); ev->base.data.finished.status = status; ev->base.data.finished.details = details; ev->base.data.finished.metadata_count = metadata_count; ev->base.data.finished.metadata_elements = metadata_elements; end_op_locked(cc, GRPC_FINISHED); - gpr_mu_unlock(&grpc_iomgr_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, @@ -237,7 +239,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, gpr_timespec deadline, size_t metadata_count, grpc_metadata *metadata_elements) { event *ev; - gpr_mu_lock(&grpc_iomgr_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); ev = add_locked(cc, GRPC_SERVER_RPC_NEW, tag, call, on_finish, user_data); ev->base.data.server_rpc_new.method = method; ev->base.data.server_rpc_new.host = host; @@ -245,7 +247,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, ev->base.data.server_rpc_new.metadata_count = metadata_count; ev->base.data.server_rpc_new.metadata_elements = metadata_elements; end_op_locked(cc, GRPC_SERVER_RPC_NEW); - gpr_mu_unlock(&grpc_iomgr_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } /* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */ @@ -262,7 +264,7 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec deadline) { event *ev = NULL; - gpr_mu_lock(&grpc_iomgr_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { if (cc->queue != NULL) { gpr_uintptr bucket; @@ -288,15 +290,16 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc, ev = create_shutdown_event(); break; } - if (cc->allow_polling && grpc_iomgr_work(deadline)) { + if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) { continue; } - if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) { - gpr_mu_unlock(&grpc_iomgr_mu); + if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset), + GRPC_POLLSET_MU(&cc->pollset), deadline)) { + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); return NULL; } } - gpr_mu_unlock(&grpc_iomgr_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base); return &ev->base; } @@ -334,7 +337,7 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_timespec deadline) { event *ev = NULL; - gpr_mu_lock(&grpc_iomgr_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { if ((ev = pluck_event(cc, tag))) { break; @@ -343,15 +346,16 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, ev = create_shutdown_event(); break; } - if (cc->allow_polling && grpc_iomgr_work(deadline)) { + if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) { continue; } - if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) { - gpr_mu_unlock(&grpc_iomgr_mu); + if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset), + GRPC_POLLSET_MU(&cc->pollset), deadline)) { + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); return NULL; } } - gpr_mu_unlock(&grpc_iomgr_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base); return &ev->base; } @@ -360,11 +364,11 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { if (gpr_unref(&cc->refs)) { - gpr_mu_lock(&grpc_iomgr_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - gpr_cv_broadcast(&grpc_iomgr_cv); - gpr_mu_unlock(&grpc_iomgr_mu); + gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } } |