diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/node/ext/call.cc | 1 | ||||
-rw-r--r-- | src/node/ext/channel.cc | 1 | ||||
-rw-r--r-- | src/node/ext/completion_queue.cc (renamed from src/node/ext/completion_queue_uv.cc) | 0 | ||||
-rw-r--r-- | src/node/ext/completion_queue_async_worker.h | 86 | ||||
-rw-r--r-- | src/node/ext/completion_queue_threadpool.cc | 180 | ||||
-rw-r--r-- | src/node/ext/node_grpc.cc | 5 | ||||
-rw-r--r-- | src/node/ext/server.cc | 70 | ||||
-rw-r--r-- | src/node/ext/server_generic.cc | 75 | ||||
-rw-r--r-- | src/node/ext/server_uv.cc | 120 |
9 files changed, 69 insertions, 469 deletions
diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index fe0c80e642..49179ab359 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -42,7 +42,6 @@ #include "call_credentials.h" #include "channel.h" #include "completion_queue.h" -#include "completion_queue_async_worker.h" #include "grpc/grpc.h" #include "grpc/grpc_security.h" #include "grpc/support/alloc.h" diff --git a/src/node/ext/channel.cc b/src/node/ext/channel.cc index be04cf421d..eb6bc0f53f 100644 --- a/src/node/ext/channel.cc +++ b/src/node/ext/channel.cc @@ -41,7 +41,6 @@ #include "channel.h" #include "channel_credentials.h" #include "completion_queue.h" -#include "completion_queue_async_worker.h" #include "grpc/grpc.h" #include "grpc/grpc_security.h" #include "timeval.h" diff --git a/src/node/ext/completion_queue_uv.cc b/src/node/ext/completion_queue.cc index 9b60911d1e..9b60911d1e 100644 --- a/src/node/ext/completion_queue_uv.cc +++ b/src/node/ext/completion_queue.cc diff --git a/src/node/ext/completion_queue_async_worker.h b/src/node/ext/completion_queue_async_worker.h deleted file mode 100644 index 6e54116765..0000000000 --- a/src/node/ext/completion_queue_async_worker.h +++ /dev/null @@ -1,86 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef NET_GRPC_NODE_COMPLETION_QUEUE_ASYNC_WORKER_H_ -#define NET_GRPC_NODE_COMPLETION_QUEUE_ASYNC_WORKER_H_ -#include <nan.h> - -#include "grpc/grpc.h" - -namespace grpc { -namespace node { - -/* A worker that asynchronously calls completion_queue_next, and queues onto the - node event loop a call to the function stored in the event's tag. */ -class CompletionQueueAsyncWorker : public Nan::AsyncWorker { - public: - CompletionQueueAsyncWorker(); - - ~CompletionQueueAsyncWorker(); - /* Calls completion_queue_next with the provided deadline, and stores the - event if there was one or sets an error message if there was not */ - void Execute(); - - /* Returns the completion queue attached to this class */ - static grpc_completion_queue *GetQueue(); - - /* Convenience function to create a worker with the given arguments and queue - it to run asynchronously */ - static void Next(); - - /* Initialize the CompletionQueueAsyncWorker class */ - static void Init(v8::Local<v8::Object> exports); - - protected: - /* Called when Execute has succeeded (completed without setting an error - message). Calls the saved callback with the event that came from - completion_queue_next */ - void HandleOKCallback(); - - void HandleErrorCallback(); - - private: - grpc_event result; - - static grpc_completion_queue *queue; - - // Number of grpc_completion_queue_next calls in the thread pool - static int current_threads; - // Number of grpc_completion_queue_next calls waiting to enter the thread pool - static int waiting_next_calls; -}; - -} // namespace node -} // namespace grpc - -#endif // NET_GRPC_NODE_COMPLETION_QUEUE_ASYNC_WORKER_H_ diff --git a/src/node/ext/completion_queue_threadpool.cc b/src/node/ext/completion_queue_threadpool.cc deleted file mode 100644 index 72df5d1d65..0000000000 --- a/src/node/ext/completion_queue_threadpool.cc +++ /dev/null @@ -1,180 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -/* I don't like using #ifndef, but I don't see a better way to do this */ -#ifndef GRPC_UV - -#include <nan.h> -#include <node.h> - -#include "call.h" -#include "completion_queue.h" -#include "grpc/grpc.h" -#include "grpc/support/log.h" -#include "grpc/support/time.h" - -namespace grpc { -namespace node { - -namespace { - -/* A worker that asynchronously calls completion_queue_next, and queues onto the - node event loop a call to the function stored in the event's tag. */ -class CompletionQueueAsyncWorker : public Nan::AsyncWorker { - public: - CompletionQueueAsyncWorker(); - - ~CompletionQueueAsyncWorker(); - /* Calls completion_queue_next with the provided deadline, and stores the - event if there was one or sets an error message if there was not */ - void Execute(); - - /* Returns the completion queue attached to this class */ - static grpc_completion_queue *GetQueue(); - - /* Convenience function to create a worker with the given arguments and queue - it to run asynchronously */ - static void Next(); - - /* Initialize the CompletionQueueAsyncWorker class */ - static void Init(v8::Local<v8::Object> exports); - - protected: - /* Called when Execute has succeeded (completed without setting an error - message). Calls the saved callback with the event that came from - completion_queue_next */ - void HandleOKCallback(); - - void HandleErrorCallback(); - - private: - static void TryAddWorker(); - - grpc_event result; - - static grpc_completion_queue *queue; - - // Number of grpc_completion_queue_next calls in the thread pool - static int current_threads; - // Number of grpc_completion_queue_next calls waiting to enter the thread pool - static int waiting_next_calls; -}; - -const int max_queue_threads = 2; - -using v8::Function; -using v8::Local; -using v8::Object; -using v8::Value; - -grpc_completion_queue *CompletionQueueAsyncWorker::queue; - -// Invariants: current_threads <= max_queue_threads -// (current_threads == max_queue_threads) || (waiting_next_calls == 0) - -int CompletionQueueAsyncWorker::current_threads; -int CompletionQueueAsyncWorker::waiting_next_calls; - -CompletionQueueAsyncWorker::CompletionQueueAsyncWorker() - : Nan::AsyncWorker(NULL) {} - -CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {} - -void CompletionQueueAsyncWorker::Execute() { - result = grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME), - NULL); - if (!result.success) { - SetErrorMessage("The async function encountered an error"); - } -} - -grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; } - -void CompletionQueueAsyncWorker::TryAddWorker() { - if (current_threads < max_queue_threads && waiting_next_calls > 0) { - current_threads += 1; - waiting_next_calls -= 1; - CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); - Nan::AsyncQueueWorker(worker); - } - GPR_ASSERT(current_threads <= max_queue_threads); - GPR_ASSERT((current_threads == max_queue_threads) || - (waiting_next_calls == 0)); -} - -void CompletionQueueAsyncWorker::Next() { - waiting_next_calls += 1; - TryAddWorker(); -} - -void CompletionQueueAsyncWorker::Init(Local<Object> exports) { - Nan::HandleScope scope; - current_threads = 0; - waiting_next_calls = 0; - queue = grpc_completion_queue_create_for_next(NULL); -} - -void CompletionQueueAsyncWorker::HandleOKCallback() { - Nan::HandleScope scope; - current_threads -= 1; - TryAddWorker(); - CompleteTag(result.tag, NULL); - - DestroyTag(result.tag); -} - -void CompletionQueueAsyncWorker::HandleErrorCallback() { - Nan::HandleScope scope; - current_threads -= 1; - TryAddWorker(); - CompleteTag(result.tag, ErrorMessage()); - - DestroyTag(result.tag); -} - -} // namespace - -grpc_completion_queue *GetCompletionQueue() { - return CompletionQueueAsyncWorker::GetQueue(); -} - -void CompletionQueueNext() { CompletionQueueAsyncWorker::Next(); } - -void CompletionQueueInit(Local<Object> exports) { - CompletionQueueAsyncWorker::Init(exports); -} - -} // namespace node -} // namespace grpc - -#endif /* GRPC_UV */ diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc index 076f1ed424..e193e82179 100644 --- a/src/node/ext/node_grpc.cc +++ b/src/node/ext/node_grpc.cc @@ -43,18 +43,15 @@ #include "grpc/support/time.h" // TODO(murgatroid99): Remove this when the endpoint API becomes public -#ifdef GRPC_UV extern "C" { #include "src/core/lib/iomgr/pollset_uv.h" } -#endif #include "call.h" #include "call_credentials.h" #include "channel.h" #include "channel_credentials.h" #include "completion_queue.h" -#include "completion_queue_async_worker.h" #include "server.h" #include "server_credentials.h" #include "slice.h" @@ -432,9 +429,7 @@ void init(Local<Object> exports) { InitWriteFlags(exports); InitLogConstants(exports); -#ifdef GRPC_UV grpc_pollset_work_run_loop = 0; -#endif grpc::node::Call::Init(exports); grpc::node::CallCredentials::Init(exports); diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc index 1871a32452..962a25d12a 100644 --- a/src/node/ext/server.cc +++ b/src/node/ext/server.cc @@ -41,7 +41,6 @@ #include <vector> #include "call.h" #include "completion_queue.h" -#include "completion_queue_async_worker.h" #include "grpc/grpc.h" #include "grpc/grpc_security.h" #include "grpc/support/log.h" @@ -78,6 +77,38 @@ using v8::Value; Nan::Callback *Server::constructor; Persistent<FunctionTemplate> Server::fun_tpl; +static Callback *shutdown_callback = NULL; + +class ServerShutdownOp : public Op { + public: + ServerShutdownOp(grpc_server *server): server(server) { + } + + ~ServerShutdownOp() { + } + + Local<Value> GetNodeValue() const { + return Nan::Null(); + } + + bool ParseOp(Local<Value> value, grpc_op *out) { + return true; + } + bool IsFinalOp() { + return false; + } + void OnComplete(bool success) { + /* Because cancel_all_calls was called, we assume that shutdown_and_notify + completes successfully */ + grpc_server_destroy(server); + } + + grpc_server *server; + + protected: + std::string GetTypeString() const { return "shutdown"; } +}; + class NewCallOp : public Op { public: NewCallOp() { @@ -149,6 +180,13 @@ class TryShutdownOp : public Op { server_persist; }; +Server::Server(grpc_server *server) : wrapped_server(server) { +} + +Server::~Server() { + this->ShutdownServer(); +} + void Server::Init(Local<Object> exports) { HandleScope scope; Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New); @@ -177,6 +215,36 @@ void Server::DestroyWrappedServer() { } } +NAN_METHOD(ServerShutdownCallback) { + if (!info[0]->IsNull()) { + return Nan::ThrowError("forceShutdown failed somehow"); + } +} + +void Server::ShutdownServer() { + Nan::HandleScope scope; + if (this->wrapped_server != NULL) { + if (shutdown_callback == NULL) { + Local<FunctionTemplate>callback_tpl = + Nan::New<FunctionTemplate>(ServerShutdownCallback); + shutdown_callback = new Callback( + Nan::GetFunction(callback_tpl).ToLocalChecked()); + } + + ServerShutdownOp *op = new ServerShutdownOp(this->wrapped_server); + unique_ptr<OpVec> ops(new OpVec()); + ops->push_back(unique_ptr<Op>(op)); + + grpc_server_shutdown_and_notify( + this->wrapped_server, GetCompletionQueue(), + new struct tag(new Callback(**shutdown_callback), ops.release(), NULL, + Nan::Null())); + grpc_server_cancel_all_calls(this->wrapped_server); + CompletionQueueNext(); + this->wrapped_server = NULL; + } +} + NAN_METHOD(Server::New) { /* If this is not a constructor call, make a constructor call and return the result */ diff --git a/src/node/ext/server_generic.cc b/src/node/ext/server_generic.cc deleted file mode 100644 index 088273d527..0000000000 --- a/src/node/ext/server_generic.cc +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Copyright 2017, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_UV - -#include "server.h" - -#include <nan.h> -#include <node.h> -#include "grpc/grpc.h" -#include "grpc/support/time.h" - -namespace grpc { -namespace node { - -Server::Server(grpc_server *server) : wrapped_server(server) { - grpc_completion_queue_attributes attrs = { - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_NON_LISTENING}; - shutdown_queue = grpc_completion_queue_create( - grpc_completion_queue_factory_lookup(&attrs), &attrs, NULL); - grpc_server_register_completion_queue(server, shutdown_queue, NULL); -} - -Server::~Server() { - this->ShutdownServer(); - grpc_completion_queue_shutdown(this->shutdown_queue); - grpc_completion_queue_destroy(this->shutdown_queue); -} - -void Server::ShutdownServer() { - if (this->wrapped_server != NULL) { - grpc_server_shutdown_and_notify(this->wrapped_server, this->shutdown_queue, - NULL); - grpc_server_cancel_all_calls(this->wrapped_server); - grpc_completion_queue_pluck(this->shutdown_queue, NULL, - gpr_inf_future(GPR_CLOCK_REALTIME), NULL); - grpc_server_destroy(this->wrapped_server); - this->wrapped_server = NULL; - } -} - -} // namespace grpc -} // namespace node - -#endif /* GRPC_UV */ diff --git a/src/node/ext/server_uv.cc b/src/node/ext/server_uv.cc deleted file mode 100644 index 709921b7fc..0000000000 --- a/src/node/ext/server_uv.cc +++ /dev/null @@ -1,120 +0,0 @@ -/* - * - * Copyright 2017, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifdef GRPC_UV - -#include "server.h" - -#include <nan.h> -#include <node.h> -#include "grpc/grpc.h" -#include "grpc/support/time.h" - -#include "call.h" -#include "completion_queue.h" - -namespace grpc { -namespace node { - -using Nan::Callback; -using Nan::MaybeLocal; - -using v8::External; -using v8::Function; -using v8::FunctionTemplate; -using v8::Local; -using v8::Object; -using v8::Value; - -static Callback *shutdown_callback = NULL; - -class ServerShutdownOp : public Op { - public: - ServerShutdownOp(grpc_server *server) : server(server) {} - - ~ServerShutdownOp() {} - - Local<Value> GetNodeValue() const { return Nan::Null(); } - - bool ParseOp(Local<Value> value, grpc_op *out) { return true; } - bool IsFinalOp() { return false; } - void OnComplete(bool success) { - /* Because cancel_all_calls was called, we assume that shutdown_and_notify - completes successfully */ - grpc_server_destroy(server); - } - - grpc_server *server; - - protected: - std::string GetTypeString() const { return "shutdown"; } -}; - -Server::Server(grpc_server *server) : wrapped_server(server) {} - -Server::~Server() { this->ShutdownServer(); } - -NAN_METHOD(ServerShutdownCallback) { - if (!info[0]->IsNull()) { - return Nan::ThrowError("forceShutdown failed somehow"); - } -} - -void Server::ShutdownServer() { - Nan::HandleScope scope; - if (this->wrapped_server != NULL) { - if (shutdown_callback == NULL) { - Local<FunctionTemplate> callback_tpl = - Nan::New<FunctionTemplate>(ServerShutdownCallback); - shutdown_callback = - new Callback(Nan::GetFunction(callback_tpl).ToLocalChecked()); - } - - ServerShutdownOp *op = new ServerShutdownOp(this->wrapped_server); - unique_ptr<OpVec> ops(new OpVec()); - ops->push_back(unique_ptr<Op>(op)); - - grpc_server_shutdown_and_notify( - this->wrapped_server, GetCompletionQueue(), - new struct tag(new Callback(**shutdown_callback), ops.release(), NULL, - Nan::Null())); - grpc_server_cancel_all_calls(this->wrapped_server); - CompletionQueueNext(); - this->wrapped_server = NULL; - } -} - -} // namespace grpc -} // namespace node - -#endif /* GRPC_UV */ |