aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-05-25 11:02:52 -0400
committerGravatar Craig Tiller <ctiller@google.com>2016-05-25 11:02:52 -0400
commit1a434052218573b98d2923e810274d35638fc7e3 (patch)
tree9f8486edabdb9b423964740e6c67d8840b69202e
parentd7bbd38b27630f908fc4f1cb906607e44c8f30bb (diff)
parent15d7f3cd0deb1251ca984b78691fa65ca6cfd969 (diff)
Merge pull request #6706 from ctiller/rraf
Round robin notifier pollset
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c8
-rw-r--r--src/node/ext/server.cc38
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi2
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi12
4 files changed, 36 insertions, 24 deletions
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index c695621de8..909e34abc7 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -128,6 +128,9 @@ struct grpc_tcp_server {
grpc_pollset **pollsets;
/* number of pollsets in the pollsets array */
size_t pollset_count;
+
+ /* next pollset to assign a channel to */
+ size_t next_pollset_to_assign;
};
grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
@@ -145,6 +148,7 @@ grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
s->head = NULL;
s->tail = NULL;
s->nports = 0;
+ s->next_pollset_to_assign = 0;
return s;
}
@@ -317,7 +321,9 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
goto error;
}
- read_notifier_pollset = grpc_fd_get_read_notifier_pollset(exec_ctx, sp->emfd);
+ read_notifier_pollset =
+ sp->server->pollsets[(sp->server->next_pollset_to_assign++) %
+ sp->server->pollset_count];
/* loop until accept4 returns EAGAIN, and then re-arm notification */
for (;;) {
diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc
index b9e1fe9160..dd1b777ac8 100644
--- a/src/node/ext/server.cc
+++ b/src/node/ext/server.cc
@@ -35,15 +35,15 @@
#include "server.h"
-#include <node.h>
#include <nan.h>
+#include <node.h>
#include <vector>
+#include "call.h"
+#include "completion_queue_async_worker.h"
#include "grpc/grpc.h"
#include "grpc/grpc_security.h"
#include "grpc/support/log.h"
-#include "call.h"
-#include "completion_queue_async_worker.h"
#include "server_credentials.h"
#include "timeval.h"
@@ -100,8 +100,8 @@ class NewCallOp : public Op {
Nan::Set(obj, Nan::New("host").ToLocalChecked(),
Nan::New(details.host).ToLocalChecked());
Nan::Set(obj, Nan::New("deadline").ToLocalChecked(),
- Nan::New<Date>(
- TimespecToMilliseconds(details.deadline)).ToLocalChecked());
+ Nan::New<Date>(TimespecToMilliseconds(details.deadline))
+ .ToLocalChecked());
Nan::Set(obj, Nan::New("metadata").ToLocalChecked(),
ParseMetadata(&request_metadata));
return scope.Escape(obj);
@@ -117,14 +117,13 @@ class NewCallOp : public Op {
grpc_metadata_array request_metadata;
protected:
- std::string GetTypeString() const {
- return "new_call";
- }
+ std::string GetTypeString() const { return "new_call"; }
};
Server::Server(grpc_server *server) : wrapped_server(server) {
shutdown_queue = grpc_completion_queue_create(NULL);
- grpc_server_register_completion_queue(server, shutdown_queue, NULL);
+ grpc_server_register_non_listening_completion_queue(server, shutdown_queue,
+ NULL);
}
Server::~Server() {
@@ -156,8 +155,7 @@ bool Server::HasInstance(Local<Value> val) {
}
void Server::ShutdownServer() {
- grpc_server_shutdown_and_notify(this->wrapped_server,
- this->shutdown_queue,
+ grpc_server_shutdown_and_notify(this->wrapped_server, this->shutdown_queue,
NULL);
grpc_server_cancel_all_calls(this->wrapped_server);
grpc_completion_queue_pluck(this->shutdown_queue, NULL,
@@ -170,8 +168,8 @@ NAN_METHOD(Server::New) {
if (!info.IsConstructCall()) {
const int argc = 1;
Local<Value> argv[argc] = {info[0]};
- MaybeLocal<Object> maybe_instance = constructor->GetFunction()->NewInstance(
- argc, argv);
+ MaybeLocal<Object> maybe_instance =
+ constructor->GetFunction()->NewInstance(argc, argv);
if (maybe_instance.IsEmpty()) {
// There's probably a pending exception
return;
@@ -185,8 +183,9 @@ NAN_METHOD(Server::New) {
grpc_channel_args *channel_args;
if (!ParseChannelArgs(info[0], &channel_args)) {
DeallocateChannelArgs(channel_args);
- return Nan::ThrowTypeError("Server options must be an object with "
- "string keys and integer or string values");
+ return Nan::ThrowTypeError(
+ "Server options must be an object with "
+ "string keys and integer or string values");
}
wrapped_server = grpc_server_create(channel_args, NULL);
DeallocateChannelArgs(channel_args);
@@ -218,8 +217,7 @@ NAN_METHOD(Server::RequestCall) {
NAN_METHOD(Server::AddHttp2Port) {
if (!HasInstance(info.This())) {
- return Nan::ThrowTypeError(
- "addHttp2Port can only be called on a Server");
+ return Nan::ThrowTypeError("addHttp2Port can only be called on a Server");
}
if (!info[0]->IsString()) {
return Nan::ThrowTypeError(
@@ -239,8 +237,7 @@ NAN_METHOD(Server::AddHttp2Port) {
*Utf8String(info[0]));
} else {
port = grpc_server_add_secure_http2_port(server->wrapped_server,
- *Utf8String(info[0]),
- creds);
+ *Utf8String(info[0]), creds);
}
info.GetReturnValue().Set(Nan::New<Number>(port));
}
@@ -262,8 +259,7 @@ NAN_METHOD(Server::TryShutdown) {
Server *server = ObjectWrap::Unwrap<Server>(info.This());
unique_ptr<OpVec> ops(new OpVec());
grpc_server_shutdown_and_notify(
- server->wrapped_server,
- CompletionQueueAsyncWorker::GetQueue(),
+ server->wrapped_server, CompletionQueueAsyncWorker::GetQueue(),
new struct tag(new Nan::Callback(info[0].As<Function>()), ops.release(),
shared_ptr<Resources>(nullptr)));
CompletionQueueAsyncWorker::Next();
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index 66e6e6b549..d42c58050f 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -336,6 +336,8 @@ cdef extern from "grpc/_cython/loader.h":
void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq,
void *reserved) nogil
+ void grpc_server_register_non_listening_completion_queue(
+ grpc_server *server, grpc_completion_queue *cq, void *reserved) nogil
int grpc_server_add_insecure_http2_port(
grpc_server *server, const char *addr) nogil
void grpc_server_start(grpc_server *server) nogil
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
index 8419a59068..55948755b2 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
@@ -81,11 +81,20 @@ cdef class Server:
self.c_server, queue.c_completion_queue, NULL)
self.registered_completion_queues.append(queue)
+ def register_non_listening_completion_queue(
+ self, CompletionQueue queue not None):
+ if self.is_started:
+ raise ValueError("cannot register completion queues after start")
+ with nogil:
+ grpc_server_register_non_listening_completion_queue(
+ self.c_server, queue.c_completion_queue, NULL)
+ self.registered_completion_queues.append(queue)
+
def start(self):
if self.is_started:
raise ValueError("the server has already started")
self.backup_shutdown_queue = CompletionQueue()
- self.register_completion_queue(self.backup_shutdown_queue)
+ self.register_non_listening_completion_queue(self.backup_shutdown_queue)
self.is_started = True
with nogil:
grpc_server_start(self.c_server)
@@ -169,4 +178,3 @@ cdef class Server:
time.sleep(0)
with nogil:
grpc_server_destroy(self.c_server)
-