diff options
Diffstat (limited to 'src/node')
-rw-r--r-- | src/node/ext/call.cc | 56 | ||||
-rw-r--r-- | src/node/ext/call.h | 7 | ||||
-rw-r--r-- | src/node/ext/completion_queue_threadpool.cc | 9 | ||||
-rw-r--r-- | src/node/ext/completion_queue_uv.cc | 12 | ||||
-rw-r--r-- | src/node/ext/server.cc | 45 | ||||
-rw-r--r-- | src/node/ext/server.h | 2 | ||||
-rw-r--r-- | src/node/ext/server_uv.cc | 15 | ||||
-rw-r--r-- | src/node/test/test_messages.proto | 2 |
8 files changed, 99 insertions, 49 deletions
diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 5d573110da..bd60775aad 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -217,6 +217,8 @@ class SendMetadataOp : public Op { bool IsFinalOp() { return false; } + void OnComplete(bool success) { + } protected: std::string GetTypeString() const { return "send_metadata"; @@ -260,6 +262,8 @@ class SendMessageOp : public Op { bool IsFinalOp() { return false; } + void OnComplete(bool success) { + } protected: std::string GetTypeString() const { return "send_message"; @@ -280,6 +284,8 @@ class SendClientCloseOp : public Op { bool IsFinalOp() { return false; } + void OnComplete(bool success) { + } protected: std::string GetTypeString() const { return "client_close"; @@ -349,6 +355,8 @@ class SendServerStatusOp : public Op { bool IsFinalOp() { return true; } + void OnComplete(bool success) { + } protected: std::string GetTypeString() const { return "send_status"; @@ -381,6 +389,8 @@ class GetMetadataOp : public Op { bool IsFinalOp() { return false; } + void OnComplete(bool success) { + } protected: std::string GetTypeString() const { @@ -413,6 +423,8 @@ class ReadMessageOp : public Op { bool IsFinalOp() { return false; } + void OnComplete(bool success) { + } protected: std::string GetTypeString() const { @@ -454,6 +466,8 @@ class ClientStatusOp : public Op { bool IsFinalOp() { return true; } + void OnComplete(bool success) { + } protected: std::string GetTypeString() const { return "status"; @@ -478,6 +492,8 @@ class ServerCloseResponseOp : public Op { bool IsFinalOp() { return false; } + void OnComplete(bool success) { + } protected: std::string GetTypeString() const { @@ -499,36 +515,36 @@ tag::~tag() { delete ops; } -Local<Value> GetTagNodeValue(void *tag) { - EscapableHandleScope scope; +void CompleteTag(void *tag, const char *error_message) { + HandleScope scope; struct tag *tag_struct = reinterpret_cast<struct tag *>(tag); - Local<Object> tag_obj = Nan::New<Object>(); - for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin(); - it != tag_struct->ops->end(); ++it) { - Op *op_ptr = it->get(); - Nan::Set(tag_obj, op_ptr->GetOpType(), op_ptr->GetNodeValue()); + Callback *callback = tag_struct->callback; + if (error_message == NULL) { + Local<Object> tag_obj = Nan::New<Object>(); + for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin(); + it != tag_struct->ops->end(); ++it) { + Op *op_ptr = it->get(); + Nan::Set(tag_obj, op_ptr->GetOpType(), op_ptr->GetNodeValue()); + } + Local<Value> argv[] = {Nan::Null(), tag_obj}; + callback->Call(2, argv); + } else { + Local<Value> argv[] = {Nan::Error(error_message)}; + callback->Call(1, argv); } - return scope.Escape(tag_obj); -} - -Callback *GetTagCallback(void *tag) { - struct tag *tag_struct = reinterpret_cast<struct tag *>(tag); - return tag_struct->callback; -} - -void CompleteTag(void *tag) { - struct tag *tag_struct = reinterpret_cast<struct tag *>(tag); + bool success = (error_message == NULL); bool is_final_op = false; - if (tag_struct->call == NULL) { - return; - } for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin(); it != tag_struct->ops->end(); ++it) { Op *op_ptr = it->get(); + op_ptr->OnComplete(success); if (op_ptr->IsFinalOp()) { is_final_op = true; } } + if (tag_struct->call == NULL) { + return; + } tag_struct->call->CompleteBatch(is_final_op); } diff --git a/src/node/ext/call.h b/src/node/ext/call.h index 53a5e4ab67..340e32682b 100644 --- a/src/node/ext/call.h +++ b/src/node/ext/call.h @@ -106,6 +106,7 @@ class Op { virtual ~Op(); v8::Local<v8::Value> GetOpType() const; virtual bool IsFinalOp() = 0; + virtual void OnComplete(bool success) = 0; protected: virtual std::string GetTypeString() const = 0; @@ -123,13 +124,9 @@ struct tag { call_persist; }; -v8::Local<v8::Value> GetTagNodeValue(void *tag); - -Nan::Callback *GetTagCallback(void *tag); - void DestroyTag(void *tag); -void CompleteTag(void *tag); +void CompleteTag(void *tag, const char *error_message); } // namespace node } // namespace grpc diff --git a/src/node/ext/completion_queue_threadpool.cc b/src/node/ext/completion_queue_threadpool.cc index 1917074dc2..7b1bdda033 100644 --- a/src/node/ext/completion_queue_threadpool.cc +++ b/src/node/ext/completion_queue_threadpool.cc @@ -148,9 +148,7 @@ void CompletionQueueAsyncWorker::HandleOKCallback() { Nan::HandleScope scope; current_threads -= 1; TryAddWorker(); - Nan::Callback *callback = GetTagCallback(result.tag); - Local<Value> argv[] = {Nan::Null(), GetTagNodeValue(result.tag)}; - callback->Call(2, argv); + CompleteTag(result.tag, NULL); DestroyTag(result.tag); } @@ -159,10 +157,7 @@ void CompletionQueueAsyncWorker::HandleErrorCallback() { Nan::HandleScope scope; current_threads -= 1; TryAddWorker(); - Nan::Callback *callback = GetTagCallback(result.tag); - Local<Value> argv[] = {Nan::Error(ErrorMessage())}; - - callback->Call(1, argv); + CompleteTag(result.tag, ErrorMessage()); DestroyTag(result.tag); } diff --git a/src/node/ext/completion_queue_uv.cc b/src/node/ext/completion_queue_uv.cc index 615973a6c9..0f6f7da460 100644 --- a/src/node/ext/completion_queue_uv.cc +++ b/src/node/ext/completion_queue_uv.cc @@ -61,17 +61,13 @@ void drain_completion_queue(uv_prepare_t *handle) { queue, gpr_inf_past(GPR_CLOCK_MONOTONIC), NULL); if (event.type == GRPC_OP_COMPLETE) { - Nan::Callback *callback = grpc::node::GetTagCallback(event.tag); + const char *error_message; if (event.success) { - Local<Value> argv[] = {Nan::Null(), - grpc::node::GetTagNodeValue(event.tag)}; - callback->Call(2, argv); + error_message = NULL; } else { - Local<Value> argv[] = {Nan::Error( - "The async function encountered an error")}; - callback->Call(1, argv); + error_message = "The async function encountered an error"; } - grpc::node::CompleteTag(event.tag); + CompleteTag(event.tag, error_message); grpc::node::DestroyTag(event.tag); pending_batches--; if (pending_batches == 0) { diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc index f0920c842a..5384305631 100644 --- a/src/node/ext/server.cc +++ b/src/node/ext/server.cc @@ -117,6 +117,8 @@ class NewCallOp : public Op { bool IsFinalOp() { return false; } + void OnComplete(bool success) { + } grpc_call *call; grpc_call_details details; @@ -126,6 +128,34 @@ class NewCallOp : public Op { std::string GetTypeString() const { return "new_call"; } }; +class TryShutdownOp: public Op { + public: + TryShutdownOp(Server *server, Local<Value> server_value) : server(server) { + server_persist.Reset(server_value); + } + Local<Value> GetNodeValue() const { + EscapableHandleScope scope; + return scope.Escape(Nan::New(server_persist)); + } + bool ParseOp(Local<Value> value, grpc_op *out) { + return true; + } + bool IsFinalOp() { + return false; + } + void OnComplete(bool success) { + if (success) { + server->DestroyWrappedServer(); + } + } + protected: + std::string GetTypeString() const { return "try_shutdown"; } + private: + Server *server; + Nan::Persistent<v8::Value, Nan::CopyablePersistentTraits<v8::Value>> + server_persist; +}; + void Server::Init(Local<Object> exports) { HandleScope scope; Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New); @@ -147,6 +177,13 @@ bool Server::HasInstance(Local<Value> val) { return Nan::New(fun_tpl)->HasInstance(val); } +void Server::DestroyWrappedServer() { + if (this->wrapped_server != NULL) { + grpc_server_destroy(this->wrapped_server); + this->wrapped_server = NULL; + } +} + NAN_METHOD(Server::New) { /* If this is not a constructor call, make a constructor call and return the result */ @@ -242,7 +279,15 @@ NAN_METHOD(Server::TryShutdown) { return Nan::ThrowTypeError("tryShutdown can only be called on a Server"); } Server *server = ObjectWrap::Unwrap<Server>(info.This()); + if (server->wrapped_server == NULL) { + // Server is already shut down. Call callback immediately. + Nan::Callback callback(info[0].As<Function>()); + callback.Call(0, {}); + return; + } + TryShutdownOp *op = new TryShutdownOp(server, info.This()); unique_ptr<OpVec> ops(new OpVec()); + ops->push_back(unique_ptr<Op>(op)); grpc_server_shutdown_and_notify( server->wrapped_server, GetCompletionQueue(), new struct tag(new Nan::Callback(info[0].As<Function>()), ops.release(), diff --git a/src/node/ext/server.h b/src/node/ext/server.h index ab5fc210e8..c0f2e86554 100644 --- a/src/node/ext/server.h +++ b/src/node/ext/server.h @@ -53,6 +53,8 @@ class Server : public Nan::ObjectWrap { JavaScript constructor */ static bool HasInstance(v8::Local<v8::Value> val); + void DestroyWrappedServer(); + private: explicit Server(grpc_server *server); ~Server(); diff --git a/src/node/ext/server_uv.cc b/src/node/ext/server_uv.cc index 82e7589fc8..789938318e 100644 --- a/src/node/ext/server_uv.cc +++ b/src/node/ext/server_uv.cc @@ -67,7 +67,7 @@ class ServerShutdownOp : public Op { } Local<Value> GetNodeValue() const { - return Nan::New<External>(reinterpret_cast<void *>(server)); + return Nan::Null(); } bool ParseOp(Local<Value> value, grpc_op *out) { @@ -76,6 +76,11 @@ class ServerShutdownOp : public Op { bool IsFinalOp() { return false; } + void OnComplete(bool success) { + /* Because cancel_all_calls was called, we assume that shutdown_and_notify + completes successfully */ + grpc_server_destroy(server); + } grpc_server *server; @@ -94,16 +99,10 @@ NAN_METHOD(ServerShutdownCallback) { if (!info[0]->IsNull()) { return Nan::ThrowError("forceShutdown failed somehow"); } - MaybeLocal<Object> maybe_result = Nan::To<Object>(info[1]); - Local<Object> result = maybe_result.ToLocalChecked(); - Local<Value> server_val = Nan::Get( - result, Nan::New("shutdown").ToLocalChecked()).ToLocalChecked(); - Local<External> server_extern = server_val.As<External>(); - grpc_server *server = reinterpret_cast<grpc_server *>(server_extern->Value()); - grpc_server_destroy(server); } void Server::ShutdownServer() { + Nan::HandleScope scope; if (this->wrapped_server != NULL) { if (shutdown_callback == NULL) { Local<FunctionTemplate>callback_tpl = diff --git a/src/node/test/test_messages.proto b/src/node/test/test_messages.proto index ae70f6e152..43c213dabb 100644 --- a/src/node/test/test_messages.proto +++ b/src/node/test/test_messages.proto @@ -57,4 +57,4 @@ enum TestEnum { message EnumValues { TestEnum enum_value = 1; -}
\ No newline at end of file +} |