diff options
author | 2016-10-26 23:34:16 +0200 | |
---|---|---|
committer | 2016-10-26 23:34:16 +0200 | |
commit | 6ab40a5fd395ec0db6231a1c1ffd4dd468b61bd0 (patch) | |
tree | b5104f137d8b4f4b54318b7ecf12c037abac62da /src/node/ext/server.cc | |
parent | 89e23643538cd632e3f0e78671795ab42bf3371c (diff) | |
parent | 832b685740f52dd1ac04fe7cdda4cc5ad1e7fa92 (diff) |
Merge branch 'master' of https://github.com/grpc/grpc into bazel-take-2
Diffstat (limited to 'src/node/ext/server.cc')
-rw-r--r-- | src/node/ext/server.cc | 91 |
1 files changed, 72 insertions, 19 deletions
diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc index dd1b777ac8..29f31ff15e 100644 --- a/src/node/ext/server.cc +++ b/src/node/ext/server.cc @@ -40,6 +40,7 @@ #include <vector> #include "call.h" +#include "completion_queue.h" #include "completion_queue_async_worker.h" #include "grpc/grpc.h" #include "grpc/grpc_security.h" @@ -64,6 +65,7 @@ using v8::Array; using v8::Boolean; using v8::Date; using v8::Exception; +using v8::External; using v8::Function; using v8::FunctionTemplate; using v8::Local; @@ -75,6 +77,8 @@ using v8::Value; Nan::Callback *Server::constructor; Persistent<FunctionTemplate> Server::fun_tpl; +static Callback *shutdown_callback; + class NewCallOp : public Op { public: NewCallOp() { @@ -111,6 +115,9 @@ class NewCallOp : public Op { shared_ptr<Resources> resources) { return true; } + bool IsFinalOp() { + return false; + } grpc_call *call; grpc_call_details details; @@ -120,17 +127,50 @@ class NewCallOp : public Op { std::string GetTypeString() const { return "new_call"; } }; +class ServerShutdownOp : public Op { + public: + ServerShutdownOp(grpc_server *server): server(server) { + } + + ~ServerShutdownOp() { + } + + Local<Value> GetNodeValue() const { + return Nan::New<External>(reinterpret_cast<void *>(server)); + } + + bool ParseOp(Local<Value> value, grpc_op *out, + shared_ptr<Resources> resources) { + return true; + } + bool IsFinalOp() { + return false; + } + + grpc_server *server; + + protected: + std::string GetTypeString() const { return "shutdown"; } +}; + +NAN_METHOD(ServerShutdownCallback) { + if (!info[0]->IsNull()) { + return Nan::ThrowError("forceShutdown failed somehow"); + } + MaybeLocal<Object> maybe_result = Nan::To<Object>(info[1]); + Local<Object> result = maybe_result.ToLocalChecked(); + Local<Value> server_val = Nan::Get( + result, Nan::New("shutdown").ToLocalChecked()).ToLocalChecked(); + Local<External> server_extern = server_val.As<External>(); + grpc_server *server = reinterpret_cast<grpc_server *>(server_extern->Value()); + grpc_server_destroy(server); +} + Server::Server(grpc_server *server) : wrapped_server(server) { - shutdown_queue = grpc_completion_queue_create(NULL); - grpc_server_register_non_listening_completion_queue(server, shutdown_queue, - NULL); } Server::~Server() { this->ShutdownServer(); - grpc_completion_queue_shutdown(this->shutdown_queue); - grpc_server_destroy(this->wrapped_server); - grpc_completion_queue_destroy(this->shutdown_queue); } void Server::Init(Local<Object> exports) { @@ -147,6 +187,11 @@ void Server::Init(Local<Object> exports) { Local<Function> ctr = Nan::GetFunction(tpl).ToLocalChecked(); Nan::Set(exports, Nan::New("Server").ToLocalChecked(), ctr); constructor = new Callback(ctr); + + Local<FunctionTemplate>callback_tpl = + Nan::New<FunctionTemplate>(ServerShutdownCallback); + shutdown_callback = new Callback( + Nan::GetFunction(callback_tpl).ToLocalChecked()); } bool Server::HasInstance(Local<Value> val) { @@ -155,11 +200,19 @@ bool Server::HasInstance(Local<Value> val) { } void Server::ShutdownServer() { - grpc_server_shutdown_and_notify(this->wrapped_server, this->shutdown_queue, - NULL); - grpc_server_cancel_all_calls(this->wrapped_server); - grpc_completion_queue_pluck(this->shutdown_queue, NULL, - gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + if (this->wrapped_server != NULL) { + ServerShutdownOp *op = new ServerShutdownOp(this->wrapped_server); + unique_ptr<OpVec> ops(new OpVec()); + ops->push_back(unique_ptr<Op>(op)); + + grpc_server_shutdown_and_notify( + this->wrapped_server, GetCompletionQueue(), + new struct tag(new Callback(**shutdown_callback), ops.release(), + shared_ptr<Resources>(nullptr), NULL)); + grpc_server_cancel_all_calls(this->wrapped_server); + CompletionQueueNext(); + this->wrapped_server = NULL; + } } NAN_METHOD(Server::New) { @@ -179,7 +232,7 @@ NAN_METHOD(Server::New) { } } grpc_server *wrapped_server; - grpc_completion_queue *queue = CompletionQueueAsyncWorker::GetQueue(); + grpc_completion_queue *queue = GetCompletionQueue(); grpc_channel_args *channel_args; if (!ParseChannelArgs(info[0], &channel_args)) { DeallocateChannelArgs(channel_args); @@ -205,14 +258,14 @@ NAN_METHOD(Server::RequestCall) { ops->push_back(unique_ptr<Op>(op)); grpc_call_error error = grpc_server_request_call( server->wrapped_server, &op->call, &op->details, &op->request_metadata, - CompletionQueueAsyncWorker::GetQueue(), - CompletionQueueAsyncWorker::GetQueue(), + GetCompletionQueue(), + GetCompletionQueue(), new struct tag(new Callback(info[0].As<Function>()), ops.release(), - shared_ptr<Resources>(nullptr))); + shared_ptr<Resources>(nullptr), NULL)); if (error != GRPC_CALL_OK) { return Nan::ThrowError(nanErrorWithCode("requestCall failed", error)); } - CompletionQueueAsyncWorker::Next(); + CompletionQueueNext(); } NAN_METHOD(Server::AddHttp2Port) { @@ -259,10 +312,10 @@ NAN_METHOD(Server::TryShutdown) { Server *server = ObjectWrap::Unwrap<Server>(info.This()); unique_ptr<OpVec> ops(new OpVec()); grpc_server_shutdown_and_notify( - server->wrapped_server, CompletionQueueAsyncWorker::GetQueue(), + server->wrapped_server, GetCompletionQueue(), new struct tag(new Nan::Callback(info[0].As<Function>()), ops.release(), - shared_ptr<Resources>(nullptr))); - CompletionQueueAsyncWorker::Next(); + shared_ptr<Resources>(nullptr), NULL)); + CompletionQueueNext(); } NAN_METHOD(Server::ForceShutdown) { |