diff options
Diffstat (limited to 'src/node/ext/server.cc')
-rw-r--r-- | src/node/ext/server.cc | 118 |
1 files changed, 106 insertions, 12 deletions
diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc index ccb55aa54c..a885a9f268 100644 --- a/src/node/ext/server.cc +++ b/src/node/ext/server.cc @@ -41,7 +41,6 @@ #include <vector> #include "call.h" #include "completion_queue.h" -#include "completion_queue_async_worker.h" #include "grpc/grpc.h" #include "grpc/grpc_security.h" #include "grpc/support/log.h" @@ -78,6 +77,30 @@ using v8::Value; Nan::Callback *Server::constructor; Persistent<FunctionTemplate> Server::fun_tpl; +static Callback *shutdown_callback = NULL; + +class ServerShutdownOp : public Op { + public: + ServerShutdownOp(grpc_server *server) : server(server) {} + + ~ServerShutdownOp() {} + + Local<Value> GetNodeValue() const { return Nan::Null(); } + + bool ParseOp(Local<Value> value, grpc_op *out) { return true; } + bool IsFinalOp() { return false; } + void OnComplete(bool success) { + /* Because cancel_all_calls was called, we assume that shutdown_and_notify + completes successfully */ + grpc_server_destroy(server); + } + + grpc_server *server; + + protected: + std::string GetTypeString() const { return "shutdown"; } +}; + class NewCallOp : public Op { public: NewCallOp() { @@ -111,12 +134,9 @@ class NewCallOp : public Op { return scope.Escape(obj); } - bool ParseOp(Local<Value> value, grpc_op *out) { - return true; - } - bool IsFinalOp() { - return false; - } + bool ParseOp(Local<Value> value, grpc_op *out) { return true; } + bool IsFinalOp() { return false; } + void OnComplete(bool success) {} grpc_call *call; grpc_call_details details; @@ -126,6 +146,36 @@ class NewCallOp : public Op { std::string GetTypeString() const { return "new_call"; } }; +class TryShutdownOp : public Op { + public: + TryShutdownOp(Server *server, Local<Value> server_value) : server(server) { + server_persist.Reset(server_value); + } + Local<Value> GetNodeValue() const { + EscapableHandleScope scope; + return scope.Escape(Nan::New(server_persist)); + } + bool ParseOp(Local<Value> value, grpc_op *out) { return true; } + bool IsFinalOp() { return false; } + void OnComplete(bool success) { + if (success) { + server->DestroyWrappedServer(); + } + } + + protected: + std::string GetTypeString() const { return "try_shutdown"; } + + private: + Server *server; + Nan::Persistent<v8::Value, Nan::CopyablePersistentTraits<v8::Value>> + server_persist; +}; + +Server::Server(grpc_server *server) : wrapped_server(server) {} + +Server::~Server() { this->ShutdownServer(); } + void Server::Init(Local<Object> exports) { HandleScope scope; Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New); @@ -147,6 +197,43 @@ bool Server::HasInstance(Local<Value> val) { return Nan::New(fun_tpl)->HasInstance(val); } +void Server::DestroyWrappedServer() { + if (this->wrapped_server != NULL) { + grpc_server_destroy(this->wrapped_server); + this->wrapped_server = NULL; + } +} + +NAN_METHOD(ServerShutdownCallback) { + if (!info[0]->IsNull()) { + return Nan::ThrowError("forceShutdown failed somehow"); + } +} + +void Server::ShutdownServer() { + Nan::HandleScope scope; + if (this->wrapped_server != NULL) { + if (shutdown_callback == NULL) { + Local<FunctionTemplate> callback_tpl = + Nan::New<FunctionTemplate>(ServerShutdownCallback); + shutdown_callback = + new Callback(Nan::GetFunction(callback_tpl).ToLocalChecked()); + } + + ServerShutdownOp *op = new ServerShutdownOp(this->wrapped_server); + unique_ptr<OpVec> ops(new OpVec()); + ops->push_back(unique_ptr<Op>(op)); + + grpc_server_shutdown_and_notify( + this->wrapped_server, GetCompletionQueue(), + new struct tag(new Callback(**shutdown_callback), ops.release(), NULL, + Nan::Null())); + grpc_server_cancel_all_calls(this->wrapped_server); + CompletionQueueNext(); + this->wrapped_server = NULL; + } +} + NAN_METHOD(Server::New) { /* If this is not a constructor call, make a constructor call and return the result */ @@ -190,10 +277,9 @@ NAN_METHOD(Server::RequestCall) { ops->push_back(unique_ptr<Op>(op)); grpc_call_error error = grpc_server_request_call( server->wrapped_server, &op->call, &op->details, &op->request_metadata, - GetCompletionQueue(), - GetCompletionQueue(), - new struct tag(new Callback(info[0].As<Function>()), ops.release(), - NULL)); + GetCompletionQueue(), GetCompletionQueue(), + new struct tag(new Callback(info[0].As<Function>()), ops.release(), NULL, + Nan::Null())); if (error != GRPC_CALL_OK) { return Nan::ThrowError(nanErrorWithCode("requestCall failed", error)); } @@ -242,11 +328,19 @@ NAN_METHOD(Server::TryShutdown) { return Nan::ThrowTypeError("tryShutdown can only be called on a Server"); } Server *server = ObjectWrap::Unwrap<Server>(info.This()); + if (server->wrapped_server == NULL) { + // Server is already shut down. Call callback immediately. + Nan::Callback callback(info[0].As<Function>()); + callback.Call(0, {}); + return; + } + TryShutdownOp *op = new TryShutdownOp(server, info.This()); unique_ptr<OpVec> ops(new OpVec()); + ops->push_back(unique_ptr<Op>(op)); grpc_server_shutdown_and_notify( server->wrapped_server, GetCompletionQueue(), new struct tag(new Nan::Callback(info[0].As<Function>()), ops.release(), - NULL)); + NULL, Nan::Null())); CompletionQueueNext(); } |