aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/node
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-04-14 13:14:10 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-04-14 13:14:10 -0700
commit2c6f63731c7ddc761eac80be7609a9e80116ddee (patch)
tree02896c6cbbacb3e8931a347166abf7538187d255 /src/node
parent694cd708313945c31e4b2c1b518d3cff80f8b031 (diff)
parente412a180602753972ac496560322e224a5db987f (diff)
Merge github.com:grpc/grpc into cpparena
Diffstat (limited to 'src/node')
-rw-r--r--src/node/ext/call.cc81
-rw-r--r--src/node/ext/call.h7
-rw-r--r--src/node/ext/completion_queue_threadpool.cc27
-rw-r--r--src/node/ext/completion_queue_uv.cc26
-rw-r--r--src/node/ext/server.cc45
-rw-r--r--src/node/ext/server.h2
-rw-r--r--src/node/ext/server_generic.cc4
-rw-r--r--src/node/ext/server_uv.cc15
-rw-r--r--src/node/health_check/package.json4
-rw-r--r--src/node/test/test_messages.proto2
-rw-r--r--src/node/tools/package.json2
11 files changed, 143 insertions, 72 deletions
diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc
index 7f3cbb8ed1..a51ad836cd 100644
--- a/src/node/ext/call.cc
+++ b/src/node/ext/call.cc
@@ -209,6 +209,7 @@ class SendMetadataOp : public Op {
return true;
}
bool IsFinalOp() { return false; }
+ void OnComplete(bool success) {}
protected:
std::string GetTypeString() const { return "send_metadata"; }
@@ -247,8 +248,14 @@ class SendMessageOp : public Op {
out->data.send_message.send_message = send_message;
return true;
}
+<<<<<<< HEAD
bool IsFinalOp() { return false; }
+=======
+ bool IsFinalOp() { return false; }
+ void OnComplete(bool success) {}
+>>>>>>> e412a180602753972ac496560322e224a5db987f
+
protected:
std::string GetTypeString() const { return "send_message"; }
@@ -262,8 +269,15 @@ class SendClientCloseOp : public Op {
EscapableHandleScope scope;
return scope.Escape(Nan::True());
}
+<<<<<<< HEAD
+ bool ParseOp(Local<Value> value, grpc_op *out) { return true; }
+ bool IsFinalOp() { return false; }
+
+=======
bool ParseOp(Local<Value> value, grpc_op *out) { return true; }
bool IsFinalOp() { return false; }
+ void OnComplete(bool success) {}
+>>>>>>> e412a180602753972ac496560322e224a5db987f
protected:
std::string GetTypeString() const { return "client_close"; }
@@ -327,7 +341,13 @@ class SendServerStatusOp : public Op {
out->data.send_status_from_server.status_details = &this->details;
return true;
}
+<<<<<<< HEAD
+ bool IsFinalOp() { return true; }
+
+=======
bool IsFinalOp() { return true; }
+ void OnComplete(bool success) {}
+>>>>>>> e412a180602753972ac496560322e224a5db987f
protected:
std::string GetTypeString() const { return "send_status"; }
@@ -352,7 +372,12 @@ class GetMetadataOp : public Op {
out->data.recv_initial_metadata.recv_initial_metadata = &recv_metadata;
return true;
}
+<<<<<<< HEAD
+ bool IsFinalOp() { return false; }
+=======
bool IsFinalOp() { return false; }
+ void OnComplete(bool success) {}
+>>>>>>> e412a180602753972ac496560322e224a5db987f
protected:
std::string GetTypeString() const { return "metadata"; }
@@ -378,7 +403,12 @@ class ReadMessageOp : public Op {
out->data.recv_message.recv_message = &recv_message;
return true;
}
+<<<<<<< HEAD
bool IsFinalOp() { return false; }
+=======
+ bool IsFinalOp() { return false; }
+ void OnComplete(bool success) {}
+>>>>>>> e412a180602753972ac496560322e224a5db987f
protected:
std::string GetTypeString() const { return "read"; }
@@ -411,7 +441,13 @@ class ClientStatusOp : public Op {
ParseMetadata(&metadata_array));
return scope.Escape(status_obj);
}
+<<<<<<< HEAD
+ bool IsFinalOp() { return true; }
+
+=======
bool IsFinalOp() { return true; }
+ void OnComplete(bool success) {}
+>>>>>>> e412a180602753972ac496560322e224a5db987f
protected:
std::string GetTypeString() const { return "status"; }
@@ -433,7 +469,12 @@ class ServerCloseResponseOp : public Op {
out->data.recv_close_on_server.cancelled = &cancelled;
return true;
}
+<<<<<<< HEAD
+ bool IsFinalOp() { return false; }
+=======
bool IsFinalOp() { return false; }
+ void OnComplete(bool success) {}
+>>>>>>> e412a180602753972ac496560322e224a5db987f
protected:
std::string GetTypeString() const { return "cancelled"; }
@@ -453,36 +494,36 @@ tag::~tag() {
delete ops;
}
-Local<Value> GetTagNodeValue(void *tag) {
- EscapableHandleScope scope;
+void CompleteTag(void *tag, const char *error_message) {
+ HandleScope scope;
struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
- Local<Object> tag_obj = Nan::New<Object>();
- for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin();
- it != tag_struct->ops->end(); ++it) {
- Op *op_ptr = it->get();
- Nan::Set(tag_obj, op_ptr->GetOpType(), op_ptr->GetNodeValue());
+ Callback *callback = tag_struct->callback;
+ if (error_message == NULL) {
+ Local<Object> tag_obj = Nan::New<Object>();
+ for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin();
+ it != tag_struct->ops->end(); ++it) {
+ Op *op_ptr = it->get();
+ Nan::Set(tag_obj, op_ptr->GetOpType(), op_ptr->GetNodeValue());
+ }
+ Local<Value> argv[] = {Nan::Null(), tag_obj};
+ callback->Call(2, argv);
+ } else {
+ Local<Value> argv[] = {Nan::Error(error_message)};
+ callback->Call(1, argv);
}
- return scope.Escape(tag_obj);
-}
-
-Callback *GetTagCallback(void *tag) {
- struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
- return tag_struct->callback;
-}
-
-void CompleteTag(void *tag) {
- struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
+ bool success = (error_message == NULL);
bool is_final_op = false;
- if (tag_struct->call == NULL) {
- return;
- }
for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin();
it != tag_struct->ops->end(); ++it) {
Op *op_ptr = it->get();
+ op_ptr->OnComplete(success);
if (op_ptr->IsFinalOp()) {
is_final_op = true;
}
}
+ if (tag_struct->call == NULL) {
+ return;
+ }
tag_struct->call->CompleteBatch(is_final_op);
}
diff --git a/src/node/ext/call.h b/src/node/ext/call.h
index 53a5e4ab67..340e32682b 100644
--- a/src/node/ext/call.h
+++ b/src/node/ext/call.h
@@ -106,6 +106,7 @@ class Op {
virtual ~Op();
v8::Local<v8::Value> GetOpType() const;
virtual bool IsFinalOp() = 0;
+ virtual void OnComplete(bool success) = 0;
protected:
virtual std::string GetTypeString() const = 0;
@@ -123,13 +124,9 @@ struct tag {
call_persist;
};
-v8::Local<v8::Value> GetTagNodeValue(void *tag);
-
-Nan::Callback *GetTagCallback(void *tag);
-
void DestroyTag(void *tag);
-void CompleteTag(void *tag);
+void CompleteTag(void *tag, const char *error_message);
} // namespace node
} // namespace grpc
diff --git a/src/node/ext/completion_queue_threadpool.cc b/src/node/ext/completion_queue_threadpool.cc
index 1917074dc2..72df5d1d65 100644
--- a/src/node/ext/completion_queue_threadpool.cc
+++ b/src/node/ext/completion_queue_threadpool.cc
@@ -34,14 +34,14 @@
/* I don't like using #ifndef, but I don't see a better way to do this */
#ifndef GRPC_UV
-#include <node.h>
#include <nan.h>
+#include <node.h>
+#include "call.h"
+#include "completion_queue.h"
#include "grpc/grpc.h"
#include "grpc/support/log.h"
#include "grpc/support/time.h"
-#include "completion_queue.h"
-#include "call.h"
namespace grpc {
namespace node {
@@ -111,8 +111,8 @@ CompletionQueueAsyncWorker::CompletionQueueAsyncWorker()
CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {}
void CompletionQueueAsyncWorker::Execute() {
- result =
- grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ result = grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME),
+ NULL);
if (!result.success) {
SetErrorMessage("The async function encountered an error");
}
@@ -141,16 +141,14 @@ void CompletionQueueAsyncWorker::Init(Local<Object> exports) {
Nan::HandleScope scope;
current_threads = 0;
waiting_next_calls = 0;
- queue = grpc_completion_queue_create(NULL);
+ queue = grpc_completion_queue_create_for_next(NULL);
}
void CompletionQueueAsyncWorker::HandleOKCallback() {
Nan::HandleScope scope;
current_threads -= 1;
TryAddWorker();
- Nan::Callback *callback = GetTagCallback(result.tag);
- Local<Value> argv[] = {Nan::Null(), GetTagNodeValue(result.tag)};
- callback->Call(2, argv);
+ CompleteTag(result.tag, NULL);
DestroyTag(result.tag);
}
@@ -159,10 +157,7 @@ void CompletionQueueAsyncWorker::HandleErrorCallback() {
Nan::HandleScope scope;
current_threads -= 1;
TryAddWorker();
- Nan::Callback *callback = GetTagCallback(result.tag);
- Local<Value> argv[] = {Nan::Error(ErrorMessage())};
-
- callback->Call(1, argv);
+ CompleteTag(result.tag, ErrorMessage());
DestroyTag(result.tag);
}
@@ -173,9 +168,7 @@ grpc_completion_queue *GetCompletionQueue() {
return CompletionQueueAsyncWorker::GetQueue();
}
-void CompletionQueueNext() {
- CompletionQueueAsyncWorker::Next();
-}
+void CompletionQueueNext() { CompletionQueueAsyncWorker::Next(); }
void CompletionQueueInit(Local<Object> exports) {
CompletionQueueAsyncWorker::Init(exports);
@@ -184,4 +177,4 @@ void CompletionQueueInit(Local<Object> exports) {
} // namespace node
} // namespace grpc
-#endif /* GRPC_UV */
+#endif /* GRPC_UV */
diff --git a/src/node/ext/completion_queue_uv.cc b/src/node/ext/completion_queue_uv.cc
index 615973a6c9..9b60911d1e 100644
--- a/src/node/ext/completion_queue_uv.cc
+++ b/src/node/ext/completion_queue_uv.cc
@@ -33,10 +33,10 @@
#ifdef GRPC_UV
-#include <uv.h>
+#include <grpc/grpc.h>
#include <node.h>
+#include <uv.h>
#include <v8.h>
-#include <grpc/grpc.h>
#include "call.h"
#include "completion_queue.h"
@@ -57,21 +57,17 @@ void drain_completion_queue(uv_prepare_t *handle) {
grpc_event event;
(void)handle;
do {
- event = grpc_completion_queue_next(
- queue, gpr_inf_past(GPR_CLOCK_MONOTONIC), NULL);
+ event = grpc_completion_queue_next(queue, gpr_inf_past(GPR_CLOCK_MONOTONIC),
+ NULL);
if (event.type == GRPC_OP_COMPLETE) {
- Nan::Callback *callback = grpc::node::GetTagCallback(event.tag);
+ const char *error_message;
if (event.success) {
- Local<Value> argv[] = {Nan::Null(),
- grpc::node::GetTagNodeValue(event.tag)};
- callback->Call(2, argv);
+ error_message = NULL;
} else {
- Local<Value> argv[] = {Nan::Error(
- "The async function encountered an error")};
- callback->Call(1, argv);
+ error_message = "The async function encountered an error";
}
- grpc::node::CompleteTag(event.tag);
+ CompleteTag(event.tag, error_message);
grpc::node::DestroyTag(event.tag);
pending_batches--;
if (pending_batches == 0) {
@@ -81,9 +77,7 @@ void drain_completion_queue(uv_prepare_t *handle) {
} while (event.type != GRPC_QUEUE_TIMEOUT);
}
-grpc_completion_queue *GetCompletionQueue() {
- return queue;
-}
+grpc_completion_queue *GetCompletionQueue() { return queue; }
void CompletionQueueNext() {
if (pending_batches == 0) {
@@ -94,7 +88,7 @@ void CompletionQueueNext() {
}
void CompletionQueueInit(Local<Object> exports) {
- queue = grpc_completion_queue_create(NULL);
+ queue = grpc_completion_queue_create_for_next(NULL);
uv_prepare_init(uv_default_loop(), &prepare);
pending_batches = 0;
}
diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc
index f0920c842a..5384305631 100644
--- a/src/node/ext/server.cc
+++ b/src/node/ext/server.cc
@@ -117,6 +117,8 @@ class NewCallOp : public Op {
bool IsFinalOp() {
return false;
}
+ void OnComplete(bool success) {
+ }
grpc_call *call;
grpc_call_details details;
@@ -126,6 +128,34 @@ class NewCallOp : public Op {
std::string GetTypeString() const { return "new_call"; }
};
+class TryShutdownOp: public Op {
+ public:
+ TryShutdownOp(Server *server, Local<Value> server_value) : server(server) {
+ server_persist.Reset(server_value);
+ }
+ Local<Value> GetNodeValue() const {
+ EscapableHandleScope scope;
+ return scope.Escape(Nan::New(server_persist));
+ }
+ bool ParseOp(Local<Value> value, grpc_op *out) {
+ return true;
+ }
+ bool IsFinalOp() {
+ return false;
+ }
+ void OnComplete(bool success) {
+ if (success) {
+ server->DestroyWrappedServer();
+ }
+ }
+ protected:
+ std::string GetTypeString() const { return "try_shutdown"; }
+ private:
+ Server *server;
+ Nan::Persistent<v8::Value, Nan::CopyablePersistentTraits<v8::Value>>
+ server_persist;
+};
+
void Server::Init(Local<Object> exports) {
HandleScope scope;
Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New);
@@ -147,6 +177,13 @@ bool Server::HasInstance(Local<Value> val) {
return Nan::New(fun_tpl)->HasInstance(val);
}
+void Server::DestroyWrappedServer() {
+ if (this->wrapped_server != NULL) {
+ grpc_server_destroy(this->wrapped_server);
+ this->wrapped_server = NULL;
+ }
+}
+
NAN_METHOD(Server::New) {
/* If this is not a constructor call, make a constructor call and return
the result */
@@ -242,7 +279,15 @@ NAN_METHOD(Server::TryShutdown) {
return Nan::ThrowTypeError("tryShutdown can only be called on a Server");
}
Server *server = ObjectWrap::Unwrap<Server>(info.This());
+ if (server->wrapped_server == NULL) {
+ // Server is already shut down. Call callback immediately.
+ Nan::Callback callback(info[0].As<Function>());
+ callback.Call(0, {});
+ return;
+ }
+ TryShutdownOp *op = new TryShutdownOp(server, info.This());
unique_ptr<OpVec> ops(new OpVec());
+ ops->push_back(unique_ptr<Op>(op));
grpc_server_shutdown_and_notify(
server->wrapped_server, GetCompletionQueue(),
new struct tag(new Nan::Callback(info[0].As<Function>()), ops.release(),
diff --git a/src/node/ext/server.h b/src/node/ext/server.h
index ab5fc210e8..c0f2e86554 100644
--- a/src/node/ext/server.h
+++ b/src/node/ext/server.h
@@ -53,6 +53,8 @@ class Server : public Nan::ObjectWrap {
JavaScript constructor */
static bool HasInstance(v8::Local<v8::Value> val);
+ void DestroyWrappedServer();
+
private:
explicit Server(grpc_server *server);
~Server();
diff --git a/src/node/ext/server_generic.cc b/src/node/ext/server_generic.cc
index 0cf20f754a..24573bd52f 100644
--- a/src/node/ext/server_generic.cc
+++ b/src/node/ext/server_generic.cc
@@ -35,8 +35,8 @@
#include "server.h"
-#include <node.h>
#include <nan.h>
+#include <node.h>
#include "grpc/grpc.h"
#include "grpc/support/time.h"
@@ -44,7 +44,7 @@ namespace grpc {
namespace node {
Server::Server(grpc_server *server) : wrapped_server(server) {
- shutdown_queue = grpc_completion_queue_create(NULL);
+ shutdown_queue = grpc_completion_queue_create_for_pluck(NULL);
grpc_server_register_non_listening_completion_queue(server, shutdown_queue,
NULL);
}
diff --git a/src/node/ext/server_uv.cc b/src/node/ext/server_uv.cc
index 82e7589fc8..789938318e 100644
--- a/src/node/ext/server_uv.cc
+++ b/src/node/ext/server_uv.cc
@@ -67,7 +67,7 @@ class ServerShutdownOp : public Op {
}
Local<Value> GetNodeValue() const {
- return Nan::New<External>(reinterpret_cast<void *>(server));
+ return Nan::Null();
}
bool ParseOp(Local<Value> value, grpc_op *out) {
@@ -76,6 +76,11 @@ class ServerShutdownOp : public Op {
bool IsFinalOp() {
return false;
}
+ void OnComplete(bool success) {
+ /* Because cancel_all_calls was called, we assume that shutdown_and_notify
+ completes successfully */
+ grpc_server_destroy(server);
+ }
grpc_server *server;
@@ -94,16 +99,10 @@ NAN_METHOD(ServerShutdownCallback) {
if (!info[0]->IsNull()) {
return Nan::ThrowError("forceShutdown failed somehow");
}
- MaybeLocal<Object> maybe_result = Nan::To<Object>(info[1]);
- Local<Object> result = maybe_result.ToLocalChecked();
- Local<Value> server_val = Nan::Get(
- result, Nan::New("shutdown").ToLocalChecked()).ToLocalChecked();
- Local<External> server_extern = server_val.As<External>();
- grpc_server *server = reinterpret_cast<grpc_server *>(server_extern->Value());
- grpc_server_destroy(server);
}
void Server::ShutdownServer() {
+ Nan::HandleScope scope;
if (this->wrapped_server != NULL) {
if (shutdown_callback == NULL) {
Local<FunctionTemplate>callback_tpl =
diff --git a/src/node/health_check/package.json b/src/node/health_check/package.json
index e218f5a406..37c9b7a54f 100644
--- a/src/node/health_check/package.json
+++ b/src/node/health_check/package.json
@@ -1,6 +1,6 @@
{
"name": "grpc-health-check",
- "version": "1.3.0-dev",
+ "version": "1.4.0-dev",
"author": "Google Inc.",
"description": "Health check service for use with gRPC",
"repository": {
@@ -15,7 +15,7 @@
}
],
"dependencies": {
- "grpc": "^1.3.0-dev",
+ "grpc": "^1.4.0-dev",
"lodash": "^3.9.3",
"google-protobuf": "^3.0.0"
},
diff --git a/src/node/test/test_messages.proto b/src/node/test/test_messages.proto
index ae70f6e152..43c213dabb 100644
--- a/src/node/test/test_messages.proto
+++ b/src/node/test/test_messages.proto
@@ -57,4 +57,4 @@ enum TestEnum {
message EnumValues {
TestEnum enum_value = 1;
-} \ No newline at end of file
+}
diff --git a/src/node/tools/package.json b/src/node/tools/package.json
index 3096c6e42a..a81aa87f4b 100644
--- a/src/node/tools/package.json
+++ b/src/node/tools/package.json
@@ -1,6 +1,6 @@
{
"name": "grpc-tools",
- "version": "1.3.0-dev",
+ "version": "1.4.0-dev",
"author": "Google Inc.",
"description": "Tools for developing with gRPC on Node.js",
"homepage": "http://www.grpc.io/",