aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/node
diff options
context:
space:
mode:
Diffstat (limited to 'src/node')
-rw-r--r--src/node/ext/byte_buffer.cc14
-rw-r--r--src/node/ext/call.cc81
-rw-r--r--src/node/ext/call.h68
-rw-r--r--src/node/ext/channel.cc12
-rw-r--r--src/node/ext/completion_queue.cc114
-rw-r--r--src/node/ext/completion_queue.h46
-rw-r--r--src/node/ext/completion_queue_async_worker.cc2
-rw-r--r--src/node/ext/node_grpc.cc23
-rw-r--r--src/node/ext/server.cc93
-rw-r--r--src/node/ext/server.h1
-rw-r--r--src/node/index.js4
-rw-r--r--src/node/interop/interop_client.js15
-rw-r--r--src/node/performance/benchmark_client_express.js291
-rw-r--r--src/node/performance/benchmark_server.js5
-rw-r--r--src/node/performance/benchmark_server_express.js109
-rw-r--r--src/node/performance/worker.js10
-rw-r--r--src/node/performance/worker_service_impl.js228
-rw-r--r--src/node/src/client.js1
-rw-r--r--src/node/src/common.js2
-rw-r--r--src/node/src/grpc_extension.js2
-rw-r--r--src/node/test/async_test.js1
-rw-r--r--src/node/test/interop_sanity_test.js4
22 files changed, 937 insertions, 189 deletions
diff --git a/src/node/ext/byte_buffer.cc b/src/node/ext/byte_buffer.cc
index a3f678f32c..ad7d0ec8c8 100644
--- a/src/node/ext/byte_buffer.cc
+++ b/src/node/ext/byte_buffer.cc
@@ -44,8 +44,8 @@
namespace grpc {
namespace node {
+using Nan::MaybeLocal;
-using v8::Context;
using v8::Function;
using v8::Local;
using v8::Object;
@@ -89,15 +89,19 @@ Local<Value> ByteBufferToBuffer(grpc_byte_buffer *buffer) {
Local<Value> MakeFastBuffer(Local<Value> slowBuffer) {
Nan::EscapableHandleScope scope;
Local<Object> globalObj = Nan::GetCurrentContext()->Global();
+ MaybeLocal<Value> constructorValue = Nan::Get(
+ globalObj, Nan::New("Buffer").ToLocalChecked());
Local<Function> bufferConstructor = Local<Function>::Cast(
- globalObj->Get(Nan::New("Buffer").ToLocalChecked()));
- Local<Value> consArgs[3] = {
+ constructorValue.ToLocalChecked());
+ const int argc = 3;
+ Local<Value> consArgs[argc] = {
slowBuffer,
Nan::New<Number>(::node::Buffer::Length(slowBuffer)),
Nan::New<Number>(0)
};
- Local<Object> fastBuffer = bufferConstructor->NewInstance(3, consArgs);
- return scope.Escape(fastBuffer);
+ MaybeLocal<Object> fastBuffer = Nan::NewInstance(bufferConstructor,
+ argc, consArgs);
+ return scope.Escape(fastBuffer.ToLocalChecked());
}
} // namespace node
} // namespace grpc
diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc
index 9f023b5883..191e763e0e 100644
--- a/src/node/ext/call.cc
+++ b/src/node/ext/call.cc
@@ -45,6 +45,7 @@
#include "byte_buffer.h"
#include "call.h"
#include "channel.h"
+#include "completion_queue.h"
#include "completion_queue_async_worker.h"
#include "call_credentials.h"
#include "timeval.h"
@@ -222,6 +223,9 @@ class SendMetadataOp : public Op {
out->data.send_initial_metadata.metadata = array.metadata;
return true;
}
+ bool IsFinalOp() {
+ return false;
+ }
protected:
std::string GetTypeString() const {
return "send_metadata";
@@ -263,6 +267,9 @@ class SendMessageOp : public Op {
resources->handles.push_back(unique_ptr<PersistentValue>(handle));
return true;
}
+ bool IsFinalOp() {
+ return false;
+ }
protected:
std::string GetTypeString() const {
return "send_message";
@@ -281,6 +288,9 @@ class SendClientCloseOp : public Op {
shared_ptr<Resources> resources) {
return true;
}
+ bool IsFinalOp() {
+ return false;
+ }
protected:
std::string GetTypeString() const {
return "client_close";
@@ -341,6 +351,9 @@ class SendServerStatusOp : public Op {
out->data.send_status_from_server.status_details = **str;
return true;
}
+ bool IsFinalOp() {
+ return true;
+ }
protected:
std::string GetTypeString() const {
return "send_status";
@@ -367,6 +380,9 @@ class GetMetadataOp : public Op {
out->data.recv_initial_metadata = &recv_metadata;
return true;
}
+ bool IsFinalOp() {
+ return false;
+ }
protected:
std::string GetTypeString() const {
@@ -397,6 +413,9 @@ class ReadMessageOp : public Op {
out->data.recv_message = &recv_message;
return true;
}
+ bool IsFinalOp() {
+ return false;
+ }
protected:
std::string GetTypeString() const {
@@ -442,6 +461,9 @@ class ClientStatusOp : public Op {
ParseMetadata(&metadata_array));
return scope.Escape(status_obj);
}
+ bool IsFinalOp() {
+ return true;
+ }
protected:
std::string GetTypeString() const {
return "status";
@@ -465,6 +487,9 @@ class ServerCloseResponseOp : public Op {
out->data.recv_close_on_server.cancelled = &cancelled;
return true;
}
+ bool IsFinalOp() {
+ return false;
+ }
protected:
std::string GetTypeString() const {
@@ -476,8 +501,8 @@ class ServerCloseResponseOp : public Op {
};
tag::tag(Callback *callback, OpVec *ops,
- shared_ptr<Resources> resources) :
- callback(callback), ops(ops), resources(resources){
+ shared_ptr<Resources> resources, Call *call) :
+ callback(callback), ops(ops), resources(resources), call(call){
}
tag::~tag() {
@@ -502,16 +527,36 @@ Callback *GetTagCallback(void *tag) {
return tag_struct->callback;
}
+void CompleteTag(void *tag) {
+ struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
+ bool is_final_op = false;
+ if (tag_struct->call == NULL) {
+ return;
+ }
+ for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin();
+ it != tag_struct->ops->end(); ++it) {
+ Op *op_ptr = it->get();
+ if (op_ptr->IsFinalOp()) {
+ is_final_op = true;
+ }
+ }
+ tag_struct->call->CompleteBatch(is_final_op);
+}
+
void DestroyTag(void *tag) {
struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
delete tag_struct;
}
-Call::Call(grpc_call *call) : wrapped_call(call) {
+Call::Call(grpc_call *call) : wrapped_call(call),
+ pending_batches(0),
+ has_final_op_completed(false) {
}
Call::~Call() {
- grpc_call_destroy(wrapped_call);
+ if (wrapped_call != NULL) {
+ grpc_call_destroy(wrapped_call);
+ }
}
void Call::Init(Local<Object> exports) {
@@ -552,6 +597,17 @@ Local<Value> Call::WrapStruct(grpc_call *call) {
}
}
+void Call::CompleteBatch(bool is_final_op) {
+ if (is_final_op) {
+ this->has_final_op_completed = true;
+ }
+ this->pending_batches--;
+ if (this->has_final_op_completed && this->pending_batches == 0) {
+ grpc_call_destroy(this->wrapped_call);
+ this->wrapped_call = NULL;
+ }
+}
+
NAN_METHOD(Call::New) {
if (info.IsConstructCall()) {
Call *call;
@@ -602,27 +658,27 @@ NAN_METHOD(Call::New) {
Utf8String host_override(info[3]);
wrapped_call = grpc_channel_create_call(
wrapped_channel, parent_call, propagate_flags,
- CompletionQueueAsyncWorker::GetQueue(), *method,
+ GetCompletionQueue(), *method,
*host_override, MillisecondsToTimespec(deadline), NULL);
} else if (info[3]->IsUndefined() || info[3]->IsNull()) {
wrapped_call = grpc_channel_create_call(
wrapped_channel, parent_call, propagate_flags,
- CompletionQueueAsyncWorker::GetQueue(), *method,
+ GetCompletionQueue(), *method,
NULL, MillisecondsToTimespec(deadline), NULL);
} else {
return Nan::ThrowTypeError("Call's fourth argument must be a string");
}
call = new Call(wrapped_call);
- info.This()->SetHiddenValue(Nan::New("channel_").ToLocalChecked(),
- channel_object);
+ Nan::Set(info.This(), Nan::New("channel_").ToLocalChecked(),
+ channel_object);
}
call->Wrap(info.This());
info.GetReturnValue().Set(info.This());
} else {
const int argc = 4;
Local<Value> argv[argc] = {info[0], info[1], info[2], info[3]};
- MaybeLocal<Object> maybe_instance = constructor->GetFunction()->NewInstance(
- argc, argv);
+ MaybeLocal<Object> maybe_instance = Nan::NewInstance(
+ constructor->GetFunction(), argc, argv);
if (maybe_instance.IsEmpty()) {
// There's probably a pending exception
return;
@@ -697,11 +753,12 @@ NAN_METHOD(Call::StartBatch) {
Callback *callback = new Callback(callback_func);
grpc_call_error error = grpc_call_start_batch(
call->wrapped_call, &ops[0], nops, new struct tag(
- callback, op_vector.release(), resources), NULL);
+ callback, op_vector.release(), resources, call), NULL);
if (error != GRPC_CALL_OK) {
return Nan::ThrowError(nanErrorWithCode("startBatch failed", error));
}
- CompletionQueueAsyncWorker::Next();
+ call->pending_batches++;
+ CompletionQueueNext();
}
NAN_METHOD(Call::Cancel) {
diff --git a/src/node/ext/call.h b/src/node/ext/call.h
index 1e3c3ba18d..31c6566d14 100644
--- a/src/node/ext/call.h
+++ b/src/node/ext/call.h
@@ -66,34 +66,6 @@ bool CreateMetadataArray(v8::Local<v8::Object> metadata,
grpc_metadata_array *array,
shared_ptr<Resources> resources);
-class Op {
- public:
- virtual v8::Local<v8::Value> GetNodeValue() const = 0;
- virtual bool ParseOp(v8::Local<v8::Value> value, grpc_op *out,
- shared_ptr<Resources> resources) = 0;
- virtual ~Op();
- v8::Local<v8::Value> GetOpType() const;
-
- protected:
- virtual std::string GetTypeString() const = 0;
-};
-
-typedef std::vector<unique_ptr<Op>> OpVec;
-struct tag {
- tag(Nan::Callback *callback, OpVec *ops,
- shared_ptr<Resources> resources);
- ~tag();
- Nan::Callback *callback;
- OpVec *ops;
- shared_ptr<Resources> resources;
-};
-
-v8::Local<v8::Value> GetTagNodeValue(void *tag);
-
-Nan::Callback *GetTagCallback(void *tag);
-
-void DestroyTag(void *tag);
-
/* Wrapper class for grpc_call structs. */
class Call : public Nan::ObjectWrap {
public:
@@ -102,6 +74,8 @@ class Call : public Nan::ObjectWrap {
/* Wrap a grpc_call struct in a javascript object */
static v8::Local<v8::Value> WrapStruct(grpc_call *call);
+ void CompleteBatch(bool is_final_op);
+
private:
explicit Call(grpc_call *call);
~Call();
@@ -121,8 +95,46 @@ class Call : public Nan::ObjectWrap {
static Nan::Persistent<v8::FunctionTemplate> fun_tpl;
grpc_call *wrapped_call;
+ // The number of ops that were started but not completed on this call
+ int pending_batches;
+ /* Indicates whether the "final" op on a call has completed. For a client
+ call, this is GRPC_OP_RECV_STATUS_ON_CLIENT and for a server call, this
+ is GRPC_OP_SEND_STATUS_FROM_SERVER */
+ bool has_final_op_completed;
};
+class Op {
+ public:
+ virtual v8::Local<v8::Value> GetNodeValue() const = 0;
+ virtual bool ParseOp(v8::Local<v8::Value> value, grpc_op *out,
+ shared_ptr<Resources> resources) = 0;
+ virtual ~Op();
+ v8::Local<v8::Value> GetOpType() const;
+ virtual bool IsFinalOp() = 0;
+
+ protected:
+ virtual std::string GetTypeString() const = 0;
+};
+
+typedef std::vector<unique_ptr<Op>> OpVec;
+struct tag {
+ tag(Nan::Callback *callback, OpVec *ops,
+ shared_ptr<Resources> resources, Call *call);
+ ~tag();
+ Nan::Callback *callback;
+ OpVec *ops;
+ shared_ptr<Resources> resources;
+ Call *call;
+};
+
+v8::Local<v8::Value> GetTagNodeValue(void *tag);
+
+Nan::Callback *GetTagCallback(void *tag);
+
+void DestroyTag(void *tag);
+
+void CompleteTag(void *tag);
+
} // namespace node
} // namespace grpc
diff --git a/src/node/ext/channel.cc b/src/node/ext/channel.cc
index 00fcca6dc8..5bc58b9b32 100644
--- a/src/node/ext/channel.cc
+++ b/src/node/ext/channel.cc
@@ -41,6 +41,7 @@
#include "grpc/grpc_security.h"
#include "call.h"
#include "channel.h"
+#include "completion_queue.h"
#include "completion_queue_async_worker.h"
#include "channel_credentials.h"
#include "timeval.h"
@@ -140,6 +141,7 @@ void DeallocateChannelArgs(grpc_channel_args *channel_args) {
Channel::Channel(grpc_channel *channel) : wrapped_channel(channel) {}
Channel::~Channel() {
+ gpr_log(GPR_DEBUG, "Destroying channel");
if (wrapped_channel != NULL) {
grpc_channel_destroy(wrapped_channel);
}
@@ -206,8 +208,8 @@ NAN_METHOD(Channel::New) {
} else {
const int argc = 3;
Local<Value> argv[argc] = {info[0], info[1], info[2]};
- MaybeLocal<Object> maybe_instance = constructor->GetFunction()->NewInstance(
- argc, argv);
+ MaybeLocal<Object> maybe_instance = Nan::NewInstance(
+ constructor->GetFunction(), argc, argv);
if (maybe_instance.IsEmpty()) {
// There's probably a pending exception
return;
@@ -276,11 +278,11 @@ NAN_METHOD(Channel::WatchConnectivityState) {
unique_ptr<OpVec> ops(new OpVec());
grpc_channel_watch_connectivity_state(
channel->wrapped_channel, last_state, MillisecondsToTimespec(deadline),
- CompletionQueueAsyncWorker::GetQueue(),
+ GetCompletionQueue(),
new struct tag(callback,
ops.release(),
- shared_ptr<Resources>(nullptr)));
- CompletionQueueAsyncWorker::Next();
+ shared_ptr<Resources>(nullptr), NULL));
+ CompletionQueueNext();
}
} // namespace node
diff --git a/src/node/ext/completion_queue.cc b/src/node/ext/completion_queue.cc
new file mode 100644
index 0000000000..fcfa77b39c
--- /dev/null
+++ b/src/node/ext/completion_queue.cc
@@ -0,0 +1,114 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <uv.h>
+#include <node.h>
+#include <v8.h>
+#include <grpc/grpc.h>
+
+#include "call.h"
+#include "completion_queue.h"
+#include "completion_queue_async_worker.h"
+
+namespace grpc {
+namespace node {
+
+using v8::Local;
+using v8::Object;
+using v8::Value;
+
+grpc_completion_queue *queue;
+uv_prepare_t prepare;
+int pending_batches;
+
+void drain_completion_queue(uv_prepare_t *handle) {
+ Nan::HandleScope scope;
+ grpc_event event;
+ (void)handle;
+ do {
+ event = grpc_completion_queue_next(
+ queue, gpr_inf_past(GPR_CLOCK_MONOTONIC), NULL);
+
+ if (event.type == GRPC_OP_COMPLETE) {
+ Nan::Callback *callback = grpc::node::GetTagCallback(event.tag);
+ if (event.success) {
+ Local<Value> argv[] = {Nan::Null(),
+ grpc::node::GetTagNodeValue(event.tag)};
+ callback->Call(2, argv);
+ } else {
+ Local<Value> argv[] = {Nan::Error(
+ "The async function encountered an error")};
+ callback->Call(1, argv);
+ }
+ grpc::node::CompleteTag(event.tag);
+ grpc::node::DestroyTag(event.tag);
+ pending_batches--;
+ if (pending_batches == 0) {
+ uv_prepare_stop(&prepare);
+ }
+ }
+ } while (event.type != GRPC_QUEUE_TIMEOUT);
+}
+
+grpc_completion_queue *GetCompletionQueue() {
+#ifdef GRPC_UV
+ return queue;
+#else
+ return CompletionQueueAsyncWorker::GetQueue();
+#endif
+}
+
+void CompletionQueueNext() {
+#ifdef GRPC_UV
+ if (pending_batches == 0) {
+ GPR_ASSERT(!uv_is_active((uv_handle_t *)&prepare));
+ uv_prepare_start(&prepare, drain_completion_queue);
+ }
+ pending_batches++;
+#else
+ CompletionQueueAsyncWorker::Next();
+#endif
+}
+
+void CompletionQueueInit(Local<Object> exports) {
+#ifdef GRPC_UV
+ queue = grpc_completion_queue_create(NULL);
+ uv_prepare_init(uv_default_loop(), &prepare);
+ pending_batches = 0;
+#else
+ CompletionQueueAsyncWorker::Init(exports);
+#endif
+}
+
+} // namespace node
+} // namespace grpc
diff --git a/src/node/ext/completion_queue.h b/src/node/ext/completion_queue.h
new file mode 100644
index 0000000000..bf280f768b
--- /dev/null
+++ b/src/node/ext/completion_queue.h
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <v8.h>
+
+namespace grpc {
+namespace node {
+
+grpc_completion_queue *GetCompletionQueue();
+
+void CompletionQueueNext();
+
+void CompletionQueueInit(v8::Local<v8::Object> exports);
+
+} // namespace node
+} // namespace grpc
diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc
index 619ea41515..f5e03b277b 100644
--- a/src/node/ext/completion_queue_async_worker.cc
+++ b/src/node/ext/completion_queue_async_worker.cc
@@ -74,6 +74,7 @@ void CompletionQueueAsyncWorker::Execute() {
grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; }
void CompletionQueueAsyncWorker::Next() {
+#ifndef GRPC_UV
Nan::HandleScope scope;
if (current_threads < max_queue_threads) {
current_threads += 1;
@@ -85,6 +86,7 @@ void CompletionQueueAsyncWorker::Next() {
GPR_ASSERT(current_threads <= max_queue_threads);
GPR_ASSERT((current_threads == max_queue_threads) ||
(waiting_next_calls == 0));
+#endif
}
void CompletionQueueAsyncWorker::Init(Local<Object> exports) {
diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc
index 745b5023d5..9b9eee85b7 100644
--- a/src/node/ext/node_grpc.cc
+++ b/src/node/ext/node_grpc.cc
@@ -42,6 +42,13 @@
#include "grpc/support/log.h"
#include "grpc/support/time.h"
+// TODO(murgatroid99): Remove this when the endpoint API becomes public
+#ifdef GRPC_UV
+extern "C" {
+#include "src/core/lib/iomgr/pollset_uv.h"
+}
+#endif
+
#include "call.h"
#include "call_credentials.h"
#include "channel.h"
@@ -50,6 +57,7 @@
#include "completion_queue_async_worker.h"
#include "server_credentials.h"
#include "timeval.h"
+#include "completion_queue.h"
using v8::FunctionTemplate;
using v8::Local;
@@ -261,10 +269,10 @@ void InitLogConstants(Local<Object> exports) {
Nan::HandleScope scope;
Local<Object> log_verbosity = Nan::New<Object>();
Nan::Set(exports, Nan::New("logVerbosity").ToLocalChecked(), log_verbosity);
- Local<Value> DEBUG(Nan::New<Uint32, uint32_t>(GPR_LOG_SEVERITY_DEBUG));
- Nan::Set(log_verbosity, Nan::New("DEBUG").ToLocalChecked(), DEBUG);
- Local<Value> INFO(Nan::New<Uint32, uint32_t>(GPR_LOG_SEVERITY_INFO));
- Nan::Set(log_verbosity, Nan::New("INFO").ToLocalChecked(), INFO);
+ Local<Value> LOG_DEBUG(Nan::New<Uint32, uint32_t>(GPR_LOG_SEVERITY_DEBUG));
+ Nan::Set(log_verbosity, Nan::New("DEBUG").ToLocalChecked(), LOG_DEBUG);
+ Local<Value> LOG_INFO(Nan::New<Uint32, uint32_t>(GPR_LOG_SEVERITY_INFO));
+ Nan::Set(log_verbosity, Nan::New("INFO").ToLocalChecked(), LOG_INFO);
Local<Value> LOG_ERROR(Nan::New<Uint32, uint32_t>(GPR_LOG_SEVERITY_ERROR));
Nan::Set(log_verbosity, Nan::New("ERROR").ToLocalChecked(), LOG_ERROR);
}
@@ -428,14 +436,19 @@ void init(Local<Object> exports) {
InitWriteFlags(exports);
InitLogConstants(exports);
+#ifdef GRPC_UV
+ grpc_pollset_work_run_loop = 0;
+#endif
+
grpc::node::Call::Init(exports);
grpc::node::CallCredentials::Init(exports);
grpc::node::Channel::Init(exports);
grpc::node::ChannelCredentials::Init(exports);
grpc::node::Server::Init(exports);
- grpc::node::CompletionQueueAsyncWorker::Init(exports);
grpc::node::ServerCredentials::Init(exports);
+ grpc::node::CompletionQueueInit(exports);
+
// Attach a few utility functions directly to the module
Nan::Set(exports, Nan::New("metadataKeyIsLegal").ToLocalChecked(),
Nan::GetFunction(
diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc
index dd1b777ac8..70d5b96f39 100644
--- a/src/node/ext/server.cc
+++ b/src/node/ext/server.cc
@@ -40,6 +40,7 @@
#include <vector>
#include "call.h"
+#include "completion_queue.h"
#include "completion_queue_async_worker.h"
#include "grpc/grpc.h"
#include "grpc/grpc_security.h"
@@ -64,6 +65,7 @@ using v8::Array;
using v8::Boolean;
using v8::Date;
using v8::Exception;
+using v8::External;
using v8::Function;
using v8::FunctionTemplate;
using v8::Local;
@@ -75,6 +77,8 @@ using v8::Value;
Nan::Callback *Server::constructor;
Persistent<FunctionTemplate> Server::fun_tpl;
+static Callback *shutdown_callback;
+
class NewCallOp : public Op {
public:
NewCallOp() {
@@ -111,6 +115,9 @@ class NewCallOp : public Op {
shared_ptr<Resources> resources) {
return true;
}
+ bool IsFinalOp() {
+ return false;
+ }
grpc_call *call;
grpc_call_details details;
@@ -120,17 +127,50 @@ class NewCallOp : public Op {
std::string GetTypeString() const { return "new_call"; }
};
+class ServerShutdownOp : public Op {
+ public:
+ ServerShutdownOp(grpc_server *server): server(server) {
+ }
+
+ ~ServerShutdownOp() {
+ }
+
+ Local<Value> GetNodeValue() const {
+ return Nan::New<External>(reinterpret_cast<void *>(server));
+ }
+
+ bool ParseOp(Local<Value> value, grpc_op *out,
+ shared_ptr<Resources> resources) {
+ return true;
+ }
+ bool IsFinalOp() {
+ return false;
+ }
+
+ grpc_server *server;
+
+ protected:
+ std::string GetTypeString() const { return "shutdown"; }
+};
+
+NAN_METHOD(ServerShutdownCallback) {
+ if (!info[0]->IsNull()) {
+ return Nan::ThrowError("forceShutdown failed somehow");
+ }
+ MaybeLocal<Object> maybe_result = Nan::To<Object>(info[1]);
+ Local<Object> result = maybe_result.ToLocalChecked();
+ Local<Value> server_val = Nan::Get(
+ result, Nan::New("shutdown").ToLocalChecked()).ToLocalChecked();
+ Local<External> server_extern = server_val.As<External>();
+ grpc_server *server = reinterpret_cast<grpc_server *>(server_extern->Value());
+ grpc_server_destroy(server);
+}
+
Server::Server(grpc_server *server) : wrapped_server(server) {
- shutdown_queue = grpc_completion_queue_create(NULL);
- grpc_server_register_non_listening_completion_queue(server, shutdown_queue,
- NULL);
}
Server::~Server() {
this->ShutdownServer();
- grpc_completion_queue_shutdown(this->shutdown_queue);
- grpc_server_destroy(this->wrapped_server);
- grpc_completion_queue_destroy(this->shutdown_queue);
}
void Server::Init(Local<Object> exports) {
@@ -147,6 +187,11 @@ void Server::Init(Local<Object> exports) {
Local<Function> ctr = Nan::GetFunction(tpl).ToLocalChecked();
Nan::Set(exports, Nan::New("Server").ToLocalChecked(), ctr);
constructor = new Callback(ctr);
+
+ Local<FunctionTemplate>callback_tpl =
+ Nan::New<FunctionTemplate>(ServerShutdownCallback);
+ shutdown_callback = new Callback(
+ Nan::GetFunction(callback_tpl).ToLocalChecked());
}
bool Server::HasInstance(Local<Value> val) {
@@ -155,11 +200,19 @@ bool Server::HasInstance(Local<Value> val) {
}
void Server::ShutdownServer() {
- grpc_server_shutdown_and_notify(this->wrapped_server, this->shutdown_queue,
- NULL);
- grpc_server_cancel_all_calls(this->wrapped_server);
- grpc_completion_queue_pluck(this->shutdown_queue, NULL,
- gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ if (this->wrapped_server != NULL) {
+ ServerShutdownOp *op = new ServerShutdownOp(this->wrapped_server);
+ unique_ptr<OpVec> ops(new OpVec());
+ ops->push_back(unique_ptr<Op>(op));
+
+ grpc_server_shutdown_and_notify(
+ this->wrapped_server, GetCompletionQueue(),
+ new struct tag(new Callback(**shutdown_callback), ops.release(),
+ shared_ptr<Resources>(nullptr), NULL));
+ grpc_server_cancel_all_calls(this->wrapped_server);
+ CompletionQueueNext();
+ this->wrapped_server = NULL;
+ }
}
NAN_METHOD(Server::New) {
@@ -169,7 +222,7 @@ NAN_METHOD(Server::New) {
const int argc = 1;
Local<Value> argv[argc] = {info[0]};
MaybeLocal<Object> maybe_instance =
- constructor->GetFunction()->NewInstance(argc, argv);
+ Nan::NewInstance(constructor->GetFunction(), argc, argv);
if (maybe_instance.IsEmpty()) {
// There's probably a pending exception
return;
@@ -179,7 +232,7 @@ NAN_METHOD(Server::New) {
}
}
grpc_server *wrapped_server;
- grpc_completion_queue *queue = CompletionQueueAsyncWorker::GetQueue();
+ grpc_completion_queue *queue = GetCompletionQueue();
grpc_channel_args *channel_args;
if (!ParseChannelArgs(info[0], &channel_args)) {
DeallocateChannelArgs(channel_args);
@@ -205,14 +258,14 @@ NAN_METHOD(Server::RequestCall) {
ops->push_back(unique_ptr<Op>(op));
grpc_call_error error = grpc_server_request_call(
server->wrapped_server, &op->call, &op->details, &op->request_metadata,
- CompletionQueueAsyncWorker::GetQueue(),
- CompletionQueueAsyncWorker::GetQueue(),
+ GetCompletionQueue(),
+ GetCompletionQueue(),
new struct tag(new Callback(info[0].As<Function>()), ops.release(),
- shared_ptr<Resources>(nullptr)));
+ shared_ptr<Resources>(nullptr), NULL));
if (error != GRPC_CALL_OK) {
return Nan::ThrowError(nanErrorWithCode("requestCall failed", error));
}
- CompletionQueueAsyncWorker::Next();
+ CompletionQueueNext();
}
NAN_METHOD(Server::AddHttp2Port) {
@@ -259,10 +312,10 @@ NAN_METHOD(Server::TryShutdown) {
Server *server = ObjectWrap::Unwrap<Server>(info.This());
unique_ptr<OpVec> ops(new OpVec());
grpc_server_shutdown_and_notify(
- server->wrapped_server, CompletionQueueAsyncWorker::GetQueue(),
+ server->wrapped_server, GetCompletionQueue(),
new struct tag(new Nan::Callback(info[0].As<Function>()), ops.release(),
- shared_ptr<Resources>(nullptr)));
- CompletionQueueAsyncWorker::Next();
+ shared_ptr<Resources>(nullptr), NULL));
+ CompletionQueueNext();
}
NAN_METHOD(Server::ForceShutdown) {
diff --git a/src/node/ext/server.h b/src/node/ext/server.h
index ab5fc210e8..9e6a7bd1e0 100644
--- a/src/node/ext/server.h
+++ b/src/node/ext/server.h
@@ -73,7 +73,6 @@ class Server : public Nan::ObjectWrap {
static Nan::Persistent<v8::FunctionTemplate> fun_tpl;
grpc_server *wrapped_server;
- grpc_completion_queue *shutdown_queue;
};
} // namespace node
diff --git a/src/node/index.js b/src/node/index.js
index 9fb6faa5d7..a294aad8ee 100644
--- a/src/node/index.js
+++ b/src/node/index.js
@@ -219,3 +219,7 @@ exports.getClientChannel = client.getClientChannel;
* @see module:src/client.waitForClientReady
*/
exports.waitForClientReady = client.waitForClientReady;
+
+exports.closeClient = function closeClient(client_obj) {
+ client.getClientChannel(client_obj).close();
+};
diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js
index e8f2d37bd8..46ddecfb1f 100644
--- a/src/node/interop/interop_client.js
+++ b/src/node/interop/interop_client.js
@@ -375,11 +375,20 @@ function statusCodeAndMessage(client, done) {
duplex.end();
}
+// NOTE: the client param to this function is from UnimplementedService
+function unimplementedService(client, done) {
+ client.unimplementedCall({}, function(err, resp) {
+ assert(err);
+ assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED);
+ done();
+ });
+}
+
+// NOTE: the client param to this function is from TestService
function unimplementedMethod(client, done) {
client.unimplementedCall({}, function(err, resp) {
assert(err);
assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED);
- assert(!err.message);
done();
});
}
@@ -527,8 +536,10 @@ var test_cases = {
Client: testProto.TestService},
status_code_and_message: {run: statusCodeAndMessage,
Client: testProto.TestService},
- unimplemented_method: {run: unimplementedMethod,
+ unimplemented_service: {run: unimplementedService,
Client: testProto.UnimplementedService},
+ unimplemented_method: {run: unimplementedMethod,
+ Client: testProto.TestService},
compute_engine_creds: {run: computeEngineCreds,
Client: testProto.TestService,
getCreds: getApplicationCreds},
diff --git a/src/node/performance/benchmark_client_express.js b/src/node/performance/benchmark_client_express.js
new file mode 100644
index 0000000000..675eb5f288
--- /dev/null
+++ b/src/node/performance/benchmark_client_express.js
@@ -0,0 +1,291 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/**
+ * Benchmark client module
+ * @module
+ */
+
+'use strict';
+
+var fs = require('fs');
+var path = require('path');
+var util = require('util');
+var EventEmitter = require('events');
+var http = require('http');
+var https = require('https');
+
+var async = require('async');
+var _ = require('lodash');
+var PoissonProcess = require('poisson-process');
+var Histogram = require('./histogram');
+
+/**
+ * Convert a time difference, as returned by process.hrtime, to a number of
+ * nanoseconds.
+ * @param {Array.<number>} time_diff The time diff, represented as
+ * [seconds, nanoseconds]
+ * @return {number} The total number of nanoseconds
+ */
+function timeDiffToNanos(time_diff) {
+ return time_diff[0] * 1e9 + time_diff[1];
+}
+
+function BenchmarkClient(server_targets, channels, histogram_params,
+ security_params) {
+ var options = {
+ method: 'PUT',
+ headers: {
+ 'Content-Type': 'application/json'
+ }
+ };
+ var protocol;
+ if (security_params) {
+ var ca_path;
+ protocol = https;
+ this.request = _.bind(https.request, https);
+ if (security_params.use_test_ca) {
+ ca_path = path.join(__dirname, '../test/data/ca.pem');
+ var ca_data = fs.readFileSync(ca_path);
+ options.ca = ca_data;
+ }
+ if (security_params.server_host_override) {
+ var host_override = security_params.server_host_override;
+ options.servername = host_override;
+ }
+ } else {
+ protocol = http;
+ }
+
+ this.request = _.bind(protocol.request, protocol);
+
+ this.client_options = [];
+
+ for (var i = 0; i < channels; i++) {
+ var host_port;
+ host_port = server_targets[i % server_targets.length].split(':')
+ var new_options = _.assign({hostname: host_port[0], port: +host_port[1]}, options);
+ new_options.agent = new protocol.Agent(new_options);
+ this.client_options[i] = new_options;
+ }
+
+ this.histogram = new Histogram(histogram_params.resolution,
+ histogram_params.max_possible);
+
+ this.running = false;
+
+ this.pending_calls = 0;
+}
+
+util.inherits(BenchmarkClient, EventEmitter);
+
+function startAllClients(client_options_list, outstanding_rpcs_per_channel,
+ makeCall, emitter) {
+ _.each(client_options_list, function(client_options) {
+ _.times(outstanding_rpcs_per_channel, function() {
+ makeCall(client_options);
+ });
+ });
+}
+
+BenchmarkClient.prototype.startClosedLoop = function(
+ outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, generic) {
+ var self = this;
+
+ var options = {};
+
+ self.running = true;
+
+ if (rpc_type == 'UNARY') {
+ options.path = '/serviceProto.BenchmarkService.service/unaryCall';
+ } else {
+ self.emit('error', new Error('Unsupported rpc_type: ' + rpc_type));
+ }
+
+ if (generic) {
+ self.emit('error', new Error('Generic client not supported'));
+ }
+
+ self.last_wall_time = process.hrtime();
+
+ var argument = {
+ response_size: resp_size,
+ payload: {
+ body: '0'.repeat(req_size)
+ }
+ };
+
+ function makeCall(client_options) {
+ if (self.running) {
+ self.pending_calls++;
+ var start_time = process.hrtime();
+ var req = self.request(client_options, function(res) {
+ var res_data = '';
+ res.on('data', function(data) {
+ res_data += data;
+ });
+ res.on('end', function() {
+ JSON.parse(res_data);
+ var time_diff = process.hrtime(start_time);
+ self.histogram.add(timeDiffToNanos(time_diff));
+ makeCall(client_options);
+ self.pending_calls--;
+ if ((!self.running) && self.pending_calls == 0) {
+ self.emit('finished');
+ }
+ });
+ });
+ req.write(JSON.stringify(argument));
+ req.end();
+ req.on('error', function(error) {
+ self.emit('error', new Error('Client error: ' + error.message));
+ self.running = false;
+ });
+ }
+ }
+
+ startAllClients(_.map(self.client_options, _.partial(_.assign, options)),
+ outstanding_rpcs_per_channel, makeCall, self);
+};
+
+BenchmarkClient.prototype.startPoisson = function(
+ outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, offered_load,
+ generic) {
+ var self = this;
+
+ var options = {};
+
+ self.running = true;
+
+ if (rpc_type == 'UNARY') {
+ options.path = '/serviceProto.BenchmarkService.service/unaryCall';
+ } else {
+ self.emit('error', new Error('Unsupported rpc_type: ' + rpc_type));
+ }
+
+ if (generic) {
+ self.emit('error', new Error('Generic client not supported'));
+ }
+
+ self.last_wall_time = process.hrtime();
+
+ var argument = {
+ response_size: resp_size,
+ payload: {
+ body: '0'.repeat(req_size)
+ }
+ };
+
+ function makeCall(client_options, poisson) {
+ if (self.running) {
+ self.pending_calls++;
+ var start_time = process.hrtime();
+ var req = self.request(client_options, function(res) {
+ var res_data = '';
+ res.on('data', function(data) {
+ res_data += data;
+ });
+ res.on('end', function() {
+ JSON.parse(res_data);
+ var time_diff = process.hrtime(start_time);
+ self.histogram.add(timeDiffToNanos(time_diff));
+ self.pending_calls--;
+ if ((!self.running) && self.pending_calls == 0) {
+ self.emit('finished');
+ }
+ });
+ });
+ req.write(JSON.stringify(argument));
+ req.end();
+ req.on('error', function(error) {
+ self.emit('error', new Error('Client error: ' + error.message));
+ self.running = false;
+ });
+ } else {
+ poisson.stop();
+ }
+ }
+
+ var averageIntervalMs = (1 / offered_load) * 1000;
+
+ startAllClients(_.map(self.client_options, _.partial(_.assign, options)),
+ outstanding_rpcs_per_channel, function(opts){
+ var p = PoissonProcess.create(averageIntervalMs, function() {
+ makeCall(opts, p);
+ });
+ p.start();
+ }, self);
+};
+
+/**
+ * Return curent statistics for the client. If reset is set, restart
+ * statistic collection.
+ * @param {boolean} reset Indicates that statistics should be reset
+ * @return {object} Client statistics
+ */
+BenchmarkClient.prototype.mark = function(reset) {
+ var wall_time_diff = process.hrtime(this.last_wall_time);
+ var histogram = this.histogram;
+ if (reset) {
+ this.last_wall_time = process.hrtime();
+ this.histogram = new Histogram(histogram.resolution,
+ histogram.max_possible);
+ }
+
+ return {
+ latencies: {
+ bucket: histogram.getContents(),
+ min_seen: histogram.minimum(),
+ max_seen: histogram.maximum(),
+ sum: histogram.getSum(),
+ sum_of_squares: histogram.sumOfSquares(),
+ count: histogram.getCount()
+ },
+ time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9,
+ // Not sure how to measure these values
+ time_user: 0,
+ time_system: 0
+ };
+};
+
+/**
+ * Stop the clients.
+ * @param {function} callback Called when the clients have finished shutting
+ * down
+ */
+BenchmarkClient.prototype.stop = function(callback) {
+ this.running = false;
+ this.on('finished', callback);
+};
+
+module.exports = BenchmarkClient;
diff --git a/src/node/performance/benchmark_server.js b/src/node/performance/benchmark_server.js
index 70cee9979b..6abde2e17a 100644
--- a/src/node/performance/benchmark_server.js
+++ b/src/node/performance/benchmark_server.js
@@ -40,6 +40,8 @@
var fs = require('fs');
var path = require('path');
+var EventEmitter = require('events');
+var util = require('util');
var genericService = require('./generic_service');
@@ -138,12 +140,15 @@ function BenchmarkServer(host, port, tls, generic, response_size) {
this.server = server;
}
+util.inherits(BenchmarkServer, EventEmitter);
+
/**
* Start the benchmark server.
*/
BenchmarkServer.prototype.start = function() {
this.server.start();
this.last_wall_time = process.hrtime();
+ this.emit('started');
};
/**
diff --git a/src/node/performance/benchmark_server_express.js b/src/node/performance/benchmark_server_express.js
new file mode 100644
index 0000000000..065bcf660b
--- /dev/null
+++ b/src/node/performance/benchmark_server_express.js
@@ -0,0 +1,109 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/**
+ * Benchmark server module
+ * @module
+ */
+
+'use strict';
+
+var fs = require('fs');
+var path = require('path');
+var http = require('http');
+var https = require('https');
+var EventEmitter = require('events');
+var util = require('util');
+
+var express = require('express');
+var bodyParser = require('body-parser')
+
+function unaryCall(req, res) {
+ var reqObj = req.body;
+ var payload = {body: '0'.repeat(reqObj.response_size)};
+ res.json(payload);
+}
+
+function BenchmarkServer(host, port, tls, generic, response_size) {
+ var app = express();
+ app.use(bodyParser.json())
+ app.put('/serviceProto.BenchmarkService.service/unaryCall', unaryCall);
+ this.input_host = host;
+ this.input_port = port;
+ if (tls) {
+ var credentials = {};
+ var key_path = path.join(__dirname, '../test/data/server1.key');
+ var pem_path = path.join(__dirname, '../test/data/server1.pem');
+
+ var key_data = fs.readFileSync(key_path);
+ var pem_data = fs.readFileSync(pem_path);
+ credentials['key'] = key_data;
+ credentials['cert'] = pem_data;
+ this.server = https.createServer(credentials, app);
+ } else {
+ this.server = http.createServer(app);
+ }
+}
+
+util.inherits(BenchmarkServer, EventEmitter);
+
+BenchmarkServer.prototype.start = function() {
+ var self = this;
+ this.server.listen(this.input_port, this.input_hostname, function() {
+ self.last_wall_time = process.hrtime();
+ self.emit('started');
+ });
+};
+
+BenchmarkServer.prototype.getPort = function() {
+ return this.server.address().port;
+};
+
+BenchmarkServer.prototype.mark = function(reset) {
+ var wall_time_diff = process.hrtime(this.last_wall_time);
+ if (reset) {
+ this.last_wall_time = process.hrtime();
+ }
+ return {
+ time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9,
+ // Not sure how to measure these values
+ time_user: 0,
+ time_system: 0
+ };
+};
+
+BenchmarkServer.prototype.stop = function(callback) {
+ this.server.close(callback);
+};
+
+module.exports = BenchmarkServer;
diff --git a/src/node/performance/worker.js b/src/node/performance/worker.js
index 7ef9b84fe7..030bf7d7ba 100644
--- a/src/node/performance/worker.js
+++ b/src/node/performance/worker.js
@@ -34,18 +34,18 @@
'use strict';
var console = require('console');
-var worker_service_impl = require('./worker_service_impl');
+var WorkerServiceImpl = require('./worker_service_impl');
var grpc = require('../../../');
var serviceProto = grpc.load({
root: __dirname + '/../../..',
file: 'src/proto/grpc/testing/services.proto'}).grpc.testing;
-function runServer(port) {
+function runServer(port, benchmark_impl) {
var server_creds = grpc.ServerCredentials.createInsecure();
var server = new grpc.Server();
server.addProtoService(serviceProto.WorkerService.service,
- worker_service_impl);
+ new WorkerServiceImpl(benchmark_impl, server));
var address = '0.0.0.0:' + port;
server.bind(address, server_creds);
server.start();
@@ -57,9 +57,9 @@ if (require.main === module) {
Error.stackTraceLimit = Infinity;
var parseArgs = require('minimist');
var argv = parseArgs(process.argv, {
- string: ['driver_port']
+ string: ['driver_port', 'benchmark_impl']
});
- runServer(argv.driver_port);
+ runServer(argv.driver_port, argv.benchmark_impl);
}
exports.runServer = runServer;
diff --git a/src/node/performance/worker_service_impl.js b/src/node/performance/worker_service_impl.js
index 4b5cb8f9c2..3f317f6429 100644
--- a/src/node/performance/worker_service_impl.js
+++ b/src/node/performance/worker_service_impl.js
@@ -38,121 +38,141 @@ var console = require('console');
var BenchmarkClient = require('./benchmark_client');
var BenchmarkServer = require('./benchmark_server');
-exports.quitWorker = function quitWorker(call, callback) {
- callback(null, {});
- process.exit(0);
-}
+module.exports = function WorkerServiceImpl(benchmark_impl, server) {
+ var BenchmarkClient;
+ var BenchmarkServer;
+ switch (benchmark_impl) {
+ case 'grpc':
+ BenchmarkClient = require('./benchmark_client');
+ BenchmarkServer = require('./benchmark_server');
+ break;
+ case 'express':
+ BenchmarkClient = require('./benchmark_client_express');
+ BenchmarkServer = require('./benchmark_server_express');
+ break;
+ default:
+ throw new Error('Unrecognized benchmark impl: ' + benchmark_impl);
+ }
-exports.runClient = function runClient(call) {
- var client;
- call.on('data', function(request) {
- var stats;
- switch (request.argtype) {
- case 'setup':
- var setup = request.setup;
- console.log('ClientConfig %j', setup);
- client = new BenchmarkClient(setup.server_targets,
- setup.client_channels,
- setup.histogram_params,
- setup.security_params);
- client.on('error', function(error) {
- call.emit('error', error);
- });
- var req_size, resp_size, generic;
- switch (setup.payload_config.payload) {
- case 'bytebuf_params':
- req_size = setup.payload_config.bytebuf_params.req_size;
- resp_size = setup.payload_config.bytebuf_params.resp_size;
- generic = true;
+ this.quitWorker = function quitWorker(call, callback) {
+ server.tryShutdown(function() {
+ callback(null, {});
+ });
+ };
+
+ this.runClient = function runClient(call) {
+ var client;
+ call.on('data', function(request) {
+ var stats;
+ switch (request.argtype) {
+ case 'setup':
+ var setup = request.setup;
+ console.log('ClientConfig %j', setup);
+ client = new BenchmarkClient(setup.server_targets,
+ setup.client_channels,
+ setup.histogram_params,
+ setup.security_params);
+ client.on('error', function(error) {
+ call.emit('error', error);
+ });
+ var req_size, resp_size, generic;
+ switch (setup.payload_config.payload) {
+ case 'bytebuf_params':
+ req_size = setup.payload_config.bytebuf_params.req_size;
+ resp_size = setup.payload_config.bytebuf_params.resp_size;
+ generic = true;
+ break;
+ case 'simple_params':
+ req_size = setup.payload_config.simple_params.req_size;
+ resp_size = setup.payload_config.simple_params.resp_size;
+ generic = false;
+ break;
+ default:
+ call.emit('error', new Error('Unsupported PayloadConfig type' +
+ setup.payload_config.payload));
+ }
+ switch (setup.load_params.load) {
+ case 'closed_loop':
+ client.startClosedLoop(setup.outstanding_rpcs_per_channel,
+ setup.rpc_type, req_size, resp_size, generic);
+ break;
+ case 'poisson':
+ client.startPoisson(setup.outstanding_rpcs_per_channel,
+ setup.rpc_type, req_size, resp_size,
+ setup.load_params.poisson.offered_load, generic);
+ break;
+ default:
+ call.emit('error', new Error('Unsupported LoadParams type' +
+ setup.load_params.load));
+ }
+ stats = client.mark();
+ call.write({
+ stats: stats
+ });
break;
- case 'simple_params':
- req_size = setup.payload_config.simple_params.req_size;
- resp_size = setup.payload_config.simple_params.resp_size;
- generic = false;
+ case 'mark':
+ if (client) {
+ stats = client.mark(request.mark.reset);
+ call.write({
+ stats: stats
+ });
+ } else {
+ call.emit('error', new Error('Got Mark before ClientConfig'));
+ }
break;
default:
- call.emit('error', new Error('Unsupported PayloadConfig type' +
- setup.payload_config.payload));
+ throw new Error('Nonexistent client argtype option: ' + request.argtype);
}
- switch (setup.load_params.load) {
- case 'closed_loop':
- client.startClosedLoop(setup.outstanding_rpcs_per_channel,
- setup.rpc_type, req_size, resp_size, generic);
+ });
+ call.on('end', function() {
+ client.stop(function() {
+ call.end();
+ });
+ });
+ };
+
+ this.runServer = function runServer(call) {
+ var server;
+ call.on('data', function(request) {
+ var stats;
+ switch (request.argtype) {
+ case 'setup':
+ console.log('ServerConfig %j', request.setup);
+ server = new BenchmarkServer('[::]', request.setup.port,
+ request.setup.security_params);
+ server.on('started', function() {
+ stats = server.mark();
+ call.write({
+ stats: stats,
+ port: server.getPort()
+ });
+ });
+ server.start();
break;
- case 'poisson':
- client.startPoisson(setup.outstanding_rpcs_per_channel,
- setup.rpc_type, req_size, resp_size,
- setup.load_params.poisson.offered_load, generic);
+ case 'mark':
+ if (server) {
+ stats = server.mark(request.mark.reset);
+ call.write({
+ stats: stats,
+ port: server.getPort(),
+ cores: 1
+ });
+ } else {
+ call.emit('error', new Error('Got Mark before ServerConfig'));
+ }
break;
default:
- call.emit('error', new Error('Unsupported LoadParams type' +
- setup.load_params.load));
+ throw new Error('Nonexistent server argtype option');
}
- stats = client.mark();
- call.write({
- stats: stats
- });
- break;
- case 'mark':
- if (client) {
- stats = client.mark(request.mark.reset);
- call.write({
- stats: stats
- });
- } else {
- call.emit('error', new Error('Got Mark before ClientConfig'));
- }
- break;
- default:
- throw new Error('Nonexistent client argtype option: ' + request.argtype);
- }
- });
- call.on('end', function() {
- client.stop(function() {
- call.end();
});
- });
-};
-
-exports.runServer = function runServer(call) {
- var server;
- call.on('data', function(request) {
- var stats;
- switch (request.argtype) {
- case 'setup':
- console.log('ServerConfig %j', request.setup);
- server = new BenchmarkServer('[::]', request.setup.port,
- request.setup.security_params);
- server.start();
- stats = server.mark();
- call.write({
- stats: stats,
- port: server.getPort()
+ call.on('end', function() {
+ server.stop(function() {
+ call.end();
});
- break;
- case 'mark':
- if (server) {
- stats = server.mark(request.mark.reset);
- call.write({
- stats: stats,
- port: server.getPort(),
- cores: 1
- });
- } else {
- call.emit('error', new Error('Got Mark before ServerConfig'));
- }
- break;
- default:
- throw new Error('Nonexistent server argtype option');
- }
- });
- call.on('end', function() {
- server.stop(function() {
- call.end();
});
- });
-};
+ };
-exports.coreCount = function coreCount(call, callback) {
- callback(null, {cores: os.cpus().length});
+ this.coreCount = function coreCount(call, callback) {
+ callback(null, {cores: os.cpus().length});
+ };
};
diff --git a/src/node/src/client.js b/src/node/src/client.js
index f75f951eb8..9c1562e8b8 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -382,6 +382,7 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
if (args.options) {
message.grpcWriteFlags = args.options.flags;
}
+
client_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
client_batch[grpc.opType.SEND_MESSAGE] = message;
diff --git a/src/node/src/common.js b/src/node/src/common.js
index 22159dd39f..c6c6d597a8 100644
--- a/src/node/src/common.js
+++ b/src/node/src/common.js
@@ -141,7 +141,7 @@ exports.getProtobufServiceAttrs = function getProtobufServiceAttrs(service,
binaryAsBase64 = options.binaryAsBase64;
longsAsStrings = options.longsAsStrings;
}
- return _.object(_.map(service.children, function(method) {
+ return _.fromPairs(_.map(service.children, function(method) {
return [_.camelCase(method.name), {
path: prefix + method.name,
requestStream: method.requestStream,
diff --git a/src/node/src/grpc_extension.js b/src/node/src/grpc_extension.js
index 6a8fe2c03c..63a281ddbc 100644
--- a/src/node/src/grpc_extension.js
+++ b/src/node/src/grpc_extension.js
@@ -31,7 +31,7 @@
*
*/
-var binary = require('node-pre-gyp');
+var binary = require('node-pre-gyp/lib/pre-binding');
var path = require('path');
var binding_path =
binary.find(path.resolve(path.join(__dirname, '../../../package.json')));
diff --git a/src/node/test/async_test.js b/src/node/test/async_test.js
index c46e745116..7b467e5475 100644
--- a/src/node/test/async_test.js
+++ b/src/node/test/async_test.js
@@ -61,6 +61,7 @@ describe('Async functionality', function() {
done();
});
after(function() {
+ grpc.closeClient(math_client);
server.forceShutdown();
});
it('should not hang', function(done) {
diff --git a/src/node/test/interop_sanity_test.js b/src/node/test/interop_sanity_test.js
index f008a87585..58f8842c0d 100644
--- a/src/node/test/interop_sanity_test.js
+++ b/src/node/test/interop_sanity_test.js
@@ -98,6 +98,10 @@ describe('Interop tests', function() {
interop_client.runTest(port, name_override, 'status_code_and_message',
true, true, done);
});
+ it('should pass unimplemented_service', function(done) {
+ interop_client.runTest(port, name_override, 'unimplemented_service',
+ true, true, done);
+ });
it('should pass unimplemented_method', function(done) {
interop_client.runTest(port, name_override, 'unimplemented_method',
true, true, done);