aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/completion_queue.c
diff options
context:
space:
mode:
authorGravatar ctiller <ctiller@google.com>2015-01-07 14:03:30 -0800
committerGravatar Nicolas Noble <nnoble@google.com>2015-01-09 17:27:32 -0800
commit58393c20bc29cf4f9e6fe15516125661e1957db7 (patch)
treed1648196ca8201d778e73c7c9e59a8ac90613763 /src/core/surface/completion_queue.c
parente4b409364e4c493a66d4b2a6fe897075aa5c174e (diff)
Remove libevent.
Fixed any exposed bugs across the stack. Add a poll() based implementation. Heavily leverages pollset infrastructure to allow small polls to be the norm. Exposes a mechanism to plug in epoll/kqueue for platforms where we have them. Simplify iomgr callbacks to return one bit of success or failure (instead of the multi valued result that was mostly unused previously). This will ease the burden on new implementations, and the previous system provided no real value anyway. Removed timeouts on endpoint read/write routines. This simplifies porting burden by providing a more orthogonal interface, and the functionality can always be replicated when desired by using an alarm combined with endpoint_shutdown. I'm fairly certain we ended up with this interface because it was convenient to do from libevent. Things that need attention still: - adding an fd to a pollset is O(n^2) - but this is probably ok given that we'll not use this for multipolling once platform specific implementations are added. - we rely on the backup poller too often - especially for SSL handshakes and for client connection establishment we should have a better mechanism ([] [] - Linux needs to use epoll for multiple fds, FreeBSD variants (including Darwin) need to use kqueue. ([] [] - Linux needs to use eventfd for poll kicking. ([] Change on 2015/01/07 by ctiller <ctiller@google.com> ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=83461069
Diffstat (limited to 'src/core/surface/completion_queue.c')
-rw-r--r--src/core/surface/completion_queue.c66
1 files changed, 35 insertions, 31 deletions
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 4837f5b978..b59c36e03a 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -36,7 +36,7 @@
#include <stdio.h>
#include <string.h>
-#include "src/core/iomgr/iomgr_completion_queue_interface.h"
+#include "src/core/iomgr/pollset.h"
#include "src/core/surface/call.h"
#include "src/core/surface/event_string.h"
#include "src/core/surface/surface_trace.h"
@@ -61,6 +61,7 @@ typedef struct event {
/* Completion queue structure */
struct grpc_completion_queue {
+ /* TODO(ctiller): see if this can be removed */
int allow_polling;
/* When refs drops to zero, we are in shutdown mode, and will be destroyable
@@ -100,7 +101,7 @@ void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) {
/* Create and append an event to the queue. Returns the event so that its data
members can be filled in.
- Requires grpc_iomgr_mu locked. */
+ Requires GRPC_POLLSET_MU(&cc->pollset) locked. */
static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data) {
@@ -126,7 +127,8 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
ev->bucket_prev = cc->buckets[bucket]->bucket_prev;
ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev;
}
- gpr_cv_broadcast(&grpc_iomgr_cv);
+ gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
+ grpc_pollset_kick(&cc->pollset);
return ev;
}
@@ -149,7 +151,7 @@ static void end_op_locked(grpc_completion_queue *cc,
if (gpr_unref(&cc->refs)) {
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
- gpr_cv_broadcast(&grpc_iomgr_cv);
+ gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
}
}
@@ -157,11 +159,11 @@ void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data,
grpc_byte_buffer *read) {
event *ev;
- gpr_mu_lock(&grpc_iomgr_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_READ, tag, call, on_finish, user_data);
ev->base.data.read = read;
end_op_locked(cc, GRPC_READ);
- gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag,
@@ -169,11 +171,11 @@ void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag,
grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error) {
event *ev;
- gpr_mu_lock(&grpc_iomgr_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_INVOKE_ACCEPTED, tag, call, on_finish, user_data);
ev->base.data.invoke_accepted = error;
end_op_locked(cc, GRPC_INVOKE_ACCEPTED);
- gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
@@ -181,11 +183,11 @@ void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error) {
event *ev;
- gpr_mu_lock(&grpc_iomgr_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_WRITE_ACCEPTED, tag, call, on_finish, user_data);
ev->base.data.write_accepted = error;
end_op_locked(cc, GRPC_WRITE_ACCEPTED);
- gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
@@ -193,11 +195,11 @@ void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error) {
event *ev;
- gpr_mu_lock(&grpc_iomgr_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_FINISH_ACCEPTED, tag, call, on_finish, user_data);
ev->base.data.finish_accepted = error;
end_op_locked(cc, GRPC_FINISH_ACCEPTED);
- gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
@@ -206,13 +208,13 @@ void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
void *user_data, size_t count,
grpc_metadata *elements) {
event *ev;
- gpr_mu_lock(&grpc_iomgr_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_CLIENT_METADATA_READ, tag, call, on_finish,
user_data);
ev->base.data.client_metadata_read.count = count;
ev->base.data.client_metadata_read.elements = elements;
end_op_locked(cc, GRPC_CLIENT_METADATA_READ);
- gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call,
@@ -221,14 +223,14 @@ void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call,
grpc_metadata *metadata_elements,
size_t metadata_count) {
event *ev;
- gpr_mu_lock(&grpc_iomgr_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_FINISHED, tag, call, on_finish, user_data);
ev->base.data.finished.status = status;
ev->base.data.finished.details = details;
ev->base.data.finished.metadata_count = metadata_count;
ev->base.data.finished.metadata_elements = metadata_elements;
end_op_locked(cc, GRPC_FINISHED);
- gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
@@ -237,7 +239,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
gpr_timespec deadline, size_t metadata_count,
grpc_metadata *metadata_elements) {
event *ev;
- gpr_mu_lock(&grpc_iomgr_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_SERVER_RPC_NEW, tag, call, on_finish, user_data);
ev->base.data.server_rpc_new.method = method;
ev->base.data.server_rpc_new.host = host;
@@ -245,7 +247,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
ev->base.data.server_rpc_new.metadata_count = metadata_count;
ev->base.data.server_rpc_new.metadata_elements = metadata_elements;
end_op_locked(cc, GRPC_SERVER_RPC_NEW);
- gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
@@ -262,7 +264,7 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline) {
event *ev = NULL;
- gpr_mu_lock(&grpc_iomgr_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
if (cc->queue != NULL) {
gpr_uintptr bucket;
@@ -288,15 +290,16 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc,
ev = create_shutdown_event();
break;
}
- if (cc->allow_polling && grpc_iomgr_work(deadline)) {
+ if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) {
continue;
}
- if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) {
- gpr_mu_unlock(&grpc_iomgr_mu);
+ if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset),
+ GRPC_POLLSET_MU(&cc->pollset), deadline)) {
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
return NULL;
}
}
- gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
return &ev->base;
}
@@ -334,7 +337,7 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline) {
event *ev = NULL;
- gpr_mu_lock(&grpc_iomgr_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
if ((ev = pluck_event(cc, tag))) {
break;
@@ -343,15 +346,16 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ev = create_shutdown_event();
break;
}
- if (cc->allow_polling && grpc_iomgr_work(deadline)) {
+ if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) {
continue;
}
- if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) {
- gpr_mu_unlock(&grpc_iomgr_mu);
+ if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset),
+ GRPC_POLLSET_MU(&cc->pollset), deadline)) {
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
return NULL;
}
}
- gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
return &ev->base;
}
@@ -360,11 +364,11 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
if (gpr_unref(&cc->refs)) {
- gpr_mu_lock(&grpc_iomgr_mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
- gpr_cv_broadcast(&grpc_iomgr_cv);
- gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
}