aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/node/ext/server.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/node/ext/server.cc')
-rw-r--r--src/node/ext/server.cc118
1 files changed, 106 insertions, 12 deletions
diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc
index ccb55aa54c..a885a9f268 100644
--- a/src/node/ext/server.cc
+++ b/src/node/ext/server.cc
@@ -41,7 +41,6 @@
#include <vector>
#include "call.h"
#include "completion_queue.h"
-#include "completion_queue_async_worker.h"
#include "grpc/grpc.h"
#include "grpc/grpc_security.h"
#include "grpc/support/log.h"
@@ -78,6 +77,30 @@ using v8::Value;
Nan::Callback *Server::constructor;
Persistent<FunctionTemplate> Server::fun_tpl;
+static Callback *shutdown_callback = NULL;
+
+class ServerShutdownOp : public Op {
+ public:
+ ServerShutdownOp(grpc_server *server) : server(server) {}
+
+ ~ServerShutdownOp() {}
+
+ Local<Value> GetNodeValue() const { return Nan::Null(); }
+
+ bool ParseOp(Local<Value> value, grpc_op *out) { return true; }
+ bool IsFinalOp() { return false; }
+ void OnComplete(bool success) {
+ /* Because cancel_all_calls was called, we assume that shutdown_and_notify
+ completes successfully */
+ grpc_server_destroy(server);
+ }
+
+ grpc_server *server;
+
+ protected:
+ std::string GetTypeString() const { return "shutdown"; }
+};
+
class NewCallOp : public Op {
public:
NewCallOp() {
@@ -111,12 +134,9 @@ class NewCallOp : public Op {
return scope.Escape(obj);
}
- bool ParseOp(Local<Value> value, grpc_op *out) {
- return true;
- }
- bool IsFinalOp() {
- return false;
- }
+ bool ParseOp(Local<Value> value, grpc_op *out) { return true; }
+ bool IsFinalOp() { return false; }
+ void OnComplete(bool success) {}
grpc_call *call;
grpc_call_details details;
@@ -126,6 +146,36 @@ class NewCallOp : public Op {
std::string GetTypeString() const { return "new_call"; }
};
+class TryShutdownOp : public Op {
+ public:
+ TryShutdownOp(Server *server, Local<Value> server_value) : server(server) {
+ server_persist.Reset(server_value);
+ }
+ Local<Value> GetNodeValue() const {
+ EscapableHandleScope scope;
+ return scope.Escape(Nan::New(server_persist));
+ }
+ bool ParseOp(Local<Value> value, grpc_op *out) { return true; }
+ bool IsFinalOp() { return false; }
+ void OnComplete(bool success) {
+ if (success) {
+ server->DestroyWrappedServer();
+ }
+ }
+
+ protected:
+ std::string GetTypeString() const { return "try_shutdown"; }
+
+ private:
+ Server *server;
+ Nan::Persistent<v8::Value, Nan::CopyablePersistentTraits<v8::Value>>
+ server_persist;
+};
+
+Server::Server(grpc_server *server) : wrapped_server(server) {}
+
+Server::~Server() { this->ShutdownServer(); }
+
void Server::Init(Local<Object> exports) {
HandleScope scope;
Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
@@ -147,6 +197,43 @@ bool Server::HasInstance(Local<Value> val) {
return Nan::New(fun_tpl)->HasInstance(val);
}
+void Server::DestroyWrappedServer() {
+ if (this->wrapped_server != NULL) {
+ grpc_server_destroy(this->wrapped_server);
+ this->wrapped_server = NULL;
+ }
+}
+
+NAN_METHOD(ServerShutdownCallback) {
+ if (!info[0]->IsNull()) {
+ return Nan::ThrowError("forceShutdown failed somehow");
+ }
+}
+
+void Server::ShutdownServer() {
+ Nan::HandleScope scope;
+ if (this->wrapped_server != NULL) {
+ if (shutdown_callback == NULL) {
+ Local<FunctionTemplate> callback_tpl =
+ Nan::New<FunctionTemplate>(ServerShutdownCallback);
+ shutdown_callback =
+ new Callback(Nan::GetFunction(callback_tpl).ToLocalChecked());
+ }
+
+ ServerShutdownOp *op = new ServerShutdownOp(this->wrapped_server);
+ unique_ptr<OpVec> ops(new OpVec());
+ ops->push_back(unique_ptr<Op>(op));
+
+ grpc_server_shutdown_and_notify(
+ this->wrapped_server, GetCompletionQueue(),
+ new struct tag(new Callback(**shutdown_callback), ops.release(), NULL,
+ Nan::Null()));
+ grpc_server_cancel_all_calls(this->wrapped_server);
+ CompletionQueueNext();
+ this->wrapped_server = NULL;
+ }
+}
+
NAN_METHOD(Server::New) {
/* If this is not a constructor call, make a constructor call and return
the result */
@@ -190,10 +277,9 @@ NAN_METHOD(Server::RequestCall) {
ops->push_back(unique_ptr<Op>(op));
grpc_call_error error = grpc_server_request_call(
server->wrapped_server, &op->call, &op->details, &op->request_metadata,
- GetCompletionQueue(),
- GetCompletionQueue(),
- new struct tag(new Callback(info[0].As<Function>()), ops.release(),
- NULL));
+ GetCompletionQueue(), GetCompletionQueue(),
+ new struct tag(new Callback(info[0].As<Function>()), ops.release(), NULL,
+ Nan::Null()));
if (error != GRPC_CALL_OK) {
return Nan::ThrowError(nanErrorWithCode("requestCall failed", error));
}
@@ -242,11 +328,19 @@ NAN_METHOD(Server::TryShutdown) {
return Nan::ThrowTypeError("tryShutdown can only be called on a Server");
}
Server *server = ObjectWrap::Unwrap<Server>(info.This());
+ if (server->wrapped_server == NULL) {
+ // Server is already shut down. Call callback immediately.
+ Nan::Callback callback(info[0].As<Function>());
+ callback.Call(0, {});
+ return;
+ }
+ TryShutdownOp *op = new TryShutdownOp(server, info.This());
unique_ptr<OpVec> ops(new OpVec());
+ ops->push_back(unique_ptr<Op>(op));
grpc_server_shutdown_and_notify(
server->wrapped_server, GetCompletionQueue(),
new struct tag(new Nan::Callback(info[0].As<Function>()), ops.release(),
- NULL));
+ NULL, Nan::Null()));
CompletionQueueNext();
}