aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/node
diff options
context:
space:
mode:
authorGravatar murgatroid99 <michael.lumish@gmail.com>2015-01-12 18:14:35 -0800
committerGravatar murgatroid99 <michael.lumish@gmail.com>2015-01-12 18:14:35 -0800
commite5061519185627e58495bede780f757339ba07c5 (patch)
tree7bf63f4bcc35d2141351d8cf29fac7a6eb5dfe63 /src/node
parent470a3ea1a192e53a61012c30a6a9a5efcc712948 (diff)
Clean commit of Node.js library source
Diffstat (limited to 'src/node')
-rw-r--r--src/node/README.md12
-rw-r--r--src/node/binding.gyp46
-rw-r--r--src/node/byte_buffer.cc46
-rw-r--r--src/node/byte_buffer.h23
-rw-r--r--src/node/call.cc384
-rw-r--r--src/node/call.h49
-rw-r--r--src/node/channel.cc155
-rw-r--r--src/node/channel.h46
-rw-r--r--src/node/client.js176
-rw-r--r--src/node/common.js29
-rw-r--r--src/node/completion_queue_async_worker.cc62
-rw-r--r--src/node/completion_queue_async_worker.h46
-rw-r--r--src/node/credentials.cc180
-rw-r--r--src/node/credentials.h48
-rw-r--r--src/node/event.cc132
-rw-r--r--src/node/event.h15
-rw-r--r--src/node/examples/math.proto25
-rw-r--r--src/node/examples/math_server.js168
-rw-r--r--src/node/node_grpc.cc151
-rw-r--r--src/node/package.json18
-rw-r--r--src/node/port_picker.js19
-rw-r--r--src/node/server.cc212
-rw-r--r--src/node/server.h46
-rw-r--r--src/node/server.js228
-rw-r--r--src/node/server_credentials.cc131
-rw-r--r--src/node/server_credentials.h44
-rw-r--r--src/node/surface_client.js306
-rw-r--r--src/node/surface_server.js325
-rw-r--r--src/node/tag.cc71
-rw-r--r--src/node/tag.h26
-rw-r--r--src/node/test/byte_buffer_test.js~35
-rw-r--r--src/node/test/call_test.js169
-rw-r--r--src/node/test/call_test.js~6
-rw-r--r--src/node/test/channel_test.js55
-rw-r--r--src/node/test/client_server_test.js150
-rw-r--r--src/node/test/client_server_test.js~59
-rw-r--r--src/node/test/completion_queue_test.js~30
-rw-r--r--src/node/test/constant_test.js97
-rw-r--r--src/node/test/constant_test.js~25
-rw-r--r--src/node/test/data/README1
-rw-r--r--src/node/test/data/ca.pem15
-rw-r--r--src/node/test/data/server1.key16
-rw-r--r--src/node/test/data/server1.pem16
-rw-r--r--src/node/test/end_to_end_test.js168
-rw-r--r--src/node/test/end_to_end_test.js~72
-rw-r--r--src/node/test/math_client_test.js176
-rw-r--r--src/node/test/math_client_test.js~87
-rw-r--r--src/node/test/server_test.js88
-rw-r--r--src/node/test/server_test.js~22
-rw-r--r--src/node/timeval.cc33
-rw-r--r--src/node/timeval.h15
51 files changed, 4554 insertions, 0 deletions
diff --git a/src/node/README.md b/src/node/README.md
new file mode 100644
index 0000000000..55329d8cb2
--- /dev/null
+++ b/src/node/README.md
@@ -0,0 +1,12 @@
+# Node.js GRPC extension
+
+The package is built with
+
+ node-gyp configure
+ node-gyp build
+
+or, for brevity
+
+ node-gyp configure build
+
+The tests can be run with `npm test` on a dev install. \ No newline at end of file
diff --git a/src/node/binding.gyp b/src/node/binding.gyp
new file mode 100644
index 0000000000..4a1fd7aaf0
--- /dev/null
+++ b/src/node/binding.gyp
@@ -0,0 +1,46 @@
+{
+ "targets" : [
+ {
+ 'include_dirs': [
+ "<!(node -e \"require('nan')\")"
+ ],
+ 'cxxflags': [
+ '-Wall',
+ '-pthread',
+ '-pedantic',
+ '-g',
+ '-zdefs'
+ '-Werror',
+ ],
+ 'ldflags': [
+ '-g',
+ '-L/usr/local/google/home/mlumish/grpc_dev/lib'
+ ],
+ 'link_settings': {
+ 'libraries': [
+ '-lgrpc',
+ '-levent',
+ '-levent_pthreads',
+ '-levent_core',
+ '-lrt',
+ '-lgpr',
+ '-lpthread'
+ ],
+ },
+ "target_name": "grpc",
+ "sources": [
+ "byte_buffer.cc",
+ "call.cc",
+ "channel.cc",
+ "completion_queue_async_worker.cc",
+ "credentials.cc",
+ "event.cc",
+ "node_grpc.cc",
+ "server.cc",
+ "server_credentials.cc",
+ "tag.cc",
+ "timeval.cc"
+ ]
+ }
+ ]
+}
diff --git a/src/node/byte_buffer.cc b/src/node/byte_buffer.cc
new file mode 100644
index 0000000000..e8bdc806ec
--- /dev/null
+++ b/src/node/byte_buffer.cc
@@ -0,0 +1,46 @@
+#include <string.h>
+#include <malloc.h>
+
+#include <node.h>
+#include <nan.h>
+#include "grpc/grpc.h"
+#include "grpc/support/slice.h"
+
+namespace grpc {
+namespace node {
+
+#include "byte_buffer.h"
+
+using ::node::Buffer;
+using v8::Handle;
+using v8::Value;
+
+grpc_byte_buffer *BufferToByteBuffer(Handle<Value> buffer) {
+ NanScope();
+ int length = Buffer::Length(buffer);
+ char *data = Buffer::Data(buffer);
+ gpr_slice slice = gpr_slice_malloc(length);
+ memcpy(GPR_SLICE_START_PTR(slice), data, length);
+ grpc_byte_buffer *byte_buffer(grpc_byte_buffer_create(&slice, 1));
+ gpr_slice_unref(slice);
+ return byte_buffer;
+}
+
+Handle<Value> ByteBufferToBuffer(grpc_byte_buffer *buffer) {
+ NanEscapableScope();
+ if (buffer == NULL) {
+ NanReturnNull();
+ }
+ size_t length = grpc_byte_buffer_length(buffer);
+ char *result = reinterpret_cast<char*>(calloc(length, sizeof(char)));
+ size_t offset = 0;
+ grpc_byte_buffer_reader *reader = grpc_byte_buffer_reader_create(buffer);
+ gpr_slice next;
+ while (grpc_byte_buffer_reader_next(reader, &next) != 0) {
+ memcpy(result+offset, GPR_SLICE_START_PTR(next), GPR_SLICE_LENGTH(next));
+ offset += GPR_SLICE_LENGTH(next);
+ }
+ return NanEscapeScope(NanNewBufferHandle(result, length));
+}
+} // namespace node
+} // namespace grpc
diff --git a/src/node/byte_buffer.h b/src/node/byte_buffer.h
new file mode 100644
index 0000000000..b439de15b1
--- /dev/null
+++ b/src/node/byte_buffer.h
@@ -0,0 +1,23 @@
+#ifndef NET_GRPC_NODE_BYTE_BUFFER_H_
+#define NET_GRPC_NODE_BYTE_BUFFER_H_
+
+#include <string.h>
+
+#include <node.h>
+#include <nan.h>
+#include "grpc/grpc.h"
+
+namespace grpc {
+namespace node {
+
+/* Convert a Node.js Buffer to grpc_byte_buffer. Requires that
+ ::node::Buffer::HasInstance(buffer) */
+grpc_byte_buffer *BufferToByteBuffer(v8::Handle<v8::Value> buffer);
+
+/* Convert a grpc_byte_buffer to a Node.js Buffer */
+v8::Handle<v8::Value> ByteBufferToBuffer(grpc_byte_buffer *buffer);
+
+} // namespace node
+} // namespace grpc
+
+#endif // NET_GRPC_NODE_BYTE_BUFFER_H_
diff --git a/src/node/call.cc b/src/node/call.cc
new file mode 100644
index 0000000000..c6685101ca
--- /dev/null
+++ b/src/node/call.cc
@@ -0,0 +1,384 @@
+#include <node.h>
+
+#include "grpc/grpc.h"
+#include "grpc/support/time.h"
+#include "byte_buffer.h"
+#include "call.h"
+#include "channel.h"
+#include "completion_queue_async_worker.h"
+#include "timeval.h"
+#include "tag.h"
+
+namespace grpc {
+namespace node {
+
+using ::node::Buffer;
+using v8::Arguments;
+using v8::Array;
+using v8::Exception;
+using v8::External;
+using v8::Function;
+using v8::FunctionTemplate;
+using v8::Handle;
+using v8::HandleScope;
+using v8::Integer;
+using v8::Local;
+using v8::Number;
+using v8::Object;
+using v8::ObjectTemplate;
+using v8::Persistent;
+using v8::Uint32;
+using v8::String;
+using v8::Value;
+
+Persistent<Function> Call::constructor;
+Persistent<FunctionTemplate> Call::fun_tpl;
+
+Call::Call(grpc_call *call) : wrapped_call(call) {
+}
+
+Call::~Call() {
+ grpc_call_destroy(wrapped_call);
+}
+
+void Call::Init(Handle<Object> exports) {
+ NanScope();
+ Local<FunctionTemplate> tpl = FunctionTemplate::New(New);
+ tpl->SetClassName(NanNew("Call"));
+ tpl->InstanceTemplate()->SetInternalFieldCount(1);
+ NanSetPrototypeTemplate(tpl, "addMetadata",
+ FunctionTemplate::New(AddMetadata)->GetFunction());
+ NanSetPrototypeTemplate(tpl, "startInvoke",
+ FunctionTemplate::New(StartInvoke)->GetFunction());
+ NanSetPrototypeTemplate(tpl, "serverAccept",
+ FunctionTemplate::New(ServerAccept)->GetFunction());
+ NanSetPrototypeTemplate(
+ tpl,
+ "serverEndInitialMetadata",
+ FunctionTemplate::New(ServerEndInitialMetadata)->GetFunction());
+ NanSetPrototypeTemplate(tpl, "cancel",
+ FunctionTemplate::New(Cancel)->GetFunction());
+ NanSetPrototypeTemplate(tpl, "startWrite",
+ FunctionTemplate::New(StartWrite)->GetFunction());
+ NanSetPrototypeTemplate(
+ tpl, "startWriteStatus",
+ FunctionTemplate::New(StartWriteStatus)->GetFunction());
+ NanSetPrototypeTemplate(tpl, "writesDone",
+ FunctionTemplate::New(WritesDone)->GetFunction());
+ NanSetPrototypeTemplate(tpl, "startReadMetadata",
+ FunctionTemplate::New(WritesDone)->GetFunction());
+ NanSetPrototypeTemplate(tpl, "startRead",
+ FunctionTemplate::New(StartRead)->GetFunction());
+ NanAssignPersistent(fun_tpl, tpl);
+ NanAssignPersistent(constructor, tpl->GetFunction());
+ constructor->Set(NanNew("WRITE_BUFFER_HINT"),
+ NanNew<Uint32, uint32_t>(GRPC_WRITE_BUFFER_HINT));
+ constructor->Set(NanNew("WRITE_NO_COMPRESS"),
+ NanNew<Uint32, uint32_t>(GRPC_WRITE_NO_COMPRESS));
+ exports->Set(String::NewSymbol("Call"), constructor);
+}
+
+bool Call::HasInstance(Handle<Value> val) {
+ NanScope();
+ return NanHasInstance(fun_tpl, val);
+}
+
+Handle<Value> Call::WrapStruct(grpc_call *call) {
+ NanEscapableScope();
+ if (call == NULL) {
+ return NanEscapeScope(NanNull());
+ }
+ const int argc = 1;
+ Handle<Value> argv[argc] = { External::New(reinterpret_cast<void*>(call)) };
+ return NanEscapeScope(constructor->NewInstance(argc, argv));
+}
+
+NAN_METHOD(Call::New) {
+ NanScope();
+
+ if (args.IsConstructCall()) {
+ Call *call;
+ if (args[0]->IsExternal()) {
+ // This option is used for wrapping an existing call
+ grpc_call *call_value = reinterpret_cast<grpc_call*>(
+ External::Unwrap(args[0]));
+ call = new Call(call_value);
+ } else {
+ if (!Channel::HasInstance(args[0])) {
+ return NanThrowTypeError("Call's first argument must be a Channel");
+ }
+ if (!args[1]->IsString()) {
+ return NanThrowTypeError("Call's second argument must be a string");
+ }
+ if (!(args[2]->IsNumber() || args[2]->IsDate())) {
+ return NanThrowTypeError(
+ "Call's third argument must be a date or a number");
+ }
+ Handle<Object> channel_object = args[0]->ToObject();
+ Channel *channel = ObjectWrap::Unwrap<Channel>(channel_object);
+ if (channel->GetWrappedChannel() == NULL) {
+ return NanThrowError("Call cannot be created from a closed channel");
+ }
+ NanUtf8String method(args[1]);
+ double deadline = args[2]->NumberValue();
+ grpc_channel *wrapped_channel = channel->GetWrappedChannel();
+ grpc_call *wrapped_call = grpc_channel_create_call(
+ wrapped_channel,
+ *method,
+ channel->GetHost(),
+ MillisecondsToTimespec(deadline));
+ call = new Call(wrapped_call);
+ args.This()->SetHiddenValue(String::NewSymbol("channel_"),
+ channel_object);
+ }
+ call->Wrap(args.This());
+ NanReturnValue(args.This());
+ } else {
+ const int argc = 4;
+ Local<Value> argv[argc] = { args[0], args[1], args[2], args[3] };
+ NanReturnValue(constructor->NewInstance(argc, argv));
+ }
+}
+
+NAN_METHOD(Call::AddMetadata) {
+ NanScope();
+ if (!HasInstance(args.This())) {
+ return NanThrowTypeError(
+ "addMetadata can only be called on Call objects");
+ }
+ Call *call = ObjectWrap::Unwrap<Call>(args.This());
+ for (int i=0; !args[i]->IsUndefined(); i++) {
+ if (!args[i]->IsObject()) {
+ return NanThrowTypeError(
+ "addMetadata arguments must be objects with key and value");
+ }
+ Handle<Object> item = args[i]->ToObject();
+ Handle<Value> key = item->Get(NanNew("key"));
+ if (!key->IsString()) {
+ return NanThrowTypeError(
+ "objects passed to addMetadata must have key->string");
+ }
+ Handle<Value> value = item->Get(NanNew("value"));
+ if (!Buffer::HasInstance(value)) {
+ return NanThrowTypeError(
+ "objects passed to addMetadata must have value->Buffer");
+ }
+ grpc_metadata metadata;
+ NanUtf8String utf8_key(key);
+ metadata.key = *utf8_key;
+ metadata.value = Buffer::Data(value);
+ metadata.value_length = Buffer::Length(value);
+ grpc_call_error error = grpc_call_add_metadata(call->wrapped_call,
+ &metadata,
+ 0);
+ if (error != GRPC_CALL_OK) {
+ return NanThrowError("addMetadata failed", error);
+ }
+ }
+ NanReturnUndefined();
+}
+
+NAN_METHOD(Call::StartInvoke) {
+ NanScope();
+ if (!HasInstance(args.This())) {
+ return NanThrowTypeError("startInvoke can only be called on Call objects");
+ }
+ if (!args[0]->IsFunction()) {
+ return NanThrowTypeError(
+ "StartInvoke's first argument must be a function");
+ }
+ if (!args[1]->IsFunction()) {
+ return NanThrowTypeError(
+ "StartInvoke's second argument must be a function");
+ }
+ if (!args[2]->IsFunction()) {
+ return NanThrowTypeError(
+ "StartInvoke's third argument must be a function");
+ }
+ if (!args[3]->IsUint32()) {
+ return NanThrowTypeError(
+ "StartInvoke's fourth argument must be integer flags");
+ }
+ Call *call = ObjectWrap::Unwrap<Call>(args.This());
+ unsigned int flags = args[3]->Uint32Value();
+ grpc_call_error error = grpc_call_start_invoke(
+ call->wrapped_call,
+ CompletionQueueAsyncWorker::GetQueue(),
+ CreateTag(args[0], args.This()),
+ CreateTag(args[1], args.This()),
+ CreateTag(args[2], args.This()),
+ flags);
+ if (error == GRPC_CALL_OK) {
+ CompletionQueueAsyncWorker::Next();
+ CompletionQueueAsyncWorker::Next();
+ CompletionQueueAsyncWorker::Next();
+ } else {
+ return NanThrowError("startInvoke failed", error);
+ }
+ NanReturnUndefined();
+}
+
+NAN_METHOD(Call::ServerAccept) {
+ NanScope();
+ if (!HasInstance(args.This())) {
+ return NanThrowTypeError("accept can only be called on Call objects");
+ }
+ if (!args[0]->IsFunction()) {
+ return NanThrowTypeError(
+ "accept's first argument must be a function");
+ }
+ Call *call = ObjectWrap::Unwrap<Call>(args.This());
+ grpc_call_error error = grpc_call_server_accept(
+ call->wrapped_call,
+ CompletionQueueAsyncWorker::GetQueue(),
+ CreateTag(args[0], args.This()));
+ if (error == GRPC_CALL_OK) {
+ CompletionQueueAsyncWorker::Next();
+ } else {
+ return NanThrowError("serverAccept failed", error);
+ }
+ NanReturnUndefined();
+}
+
+NAN_METHOD(Call::ServerEndInitialMetadata) {
+ NanScope();
+ if (!HasInstance(args.This())) {
+ return NanThrowTypeError(
+ "serverEndInitialMetadata can only be called on Call objects");
+ }
+ if (!args[0]->IsUint32()) {
+ return NanThrowTypeError(
+ "serverEndInitialMetadata's second argument must be integer flags");
+ }
+ Call *call = ObjectWrap::Unwrap<Call>(args.This());
+ unsigned int flags = args[1]->Uint32Value();
+ grpc_call_error error = grpc_call_server_end_initial_metadata(
+ call->wrapped_call,
+ flags);
+ if (error != GRPC_CALL_OK) {
+ return NanThrowError("serverEndInitialMetadata failed", error);
+ }
+ NanReturnUndefined();
+}
+
+NAN_METHOD(Call::Cancel) {
+ NanScope();
+ if (!HasInstance(args.This())) {
+ return NanThrowTypeError("startInvoke can only be called on Call objects");
+ }
+ Call *call = ObjectWrap::Unwrap<Call>(args.This());
+ grpc_call_error error = grpc_call_cancel(call->wrapped_call);
+ if (error != GRPC_CALL_OK) {
+ return NanThrowError("cancel failed", error);
+ }
+ NanReturnUndefined();
+}
+
+NAN_METHOD(Call::StartWrite) {
+ NanScope();
+ if (!HasInstance(args.This())) {
+ return NanThrowTypeError("startWrite can only be called on Call objects");
+ }
+ if (!Buffer::HasInstance(args[0])) {
+ return NanThrowTypeError(
+ "startWrite's first argument must be a Buffer");
+ }
+ if (!args[1]->IsFunction()) {
+ return NanThrowTypeError(
+ "startWrite's second argument must be a function");
+ }
+ if (!args[2]->IsUint32()) {
+ return NanThrowTypeError(
+ "startWrite's third argument must be integer flags");
+ }
+ Call *call = ObjectWrap::Unwrap<Call>(args.This());
+ grpc_byte_buffer *buffer = BufferToByteBuffer(args[0]);
+ unsigned int flags = args[2]->Uint32Value();
+ grpc_call_error error = grpc_call_start_write(call->wrapped_call,
+ buffer,
+ CreateTag(args[1], args.This()),
+ flags);
+ if (error == GRPC_CALL_OK) {
+ CompletionQueueAsyncWorker::Next();
+ } else {
+ return NanThrowError("startWrite failed", error);
+ }
+ NanReturnUndefined();
+}
+
+NAN_METHOD(Call::StartWriteStatus) {
+ NanScope();
+ if (!HasInstance(args.This())) {
+ return NanThrowTypeError(
+ "startWriteStatus can only be called on Call objects");
+ }
+ if (!args[0]->IsUint32()) {
+ return NanThrowTypeError(
+ "startWriteStatus's first argument must be a status code");
+ }
+ if (!args[1]->IsString()) {
+ return NanThrowTypeError(
+ "startWriteStatus's second argument must be a string");
+ }
+ if (!args[2]->IsFunction()) {
+ return NanThrowTypeError(
+ "startWriteStatus's third argument must be a function");
+ }
+ Call *call = ObjectWrap::Unwrap<Call>(args.This());
+ NanUtf8String details(args[1]);
+ grpc_call_error error = grpc_call_start_write_status(
+ call->wrapped_call,
+ (grpc_status_code)args[0]->Uint32Value(),
+ *details,
+ CreateTag(args[2], args.This()));
+ if (error == GRPC_CALL_OK) {
+ CompletionQueueAsyncWorker::Next();
+ } else {
+ return NanThrowError("startWriteStatus failed", error);
+ }
+ NanReturnUndefined();
+}
+
+NAN_METHOD(Call::WritesDone) {
+ NanScope();
+ if (!HasInstance(args.This())) {
+ return NanThrowTypeError("writesDone can only be called on Call objects");
+ }
+ if (!args[0]->IsFunction()) {
+ return NanThrowTypeError(
+ "writesDone's first argument must be a function");
+ }
+ Call *call = ObjectWrap::Unwrap<Call>(args.This());
+ grpc_call_error error = grpc_call_writes_done(
+ call->wrapped_call,
+ CreateTag(args[0], args.This()));
+ if (error == GRPC_CALL_OK) {
+ CompletionQueueAsyncWorker::Next();
+ } else {
+ return NanThrowError("writesDone failed", error);
+ }
+ NanReturnUndefined();
+}
+
+NAN_METHOD(Call::StartRead) {
+ NanScope();
+ if (!HasInstance(args.This())) {
+ return NanThrowTypeError("startRead can only be called on Call objects");
+ }
+ if (!args[0]->IsFunction()) {
+ return NanThrowTypeError(
+ "startRead's first argument must be a function");
+ }
+ Call *call = ObjectWrap::Unwrap<Call>(args.This());
+ grpc_call_error error = grpc_call_start_read(call->wrapped_call,
+ CreateTag(args[0], args.This()));
+ if (error == GRPC_CALL_OK) {
+ CompletionQueueAsyncWorker::Next();
+ } else {
+ return NanThrowError("startRead failed", error);
+ }
+ NanReturnUndefined();
+}
+
+} // namespace node
+} // namespace grpc
diff --git a/src/node/call.h b/src/node/call.h
new file mode 100644
index 0000000000..1bf8e32aab
--- /dev/null
+++ b/src/node/call.h
@@ -0,0 +1,49 @@
+#ifndef NET_GRPC_NODE_CALL_H_
+#define NET_GRPC_NODE_CALL_H_
+
+#include <node.h>
+#include <nan.h>
+#include "grpc/grpc.h"
+
+#include "channel.h"
+
+namespace grpc {
+namespace node {
+
+/* Wrapper class for grpc_call structs. */
+class Call : public ::node::ObjectWrap {
+ public:
+ static void Init(v8::Handle<v8::Object> exports);
+ static bool HasInstance(v8::Handle<v8::Value> val);
+ /* Wrap a grpc_call struct in a javascript object */
+ static v8::Handle<v8::Value> WrapStruct(grpc_call *call);
+
+ private:
+ explicit Call(grpc_call *call);
+ ~Call();
+
+ // Prevent copying
+ Call(const Call&);
+ Call& operator=(const Call&);
+
+ static NAN_METHOD(New);
+ static NAN_METHOD(AddMetadata);
+ static NAN_METHOD(StartInvoke);
+ static NAN_METHOD(ServerAccept);
+ static NAN_METHOD(ServerEndInitialMetadata);
+ static NAN_METHOD(Cancel);
+ static NAN_METHOD(StartWrite);
+ static NAN_METHOD(StartWriteStatus);
+ static NAN_METHOD(WritesDone);
+ static NAN_METHOD(StartRead);
+ static v8::Persistent<v8::Function> constructor;
+ // Used for typechecking instances of this javascript class
+ static v8::Persistent<v8::FunctionTemplate> fun_tpl;
+
+ grpc_call *wrapped_call;
+};
+
+} // namespace node
+} // namespace grpc
+
+#endif // NET_GRPC_NODE_CALL_H_
diff --git a/src/node/channel.cc b/src/node/channel.cc
new file mode 100644
index 0000000000..222fb3b4e0
--- /dev/null
+++ b/src/node/channel.cc
@@ -0,0 +1,155 @@
+#include <malloc.h>
+
+#include <vector>
+
+#include <node.h>
+#include <nan.h>
+#include "grpc/grpc.h"
+#include "grpc/grpc_security.h"
+#include "channel.h"
+#include "credentials.h"
+
+namespace grpc {
+namespace node {
+
+using v8::Arguments;
+using v8::Array;
+using v8::Exception;
+using v8::Function;
+using v8::FunctionTemplate;
+using v8::Handle;
+using v8::HandleScope;
+using v8::Integer;
+using v8::Local;
+using v8::Object;
+using v8::Persistent;
+using v8::String;
+using v8::Value;
+
+Persistent<Function> Channel::constructor;
+Persistent<FunctionTemplate> Channel::fun_tpl;
+
+Channel::Channel(grpc_channel *channel, NanUtf8String *host)
+ : wrapped_channel(channel), host(host) {
+}
+
+Channel::~Channel() {
+ if (wrapped_channel != NULL) {
+ grpc_channel_destroy(wrapped_channel);
+ }
+ delete host;
+}
+
+void Channel::Init(Handle<Object> exports) {
+ NanScope();
+ Local<FunctionTemplate> tpl = FunctionTemplate::New(New);
+ tpl->SetClassName(NanNew("Channel"));
+ tpl->InstanceTemplate()->SetInternalFieldCount(1);
+ NanSetPrototypeTemplate(tpl, "close",
+ FunctionTemplate::New(Close)->GetFunction());
+ NanAssignPersistent(fun_tpl, tpl);
+ NanAssignPersistent(constructor, tpl->GetFunction());
+ exports->Set(NanNew("Channel"), constructor);
+}
+
+bool Channel::HasInstance(Handle<Value> val) {
+ NanScope();
+ return NanHasInstance(fun_tpl, val);
+}
+
+grpc_channel *Channel::GetWrappedChannel() {
+ return this->wrapped_channel;
+}
+
+char *Channel::GetHost() {
+ return **this->host;
+}
+
+NAN_METHOD(Channel::New) {
+ NanScope();
+
+ if (args.IsConstructCall()) {
+ if (!args[0]->IsString()) {
+ return NanThrowTypeError("Channel expects a string and an object");
+ }
+ grpc_channel *wrapped_channel;
+ // Owned by the Channel object
+ NanUtf8String *host = new NanUtf8String(args[0]);
+ if (args[1]->IsUndefined()) {
+ wrapped_channel = grpc_channel_create(**host, NULL);
+ } else if (args[1]->IsObject()) {
+ grpc_credentials *creds = NULL;
+ Handle<Object> args_hash(args[1]->ToObject()->Clone());
+ if (args_hash->HasOwnProperty(NanNew("credentials"))) {
+ Handle<Value> creds_value = args_hash->Get(NanNew("credentials"));
+ if (!Credentials::HasInstance(creds_value)) {
+ return NanThrowTypeError(
+ "credentials arg must be a Credentials object");
+ }
+ Credentials *creds_object = ObjectWrap::Unwrap<Credentials>(
+ creds_value->ToObject());
+ creds = creds_object->GetWrappedCredentials();
+ args_hash->Delete(NanNew("credentials"));
+ }
+ Handle<Array> keys(args_hash->GetOwnPropertyNames());
+ grpc_channel_args channel_args;
+ channel_args.num_args = keys->Length();
+ channel_args.args = reinterpret_cast<grpc_arg*>(
+ calloc(channel_args.num_args, sizeof(grpc_arg)));
+ /* These are used to keep all strings until then end of the block, then
+ destroy them */
+ std::vector<NanUtf8String*> key_strings(keys->Length());
+ std::vector<NanUtf8String*> value_strings(keys->Length());
+ for (unsigned int i = 0; i < channel_args.num_args; i++) {
+ Handle<String> current_key(keys->Get(i)->ToString());
+ Handle<Value> current_value(args_hash->Get(current_key));
+ key_strings[i] = new NanUtf8String(current_key);
+ channel_args.args[i].key = **key_strings[i];
+ if (current_value->IsInt32()) {
+ channel_args.args[i].type = GRPC_ARG_INTEGER;
+ channel_args.args[i].value.integer = current_value->Int32Value();
+ } else if (current_value->IsString()) {
+ channel_args.args[i].type = GRPC_ARG_STRING;
+ value_strings[i] = new NanUtf8String(current_value);
+ channel_args.args[i].value.string = **value_strings[i];
+ } else {
+ free(channel_args.args);
+ return NanThrowTypeError("Arg values must be strings");
+ }
+ }
+ if (creds == NULL) {
+ wrapped_channel = grpc_channel_create(**host, &channel_args);
+ } else {
+ wrapped_channel = grpc_secure_channel_create(creds,
+ **host,
+ &channel_args);
+ }
+ free(channel_args.args);
+ } else {
+ return NanThrowTypeError("Channel expects a string and an object");
+ }
+ Channel *channel = new Channel(wrapped_channel, host);
+ channel->Wrap(args.This());
+ NanReturnValue(args.This());
+ } else {
+ const int argc = 2;
+ Local<Value> argv[argc] = { args[0], args[1] };
+ NanReturnValue(constructor->NewInstance(argc, argv));
+ }
+}
+
+NAN_METHOD(Channel::Close) {
+ NanScope();
+ if (!HasInstance(args.This())) {
+ return NanThrowTypeError("close can only be called on Channel objects");
+ }
+ Channel *channel = ObjectWrap::Unwrap<Channel>(args.This());
+ if (channel->wrapped_channel != NULL) {
+ grpc_channel_destroy(channel->wrapped_channel);
+ channel->wrapped_channel = NULL;
+ }
+ NanReturnUndefined();
+}
+
+} // namespace node
+} // namespace grpc
diff --git a/src/node/channel.h b/src/node/channel.h
new file mode 100644
index 0000000000..fbfb83874c
--- /dev/null
+++ b/src/node/channel.h
@@ -0,0 +1,46 @@
+#ifndef NET_GRPC_NODE_CHANNEL_H_
+#define NET_GRPC_NODE_CHANNEL_H_
+
+#include <node.h>
+#include <nan.h>
+#include "grpc/grpc.h"
+
+namespace grpc {
+namespace node {
+
+/* Wrapper class for grpc_channel structs */
+class Channel : public ::node::ObjectWrap {
+ public:
+ static void Init(v8::Handle<v8::Object> exports);
+ static bool HasInstance(v8::Handle<v8::Value> val);
+ /* This is used to typecheck javascript objects before converting them to
+ this type */
+ static v8::Persistent<v8::Value> prototype;
+
+ /* Returns the grpc_channel struct that this object wraps */
+ grpc_channel *GetWrappedChannel();
+
+ /* Return the hostname that this channel connects to */
+ char *GetHost();
+
+ private:
+ explicit Channel(grpc_channel *channel, NanUtf8String *host);
+ ~Channel();
+
+ // Prevent copying
+ Channel(const Channel&);
+ Channel& operator=(const Channel&);
+
+ static NAN_METHOD(New);
+ static NAN_METHOD(Close);
+ static v8::Persistent<v8::Function> constructor;
+ static v8::Persistent<v8::FunctionTemplate> fun_tpl;
+
+ grpc_channel *wrapped_channel;
+ NanUtf8String *host;
+};
+
+} // namespace node
+} // namespace grpc
+
+#endif // NET_GRPC_NODE_CHANNEL_H_
diff --git a/src/node/client.js b/src/node/client.js
new file mode 100644
index 0000000000..e42e36aad4
--- /dev/null
+++ b/src/node/client.js
@@ -0,0 +1,176 @@
+var grpc = require('bindings')('grpc.node');
+
+var common = require('./common');
+
+var Duplex = require('stream').Duplex;
+var util = require('util');
+
+util.inherits(GrpcClientStream, Duplex);
+
+/**
+ * Class for representing a gRPC client side stream as a Node stream. Extends
+ * from stream.Duplex.
+ * @constructor
+ * @param {grpc.Call} call Call object to proxy
+ * @param {object} options Stream options
+ */
+function GrpcClientStream(call, options) {
+ Duplex.call(this, options);
+ var self = this;
+ // Indicates that we can start reading and have not received a null read
+ var can_read = false;
+ // Indicates that a read is currently pending
+ var reading = false;
+ // Indicates that we can call startWrite
+ var can_write = false;
+ // Indicates that a write is currently pending
+ var writing = false;
+ this._call = call;
+ /**
+ * Callback to handle receiving a READ event. Pushes the data from that event
+ * onto the read queue and starts reading again if applicable.
+ * @param {grpc.Event} event The READ event object
+ */
+ function readCallback(event) {
+ var data = event.data;
+ if (self.push(data)) {
+ if (data == null) {
+ // Disable starting to read after null read was received
+ can_read = false;
+ reading = false;
+ } else {
+ call.startRead(readCallback);
+ }
+ } else {
+ // Indicate that reading can be resumed by calling startReading
+ reading = false;
+ }
+ };
+ /**
+ * Initiate a read, which continues until self.push returns false (indicating
+ * that reading should be paused) or data is null (indicating that there is no
+ * more data to read).
+ */
+ function startReading() {
+ call.startRead(readCallback);
+ }
+ // TODO(mlumish): possibly change queue implementation due to shift slowness
+ var write_queue = [];
+ /**
+ * Write the next chunk of data in the write queue if there is one. Otherwise
+ * indicate that there is no pending write. When the write succeeds, this
+ * function is called again.
+ */
+ function writeNext() {
+ if (write_queue.length > 0) {
+ writing = true;
+ var next = write_queue.shift();
+ var writeCallback = function(event) {
+ next.callback();
+ writeNext();
+ };
+ call.startWrite(next.chunk, writeCallback, 0);
+ } else {
+ writing = false;
+ }
+ }
+ call.startInvoke(function(event) {
+ can_read = true;
+ can_write = true;
+ startReading();
+ writeNext();
+ }, function(event) {
+ self.emit('metadata', event.data);
+ }, function(event) {
+ self.emit('status', event.data);
+ }, 0);
+ this.on('finish', function() {
+ call.writesDone(function() {});
+ });
+ /**
+ * Indicate that reads should start, and start them if the INVOKE_ACCEPTED
+ * event has been received.
+ */
+ this._enableRead = function() {
+ if (!reading) {
+ reading = true;
+ if (can_read) {
+ startReading();
+ }
+ }
+ };
+ /**
+ * Push the chunk onto the write queue, and write from the write queue if
+ * there is not a pending write
+ * @param {Buffer} chunk The chunk of data to write
+ * @param {function(Error=)} callback The callback to call when the write
+ * completes
+ */
+ this._tryWrite = function(chunk, callback) {
+ write_queue.push({chunk: chunk, callback: callback});
+ if (can_write && !writing) {
+ writeNext();
+ }
+ };
+}
+
+/**
+ * Start reading. This is an implementation of a method needed for implementing
+ * stream.Readable.
+ * @param {number} size Ignored
+ */
+GrpcClientStream.prototype._read = function(size) {
+ this._enableRead();
+};
+
+/**
+ * Attempt to write the given chunk. Calls the callback when done. This is an
+ * implementation of a method needed for implementing stream.Writable.
+ * @param {Buffer} chunk The chunk to write
+ * @param {string} encoding Ignored
+ * @param {function(Error=)} callback Ignored
+ */
+GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
+ this._tryWrite(chunk, callback);
+};
+
+/**
+ * Make a request on the channel to the given method with the given arguments
+ * @param {grpc.Channel} channel The channel on which to make the request
+ * @param {string} method The method to request
+ * @param {array=} metadata Array of metadata key/value pairs to add to the call
+ * @param {(number|Date)=} deadline The deadline for processing this request.
+ * Defaults to infinite future.
+ * @return {stream=} The stream of responses
+ */
+function makeRequest(channel,
+ method,
+ metadata,
+ deadline) {
+ if (deadline === undefined) {
+ deadline = Infinity;
+ }
+ var call = new grpc.Call(channel, method, deadline);
+ if (metadata) {
+ call.addMetadata(metadata);
+ }
+ return new GrpcClientStream(call);
+}
+
+/**
+ * See documentation for makeRequest above
+ */
+exports.makeRequest = makeRequest;
+
+/**
+ * Represents a client side gRPC channel associated with a single host.
+ */
+exports.Channel = grpc.Channel;
+/**
+ * Status name to code number mapping
+ */
+exports.status = grpc.status;
+/**
+ * Call error name to code number mapping
+ */
+exports.callError = grpc.callError;
diff --git a/src/node/common.js b/src/node/common.js
new file mode 100644
index 0000000000..b856bf6387
--- /dev/null
+++ b/src/node/common.js
@@ -0,0 +1,29 @@
+var _ = require('highland');
+
+/**
+ * When the given stream finishes without error, call the callback once. This
+ * will not be called until something begins to consume the stream.
+ * @param {function} callback The callback to call at stream end
+ * @param {stream} source The stream to watch
+ * @return {stream} The stream with the callback attached
+ */
+function onSuccessfulStreamEnd(callback, source) {
+ var error = false;
+ return source.consume(function(err, x, push, next) {
+ if (x === _.nil) {
+ if (!error) {
+ callback();
+ }
+ push(null, x);
+ } else if (err) {
+ error = true;
+ push(err);
+ next();
+ } else {
+ push(err, x);
+ next();
+ }
+ });
+}
+
+exports.onSuccessfulStreamEnd = onSuccessfulStreamEnd;
diff --git a/src/node/completion_queue_async_worker.cc b/src/node/completion_queue_async_worker.cc
new file mode 100644
index 0000000000..d8156654cd
--- /dev/null
+++ b/src/node/completion_queue_async_worker.cc
@@ -0,0 +1,62 @@
+#include <node.h>
+#include <nan.h>
+
+#include "grpc/grpc.h"
+#include "grpc/support/time.h"
+#include "completion_queue_async_worker.h"
+#include "event.h"
+#include "tag.h"
+
+namespace grpc {
+namespace node {
+
+using v8::Function;
+using v8::Handle;
+using v8::Object;
+using v8::Persistent;
+using v8::Value;
+
+grpc_completion_queue *CompletionQueueAsyncWorker::queue;
+
+CompletionQueueAsyncWorker::CompletionQueueAsyncWorker() :
+ NanAsyncWorker(NULL) {
+}
+
+CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {
+}
+
+void CompletionQueueAsyncWorker::Execute() {
+ result = grpc_completion_queue_next(queue, gpr_inf_future);
+}
+
+grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() {
+ return queue;
+}
+
+void CompletionQueueAsyncWorker::Next() {
+ NanScope();
+ CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
+ NanAsyncQueueWorker(worker);
+}
+
+void CompletionQueueAsyncWorker::Init(Handle<Object> exports) {
+ NanScope();
+ queue = grpc_completion_queue_create();
+}
+
+void CompletionQueueAsyncWorker::HandleOKCallback() {
+ NanScope();
+ NanCallback event_callback(GetTagHandle(result->tag).As<Function>());
+ Handle<Value> argv[] = {
+ CreateEventObject(result)
+ };
+
+ DestroyTag(result->tag);
+ grpc_event_finish(result);
+ result = NULL;
+
+ event_callback.Call(1, argv);
+}
+
+} // namespace node
+} // namespace grpc
diff --git a/src/node/completion_queue_async_worker.h b/src/node/completion_queue_async_worker.h
new file mode 100644
index 0000000000..9e6d03b552
--- /dev/null
+++ b/src/node/completion_queue_async_worker.h
@@ -0,0 +1,46 @@
+#ifndef NET_GRPC_NODE_COMPLETION_QUEUE_ASYNC_WORKER_H_
+#define NET_GRPC_NODE_COMPLETION_QUEUE_ASYNC_WORKER_H_
+#include <nan.h>
+
+#include "grpc/grpc.h"
+
+namespace grpc {
+namespace node {
+
+/* A worker that asynchronously calls completion_queue_next, and queues onto the
+ node event loop a call to the function stored in the event's tag. */
+class CompletionQueueAsyncWorker : public NanAsyncWorker {
+ public:
+ CompletionQueueAsyncWorker();
+
+ ~CompletionQueueAsyncWorker();
+ /* Calls completion_queue_next with the provided deadline, and stores the
+ event if there was one or sets an error message if there was not */
+ void Execute();
+
+ /* Returns the completion queue attached to this class */
+ static grpc_completion_queue *GetQueue();
+
+ /* Convenience function to create a worker with the given arguments and queue
+ it to run asynchronously */
+ static void Next();
+
+ /* Initialize the CompletionQueueAsyncWorker class */
+ static void Init(v8::Handle<v8::Object> exports);
+
+ protected:
+ /* Called when Execute has succeeded (completed without setting an error
+ message). Calls the saved callback with the event that came from
+ completion_queue_next */
+ void HandleOKCallback();
+
+ private:
+ grpc_event *result;
+
+ static grpc_completion_queue *queue;
+};
+
+} // namespace node
+} // namespace grpc
+
+#endif // NET_GRPC_NODE_COMPLETION_QUEUE_ASYNC_WORKER_H_
diff --git a/src/node/credentials.cc b/src/node/credentials.cc
new file mode 100644
index 0000000000..a5743d7a09
--- /dev/null
+++ b/src/node/credentials.cc
@@ -0,0 +1,180 @@
+#include <node.h>
+
+#include "grpc/grpc.h"
+#include "grpc/grpc_security.h"
+#include "grpc/support/log.h"
+#include "credentials.h"
+
+namespace grpc {
+namespace node {
+
+using ::node::Buffer;
+using v8::Arguments;
+using v8::Exception;
+using v8::External;
+using v8::Function;
+using v8::FunctionTemplate;
+using v8::Handle;
+using v8::HandleScope;
+using v8::Integer;
+using v8::Local;
+using v8::Object;
+using v8::ObjectTemplate;
+using v8::Persistent;
+using v8::Value;
+
+Persistent<Function> Credentials::constructor;
+Persistent<FunctionTemplate> Credentials::fun_tpl;
+
+Credentials::Credentials(grpc_credentials *credentials)
+ : wrapped_credentials(credentials) {
+}
+
+Credentials::~Credentials() {
+ gpr_log(GPR_DEBUG, "Destroying credentials object");
+ grpc_credentials_release(wrapped_credentials);
+}
+
+void Credentials::Init(Handle<Object> exports) {
+ NanScope();
+ Local<FunctionTemplate> tpl = FunctionTemplate::New(New);
+ tpl->SetClassName(NanNew("Credentials"));
+ tpl->InstanceTemplate()->SetInternalFieldCount(1);
+ NanAssignPersistent(fun_tpl, tpl);
+ NanAssignPersistent(constructor, tpl->GetFunction());
+ constructor->Set(NanNew("createDefault"),
+ FunctionTemplate::New(CreateDefault)->GetFunction());
+ constructor->Set(NanNew("createSsl"),
+ FunctionTemplate::New(CreateSsl)->GetFunction());
+ constructor->Set(NanNew("createComposite"),
+ FunctionTemplate::New(CreateComposite)->GetFunction());
+ constructor->Set(NanNew("createGce"),
+ FunctionTemplate::New(CreateGce)->GetFunction());
+ constructor->Set(NanNew("createFake"),
+ FunctionTemplate::New(CreateFake)->GetFunction());
+ constructor->Set(NanNew("createIam"),
+ FunctionTemplate::New(CreateIam)->GetFunction());
+ exports->Set(NanNew("Credentials"), constructor);
+}
+
+bool Credentials::HasInstance(Handle<Value> val) {
+ NanScope();
+ return NanHasInstance(fun_tpl, val);
+}
+
+Handle<Value> Credentials::WrapStruct(grpc_credentials *credentials) {
+ NanEscapableScope();
+ if (credentials == NULL) {
+ return NanEscapeScope(NanNull());
+ }
+ const int argc = 1;
+ Handle<Value> argv[argc] = {
+ External::New(reinterpret_cast<void*>(credentials)) };
+ return NanEscapeScope(constructor->NewInstance(argc, argv));
+}
+
+grpc_credentials *Credentials::GetWrappedCredentials() {
+ return wrapped_credentials;
+}
+
+NAN_METHOD(Credentials::New) {
+ NanScope();
+
+ if (args.IsConstructCall()) {
+ if (!args[0]->IsExternal()) {
+ return NanThrowTypeError(
+ "Credentials can only be created with the provided functions");
+ }
+ grpc_credentials *creds_value = reinterpret_cast<grpc_credentials*>(
+ External::Unwrap(args[0]));
+ Credentials *credentials = new Credentials(creds_value);
+ credentials->Wrap(args.This());
+ NanReturnValue(args.This());
+ } else {
+ const int argc = 1;
+ Local<Value> argv[argc] = { args[0] };
+ NanReturnValue(constructor->NewInstance(argc, argv));
+ }
+}
+
+NAN_METHOD(Credentials::CreateDefault) {
+ NanScope();
+ NanReturnValue(WrapStruct(grpc_default_credentials_create()));
+}
+
+NAN_METHOD(Credentials::CreateSsl) {
+ NanScope();
+ char *root_certs;
+ char *private_key = NULL;
+ char *cert_chain = NULL;
+ int root_certs_length, private_key_length = 0, cert_chain_length = 0;
+ if (!Buffer::HasInstance(args[0])) {
+ return NanThrowTypeError(
+ "createSsl's first argument must be a Buffer");
+ }
+ root_certs = Buffer::Data(args[0]);
+ root_certs_length = Buffer::Length(args[0]);
+ if (Buffer::HasInstance(args[1])) {
+ private_key = Buffer::Data(args[1]);
+ private_key_length = Buffer::Length(args[1]);
+ } else if (!(args[1]->IsNull() || args[1]->IsUndefined())) {
+ return NanThrowTypeError(
+ "createSSl's second argument must be a Buffer if provided");
+ }
+ if (Buffer::HasInstance(args[2])) {
+ cert_chain = Buffer::Data(args[2]);
+ cert_chain_length = Buffer::Length(args[2]);
+ } else if (!(args[2]->IsNull() || args[2]->IsUndefined())) {
+ return NanThrowTypeError(
+ "createSSl's third argument must be a Buffer if provided");
+ }
+ NanReturnValue(WrapStruct(grpc_ssl_credentials_create(
+ reinterpret_cast<unsigned char*>(root_certs), root_certs_length,
+ reinterpret_cast<unsigned char*>(private_key), private_key_length,
+ reinterpret_cast<unsigned char*>(cert_chain), cert_chain_length)));
+}
+
+NAN_METHOD(Credentials::CreateComposite) {
+ NanScope();
+ if (!HasInstance(args[0])) {
+ return NanThrowTypeError(
+ "createComposite's first argument must be a Credentials object");
+ }
+ if (!HasInstance(args[1])) {
+ return NanThrowTypeError(
+ "createComposite's second argument must be a Credentials object");
+ }
+ Credentials *creds1 = ObjectWrap::Unwrap<Credentials>(args[0]->ToObject());
+ Credentials *creds2 = ObjectWrap::Unwrap<Credentials>(args[1]->ToObject());
+ NanReturnValue(WrapStruct(grpc_composite_credentials_create(
+ creds1->wrapped_credentials, creds2->wrapped_credentials)));
+}
+
+NAN_METHOD(Credentials::CreateGce) {
+ NanScope();
+ NanReturnValue(WrapStruct(grpc_compute_engine_credentials_create()));
+}
+
+NAN_METHOD(Credentials::CreateFake) {
+ NanScope();
+ NanReturnValue(WrapStruct(grpc_fake_transport_security_credentials_create()));
+}
+
+NAN_METHOD(Credentials::CreateIam) {
+ NanScope();
+ if (!args[0]->IsString()) {
+ return NanThrowTypeError(
+ "createIam's first argument must be a string");
+ }
+ if (!args[1]->IsString()) {
+ return NanThrowTypeError(
+ "createIam's second argument must be a string");
+ }
+ NanUtf8String auth_token(args[0]);
+ NanUtf8String auth_selector(args[1]);
+ NanReturnValue(WrapStruct(grpc_iam_credentials_create(*auth_token,
+ *auth_selector)));
+}
+
+} // namespace node
+} // namespace grpc
diff --git a/src/node/credentials.h b/src/node/credentials.h
new file mode 100644
index 0000000000..f30cc7373e
--- /dev/null
+++ b/src/node/credentials.h
@@ -0,0 +1,48 @@
+#ifndef NET_GRPC_NODE_CREDENTIALS_H_
+#define NET_GRPC_NODE_CREDENTIALS_H_
+
+#include <node.h>
+#include <nan.h>
+#include "grpc/grpc.h"
+#include "grpc/grpc_security.h"
+
+namespace grpc {
+namespace node {
+
+/* Wrapper class for grpc_credentials structs */
+class Credentials : public ::node::ObjectWrap {
+ public:
+ static void Init(v8::Handle<v8::Object> exports);
+ static bool HasInstance(v8::Handle<v8::Value> val);
+ /* Wrap a grpc_credentials struct in a javascript object */
+ static v8::Handle<v8::Value> WrapStruct(grpc_credentials *credentials);
+
+ /* Returns the grpc_credentials struct that this object wraps */
+ grpc_credentials *GetWrappedCredentials();
+
+ private:
+ explicit Credentials(grpc_credentials *credentials);
+ ~Credentials();
+
+ // Prevent copying
+ Credentials(const Credentials&);
+ Credentials& operator=(const Credentials&);
+
+ static NAN_METHOD(New);
+ static NAN_METHOD(CreateDefault);
+ static NAN_METHOD(CreateSsl);
+ static NAN_METHOD(CreateComposite);
+ static NAN_METHOD(CreateGce);
+ static NAN_METHOD(CreateFake);
+ static NAN_METHOD(CreateIam);
+ static v8::Persistent<v8::Function> constructor;
+ // Used for typechecking instances of this javascript class
+ static v8::Persistent<v8::FunctionTemplate> fun_tpl;
+
+ grpc_credentials *wrapped_credentials;
+};
+
+} // namespace node
+} // namespace grpc
+
+#endif // NET_GRPC_NODE_CREDENTIALS_H_
diff --git a/src/node/event.cc b/src/node/event.cc
new file mode 100644
index 0000000000..0eb4a54c54
--- /dev/null
+++ b/src/node/event.cc
@@ -0,0 +1,132 @@
+#include <node.h>
+#include <nan.h>
+#include "grpc/grpc.h"
+#include "byte_buffer.h"
+#include "call.h"
+#include "event.h"
+#include "tag.h"
+#include "timeval.h"
+
+namespace grpc {
+namespace node {
+
+using v8::Array;
+using v8::Date;
+using v8::Handle;
+using v8::HandleScope;
+using v8::Number;
+using v8::Object;
+using v8::Persistent;
+using v8::String;
+using v8::Value;
+
+Handle<Value> GetEventData(grpc_event *event) {
+ NanEscapableScope();
+ size_t count;
+ grpc_metadata *items;
+ Handle<Array> metadata;
+ Handle<Object> status;
+ Handle<Object> rpc_new;
+ switch (event->type) {
+ case GRPC_READ:
+ return NanEscapeScope(ByteBufferToBuffer(event->data.read));
+ case GRPC_INVOKE_ACCEPTED:
+ return NanEscapeScope(NanNew<Number>(event->data.invoke_accepted));
+ case GRPC_WRITE_ACCEPTED:
+ return NanEscapeScope(NanNew<Number>(event->data.write_accepted));
+ case GRPC_FINISH_ACCEPTED:
+ return NanEscapeScope(NanNew<Number>(event->data.finish_accepted));
+ case GRPC_CLIENT_METADATA_READ:
+ count = event->data.client_metadata_read.count;
+ items = event->data.client_metadata_read.elements;
+ metadata = NanNew<Array>(static_cast<int>(count));
+ for (unsigned int i = 0; i < count; i++) {
+ Handle<Object> item_obj = NanNew<Object>();
+ item_obj->Set(NanNew<String, const char *>("key"),
+ NanNew<String, char *>(items[i].key));
+ item_obj->Set(NanNew<String, const char *>("value"),
+ NanNew<String, char *>(
+ items[i].value,
+ static_cast<int>(items[i].value_length)));
+ metadata->Set(i, item_obj);
+ }
+ return NanEscapeScope(metadata);
+ case GRPC_FINISHED:
+ status = NanNew<Object>();
+ status->Set(NanNew("code"), NanNew<Number>(
+ event->data.finished.status));
+ if (event->data.finished.details != NULL) {
+ status->Set(NanNew("details"), String::New(
+ event->data.finished.details));
+ }
+ count = event->data.finished.metadata_count;
+ items = event->data.finished.metadata_elements;
+ metadata = NanNew<Array>(static_cast<int>(count));
+ for (unsigned int i = 0; i < count; i++) {
+ Handle<Object> item_obj = NanNew<Object>();
+ item_obj->Set(NanNew<String, const char *>("key"),
+ NanNew<String, char *>(items[i].key));
+ item_obj->Set(NanNew<String, const char *>("value"),
+ NanNew<String, char *>(
+ items[i].value,
+ static_cast<int>(items[i].value_length)));
+ metadata->Set(i, item_obj);
+ }
+ status->Set(NanNew("metadata"), metadata);
+ return NanEscapeScope(status);
+ case GRPC_SERVER_RPC_NEW:
+ rpc_new = NanNew<Object>();
+ if (event->data.server_rpc_new.method == NULL) {
+ return NanEscapeScope(NanNull());
+ }
+ rpc_new->Set(NanNew<String, const char *>("method"),
+ NanNew<String, const char *>(
+ event->data.server_rpc_new.method));
+ rpc_new->Set(NanNew<String, const char *>("host"),
+ NanNew<String, const char *>(
+ event->data.server_rpc_new.host));
+ rpc_new->Set(NanNew<String, const char *>("absolute_deadline"),
+ NanNew<Date>(TimespecToMilliseconds(
+ event->data.server_rpc_new.deadline)));
+ count = event->data.server_rpc_new.metadata_count;
+ items = event->data.server_rpc_new.metadata_elements;
+ metadata = NanNew<Array>(static_cast<int>(count));
+ for (unsigned int i = 0; i < count; i++) {
+ Handle<Object> item_obj = Object::New();
+ item_obj->Set(NanNew<String, const char *>("key"),
+ NanNew<String, char *>(items[i].key));
+ item_obj->Set(NanNew<String, const char *>("value"),
+ NanNew<String, char *>(
+ items[i].value,
+ static_cast<int>(items[i].value_length)));
+ metadata->Set(i, item_obj);
+ }
+ rpc_new->Set(NanNew<String, const char *>("metadata"), metadata);
+ return NanEscapeScope(rpc_new);
+ default:
+ return NanEscapeScope(NanNull());
+ }
+}
+
+Handle<Value> CreateEventObject(grpc_event *event) {
+ NanEscapableScope();
+ if (event == NULL) {
+ return NanEscapeScope(NanNull());
+ }
+ Handle<Object> event_obj = NanNew<Object>();
+ Handle<Value> call;
+ if (TagHasCall(event->tag)) {
+ call = TagGetCall(event->tag);
+ } else {
+ call = Call::WrapStruct(event->call);
+ }
+ event_obj->Set(NanNew<String, const char *>("call"), call);
+ event_obj->Set(NanNew<String, const char *>("type"),
+ NanNew<Number>(event->type));
+ event_obj->Set(NanNew<String, const char *>("data"), GetEventData(event));
+
+ return NanEscapeScope(event_obj);
+}
+
+} // namespace node
+} // namespace grpc
diff --git a/src/node/event.h b/src/node/event.h
new file mode 100644
index 0000000000..cbc4b9c311
--- /dev/null
+++ b/src/node/event.h
@@ -0,0 +1,15 @@
+#ifndef NET_GRPC_NODE_EVENT_H_
+#define NET_GRPC_NODE_EVENT_H_
+
+#include <node.h>
+#include "grpc/grpc.h"
+
+namespace grpc {
+namespace node {
+
+v8::Handle<v8::Value> CreateEventObject(grpc_event *event);
+
+} // namespace node
+} // namespace grpc
+
+#endif // NET_GRPC_NODE_EVENT_H_
diff --git a/src/node/examples/math.proto b/src/node/examples/math.proto
new file mode 100644
index 0000000000..14eff5daaf
--- /dev/null
+++ b/src/node/examples/math.proto
@@ -0,0 +1,25 @@
+syntax = "proto2";
+
+package math;
+
+message DivArgs {
+ required int64 dividend = 1;
+ required int64 divisor = 2;
+}
+
+message DivReply {
+ required int64 quotient = 1;
+ required int64 remainder = 2;
+}
+
+message FibArgs {
+ optional int64 limit = 1;
+}
+
+message Num {
+ required int64 num = 1;
+}
+
+message FibReply {
+ required int64 count = 1;
+} \ No newline at end of file
diff --git a/src/node/examples/math_server.js b/src/node/examples/math_server.js
new file mode 100644
index 0000000000..ebbeeb8763
--- /dev/null
+++ b/src/node/examples/math_server.js
@@ -0,0 +1,168 @@
+var _ = require('underscore');
+var ProtoBuf = require('protobufjs');
+var fs = require('fs');
+var util = require('util');
+
+var Transform = require('stream').Transform;
+
+var builder = ProtoBuf.loadProtoFile(__dirname + '/math.proto');
+var math = builder.build('math');
+
+var makeConstructor = require('../surface_server.js').makeServerConstructor;
+
+/**
+ * Get a function that deserializes a specific type of protobuf.
+ * @param {function()} cls The constructor of the message type to deserialize
+ * @return {function(Buffer):cls} The deserialization function
+ */
+function deserializeCls(cls) {
+ /**
+ * Deserialize a buffer to a message object
+ * @param {Buffer} arg_buf The buffer to deserialize
+ * @return {cls} The resulting object
+ */
+ return function deserialize(arg_buf) {
+ return cls.decode(arg_buf);
+ };
+}
+
+/**
+ * Get a function that serializes objects to a buffer by protobuf class.
+ * @param {function()} Cls The constructor of the message type to serialize
+ * @return {function(Cls):Buffer} The serialization function
+ */
+function serializeCls(Cls) {
+ /**
+ * Serialize an object to a Buffer
+ * @param {Object} arg The object to serialize
+ * @return {Buffer} The serialized object
+ */
+ return function serialize(arg) {
+ return new Buffer(new Cls(arg).encode().toBuffer());
+ };
+}
+
+/* This function call creates a server constructor for servers that that expose
+ * the four specified methods. This specifies how to serialize messages that the
+ * server sends and deserialize messages that the client sends, and whether the
+ * client or the server will send a stream of messages, for each method. This
+ * also specifies a prefix that will be added to method names when sending them
+ * on the wire. This function call and all of the preceding code in this file
+ * are intended to approximate what the generated code will look like for the
+ * math service */
+var Server = makeConstructor({
+ Div: {
+ serialize: serializeCls(math.DivReply),
+ deserialize: deserializeCls(math.DivArgs),
+ client_stream: false,
+ server_stream: false
+ },
+ Fib: {
+ serialize: serializeCls(math.Num),
+ deserialize: deserializeCls(math.FibArgs),
+ client_stream: false,
+ server_stream: true
+ },
+ Sum: {
+ serialize: serializeCls(math.Num),
+ deserialize: deserializeCls(math.Num),
+ client_stream: true,
+ server_stream: false
+ },
+ DivMany: {
+ serialize: serializeCls(math.DivReply),
+ deserialize: deserializeCls(math.DivArgs),
+ client_stream: true,
+ server_stream: true
+ }
+}, '/Math/');
+
+/**
+ * Server function for division. Provides the /Math/DivMany and /Math/Div
+ * functions (Div is just DivMany with only one stream element). For each
+ * DivArgs parameter, responds with a DivReply with the results of the division
+ * @param {Object} call The object containing request and cancellation info
+ * @param {function(Error, *)} cb Response callback
+ */
+function mathDiv(call, cb) {
+ var req = call.request;
+ if (req.divisor == 0) {
+ cb(new Error('cannot divide by zero'));
+ }
+ cb(null, {
+ quotient: req.dividend / req.divisor,
+ remainder: req.dividend % req.divisor
+ });
+}
+
+/**
+ * Server function for Fibonacci numbers. Provides the /Math/Fib function. Reads
+ * a single parameter that indicates the number of responses, and then responds
+ * with a stream of that many Fibonacci numbers.
+ * @param {stream} stream The stream for sending responses.
+ */
+function mathFib(stream) {
+ // Here, call is a standard writable Node object Stream
+ var previous = 0, current = 1;
+ for (var i = 0; i < stream.request.limit; i++) {
+ stream.write({num: current});
+ var temp = current;
+ current += previous;
+ previous = temp;
+ }
+ stream.end();
+}
+
+/**
+ * Server function for summation. Provides the /Math/Sum function. Reads a
+ * stream of number parameters, then responds with their sum.
+ * @param {stream} call The stream of arguments.
+ * @param {function(Error, *)} cb Response callback
+ */
+function mathSum(call, cb) {
+ // Here, call is a standard readable Node object Stream
+ var sum = 0;
+ call.on('data', function(data) {
+ sum += data.num | 0;
+ });
+ call.on('end', function() {
+ cb(null, {num: sum});
+ });
+}
+
+function mathDivMany(stream) {
+ // Here, call is a standard duplex Node object Stream
+ util.inherits(DivTransform, Transform);
+ function DivTransform() {
+ var options = {objectMode: true};
+ Transform.call(this, options);
+ }
+ DivTransform.prototype._transform = function(div_args, encoding, callback) {
+ if (div_args.divisor == 0) {
+ callback(new Error('cannot divide by zero'));
+ }
+ callback(null, {
+ quotient: div_args.dividend / div_args.divisor,
+ remainder: div_args.dividend % div_args.divisor
+ });
+ };
+ var transform = new DivTransform();
+ stream.pipe(transform);
+ transform.pipe(stream);
+}
+
+var server = new Server({
+ Div: mathDiv,
+ Fib: mathFib,
+ Sum: mathSum,
+ DivMany: mathDivMany
+});
+
+if (require.main === module) {
+ server.bind('localhost:7070').listen();
+}
+
+/**
+ * See docs for server
+ */
+module.exports = server;
diff --git a/src/node/node_grpc.cc b/src/node/node_grpc.cc
new file mode 100644
index 0000000000..fb9e060693
--- /dev/null
+++ b/src/node/node_grpc.cc
@@ -0,0 +1,151 @@
+#include <node.h>
+#include <nan.h>
+#include <v8.h>
+#include "grpc/grpc.h"
+
+#include "call.h"
+#include "channel.h"
+#include "event.h"
+#include "server.h"
+#include "completion_queue_async_worker.h"
+#include "credentials.h"
+#include "server_credentials.h"
+
+using v8::Handle;
+using v8::Value;
+using v8::Object;
+using v8::Uint32;
+using v8::String;
+
+void InitStatusConstants(Handle<Object> exports) {
+ NanScope();
+ Handle<Object> status = Object::New();
+ exports->Set(NanNew("status"), status);
+ Handle<Value> OK(NanNew<Uint32, uint32_t>(GRPC_STATUS_OK));
+ status->Set(NanNew("OK"), OK);
+ Handle<Value> CANCELLED(NanNew<Uint32, uint32_t>(GRPC_STATUS_CANCELLED));
+ status->Set(NanNew("CANCELLED"), CANCELLED);
+ Handle<Value> UNKNOWN(NanNew<Uint32, uint32_t>(GRPC_STATUS_UNKNOWN));
+ status->Set(NanNew("UNKNOWN"), UNKNOWN);
+ Handle<Value> INVALID_ARGUMENT(
+ NanNew<Uint32, uint32_t>(GRPC_STATUS_INVALID_ARGUMENT));
+ status->Set(NanNew("INVALID_ARGUMENT"), INVALID_ARGUMENT);
+ Handle<Value> DEADLINE_EXCEEDED(
+ NanNew<Uint32, uint32_t>(GRPC_STATUS_DEADLINE_EXCEEDED));
+ status->Set(NanNew("DEADLINE_EXCEEDED"), DEADLINE_EXCEEDED);
+ Handle<Value> NOT_FOUND(NanNew<Uint32, uint32_t>(GRPC_STATUS_NOT_FOUND));
+ status->Set(NanNew("NOT_FOUND"), NOT_FOUND);
+ Handle<Value> ALREADY_EXISTS(
+ NanNew<Uint32, uint32_t>(GRPC_STATUS_ALREADY_EXISTS));
+ status->Set(NanNew("ALREADY_EXISTS"), ALREADY_EXISTS);
+ Handle<Value> PERMISSION_DENIED(
+ NanNew<Uint32, uint32_t>(GRPC_STATUS_PERMISSION_DENIED));
+ status->Set(NanNew("PERMISSION_DENIED"), PERMISSION_DENIED);
+ Handle<Value> UNAUTHENTICATED(
+ NanNew<Uint32, uint32_t>(GRPC_STATUS_UNAUTHENTICATED));
+ status->Set(NanNew("UNAUTHENTICATED"), UNAUTHENTICATED);
+ Handle<Value> RESOURCE_EXHAUSTED(
+ NanNew<Uint32, uint32_t>(GRPC_STATUS_RESOURCE_EXHAUSTED));
+ status->Set(NanNew("RESOURCE_EXHAUSTED"), RESOURCE_EXHAUSTED);
+ Handle<Value> FAILED_PRECONDITION(
+ NanNew<Uint32, uint32_t>(GRPC_STATUS_FAILED_PRECONDITION));
+ status->Set(NanNew("FAILED_PRECONDITION"), FAILED_PRECONDITION);
+ Handle<Value> ABORTED(NanNew<Uint32, uint32_t>(GRPC_STATUS_ABORTED));
+ status->Set(NanNew("ABORTED"), ABORTED);
+ Handle<Value> OUT_OF_RANGE(
+ NanNew<Uint32, uint32_t>(GRPC_STATUS_OUT_OF_RANGE));
+ status->Set(NanNew("OUT_OF_RANGE"), OUT_OF_RANGE);
+ Handle<Value> UNIMPLEMENTED(
+ NanNew<Uint32, uint32_t>(GRPC_STATUS_UNIMPLEMENTED));
+ status->Set(NanNew("UNIMPLEMENTED"), UNIMPLEMENTED);
+ Handle<Value> INTERNAL(NanNew<Uint32, uint32_t>(GRPC_STATUS_INTERNAL));
+ status->Set(NanNew("INTERNAL"), INTERNAL);
+ Handle<Value> UNAVAILABLE(NanNew<Uint32, uint32_t>(GRPC_STATUS_UNAVAILABLE));
+ status->Set(NanNew("UNAVAILABLE"), UNAVAILABLE);
+ Handle<Value> DATA_LOSS(NanNew<Uint32, uint32_t>(GRPC_STATUS_DATA_LOSS));
+ status->Set(NanNew("DATA_LOSS"), DATA_LOSS);
+}
+
+void InitCallErrorConstants(Handle<Object> exports) {
+ NanScope();
+ Handle<Object> call_error = Object::New();
+ exports->Set(NanNew("callError"), call_error);
+ Handle<Value> OK(NanNew<Uint32, uint32_t>(GRPC_CALL_OK));
+ call_error->Set(NanNew("OK"), OK);
+ Handle<Value> ERROR(NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR));
+ call_error->Set(NanNew("ERROR"), ERROR);
+ Handle<Value> NOT_ON_SERVER(
+ NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR_NOT_ON_SERVER));
+ call_error->Set(NanNew("NOT_ON_SERVER"), NOT_ON_SERVER);
+ Handle<Value> NOT_ON_CLIENT(
+ NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR_NOT_ON_CLIENT));
+ call_error->Set(NanNew("NOT_ON_CLIENT"), NOT_ON_CLIENT);
+ Handle<Value> ALREADY_INVOKED(
+ NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR_ALREADY_INVOKED));
+ call_error->Set(NanNew("ALREADY_INVOKED"), ALREADY_INVOKED);
+ Handle<Value> NOT_INVOKED(
+ NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR_NOT_INVOKED));
+ call_error->Set(NanNew("NOT_INVOKED"), NOT_INVOKED);
+ Handle<Value> ALREADY_FINISHED(
+ NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR_ALREADY_FINISHED));
+ call_error->Set(NanNew("ALREADY_FINISHED"), ALREADY_FINISHED);
+ Handle<Value> TOO_MANY_OPERATIONS(
+ NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR_TOO_MANY_OPERATIONS));
+ call_error->Set(NanNew("TOO_MANY_OPERATIONS"),
+ TOO_MANY_OPERATIONS);
+ Handle<Value> INVALID_FLAGS(
+ NanNew<Uint32, uint32_t>(GRPC_CALL_ERROR_INVALID_FLAGS));
+ call_error->Set(NanNew("INVALID_FLAGS"), INVALID_FLAGS);
+}
+
+void InitOpErrorConstants(Handle<Object> exports) {
+ NanScope();
+ Handle<Object> op_error = Object::New();
+ exports->Set(NanNew("opError"), op_error);
+ Handle<Value> OK(NanNew<Uint32, uint32_t>(GRPC_OP_OK));
+ op_error->Set(NanNew("OK"), OK);
+ Handle<Value> ERROR(NanNew<Uint32, uint32_t>(GRPC_OP_ERROR));
+ op_error->Set(NanNew("ERROR"), ERROR);
+}
+
+void InitCompletionTypeConstants(Handle<Object> exports) {
+ NanScope();
+ Handle<Object> completion_type = Object::New();
+ exports->Set(NanNew("completionType"), completion_type);
+ Handle<Value> QUEUE_SHUTDOWN(NanNew<Uint32, uint32_t>(GRPC_QUEUE_SHUTDOWN));
+ completion_type->Set(NanNew("QUEUE_SHUTDOWN"), QUEUE_SHUTDOWN);
+ Handle<Value> READ(NanNew<Uint32, uint32_t>(GRPC_READ));
+ completion_type->Set(NanNew("READ"), READ);
+ Handle<Value> INVOKE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_INVOKE_ACCEPTED));
+ completion_type->Set(NanNew("INVOKE_ACCEPTED"), INVOKE_ACCEPTED);
+ Handle<Value> WRITE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_WRITE_ACCEPTED));
+ completion_type->Set(NanNew("WRITE_ACCEPTED"), WRITE_ACCEPTED);
+ Handle<Value> FINISH_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_FINISH_ACCEPTED));
+ completion_type->Set(NanNew("FINISH_ACCEPTED"), FINISH_ACCEPTED);
+ Handle<Value> CLIENT_METADATA_READ(
+ NanNew<Uint32, uint32_t>(GRPC_CLIENT_METADATA_READ));
+ completion_type->Set(NanNew("CLIENT_METADATA_READ"),
+ CLIENT_METADATA_READ);
+ Handle<Value> FINISHED(NanNew<Uint32, uint32_t>(GRPC_FINISHED));
+ completion_type->Set(NanNew("FINISHED"), FINISHED);
+ Handle<Value> SERVER_RPC_NEW(NanNew<Uint32, uint32_t>(GRPC_SERVER_RPC_NEW));
+ completion_type->Set(NanNew("SERVER_RPC_NEW"), SERVER_RPC_NEW);
+}
+
+void init(Handle<Object> exports) {
+ NanScope();
+ grpc_init();
+ InitStatusConstants(exports);
+ InitCallErrorConstants(exports);
+ InitOpErrorConstants(exports);
+ InitCompletionTypeConstants(exports);
+
+ grpc::node::Call::Init(exports);
+ grpc::node::Channel::Init(exports);
+ grpc::node::Server::Init(exports);
+ grpc::node::CompletionQueueAsyncWorker::Init(exports);
+ grpc::node::Credentials::Init(exports);
+ grpc::node::ServerCredentials::Init(exports);
+}
+
+NODE_MODULE(grpc, init)
diff --git a/src/node/package.json b/src/node/package.json
new file mode 100644
index 0000000000..a2940b29bb
--- /dev/null
+++ b/src/node/package.json
@@ -0,0 +1,18 @@
+{
+ "name": "grpc",
+ "version": "0.1.0",
+ "description": "gRPC Library for Node",
+ "scripts": {
+ "test": "./node_modules/mocha/bin/mocha"
+ },
+ "dependencies": {
+ "bindings": "^1.2.1",
+ "nan": "~1.3.0",
+ "underscore": "^1.7.0"
+ },
+ "devDependencies": {
+ "mocha": "~1.21.0",
+ "highland": "~2.0.0",
+ "protobufjs": "~3.8.0"
+ }
+}
diff --git a/src/node/port_picker.js b/src/node/port_picker.js
new file mode 100644
index 0000000000..7b3cfd74ff
--- /dev/null
+++ b/src/node/port_picker.js
@@ -0,0 +1,19 @@
+var net = require('net');
+
+/**
+ * Finds a free port that a server can bind to, in the format
+ * "address:port"
+ * @param {function(string)} cb The callback that should execute when the port
+ * is available
+ */
+function nextAvailablePort(cb) {
+ var server = net.createServer();
+ server.listen(function() {
+ var address = server.address();
+ server.close(function() {
+ cb(address.address + ':' + address.port.toString());
+ });
+ });
+}
+
+exports.nextAvailablePort = nextAvailablePort;
diff --git a/src/node/server.cc b/src/node/server.cc
new file mode 100644
index 0000000000..0e6c59b47c
--- /dev/null
+++ b/src/node/server.cc
@@ -0,0 +1,212 @@
+#include "server.h"
+
+#include <node.h>
+#include <nan.h>
+
+#include <malloc.h>
+
+#include <vector>
+#include "grpc/grpc.h"
+#include "grpc/grpc_security.h"
+#include "call.h"
+#include "completion_queue_async_worker.h"
+#include "tag.h"
+#include "server_credentials.h"
+
+namespace grpc {
+namespace node {
+
+using v8::Arguments;
+using v8::Array;
+using v8::Boolean;
+using v8::Exception;
+using v8::Function;
+using v8::FunctionTemplate;
+using v8::Handle;
+using v8::HandleScope;
+using v8::Local;
+using v8::Number;
+using v8::Object;
+using v8::Persistent;
+using v8::String;
+using v8::Value;
+
+Persistent<Function> Server::constructor;
+Persistent<FunctionTemplate> Server::fun_tpl;
+
+Server::Server(grpc_server *server) : wrapped_server(server) {
+}
+
+Server::~Server() {
+ grpc_server_destroy(wrapped_server);
+}
+
+void Server::Init(Handle<Object> exports) {
+ NanScope();
+ Local<FunctionTemplate> tpl = FunctionTemplate::New(New);
+ tpl->SetClassName(String::NewSymbol("Server"));
+ tpl->InstanceTemplate()->SetInternalFieldCount(1);
+ NanSetPrototypeTemplate(tpl, "requestCall",
+ FunctionTemplate::New(RequestCall)->GetFunction());
+
+ NanSetPrototypeTemplate(tpl, "addHttp2Port",
+ FunctionTemplate::New(AddHttp2Port)->GetFunction());
+
+ NanSetPrototypeTemplate(tpl, "addSecureHttp2Port",
+ FunctionTemplate::New(
+ AddSecureHttp2Port)->GetFunction());
+
+ NanSetPrototypeTemplate(tpl, "start",
+ FunctionTemplate::New(Start)->GetFunction());
+
+ NanSetPrototypeTemplate(tpl, "shutdown",
+ FunctionTemplate::New(Shutdown)->GetFunction());
+
+ NanAssignPersistent(fun_tpl, tpl);
+ NanAssignPersistent(constructor, tpl->GetFunction());
+ exports->Set(String::NewSymbol("Server"), constructor);
+}
+
+bool Server::HasInstance(Handle<Value> val) {
+ return NanHasInstance(fun_tpl, val);
+}
+
+NAN_METHOD(Server::New) {
+ NanScope();
+
+ /* If this is not a constructor call, make a constructor call and return
+ the result */
+ if (!args.IsConstructCall()) {
+ const int argc = 1;
+ Local<Value> argv[argc] = { args[0] };
+ NanReturnValue(constructor->NewInstance(argc, argv));
+ }
+ grpc_server *wrapped_server;
+ grpc_completion_queue *queue = CompletionQueueAsyncWorker::GetQueue();
+ if (args[0]->IsUndefined()) {
+ wrapped_server = grpc_server_create(queue, NULL);
+ } else if (args[0]->IsObject()) {
+ grpc_server_credentials *creds = NULL;
+ Handle<Object> args_hash(args[0]->ToObject()->Clone());
+ if (args_hash->HasOwnProperty(NanNew("credentials"))) {
+ Handle<Value> creds_value = args_hash->Get(NanNew("credentials"));
+ if (!ServerCredentials::HasInstance(creds_value)) {
+ return NanThrowTypeError(
+ "credentials arg must be a ServerCredentials object");
+ }
+ ServerCredentials *creds_object = ObjectWrap::Unwrap<ServerCredentials>(
+ creds_value->ToObject());
+ creds = creds_object->GetWrappedServerCredentials();
+ args_hash->Delete(NanNew("credentials"));
+ }
+ Handle<Array> keys(args_hash->GetOwnPropertyNames());
+ grpc_channel_args channel_args;
+ channel_args.num_args = keys->Length();
+ channel_args.args = reinterpret_cast<grpc_arg*>(
+ calloc(channel_args.num_args, sizeof(grpc_arg)));
+ /* These are used to keep all strings until then end of the block, then
+ destroy them */
+ std::vector<NanUtf8String*> key_strings(keys->Length());
+ std::vector<NanUtf8String*> value_strings(keys->Length());
+ for (unsigned int i = 0; i < channel_args.num_args; i++) {
+ Handle<String> current_key(keys->Get(i)->ToString());
+ Handle<Value> current_value(args_hash->Get(current_key));
+ key_strings[i] = new NanUtf8String(current_key);
+ channel_args.args[i].key = **key_strings[i];
+ if (current_value->IsInt32()) {
+ channel_args.args[i].type = GRPC_ARG_INTEGER;
+ channel_args.args[i].value.integer = current_value->Int32Value();
+ } else if (current_value->IsString()) {
+ channel_args.args[i].type = GRPC_ARG_STRING;
+ value_strings[i] = new NanUtf8String(current_value);
+ channel_args.args[i].value.string = **value_strings[i];
+ } else {
+ free(channel_args.args);
+ return NanThrowTypeError("Arg values must be strings");
+ }
+ }
+ if (creds == NULL) {
+ wrapped_server = grpc_server_create(queue,
+ &channel_args);
+ } else {
+ wrapped_server = grpc_secure_server_create(creds,
+ queue,
+ &channel_args);
+ }
+ free(channel_args.args);
+ } else {
+ return NanThrowTypeError("Server expects an object");
+ }
+ Server *server = new Server(wrapped_server);
+ server->Wrap(args.This());
+ NanReturnValue(args.This());
+}
+
+NAN_METHOD(Server::RequestCall) {
+ NanScope();
+ if (!HasInstance(args.This())) {
+ return NanThrowTypeError("requestCall can only be called on a Server");
+ }
+ Server *server = ObjectWrap::Unwrap<Server>(args.This());
+ grpc_call_error error = grpc_server_request_call(
+ server->wrapped_server,
+ CreateTag(args[0], NanNull()));
+ if (error == GRPC_CALL_OK) {
+ CompletionQueueAsyncWorker::Next();
+ } else {
+ return NanThrowError("requestCall failed", error);
+ }
+ NanReturnUndefined();
+}
+
+NAN_METHOD(Server::AddHttp2Port) {
+ NanScope();
+ if (!HasInstance(args.This())) {
+ return NanThrowTypeError("addHttp2Port can only be called on a Server");
+ }
+ if (!args[0]->IsString()) {
+ return NanThrowTypeError("addHttp2Port's argument must be a String");
+ }
+ Server *server = ObjectWrap::Unwrap<Server>(args.This());
+ NanReturnValue(NanNew<Boolean>(grpc_server_add_http2_port(
+ server->wrapped_server,
+ *NanUtf8String(args[0]))));
+}
+
+NAN_METHOD(Server::AddSecureHttp2Port) {
+ NanScope();
+ if (!HasInstance(args.This())) {
+ return NanThrowTypeError(
+ "addSecureHttp2Port can only be called on a Server");
+ }
+ if (!args[0]->IsString()) {
+ return NanThrowTypeError("addSecureHttp2Port's argument must be a String");
+ }
+ Server *server = ObjectWrap::Unwrap<Server>(args.This());
+ NanReturnValue(NanNew<Boolean>(grpc_server_add_secure_http2_port(
+ server->wrapped_server,
+ *NanUtf8String(args[0]))));
+}
+
+NAN_METHOD(Server::Start) {
+ NanScope();
+ if (!HasInstance(args.This())) {
+ return NanThrowTypeError("start can only be called on a Server");
+ }
+ Server *server = ObjectWrap::Unwrap<Server>(args.This());
+ grpc_server_start(server->wrapped_server);
+ NanReturnUndefined();
+}
+
+NAN_METHOD(Server::Shutdown) {
+ NanScope();
+ if (!HasInstance(args.This())) {
+ return NanThrowTypeError("shutdown can only be called on a Server");
+ }
+ Server *server = ObjectWrap::Unwrap<Server>(args.This());
+ grpc_server_shutdown(server->wrapped_server);
+ NanReturnUndefined();
+}
+
+} // namespace node
+} // namespace grpc
diff --git a/src/node/server.h b/src/node/server.h
new file mode 100644
index 0000000000..5edf9557f5
--- /dev/null
+++ b/src/node/server.h
@@ -0,0 +1,46 @@
+#ifndef NET_GRPC_NODE_SERVER_H_
+#define NET_GRPC_NODE_SERVER_H_
+
+#include <node.h>
+#include <nan.h>
+#include "grpc/grpc.h"
+
+namespace grpc {
+namespace node {
+
+/* Wraps grpc_server as a JavaScript object. Provides a constructor
+ and wrapper methods for grpc_server_create, grpc_server_request_call,
+ grpc_server_add_http2_port, and grpc_server_start. */
+class Server : public ::node::ObjectWrap {
+ public:
+ /* Initializes the Server class and exposes the constructor and
+ wrapper methods to JavaScript */
+ static void Init(v8::Handle<v8::Object> exports);
+ /* Tests whether the given value was constructed by this class's
+ JavaScript constructor */
+ static bool HasInstance(v8::Handle<v8::Value> val);
+
+ private:
+ explicit Server(grpc_server *server);
+ ~Server();
+
+ // Prevent copying
+ Server(const Server&);
+ Server& operator=(const Server&);
+
+ static NAN_METHOD(New);
+ static NAN_METHOD(RequestCall);
+ static NAN_METHOD(AddHttp2Port);
+ static NAN_METHOD(AddSecureHttp2Port);
+ static NAN_METHOD(Start);
+ static NAN_METHOD(Shutdown);
+ static v8::Persistent<v8::Function> constructor;
+ static v8::Persistent<v8::FunctionTemplate> fun_tpl;
+
+ grpc_server *wrapped_server;
+};
+
+} // namespace node
+} // namespace grpc
+
+#endif // NET_GRPC_NODE_SERVER_H_
diff --git a/src/node/server.js b/src/node/server.js
new file mode 100644
index 0000000000..4079b2914e
--- /dev/null
+++ b/src/node/server.js
@@ -0,0 +1,228 @@
+var grpc = require('bindings')('grpc.node');
+
+var common = require('./common');
+
+var Duplex = require('stream').Duplex;
+var util = require('util');
+
+util.inherits(GrpcServerStream, Duplex);
+
+/**
+ * Class for representing a gRPC server side stream as a Node stream. Extends
+ * from stream.Duplex.
+ * @constructor
+ * @param {grpc.Call} call Call object to proxy
+ * @param {object} options Stream options
+ */
+function GrpcServerStream(call, options) {
+ Duplex.call(this, options);
+ this._call = call;
+ // Indicate that a status has been sent
+ var finished = false;
+ var self = this;
+ var status = {
+ 'code' : grpc.status.OK,
+ 'details' : 'OK'
+ };
+ /**
+ * Send the pending status
+ */
+ function sendStatus() {
+ call.startWriteStatus(status.code, status.details, function() {
+ });
+ finished = true;
+ }
+ this.on('finish', sendStatus);
+ /**
+ * Set the pending status to a given error status. If the error does not have
+ * code or details properties, the code will be set to grpc.status.INTERNAL
+ * and the details will be set to 'Unknown Error'.
+ * @param {Error} err The error object
+ */
+ function setStatus(err) {
+ var code = grpc.status.INTERNAL;
+ var details = 'Unknown Error';
+
+ if (err.hasOwnProperty('code')) {
+ code = err.code;
+ if (err.hasOwnProperty('details')) {
+ details = err.details;
+ }
+ }
+ status = {'code': code, 'details': details};
+ }
+ /**
+ * Terminate the call. This includes indicating that reads are done, draining
+ * all pending writes, and sending the given error as a status
+ * @param {Error} err The error object
+ * @this GrpcServerStream
+ */
+ function terminateCall(err) {
+ // Drain readable data
+ this.on('data', function() {});
+ setStatus(err);
+ this.end();
+ }
+ this.on('error', terminateCall);
+ // Indicates that a read is pending
+ var reading = false;
+ /**
+ * Callback to be called when a READ event is received. Pushes the data onto
+ * the read queue and starts reading again if applicable
+ * @param {grpc.Event} event READ event object
+ */
+ function readCallback(event) {
+ if (finished) {
+ self.push(null);
+ return;
+ }
+ var data = event.data;
+ if (self.push(data) && data != null) {
+ self._call.startRead(readCallback);
+ } else {
+ reading = false;
+ }
+ }
+ /**
+ * Start reading if there is not already a pending read. Reading will
+ * continue until self.push returns false (indicating reads should slow
+ * down) or the read data is null (indicating that there is no more data).
+ */
+ this.startReading = function() {
+ if (finished) {
+ self.push(null);
+ } else {
+ if (!reading) {
+ reading = true;
+ self._call.startRead(readCallback);
+ }
+ }
+ };
+}
+
+/**
+ * Start reading from the gRPC data source. This is an implementation of a
+ * method required for implementing stream.Readable
+ * @param {number} size Ignored
+ */
+GrpcServerStream.prototype._read = function(size) {
+ this.startReading();
+};
+
+/**
+ * Start writing a chunk of data. This is an implementation of a method required
+ * for implementing stream.Writable.
+ * @param {Buffer} chunk The chunk of data to write
+ * @param {string} encoding Ignored
+ * @param {function(Error=)} callback Callback to indicate that the write is
+ * complete
+ */
+GrpcServerStream.prototype._write = function(chunk, encoding, callback) {
+ var self = this;
+ self._call.startWrite(chunk, function(event) {
+ callback();
+ }, 0);
+};
+
+/**
+ * Constructs a server object that stores request handlers and delegates
+ * incoming requests to those handlers
+ * @constructor
+ * @param {Array} options Options that should be passed to the internal server
+ * implementation
+ */
+function Server(options) {
+ this.handlers = {};
+ var handlers = this.handlers;
+ var server = new grpc.Server(options);
+ this._server = server;
+ var started = false;
+ /**
+ * Start the server and begin handling requests
+ * @this Server
+ */
+ this.start = function() {
+ if (this.started) {
+ throw 'Server is already running';
+ }
+ server.start();
+ /**
+ * Handles the SERVER_RPC_NEW event. If there is a handler associated with
+ * the requested method, use that handler to respond to the request. Then
+ * wait for the next request
+ * @param {grpc.Event} event The event to handle with tag SERVER_RPC_NEW
+ */
+ function handleNewCall(event) {
+ var call = event.call;
+ var data = event.data;
+ if (data == null) {
+ return;
+ }
+ server.requestCall(handleNewCall);
+ var handler = undefined;
+ var deadline = data.absolute_deadline;
+ var cancelled = false;
+ if (handlers.hasOwnProperty(data.method)) {
+ handler = handlers[data.method];
+ }
+ call.serverAccept(function(event) {
+ if (event.data.code === grpc.status.CANCELLED) {
+ cancelled = true;
+ }
+ }, 0);
+ call.serverEndInitialMetadata(0);
+ var stream = new GrpcServerStream(call);
+ Object.defineProperty(stream, 'cancelled', {
+ get: function() { return cancelled;}
+ });
+ try {
+ handler(stream, data.metadata);
+ } catch (e) {
+ stream.emit('error', e);
+ }
+ }
+ server.requestCall(handleNewCall);
+ };
+ /** Shuts down the server.
+ */
+ this.shutdown = function() {
+ server.shutdown();
+ };
+}
+
+/**
+ * Registers a handler to handle the named method. Fails if there already is
+ * a handler for the given method. Returns true on success
+ * @param {string} name The name of the method that the provided function should
+ * handle/respond to.
+ * @param {function} handler Function that takes a stream of request values and
+ * returns a stream of response values
+ * @return {boolean} True if the handler was set. False if a handler was already
+ * set for that name.
+ */
+Server.prototype.register = function(name, handler) {
+ if (this.handlers.hasOwnProperty(name)) {
+ return false;
+ }
+ this.handlers[name] = handler;
+ return true;
+};
+
+/**
+ * Binds the server to the given port, with SSL enabled if secure is specified
+ * @param {string} port The port that the server should bind on, in the format
+ * "address:port"
+ * @param {boolean=} secure Whether the server should open a secure port
+ */
+Server.prototype.bind = function(port, secure) {
+ if (secure) {
+ this._server.addSecureHttp2Port(port);
+ } else {
+ this._server.addHttp2Port(port);
+ }
+};
+
+/**
+ * See documentation for Server
+ */
+module.exports = Server;
diff --git a/src/node/server_credentials.cc b/src/node/server_credentials.cc
new file mode 100644
index 0000000000..3d32bdafa9
--- /dev/null
+++ b/src/node/server_credentials.cc
@@ -0,0 +1,131 @@
+#include <node.h>
+
+#include "grpc/grpc.h"
+#include "grpc/grpc_security.h"
+#include "grpc/support/log.h"
+#include "server_credentials.h"
+
+namespace grpc {
+namespace node {
+
+using ::node::Buffer;
+using v8::Arguments;
+using v8::Exception;
+using v8::External;
+using v8::Function;
+using v8::FunctionTemplate;
+using v8::Handle;
+using v8::HandleScope;
+using v8::Integer;
+using v8::Local;
+using v8::Object;
+using v8::ObjectTemplate;
+using v8::Persistent;
+using v8::Value;
+
+Persistent<Function> ServerCredentials::constructor;
+Persistent<FunctionTemplate> ServerCredentials::fun_tpl;
+
+ServerCredentials::ServerCredentials(grpc_server_credentials *credentials)
+ : wrapped_credentials(credentials) {
+}
+
+ServerCredentials::~ServerCredentials() {
+ gpr_log(GPR_DEBUG, "Destroying server credentials object");
+ grpc_server_credentials_release(wrapped_credentials);
+}
+
+void ServerCredentials::Init(Handle<Object> exports) {
+ NanScope();
+ Local<FunctionTemplate> tpl = FunctionTemplate::New(New);
+ tpl->SetClassName(NanNew("ServerCredentials"));
+ tpl->InstanceTemplate()->SetInternalFieldCount(1);
+ NanAssignPersistent(fun_tpl, tpl);
+ NanAssignPersistent(constructor, tpl->GetFunction());
+ constructor->Set(NanNew("createSsl"),
+ FunctionTemplate::New(CreateSsl)->GetFunction());
+ constructor->Set(NanNew("createFake"),
+ FunctionTemplate::New(CreateFake)->GetFunction());
+ exports->Set(NanNew("ServerCredentials"), constructor);
+}
+
+bool ServerCredentials::HasInstance(Handle<Value> val) {
+ NanScope();
+ return NanHasInstance(fun_tpl, val);
+}
+
+Handle<Value> ServerCredentials::WrapStruct(
+ grpc_server_credentials *credentials) {
+ NanEscapableScope();
+ if (credentials == NULL) {
+ return NanEscapeScope(NanNull());
+ }
+ const int argc = 1;
+ Handle<Value> argv[argc] = {
+ External::New(reinterpret_cast<void*>(credentials)) };
+ return NanEscapeScope(constructor->NewInstance(argc, argv));
+}
+
+grpc_server_credentials *ServerCredentials::GetWrappedServerCredentials() {
+ return wrapped_credentials;
+}
+
+NAN_METHOD(ServerCredentials::New) {
+ NanScope();
+
+ if (args.IsConstructCall()) {
+ if (!args[0]->IsExternal()) {
+ return NanThrowTypeError(
+ "ServerCredentials can only be created with the provide functions");
+ }
+ grpc_server_credentials *creds_value =
+ reinterpret_cast<grpc_server_credentials*>(External::Unwrap(args[0]));
+ ServerCredentials *credentials = new ServerCredentials(creds_value);
+ credentials->Wrap(args.This());
+ NanReturnValue(args.This());
+ } else {
+ const int argc = 1;
+ Local<Value> argv[argc] = { args[0] };
+ NanReturnValue(constructor->NewInstance(argc, argv));
+ }
+}
+
+NAN_METHOD(ServerCredentials::CreateSsl) {
+ NanScope();
+ char *root_certs = NULL;
+ char *private_key;
+ char *cert_chain;
+ int root_certs_length = 0, private_key_length, cert_chain_length;
+ if (Buffer::HasInstance(args[0])) {
+ root_certs = Buffer::Data(args[0]);
+ root_certs_length = Buffer::Length(args[0]);
+ } else if (!(args[0]->IsNull() || args[0]->IsUndefined())) {
+ return NanThrowTypeError(
+ "createSSl's first argument must be a Buffer if provided");
+ }
+ if (!Buffer::HasInstance(args[1])) {
+ return NanThrowTypeError(
+ "createSsl's second argument must be a Buffer");
+ }
+ private_key = Buffer::Data(args[1]);
+ private_key_length = Buffer::Length(args[1]);
+ if (!Buffer::HasInstance(args[2])) {
+ return NanThrowTypeError(
+ "createSsl's third argument must be a Buffer");
+ }
+ cert_chain = Buffer::Data(args[2]);
+ cert_chain_length = Buffer::Length(args[2]);
+ NanReturnValue(WrapStruct(grpc_ssl_server_credentials_create(
+ reinterpret_cast<unsigned char*>(root_certs), root_certs_length,
+ reinterpret_cast<unsigned char*>(private_key), private_key_length,
+ reinterpret_cast<unsigned char*>(cert_chain), cert_chain_length)));
+}
+
+NAN_METHOD(ServerCredentials::CreateFake) {
+ NanScope();
+ NanReturnValue(WrapStruct(
+ grpc_fake_transport_security_server_credentials_create()));
+}
+
+} // namespace node
+} // namespace grpc
diff --git a/src/node/server_credentials.h b/src/node/server_credentials.h
new file mode 100644
index 0000000000..63220f5eee
--- /dev/null
+++ b/src/node/server_credentials.h
@@ -0,0 +1,44 @@
+#ifndef NET_GRPC_NODE_SERVER_CREDENTIALS_H_
+#define NET_GRPC_NODE_SERVER_CREDENTIALS_H_
+
+#include <node.h>
+#include <nan.h>
+#include "grpc/grpc.h"
+#include "grpc/grpc_security.h"
+
+namespace grpc {
+namespace node {
+
+/* Wrapper class for grpc_server_credentials structs */
+class ServerCredentials : public ::node::ObjectWrap {
+ public:
+ static void Init(v8::Handle<v8::Object> exports);
+ static bool HasInstance(v8::Handle<v8::Value> val);
+ /* Wrap a grpc_server_credentials struct in a javascript object */
+ static v8::Handle<v8::Value> WrapStruct(grpc_server_credentials *credentials);
+
+ /* Returns the grpc_server_credentials struct that this object wraps */
+ grpc_server_credentials *GetWrappedServerCredentials();
+
+ private:
+ explicit ServerCredentials(grpc_server_credentials *credentials);
+ ~ServerCredentials();
+
+ // Prevent copying
+ ServerCredentials(const ServerCredentials&);
+ ServerCredentials& operator=(const ServerCredentials&);
+
+ static NAN_METHOD(New);
+ static NAN_METHOD(CreateSsl);
+ static NAN_METHOD(CreateFake);
+ static v8::Persistent<v8::Function> constructor;
+ // Used for typechecking instances of this javascript class
+ static v8::Persistent<v8::FunctionTemplate> fun_tpl;
+
+ grpc_server_credentials *wrapped_credentials;
+};
+
+} // namespace node
+} // namespace grpc
+
+#endif // NET_GRPC_NODE_SERVER_CREDENTIALS_H_
diff --git a/src/node/surface_client.js b/src/node/surface_client.js
new file mode 100644
index 0000000000..24f634ec65
--- /dev/null
+++ b/src/node/surface_client.js
@@ -0,0 +1,306 @@
+var _ = require('underscore');
+
+var client = require('./client.js');
+
+var EventEmitter = require('events').EventEmitter;
+
+var stream = require('stream');
+
+var Readable = stream.Readable;
+var Writable = stream.Writable;
+var Duplex = stream.Duplex;
+var util = require('util');
+
+function forwardEvent(fromEmitter, toEmitter, event) {
+ fromEmitter.on(event, function forward() {
+ _.partial(toEmitter.emit, event).apply(toEmitter, arguments);
+ });
+}
+
+util.inherits(ClientReadableObjectStream, Readable);
+
+/**
+ * Class for representing a gRPC server streaming call as a Node stream on the
+ * client side. Extends from stream.Readable.
+ * @constructor
+ * @param {stream} stream Underlying binary Duplex stream for the call
+ * @param {function(Buffer)} deserialize Function for deserializing binary data
+ * @param {object} options Stream options
+ */
+function ClientReadableObjectStream(stream, deserialize, options) {
+ options = _.extend(options, {objectMode: true});
+ Readable.call(this, options);
+ this._stream = stream;
+ var self = this;
+ forwardEvent(stream, this, 'status');
+ forwardEvent(stream, this, 'metadata');
+ this._stream.on('data', function forwardData(chunk) {
+ if (!self.push(deserialize(chunk))) {
+ self._stream.pause();
+ }
+ });
+ this._stream.pause();
+}
+
+util.inherits(ClientWritableObjectStream, Writable);
+
+/**
+ * Class for representing a gRPC client streaming call as a Node stream on the
+ * client side. Extends from stream.Writable.
+ * @constructor
+ * @param {stream} stream Underlying binary Duplex stream for the call
+ * @param {function(*):Buffer} serialize Function for serializing objects
+ * @param {object} options Stream options
+ */
+function ClientWritableObjectStream(stream, serialize, options) {
+ options = _.extend(options, {objectMode: true});
+ Writable.call(this, options);
+ this._stream = stream;
+ this._serialize = serialize;
+ forwardEvent(stream, this, 'status');
+ forwardEvent(stream, this, 'metadata');
+ this.on('finish', function() {
+ this._stream.end();
+ });
+}
+
+
+util.inherits(ClientBidiObjectStream, Duplex);
+
+/**
+ * Class for representing a gRPC bidi streaming call as a Node stream on the
+ * client side. Extends from stream.Duplex.
+ * @constructor
+ * @param {stream} stream Underlying binary Duplex stream for the call
+ * @param {function(*):Buffer} serialize Function for serializing objects
+ * @param {function(Buffer)} deserialize Function for deserializing binary data
+ * @param {object} options Stream options
+ */
+function ClientBidiObjectStream(stream, serialize, deserialize, options) {
+ options = _.extend(options, {objectMode: true});
+ Duplex.call(this, options);
+ this._stream = stream;
+ this._serialize = serialize;
+ var self = this;
+ forwardEvent(stream, this, 'status');
+ forwardEvent(stream, this, 'metadata');
+ this._stream.on('data', function forwardData(chunk) {
+ if (!self.push(deserialize(chunk))) {
+ self._stream.pause();
+ }
+ });
+ this._stream.pause();
+ this.on('finish', function() {
+ this._stream.end();
+ });
+}
+
+/**
+ * _read implementation for both types of streams that allow reading.
+ * @this {ClientReadableObjectStream|ClientBidiObjectStream}
+ * @param {number} size Ignored
+ */
+function _read(size) {
+ this._stream.resume();
+}
+
+/**
+ * See docs for _read
+ */
+ClientReadableObjectStream.prototype._read = _read;
+/**
+ * See docs for _read
+ */
+ClientBidiObjectStream.prototype._read = _read;
+
+/**
+ * _write implementation for both types of streams that allow writing
+ * @this {ClientWritableObjectStream|ClientBidiObjectStream}
+ * @param {*} chunk The value to write to the stream
+ * @param {string} encoding Ignored
+ * @param {function(Error)} callback Callback to call when finished writing
+ */
+function _write(chunk, encoding, callback) {
+ this._stream.write(this._serialize(chunk), encoding, callback);
+}
+
+/**
+ * See docs for _write
+ */
+ClientWritableObjectStream.prototype._write = _write;
+/**
+ * See docs for _write
+ */
+ClientBidiObjectStream.prototype._write = _write;
+
+/**
+ * Get a function that can make unary requests to the specified method.
+ * @param {string} method The name of the method to request
+ * @param {function(*):Buffer} serialize The serialization function for inputs
+ * @param {function(Buffer)} deserialize The deserialization function for
+ * outputs
+ * @return {Function} makeUnaryRequest
+ */
+function makeUnaryRequestFunction(method, serialize, deserialize) {
+ /**
+ * Make a unary request with this method on the given channel with the given
+ * argument, callback, etc.
+ * @param {client.Channel} channel The channel on which to make the request
+ * @param {*} argument The argument to the call. Should be serializable with
+ * serialize
+ * @param {function(?Error, value=)} callback The callback to for when the
+ * response is received
+ * @param {array=} metadata Array of metadata key/value pairs to add to the
+ * call
+ * @param {(number|Date)=} deadline The deadline for processing this request.
+ * Defaults to infinite future
+ * @return {EventEmitter} An event emitter for stream related events
+ */
+ function makeUnaryRequest(channel, argument, callback, metadata, deadline) {
+ var stream = client.makeRequest(channel, method, metadata, deadline);
+ var emitter = new EventEmitter();
+ forwardEvent(stream, emitter, 'status');
+ forwardEvent(stream, emitter, 'metadata');
+ stream.write(serialize(argument));
+ stream.end();
+ stream.on('data', function forwardData(chunk) {
+ try {
+ callback(null, deserialize(chunk));
+ } catch (e) {
+ callback(e);
+ }
+ });
+ return emitter;
+ }
+ return makeUnaryRequest;
+}
+
+/**
+ * Get a function that can make client stream requests to the specified method.
+ * @param {string} method The name of the method to request
+ * @param {function(*):Buffer} serialize The serialization function for inputs
+ * @param {function(Buffer)} deserialize The deserialization function for
+ * outputs
+ * @return {Function} makeClientStreamRequest
+ */
+function makeClientStreamRequestFunction(method, serialize, deserialize) {
+ /**
+ * Make a client stream request with this method on the given channel with the
+ * given callback, etc.
+ * @param {client.Channel} channel The channel on which to make the request
+ * @param {function(?Error, value=)} callback The callback to for when the
+ * response is received
+ * @param {array=} metadata Array of metadata key/value pairs to add to the
+ * call
+ * @param {(number|Date)=} deadline The deadline for processing this request.
+ * Defaults to infinite future
+ * @return {EventEmitter} An event emitter for stream related events
+ */
+ function makeClientStreamRequest(channel, callback, metadata, deadline) {
+ var stream = client.makeRequest(channel, method, metadata, deadline);
+ var obj_stream = new ClientWritableObjectStream(stream, serialize, {});
+ stream.on('data', function forwardData(chunk) {
+ try {
+ callback(null, deserialize(chunk));
+ } catch (e) {
+ callback(e);
+ }
+ });
+ return obj_stream;
+ }
+ return makeClientStreamRequest;
+}
+
+/**
+ * Get a function that can make server stream requests to the specified method.
+ * @param {string} method The name of the method to request
+ * @param {function(*):Buffer} serialize The serialization function for inputs
+ * @param {function(Buffer)} deserialize The deserialization function for
+ * outputs
+ * @return {Function} makeServerStreamRequest
+ */
+function makeServerStreamRequestFunction(method, serialize, deserialize) {
+ /**
+ * Make a server stream request with this method on the given channel with the
+ * given argument, etc.
+ * @param {client.Channel} channel The channel on which to make the request
+ * @param {*} argument The argument to the call. Should be serializable with
+ * serialize
+ * @param {array=} metadata Array of metadata key/value pairs to add to the
+ * call
+ * @param {(number|Date)=} deadline The deadline for processing this request.
+ * Defaults to infinite future
+ * @return {EventEmitter} An event emitter for stream related events
+ */
+ function makeServerStreamRequest(channel, argument, metadata, deadline) {
+ var stream = client.makeRequest(channel, method, metadata, deadline);
+ var obj_stream = new ClientReadableObjectStream(stream, deserialize, {});
+ stream.write(serialize(argument));
+ stream.end();
+ return obj_stream;
+ }
+ return makeServerStreamRequest;
+}
+
+/**
+ * Get a function that can make bidirectional stream requests to the specified
+ * method.
+ * @param {string} method The name of the method to request
+ * @param {function(*):Buffer} serialize The serialization function for inputs
+ * @param {function(Buffer)} deserialize The deserialization function for
+ * outputs
+ * @return {Function} makeBidiStreamRequest
+ */
+function makeBidiStreamRequestFunction(method, serialize, deserialize) {
+ /**
+ * Make a bidirectional stream request with this method on the given channel.
+ * @param {client.Channel} channel The channel on which to make the request
+ * @param {array=} metadata Array of metadata key/value pairs to add to the
+ * call
+ * @param {(number|Date)=} deadline The deadline for processing this request.
+ * Defaults to infinite future
+ * @return {EventEmitter} An event emitter for stream related events
+ */
+ function makeBidiStreamRequest(channel, metadata, deadline) {
+ var stream = client.makeRequest(channel, method, metadata, deadline);
+ var obj_stream = new ClientBidiObjectStream(stream,
+ serialize,
+ deserialize,
+ {});
+ return obj_stream;
+ }
+ return makeBidiStreamRequest;
+}
+
+/**
+ * See docs for makeUnaryRequestFunction
+ */
+exports.makeUnaryRequestFunction = makeUnaryRequestFunction;
+
+/**
+ * See docs for makeClientStreamRequestFunction
+ */
+exports.makeClientStreamRequestFunction = makeClientStreamRequestFunction;
+
+/**
+ * See docs for makeServerStreamRequestFunction
+ */
+exports.makeServerStreamRequestFunction = makeServerStreamRequestFunction;
+
+/**
+ * See docs for makeBidiStreamRequestFunction
+ */
+exports.makeBidiStreamRequestFunction = makeBidiStreamRequestFunction;
+
+/**
+ * See docs for client.Channel
+ */
+exports.Channel = client.Channel;
+/**
+ * See docs for client.status
+ */
+exports.status = client.status;
+/**
+ * See docs for client.callError
+ */
+exports.callError = client.callError;
diff --git a/src/node/surface_server.js b/src/node/surface_server.js
new file mode 100644
index 0000000000..38b4233d99
--- /dev/null
+++ b/src/node/surface_server.js
@@ -0,0 +1,325 @@
+var _ = require('underscore');
+
+var Server = require('./server.js');
+
+var stream = require('stream');
+
+var Readable = stream.Readable;
+var Writable = stream.Writable;
+var Duplex = stream.Duplex;
+var util = require('util');
+
+util.inherits(ServerReadableObjectStream, Readable);
+
+/**
+ * Class for representing a gRPC client streaming call as a Node stream on the
+ * server side. Extends from stream.Readable.
+ * @constructor
+ * @param {stream} stream Underlying binary Duplex stream for the call
+ * @param {function(Buffer)} deserialize Function for deserializing binary data
+ * @param {object} options Stream options
+ */
+function ServerReadableObjectStream(stream, deserialize, options) {
+ options = _.extend(options, {objectMode: true});
+ Readable.call(this, options);
+ this._stream = stream;
+ Object.defineProperty(this, 'cancelled', {
+ get: function() { return stream.cancelled; }
+ });
+ var self = this;
+ this._stream.on('data', function forwardData(chunk) {
+ if (!self.push(deserialize(chunk))) {
+ self._stream.pause();
+ }
+ });
+ this._stream.on('end', function forwardEnd() {
+ self.push(null);
+ });
+ this._stream.pause();
+}
+
+util.inherits(ServerWritableObjectStream, Writable);
+
+/**
+ * Class for representing a gRPC server streaming call as a Node stream on the
+ * server side. Extends from stream.Writable.
+ * @constructor
+ * @param {stream} stream Underlying binary Duplex stream for the call
+ * @param {function(*):Buffer} serialize Function for serializing objects
+ * @param {object} options Stream options
+ */
+function ServerWritableObjectStream(stream, serialize, options) {
+ options = _.extend(options, {objectMode: true});
+ Writable.call(this, options);
+ this._stream = stream;
+ this._serialize = serialize;
+ this.on('finish', function() {
+ this._stream.end();
+ });
+}
+
+util.inherits(ServerBidiObjectStream, Duplex);
+
+/**
+ * Class for representing a gRPC bidi streaming call as a Node stream on the
+ * server side. Extends from stream.Duplex.
+ * @constructor
+ * @param {stream} stream Underlying binary Duplex stream for the call
+ * @param {function(*):Buffer} serialize Function for serializing objects
+ * @param {function(Buffer)} deserialize Function for deserializing binary data
+ * @param {object} options Stream options
+ */
+function ServerBidiObjectStream(stream, serialize, deserialize, options) {
+ options = _.extend(options, {objectMode: true});
+ Duplex.call(this, options);
+ this._stream = stream;
+ this._serialize = serialize;
+ var self = this;
+ this._stream.on('data', function forwardData(chunk) {
+ if (!self.push(deserialize(chunk))) {
+ self._stream.pause();
+ }
+ });
+ this._stream.on('end', function forwardEnd() {
+ self.push(null);
+ });
+ this._stream.pause();
+ this.on('finish', function() {
+ this._stream.end();
+ });
+}
+
+/**
+ * _read implementation for both types of streams that allow reading.
+ * @this {ServerReadableObjectStream|ServerBidiObjectStream}
+ * @param {number} size Ignored
+ */
+function _read(size) {
+ this._stream.resume();
+}
+
+/**
+ * See docs for _read
+ */
+ServerReadableObjectStream.prototype._read = _read;
+/**
+ * See docs for _read
+ */
+ServerBidiObjectStream.prototype._read = _read;
+
+/**
+ * _write implementation for both types of streams that allow writing
+ * @this {ServerWritableObjectStream|ServerBidiObjectStream}
+ * @param {*} chunk The value to write to the stream
+ * @param {string} encoding Ignored
+ * @param {function(Error)} callback Callback to call when finished writing
+ */
+function _write(chunk, encoding, callback) {
+ this._stream.write(this._serialize(chunk), encoding, callback);
+}
+
+/**
+ * See docs for _write
+ */
+ServerWritableObjectStream.prototype._write = _write;
+/**
+ * See docs for _write
+ */
+ServerBidiObjectStream.prototype._write = _write;
+
+/**
+ * Creates a binary stream handler function from a unary handler function
+ * @param {function(Object, function(Error, *))} handler Unary call handler
+ * @param {function(*):Buffer} serialize Serialization function
+ * @param {function(Buffer):*} deserialize Deserialization function
+ * @return {function(stream)} Binary stream handler
+ */
+function makeUnaryHandler(handler, serialize, deserialize) {
+ /**
+ * Handles a stream by reading a single data value, passing it to the handler,
+ * and writing the response back to the stream.
+ * @param {stream} stream Binary data stream
+ */
+ return function handleUnaryCall(stream) {
+ stream.on('data', function handleUnaryData(value) {
+ var call = {request: deserialize(value)};
+ Object.defineProperty(call, 'cancelled', {
+ get: function() { return stream.cancelled;}
+ });
+ handler(call, function sendUnaryData(err, value) {
+ if (err) {
+ stream.emit('error', err);
+ } else {
+ stream.write(serialize(value));
+ stream.end();
+ }
+ });
+ });
+ };
+}
+
+/**
+ * Creates a binary stream handler function from a client stream handler
+ * function
+ * @param {function(Readable, function(Error, *))} handler Client stream call
+ * handler
+ * @param {function(*):Buffer} serialize Serialization function
+ * @param {function(Buffer):*} deserialize Deserialization function
+ * @return {function(stream)} Binary stream handler
+ */
+function makeClientStreamHandler(handler, serialize, deserialize) {
+ /**
+ * Handles a stream by passing a deserializing stream to the handler and
+ * writing the response back to the stream.
+ * @param {stream} stream Binary data stream
+ */
+ return function handleClientStreamCall(stream) {
+ var object_stream = new ServerReadableObjectStream(stream, deserialize, {});
+ handler(object_stream, function sendClientStreamData(err, value) {
+ if (err) {
+ stream.emit('error', err);
+ } else {
+ stream.write(serialize(value));
+ stream.end();
+ }
+ });
+ };
+}
+
+/**
+ * Creates a binary stream handler function from a server stream handler
+ * function
+ * @param {function(Writable)} handler Server stream call handler
+ * @param {function(*):Buffer} serialize Serialization function
+ * @param {function(Buffer):*} deserialize Deserialization function
+ * @return {function(stream)} Binary stream handler
+ */
+function makeServerStreamHandler(handler, serialize, deserialize) {
+ /**
+ * Handles a stream by attaching it to a serializing stream, and passing it to
+ * the handler.
+ * @param {stream} stream Binary data stream
+ */
+ return function handleServerStreamCall(stream) {
+ stream.on('data', function handleClientData(value) {
+ var object_stream = new ServerWritableObjectStream(stream,
+ serialize,
+ {});
+ object_stream.request = deserialize(value);
+ handler(object_stream);
+ });
+ };
+}
+
+/**
+ * Creates a binary stream handler function from a bidi stream handler function
+ * @param {function(Duplex)} handler Unary call handler
+ * @param {function(*):Buffer} serialize Serialization function
+ * @param {function(Buffer):*} deserialize Deserialization function
+ * @return {function(stream)} Binary stream handler
+ */
+function makeBidiStreamHandler(handler, serialize, deserialize) {
+ /**
+ * Handles a stream by wrapping it in a serializing and deserializing object
+ * stream, and passing it to the handler.
+ * @param {stream} stream Binary data stream
+ */
+ return function handleBidiStreamCall(stream) {
+ var object_stream = new ServerBidiObjectStream(stream,
+ serialize,
+ deserialize,
+ {});
+ handler(object_stream);
+ };
+}
+
+/**
+ * Map with short names for each of the handler maker functions. Used in
+ * makeServerConstructor
+ */
+var handler_makers = {
+ unary: makeUnaryHandler,
+ server_stream: makeServerStreamHandler,
+ client_stream: makeClientStreamHandler,
+ bidi: makeBidiStreamHandler
+};
+
+/**
+ * Creates a constructor for servers with a service defined by the methods
+ * object. The methods object has string keys and values of this form:
+ * {serialize: function, deserialize: function, client_stream: bool,
+ * server_stream: bool}
+ * @param {Object} methods Method descriptor for each method the server should
+ * expose
+ * @param {string} prefix The prefex to prepend to each method name
+ * @return {function(Object, Object)} New server constructor
+ */
+function makeServerConstructor(methods, prefix) {
+ /**
+ * Create a server with the given handlers for all of the methods.
+ * @constructor
+ * @param {Object} handlers Map from method names to method handlers.
+ * @param {Object} options Options to pass to the underlying server
+ */
+ function SurfaceServer(handlers, options) {
+ var server = new Server(options);
+ this.inner_server = server;
+ _.each(handlers, function(handler, name) {
+ var method = methods[name];
+ var method_type;
+ if (method.client_stream) {
+ if (method.server_stream) {
+ method_type = 'bidi';
+ } else {
+ method_type = 'client_stream';
+ }
+ } else {
+ if (method.server_stream) {
+ method_type = 'server_stream';
+ } else {
+ method_type = 'unary';
+ }
+ }
+ var binary_handler = handler_makers[method_type](handler,
+ method.serialize,
+ method.deserialize);
+ server.register('' + prefix + name, binary_handler);
+ }, this);
+ }
+
+ /**
+ * Binds the server to the given port, with SSL enabled if secure is specified
+ * @param {string} port The port that the server should bind on, in the format
+ * "address:port"
+ * @param {boolean=} secure Whether the server should open a secure port
+ * @return {SurfaceServer} this
+ */
+ SurfaceServer.prototype.bind = function(port, secure) {
+ this.inner_server.bind(port, secure);
+ return this;
+ };
+
+ /**
+ * Starts the server listening on any bound ports
+ * @return {SurfaceServer} this
+ */
+ SurfaceServer.prototype.listen = function() {
+ this.inner_server.start();
+ return this;
+ };
+
+ /**
+ * Shuts the server down; tells it to stop listening for new requests and to
+ * kill old requests.
+ */
+ SurfaceServer.prototype.shutdown = function() {
+ this.inner_server.shutdown();
+ };
+
+ return SurfaceServer;
+}
+
+/**
+ * See documentation for makeServerConstructor
+ */
+exports.makeServerConstructor = makeServerConstructor;
diff --git a/src/node/tag.cc b/src/node/tag.cc
new file mode 100644
index 0000000000..0cebb83d1b
--- /dev/null
+++ b/src/node/tag.cc
@@ -0,0 +1,71 @@
+#include <stdlib.h>
+#include <node.h>
+#include <nan.h>
+#include "tag.h"
+
+namespace grpc {
+namespace node {
+
+using v8::Handle;
+using v8::HandleScope;
+using v8::Persistent;
+using v8::Value;
+
+struct tag {
+ tag(Persistent<Value> *tag, Persistent<Value> *call)
+ : persist_tag(tag), persist_call(call) {
+ }
+
+ ~tag() {
+ persist_tag->Dispose();
+ if (persist_call != NULL) {
+ persist_call->Dispose();
+ }
+ }
+ Persistent<Value> *persist_tag;
+ Persistent<Value> *persist_call;
+};
+
+void *CreateTag(Handle<Value> tag, Handle<Value> call) {
+ NanScope();
+ Persistent<Value> *persist_tag = new Persistent<Value>();
+ NanAssignPersistent(*persist_tag, tag);
+ Persistent<Value> *persist_call;
+ if (call->IsNull() || call->IsUndefined()) {
+ persist_call = NULL;
+ } else {
+ persist_call = new Persistent<Value>();
+ NanAssignPersistent(*persist_call, call);
+ }
+ struct tag *tag_struct = new struct tag(persist_tag, persist_call);
+ return reinterpret_cast<void*>(tag_struct);
+}
+
+Handle<Value> GetTagHandle(void *tag) {
+ NanEscapableScope();
+ struct tag *tag_struct = reinterpret_cast<struct tag*>(tag);
+ Handle<Value> tag_value = NanNew<Value>(*tag_struct->persist_tag);
+ return NanEscapeScope(tag_value);
+}
+
+bool TagHasCall(void *tag) {
+ struct tag *tag_struct = reinterpret_cast<struct tag*>(tag);
+ return tag_struct->persist_call != NULL;
+}
+
+Handle<Value> TagGetCall(void *tag) {
+ NanEscapableScope();
+ struct tag *tag_struct = reinterpret_cast<struct tag*>(tag);
+ if (tag_struct->persist_call == NULL) {
+ return NanEscapeScope(NanNull());
+ }
+ Handle<Value> call_value = NanNew<Value>(*tag_struct->persist_call);
+ return NanEscapeScope(call_value);
+}
+
+void DestroyTag(void *tag) {
+ delete reinterpret_cast<struct tag*>(tag);
+}
+
+} // namespace node
+} // namespace grpc
diff --git a/src/node/tag.h b/src/node/tag.h
new file mode 100644
index 0000000000..46c95de5b4
--- /dev/null
+++ b/src/node/tag.h
@@ -0,0 +1,26 @@
+#ifndef NET_GRPC_NODE_TAG_H_
+#define NET_GRPC_NODE_TAG_H_
+
+#include <node.h>
+
+namespace grpc {
+namespace node {
+
+/* Create a void* tag that can be passed to various grpc_call functions from
+ a javascript value and the javascript wrapper for the call. The call can be
+ null. */
+void *CreateTag(v8::Handle<v8::Value> tag, v8::Handle<v8::Value> call);
+/* Return the javascript value stored in the tag */
+v8::Handle<v8::Value> GetTagHandle(void *tag);
+/* Returns true if the call was set (non-null) when the tag was created */
+bool TagHasCall(void *tag);
+/* Returns the javascript wrapper for the call associated with this tag */
+v8::Handle<v8::Value> TagGetCall(void *call);
+/* Destroy the tag and all resources it is holding. It is illegal to call any
+ of these other functions on a tag after it has been destroyed. */
+void DestroyTag(void *tag);
+
+} // namespace node
+} // namespace grpc
+
+#endif // NET_GRPC_NODE_TAG_H_
diff --git a/src/node/test/byte_buffer_test.js~ b/src/node/test/byte_buffer_test.js~
new file mode 100644
index 0000000000..8f272e5bd6
--- /dev/null
+++ b/src/node/test/byte_buffer_test.js~
@@ -0,0 +1,35 @@
+var assert = require('assert');
+var grpc = require('..build/Release/grpc');
+
+describe('byte buffer', function() {
+ describe('constructor', function() {
+ it('should reject bad constructor calls', function() {
+ it('should require at least one argument', function() {
+ assert.throws(new grpc.ByteBuffer(), TypeError);
+ });
+ it('should reject non-string arguments', function() {
+ assert.throws(new grpc.ByteBuffer(0), TypeError);
+ assert.throws(new grpc.ByteBuffer(1.5), TypeError);
+ assert.throws(new grpc.ByteBuffer(null), TypeError);
+ assert.throws(new grpc.ByteBuffer(Date.now()), TypeError);
+ });
+ it('should accept string arguments', function() {
+ assert.doesNotThrow(new grpc.ByteBuffer(''));
+ assert.doesNotThrow(new grpc.ByteBuffer('test'));
+ assert.doesNotThrow(new grpc.ByteBuffer('\0'));
+ });
+ });
+ });
+ describe('bytes', function() {
+ it('should return the passed string', function() {
+ it('should preserve simple strings', function() {
+ var buffer = new grpc.ByteBuffer('test');
+ assert.strictEqual(buffer.bytes(), 'test');
+ });
+ it('should preserve null characters', function() {
+ var buffer = new grpc.ByteBuffer('test\0test');
+ assert.strictEqual(buffer.bytes(), 'test\0test');
+ });
+ });
+ });
+});
diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js
new file mode 100644
index 0000000000..25860d040d
--- /dev/null
+++ b/src/node/test/call_test.js
@@ -0,0 +1,169 @@
+var assert = require('assert');
+var grpc = require('bindings')('grpc.node');
+
+var channel = new grpc.Channel('localhost:7070');
+
+/**
+ * Helper function to return an absolute deadline given a relative timeout in
+ * seconds.
+ * @param {number} timeout_secs The number of seconds to wait before timing out
+ * @return {Date} A date timeout_secs in the future
+ */
+function getDeadline(timeout_secs) {
+ var deadline = new Date();
+ deadline.setSeconds(deadline.getSeconds() + timeout_secs);
+ return deadline;
+}
+
+describe('call', function() {
+ describe('constructor', function() {
+ it('should reject anything less than 3 arguments', function() {
+ assert.throws(function() {
+ new grpc.Call();
+ }, TypeError);
+ assert.throws(function() {
+ new grpc.Call(channel);
+ }, TypeError);
+ assert.throws(function() {
+ new grpc.Call(channel, 'method');
+ }, TypeError);
+ });
+ it('should succeed with a Channel, a string, and a date or number',
+ function() {
+ assert.doesNotThrow(function() {
+ new grpc.Call(channel, 'method', new Date());
+ });
+ assert.doesNotThrow(function() {
+ new grpc.Call(channel, 'method', 0);
+ });
+ });
+ it('should fail with a closed channel', function() {
+ var local_channel = new grpc.Channel('hostname');
+ local_channel.close();
+ assert.throws(function() {
+ new grpc.Call(channel, 'method');
+ });
+ });
+ it('should fail with other types', function() {
+ assert.throws(function() {
+ new grpc.Call({}, 'method', 0);
+ }, TypeError);
+ assert.throws(function() {
+ new grpc.Call(channel, null, 0);
+ }, TypeError);
+ assert.throws(function() {
+ new grpc.Call(channel, 'method', 'now');
+ }, TypeError);
+ });
+ });
+ describe('addMetadata', function() {
+ it('should succeed with objects containing keys and values', function() {
+ var call = new grpc.Call(channel, 'method', getDeadline(1));
+ assert.doesNotThrow(function() {
+ call.addMetadata();
+ });
+ assert.doesNotThrow(function() {
+ call.addMetadata({'key' : 'key',
+ 'value' : new Buffer('value')});
+ });
+ assert.doesNotThrow(function() {
+ call.addMetadata({'key' : 'key1',
+ 'value' : new Buffer('value1')},
+ {'key' : 'key2',
+ 'value' : new Buffer('value2')});
+ });
+ });
+ it('should fail with other parameter types', function() {
+ var call = new grpc.Call(channel, 'method', getDeadline(1));
+ assert.throws(function() {
+ call.addMetadata(null);
+ }, TypeError);
+ assert.throws(function() {
+ call.addMetadata('value');
+ }, TypeError);
+ assert.throws(function() {
+ call.addMetadata(5);
+ }, TypeError);
+ });
+ it('should fail if startInvoke was already called', function(done) {
+ var call = new grpc.Call(channel, 'method', getDeadline(1));
+ call.startInvoke(function() {},
+ function() {},
+ function() {done();},
+ 0);
+ assert.throws(function() {
+ call.addMetadata({'key' : 'key', 'value' : new Buffer('value') });
+ }, function(err) {
+ return err.code === grpc.callError.ALREADY_INVOKED;
+ });
+ // Cancel to speed up the test
+ call.cancel();
+ });
+ });
+ describe('startInvoke', function() {
+ it('should fail with fewer than 4 arguments', function() {
+ var call = new grpc.Call(channel, 'method', getDeadline(1));
+ assert.throws(function() {
+ call.startInvoke();
+ }, TypeError);
+ assert.throws(function() {
+ call.startInvoke(function() {});
+ }, TypeError);
+ assert.throws(function() {
+ call.startInvoke(function() {},
+ function() {});
+ }, TypeError);
+ assert.throws(function() {
+ call.startInvoke(function() {},
+ function() {},
+ function() {});
+ }, TypeError);
+ });
+ it('should work with 3 args and an int', function(done) {
+ assert.doesNotThrow(function() {
+ var call = new grpc.Call(channel, 'method', getDeadline(1));
+ call.startInvoke(function() {},
+ function() {},
+ function() {done();},
+ 0);
+ // Cancel to speed up the test
+ call.cancel();
+ });
+ });
+ it('should reject incorrectly typed arguments', function() {
+ var call = new grpc.Call(channel, 'method', getDeadline(1));
+ assert.throws(function() {
+ call.startInvoke(0, 0, 0, 0);
+ }, TypeError);
+ assert.throws(function() {
+ call.startInvoke(function() {},
+ function() {},
+ function() {}, 'test');
+ });
+ });
+ });
+ describe('serverAccept', function() {
+ it('should fail with fewer than 1 argument1', function() {
+ var call = new grpc.Call(channel, 'method', getDeadline(1));
+ assert.throws(function() {
+ call.serverAccept();
+ }, TypeError);
+ });
+ it('should return an error when called on a client Call', function() {
+ var call = new grpc.Call(channel, 'method', getDeadline(1));
+ assert.throws(function() {
+ call.serverAccept(function() {});
+ }, function(err) {
+ return err.code === grpc.callError.NOT_ON_CLIENT;
+ });
+ });
+ });
+ describe('cancel', function() {
+ it('should succeed', function() {
+ var call = new grpc.Call(channel, 'method', getDeadline(1));
+ assert.doesNotThrow(function() {
+ call.cancel();
+ });
+ });
+ });
+});
diff --git a/src/node/test/call_test.js~ b/src/node/test/call_test.js~
new file mode 100644
index 0000000000..9506f209bd
--- /dev/null
+++ b/src/node/test/call_test.js~
@@ -0,0 +1,6 @@
+var assert = require('assert');
+var grpc = require('../build/Release/grpc');
+
+describe('call', function() {
+ describe('constructor', function() {
+ it('should reject anything less than 4 arguments', function() { \ No newline at end of file
diff --git a/src/node/test/channel_test.js b/src/node/test/channel_test.js
new file mode 100644
index 0000000000..61833875b6
--- /dev/null
+++ b/src/node/test/channel_test.js
@@ -0,0 +1,55 @@
+var assert = require('assert');
+var grpc = require('bindings')('grpc.node');
+
+describe('channel', function() {
+ describe('constructor', function() {
+ it('should require a string for the first argument', function() {
+ assert.doesNotThrow(function() {
+ new grpc.Channel('hostname');
+ });
+ assert.throws(function() {
+ new grpc.Channel();
+ }, TypeError);
+ assert.throws(function() {
+ new grpc.Channel(5);
+ });
+ });
+ it('should accept an object for the second parameter', function() {
+ assert.doesNotThrow(function() {
+ new grpc.Channel('hostname', {});
+ });
+ assert.throws(function() {
+ new grpc.Channel('hostname', 5);
+ });
+ });
+ it('should only accept objects with string or int values', function() {
+ assert.doesNotThrow(function() {
+ new grpc.Channel('hostname', {'key' : 'value'});
+ });
+ assert.doesNotThrow(function() {
+ new grpc.Channel('hostname', {'key' : 5});
+ });
+ assert.throws(function() {
+ new grpc.Channel('hostname', {'key' : null});
+ });
+ assert.throws(function() {
+ new grpc.Channel('hostname', {'key' : new Date()});
+ });
+ });
+ });
+ describe('close', function() {
+ it('should succeed silently', function() {
+ var channel = new grpc.Channel('hostname', {});
+ assert.doesNotThrow(function() {
+ channel.close();
+ });
+ });
+ it('should be idempotent', function() {
+ var channel = new grpc.Channel('hostname', {});
+ assert.doesNotThrow(function() {
+ channel.close();
+ channel.close();
+ });
+ });
+ });
+});
diff --git a/src/node/test/client_server_test.js b/src/node/test/client_server_test.js
new file mode 100644
index 0000000000..9f15a76520
--- /dev/null
+++ b/src/node/test/client_server_test.js
@@ -0,0 +1,150 @@
+var assert = require('assert');
+var fs = require('fs');
+var path = require('path');
+var grpc = require('bindings')('grpc.node');
+var Server = require('../server');
+var client = require('../client');
+var port_picker = require('../port_picker');
+var common = require('../common');
+var _ = require('highland');
+
+var ca_path = path.join(__dirname, 'data/ca.pem');
+
+var key_path = path.join(__dirname, 'data/server1.key');
+
+var pem_path = path.join(__dirname, 'data/server1.pem');
+
+/**
+ * Helper function to return an absolute deadline given a relative timeout in
+ * seconds.
+ * @param {number} timeout_secs The number of seconds to wait before timing out
+ * @return {Date} A date timeout_secs in the future
+ */
+function getDeadline(timeout_secs) {
+ var deadline = new Date();
+ deadline.setSeconds(deadline.getSeconds() + timeout_secs);
+ return deadline;
+}
+
+/**
+ * Responds to every request with the same data as a response
+ * @param {Stream} stream
+ */
+function echoHandler(stream) {
+ stream.pipe(stream);
+}
+
+/**
+ * Responds to every request with an error status
+ * @param {Stream} stream
+ */
+function errorHandler(stream) {
+ throw {
+ 'code' : grpc.status.UNIMPLEMENTED,
+ 'details' : 'error details'
+ };
+}
+
+describe('echo client', function() {
+ it('should receive echo responses', function(done) {
+ port_picker.nextAvailablePort(function(port) {
+ var server = new Server();
+ server.bind(port);
+ server.register('echo', echoHandler);
+ server.start();
+
+ var messages = ['echo1', 'echo2', 'echo3', 'echo4'];
+ var channel = new grpc.Channel(port);
+ var stream = client.makeRequest(
+ channel,
+ 'echo');
+ _(messages).map(function(val) {
+ return new Buffer(val);
+ }).pipe(stream);
+ var index = 0;
+ stream.on('data', function(chunk) {
+ assert.equal(messages[index], chunk.toString());
+ index += 1;
+ });
+ stream.on('end', function() {
+ server.shutdown();
+ done();
+ });
+ });
+ });
+ it('should get an error status that the server throws', function(done) {
+ port_picker.nextAvailablePort(function(port) {
+ var server = new Server();
+ server.bind(port);
+ server.register('error', errorHandler);
+ server.start();
+
+ var channel = new grpc.Channel(port);
+ var stream = client.makeRequest(
+ channel,
+ 'error',
+ null,
+ getDeadline(1));
+
+ stream.on('data', function() {});
+ stream.write(new Buffer('test'));
+ stream.end();
+ stream.on('status', function(status) {
+ assert.equal(status.code, grpc.status.UNIMPLEMENTED);
+ assert.equal(status.details, 'error details');
+ server.shutdown();
+ done();
+ });
+
+ });
+ });
+});
+/* TODO(mlumish): explore options for reducing duplication between this test
+ * and the insecure echo client test */
+describe('secure echo client', function() {
+ it('should recieve echo responses', function(done) {
+ port_picker.nextAvailablePort(function(port) {
+ fs.readFile(ca_path, function(err, ca_data) {
+ assert.ifError(err);
+ fs.readFile(key_path, function(err, key_data) {
+ assert.ifError(err);
+ fs.readFile(pem_path, function(err, pem_data) {
+ assert.ifError(err);
+ var creds = grpc.Credentials.createSsl(ca_data);
+ var server_creds = grpc.ServerCredentials.createSsl(null,
+ key_data,
+ pem_data);
+
+ var server = new Server({'credentials' : server_creds});
+ server.bind(port, true);
+ server.register('echo', echoHandler);
+ server.start();
+
+ var messages = ['echo1', 'echo2', 'echo3', 'echo4'];
+ var channel = new grpc.Channel(port, {
+ 'grpc.ssl_target_name_override' : 'foo.test.google.com',
+ 'credentials' : creds
+ });
+ var stream = client.makeRequest(
+ channel,
+ 'echo');
+
+ _(messages).map(function(val) {
+ return new Buffer(val);
+ }).pipe(stream);
+ var index = 0;
+ stream.on('data', function(chunk) {
+ assert.equal(messages[index], chunk.toString());
+ index += 1;
+ });
+ stream.on('end', function() {
+ server.shutdown();
+ done();
+ });
+ });
+
+ });
+ });
+ });
+ });
+});
diff --git a/src/node/test/client_server_test.js~ b/src/node/test/client_server_test.js~
new file mode 100644
index 0000000000..22a0523939
--- /dev/null
+++ b/src/node/test/client_server_test.js~
@@ -0,0 +1,59 @@
+var assert = require('assert');
+var grpc = require('../build/Debug/grpc');
+var Server = require('../server');
+var client = require('../client');
+var port_picker = require('../port_picker');
+var iterators = require('async-iterators');
+
+/**
+ * General function to process an event by checking that there was no error and
+ * calling the callback passed as a tag.
+ * @param {*} err Truthy values indicate an error (in this case, that there was
+ * no event available).
+ * @param {grpc.Event} event The event to process.
+ */
+function processEvent(err, event) {
+ assert.ifError(err);
+ assert.notEqual(event, null);
+ event.getTag()(event);
+}
+
+/**
+ * Responds to every request with the same data as a response
+ * @param {{next:function(function(*, Buffer))}} arg_iter The async iterator of
+ * arguments.
+ * @return {{next:function(function(*, Buffer))}} The async iterator of results
+ */
+function echoHandler(arg_iter) {
+ return {
+ 'next' : function(write) {
+ arg_iter.next(function(err, value) {
+ if (value == undefined) {
+ write({
+ 'code' : grpc.status.OK,
+ 'details' : 'OK'
+ });
+ } else {
+ write(err, value);
+ }
+ });
+ }
+ };
+}
+
+describe('echo client server', function() {
+ it('should recieve echo responses', function(done) {
+ port_picker.nextAvailablePort(function(port) {
+ var server = new Server(port);
+ server.register('echo', echoHandler);
+ server.start();
+
+ var messages = ['echo1', 'echo2', 'echo3'];
+ var channel = new grpc.Channel(port);
+ var responses = client.makeRequest(channel,
+ 'echo',
+ iterators.fromArray(messages));
+ assert.equal(messages, iterators.toArray(responses));
+ });
+ });
+});
diff --git a/src/node/test/completion_queue_test.js~ b/src/node/test/completion_queue_test.js~
new file mode 100644
index 0000000000..5d2d509be6
--- /dev/null
+++ b/src/node/test/completion_queue_test.js~
@@ -0,0 +1,30 @@
+var assert = require('assert');
+var grpc = require('../build/Release/grpc');
+
+describe('completion queue', function() {
+ describe('constructor', function() {
+ it('should succeed with now arguments', function() {
+ assert.doesNotThrow(function() {
+ new grpc.CompletionQueue();
+ });
+ });
+ });
+ describe('next', function() {
+ it('should require a date parameter', function() {
+ var queue = new grpc.CompletionQueue();
+ assert.throws(function() {
+ queue->next();
+ }, TypeError);
+ assert.throws(function() {
+ queue->next('test');
+ }, TypeError);
+ assert.doesNotThrow(function() {
+ queue->next(Date.now());
+ });
+ });
+ it('should return null from a new queue', function() {
+ var queue = new grpc.CompletionQueue();
+ assert.strictEqual(queue->next(Date.now()), null);
+ });
+ });
+}); \ No newline at end of file
diff --git a/src/node/test/constant_test.js b/src/node/test/constant_test.js
new file mode 100644
index 0000000000..b9f8958da9
--- /dev/null
+++ b/src/node/test/constant_test.js
@@ -0,0 +1,97 @@
+var assert = require('assert');
+var grpc = require('bindings')('grpc.node');
+
+/**
+ * List of all status names
+ * @const
+ * @type {Array.<string>}
+ */
+var statusNames = [
+ 'OK',
+ 'CANCELLED',
+ 'UNKNOWN',
+ 'INVALID_ARGUMENT',
+ 'DEADLINE_EXCEEDED',
+ 'NOT_FOUND',
+ 'ALREADY_EXISTS',
+ 'PERMISSION_DENIED',
+ 'UNAUTHENTICATED',
+ 'RESOURCE_EXHAUSTED',
+ 'FAILED_PRECONDITION',
+ 'ABORTED',
+ 'OUT_OF_RANGE',
+ 'UNIMPLEMENTED',
+ 'INTERNAL',
+ 'UNAVAILABLE',
+ 'DATA_LOSS'
+];
+
+/**
+ * List of all call error names
+ * @const
+ * @type {Array.<string>}
+ */
+var callErrorNames = [
+ 'OK',
+ 'ERROR',
+ 'NOT_ON_SERVER',
+ 'NOT_ON_CLIENT',
+ 'ALREADY_INVOKED',
+ 'NOT_INVOKED',
+ 'ALREADY_FINISHED',
+ 'TOO_MANY_OPERATIONS',
+ 'INVALID_FLAGS'
+];
+
+/**
+ * List of all op error names
+ * @const
+ * @type {Array.<string>}
+ */
+var opErrorNames = [
+ 'OK',
+ 'ERROR'
+];
+
+/**
+ * List of all completion type names
+ * @const
+ * @type {Array.<string>}
+ */
+var completionTypeNames = [
+ 'QUEUE_SHUTDOWN',
+ 'READ',
+ 'INVOKE_ACCEPTED',
+ 'WRITE_ACCEPTED',
+ 'FINISH_ACCEPTED',
+ 'CLIENT_METADATA_READ',
+ 'FINISHED',
+ 'SERVER_RPC_NEW'
+];
+
+describe('constants', function() {
+ it('should have all of the status constants', function() {
+ for (var i = 0; i < statusNames.length; i++) {
+ assert(grpc.status.hasOwnProperty(statusNames[i]),
+ 'status missing: ' + statusNames[i]);
+ }
+ });
+ it('should have all of the call errors', function() {
+ for (var i = 0; i < callErrorNames.length; i++) {
+ assert(grpc.callError.hasOwnProperty(callErrorNames[i]),
+ 'call error missing: ' + callErrorNames[i]);
+ }
+ });
+ it('should have all of the op errors', function() {
+ for (var i = 0; i < opErrorNames.length; i++) {
+ assert(grpc.opError.hasOwnProperty(opErrorNames[i]),
+ 'op error missing: ' + opErrorNames[i]);
+ }
+ });
+ it('should have all of the completion types', function() {
+ for (var i = 0; i < completionTypeNames.length; i++) {
+ assert(grpc.completionType.hasOwnProperty(completionTypeNames[i]),
+ 'completion type missing: ' + completionTypeNames[i]);
+ }
+ });
+});
diff --git a/src/node/test/constant_test.js~ b/src/node/test/constant_test.js~
new file mode 100644
index 0000000000..8ad9f81bbe
--- /dev/null
+++ b/src/node/test/constant_test.js~
@@ -0,0 +1,25 @@
+var assert = require("assert");
+var grpc = require("../build/Release");
+
+var status_names = [
+ "OK",
+ "CANCELLED",
+ "UNKNOWN",
+ "INVALID_ARGUMENT",
+ "DEADLINE_EXCEEDED",
+ "NOT_FOUND",
+ "ALREADY_EXISTS",
+ "PERMISSION_DENIED",
+ "UNAUTHENTICATED",
+ "RESOURCE_EXHAUSTED",
+ "FAILED_PRECONDITION",
+ "ABORTED",
+ "OUT_OF_RANGE",
+ "UNIMPLEMENTED",
+ "INTERNAL",
+ "UNAVAILABLE",
+ "DATA_LOSS"
+];
+
+describe("constants", function() {
+ it("should have all of the status constants", function() {
diff --git a/src/node/test/data/README b/src/node/test/data/README
new file mode 100644
index 0000000000..888d95b900
--- /dev/null
+++ b/src/node/test/data/README
@@ -0,0 +1 @@
+CONFIRMEDTESTKEY
diff --git a/src/node/test/data/ca.pem b/src/node/test/data/ca.pem
new file mode 100644
index 0000000000..6c8511a73c
--- /dev/null
+++ b/src/node/test/data/ca.pem
@@ -0,0 +1,15 @@
+-----BEGIN CERTIFICATE-----
+MIICSjCCAbOgAwIBAgIJAJHGGR4dGioHMA0GCSqGSIb3DQEBCwUAMFYxCzAJBgNV
+BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
+aWRnaXRzIFB0eSBMdGQxDzANBgNVBAMTBnRlc3RjYTAeFw0xNDExMTEyMjMxMjla
+Fw0yNDExMDgyMjMxMjlaMFYxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0
+YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMT
+BnRlc3RjYTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAwEDfBV5MYdlHVHJ7
++L4nxrZy7mBfAVXpOc5vMYztssUI7mL2/iYujiIXM+weZYNTEpLdjyJdu7R5gGUu
+g1jSVK/EPHfc74O7AyZU34PNIP4Sh33N+/A5YexrNgJlPY+E3GdVYi4ldWJjgkAd
+Qah2PH5ACLrIIC6tRka9hcaBlIECAwEAAaMgMB4wDAYDVR0TBAUwAwEB/zAOBgNV
+HQ8BAf8EBAMCAgQwDQYJKoZIhvcNAQELBQADgYEAHzC7jdYlzAVmddi/gdAeKPau
+sPBG/C2HCWqHzpCUHcKuvMzDVkY/MP2o6JIW2DBbY64bO/FceExhjcykgaYtCH/m
+oIU63+CFOTtR7otyQAWHqXa7q4SbCDlG7DyRFxqG0txPtGvy12lgldA2+RgcigQG
+Dfcog5wrJytaQ6UA0wE=
+-----END CERTIFICATE-----
diff --git a/src/node/test/data/server1.key b/src/node/test/data/server1.key
new file mode 100644
index 0000000000..143a5b8765
--- /dev/null
+++ b/src/node/test/data/server1.key
@@ -0,0 +1,16 @@
+-----BEGIN PRIVATE KEY-----
+MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAOHDFScoLCVJpYDD
+M4HYtIdV6Ake/sMNaaKdODjDMsux/4tDydlumN+fm+AjPEK5GHhGn1BgzkWF+slf
+3BxhrA/8dNsnunstVA7ZBgA/5qQxMfGAq4wHNVX77fBZOgp9VlSMVfyd9N8YwbBY
+AckOeUQadTi2X1S6OgJXgQ0m3MWhAgMBAAECgYAn7qGnM2vbjJNBm0VZCkOkTIWm
+V10okw7EPJrdL2mkre9NasghNXbE1y5zDshx5Nt3KsazKOxTT8d0Jwh/3KbaN+YY
+tTCbKGW0pXDRBhwUHRcuRzScjli8Rih5UOCiZkhefUTcRb6xIhZJuQy71tjaSy0p
+dHZRmYyBYO2YEQ8xoQJBAPrJPhMBkzmEYFtyIEqAxQ/o/A6E+E4w8i+KM7nQCK7q
+K4JXzyXVAjLfyBZWHGM2uro/fjqPggGD6QH1qXCkI4MCQQDmdKeb2TrKRh5BY1LR
+81aJGKcJ2XbcDu6wMZK4oqWbTX2KiYn9GB0woM6nSr/Y6iy1u145YzYxEV/iMwff
+DJULAkB8B2MnyzOg0pNFJqBJuH29bKCcHa8gHJzqXhNO5lAlEbMK95p/P2Wi+4Hd
+aiEIAF1BF326QJcvYKmwSmrORp85AkAlSNxRJ50OWrfMZnBgzVjDx3xG6KsFQVk2
+ol6VhqL6dFgKUORFUWBvnKSyhjJxurlPEahV6oo6+A+mPhFY8eUvAkAZQyTdupP3
+XEFQKctGz+9+gKkemDp7LBBMEMBXrGTLPhpEfcjv/7KPdnFHYmhYeBTBnuVmTVWe
+F98XJ7tIFfJq
+-----END PRIVATE KEY-----
diff --git a/src/node/test/data/server1.pem b/src/node/test/data/server1.pem
new file mode 100644
index 0000000000..8e582e571f
--- /dev/null
+++ b/src/node/test/data/server1.pem
@@ -0,0 +1,16 @@
+-----BEGIN CERTIFICATE-----
+MIICmzCCAgSgAwIBAgIBAzANBgkqhkiG9w0BAQUFADBWMQswCQYDVQQGEwJBVTET
+MBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0cyBQ
+dHkgTHRkMQ8wDQYDVQQDDAZ0ZXN0Y2EwHhcNMTQwNzIyMDYwMDU3WhcNMjQwNzE5
+MDYwMDU3WjBkMQswCQYDVQQGEwJVUzERMA8GA1UECBMISWxsaW5vaXMxEDAOBgNV
+BAcTB0NoaWNhZ28xFDASBgNVBAoTC0dvb2dsZSBJbmMuMRowGAYDVQQDFBEqLnRl
+c3QuZ29vZ2xlLmNvbTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA4cMVJygs
+JUmlgMMzgdi0h1XoCR7+ww1pop04OMMyy7H/i0PJ2W6Y35+b4CM8QrkYeEafUGDO
+RYX6yV/cHGGsD/x02ye6ey1UDtkGAD/mpDEx8YCrjAc1Vfvt8Fk6Cn1WVIxV/J30
+3xjBsFgByQ55RBp1OLZfVLo6AleBDSbcxaECAwEAAaNrMGkwCQYDVR0TBAIwADAL
+BgNVHQ8EBAMCBeAwTwYDVR0RBEgwRoIQKi50ZXN0Lmdvb2dsZS5mcoIYd2F0ZXJ6
+b29pLnRlc3QuZ29vZ2xlLmJlghIqLnRlc3QueW91dHViZS5jb22HBMCoAQMwDQYJ
+KoZIhvcNAQEFBQADgYEAM2Ii0LgTGbJ1j4oqX9bxVcxm+/R5Yf8oi0aZqTJlnLYS
+wXcBykxTx181s7WyfJ49WwrYXo78zTDAnf1ma0fPq3e4mpspvyndLh1a+OarHa1e
+aT0DIIYk7qeEa1YcVljx2KyLd0r1BBAfrwyGaEPVeJQVYWaOJRU2we/KD4ojf9s=
+-----END CERTIFICATE-----
diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js
new file mode 100644
index 0000000000..64b207cb5c
--- /dev/null
+++ b/src/node/test/end_to_end_test.js
@@ -0,0 +1,168 @@
+var assert = require('assert');
+var grpc = require('bindings')('grpc.node');
+var port_picker = require('../port_picker');
+
+/**
+ * This is used for testing functions with multiple asynchronous calls that
+ * can happen in different orders. This should be passed the number of async
+ * function invocations that can occur last, and each of those should call this
+ * function's return value
+ * @param {function()} done The function that should be called when a test is
+ * complete.
+ * @param {number} count The number of calls to the resulting function if the
+ * test passes.
+ * @return {function()} The function that should be called at the end of each
+ * sequence of asynchronous functions.
+ */
+function multiDone(done, count) {
+ return function() {
+ count -= 1;
+ if (count <= 0) {
+ done();
+ }
+ };
+}
+
+describe('end-to-end', function() {
+ it('should start and end a request without error', function(complete) {
+ port_picker.nextAvailablePort(function(port) {
+ var server = new grpc.Server();
+ var done = multiDone(function() {
+ complete();
+ server.shutdown();
+ }, 2);
+ server.addHttp2Port(port);
+ var channel = new grpc.Channel(port);
+ var deadline = new Date();
+ deadline.setSeconds(deadline.getSeconds() + 3);
+ var status_text = 'xyz';
+ var call = new grpc.Call(channel,
+ 'dummy_method',
+ deadline);
+ call.startInvoke(function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.INVOKE_ACCEPTED);
+
+ call.writesDone(function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.FINISH_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ });
+ },function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.CLIENT_METADATA_READ);
+ },function(event) {
+ assert.strictEqual(event.type, grpc.completionType.FINISHED);
+ var status = event.data;
+ assert.strictEqual(status.code, grpc.status.OK);
+ assert.strictEqual(status.details, status_text);
+ done();
+ }, 0);
+
+ server.start();
+ server.requestCall(function(event) {
+ assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW);
+ var server_call = event.call;
+ assert.notEqual(server_call, null);
+ server_call.serverAccept(function(event) {
+ assert.strictEqual(event.type, grpc.completionType.FINISHED);
+ }, 0);
+ server_call.serverEndInitialMetadata(0);
+ server_call.startWriteStatus(
+ grpc.status.OK,
+ status_text,
+ function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.FINISH_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ done();
+ });
+ });
+ });
+ });
+
+ it('should send and receive data without error', function(complete) {
+ port_picker.nextAvailablePort(function(port) {
+ var req_text = 'client_request';
+ var reply_text = 'server_response';
+ var server = new grpc.Server();
+ var done = multiDone(function() {
+ complete();
+ server.shutdown();
+ }, 6);
+ server.addHttp2Port(port);
+ var channel = new grpc.Channel(port);
+ var deadline = new Date();
+ deadline.setSeconds(deadline.getSeconds() + 3);
+ var status_text = 'success';
+ var call = new grpc.Call(channel,
+ 'dummy_method',
+ deadline);
+ call.startInvoke(function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.INVOKE_ACCEPTED);
+ call.startWrite(
+ new Buffer(req_text),
+ function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.WRITE_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ call.writesDone(function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.FINISH_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ done();
+ });
+ }, 0);
+ call.startRead(function(event) {
+ assert.strictEqual(event.type, grpc.completionType.READ);
+ assert.strictEqual(event.data.toString(), reply_text);
+ done();
+ });
+ },function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.CLIENT_METADATA_READ);
+ done();
+ },function(event) {
+ assert.strictEqual(event.type, grpc.completionType.FINISHED);
+ var status = event.data;
+ assert.strictEqual(status.code, grpc.status.OK);
+ assert.strictEqual(status.details, status_text);
+ done();
+ }, 0);
+
+ server.start();
+ server.requestCall(function(event) {
+ assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW);
+ var server_call = event.call;
+ assert.notEqual(server_call, null);
+ server_call.serverAccept(function(event) {
+ assert.strictEqual(event.type, grpc.completionType.FINISHED);
+ done();
+ });
+ server_call.serverEndInitialMetadata(0);
+ server_call.startRead(function(event) {
+ assert.strictEqual(event.type, grpc.completionType.READ);
+ assert.strictEqual(event.data.toString(), req_text);
+ server_call.startWrite(
+ new Buffer(reply_text),
+ function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.WRITE_ACCEPTED);
+ assert.strictEqual(event.data,
+ grpc.opError.OK);
+ server_call.startWriteStatus(
+ grpc.status.OK,
+ status_text,
+ function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.FINISH_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ done();
+ });
+ }, 0);
+ });
+ });
+ });
+ });
+});
diff --git a/src/node/test/end_to_end_test.js~ b/src/node/test/end_to_end_test.js~
new file mode 100644
index 0000000000..74184a287f
--- /dev/null
+++ b/src/node/test/end_to_end_test.js~
@@ -0,0 +1,72 @@
+var assert = require('assert');
+var grpc = require('../build/Release/grpc');
+
+describe('end-to-end', function() {
+ it('should start and end a request without error', function() {
+ var event;
+ var client_queue = new grpc.CompletionQueue();
+ var server_queue = new grpc.CompletionQueue();
+ var server = new grpc.Server(server_queue);
+ server.addHttp2Port('localhost:9000');
+ var channel = new grpc.Channel('localhost:9000');
+ var deadline = Infinity;
+ var status_text = 'xyz';
+ var call = new grpc.Call(channel, 'dummy_method', deadline);
+ var tag = 1;
+ assert.strictEqual(call.startInvoke(client_queue, tag, tag, tag),
+ grpc.callError.OK);
+ var server_tag = 2;
+
+ // the client invocation was accepted
+ event = client_queue.next(deadline);
+ assert.notEqual(event, null);
+ assert.strictEqual(event->getType(), grpc.completionType.INVOKE_ACCEPTED);
+
+ assert.strictEqual(call.writesDone(tag), grpc.callError.CALL_OK);
+ event = client_queue.next(deadline);
+ assert.notEqual(event, null);
+ assert.strictEqual(event.getType(), grpc.completionType.FINISH_ACCEPTED);
+ assert.strictEqual(event.getData(), grpc.opError.OK);
+
+ // check that a server rpc new was recieved
+ assert(server.start());
+ assert.strictEqual(server.requestCall(server_tag, server_tag),
+ grpc.callError.OK);
+ event = server_queue.next(deadline);
+ assert.notEqual(event, null);
+ assert.strictEqual(event.getType(), grpc.completionType.SERVER_RPC_NEW);
+ var server_call = event.getCall();
+ assert.notEqual(server_call, null);
+ assert.strictEqual(server_call.accept(server_queue, server_tag),
+ grpc.callError.OK);
+
+ // the server sends the status
+ assert.strictEqual(server_call.start_write_status(grpc.status.OK,
+ status_text,
+ server_tag),
+ grpc.callError.OK);
+ event = server_queue.next(deadline);
+ assert.notEqual(event, null);
+ assert.strictEqual(event.getType(), grpc.completionType.FINISH_ACCEPTED);
+ assert.strictEqual(event.getData(), grpc.opError.OK);
+
+ // the client gets CLIENT_METADATA_READ
+ event = client_queue.next(deadline);
+ assert.notEqual(event, null);
+ assert.strictEqual(event.getType(),
+ grpc.completionType.CLIENT_METADATA_READ);
+
+ // the client gets FINISHED
+ event = client_queue.next(deadline);
+ assert.notEqual(event, null);
+ assert.strictEqual(event.getType(), grpc.completionType.FINISHED);
+ var status = event.getData();
+ assert.strictEqual(status.code, grpc.status.OK);
+ assert.strictEqual(status.details, status_text);
+
+ // the server gets FINISHED
+ event = client_queue.next(deadline);
+ assert.notEqual(event, null);
+ assert.strictEqual(event.getType(), grpc.completionType.FINISHED);
+ });
+});
diff --git a/src/node/test/math_client_test.js b/src/node/test/math_client_test.js
new file mode 100644
index 0000000000..dd2e183831
--- /dev/null
+++ b/src/node/test/math_client_test.js
@@ -0,0 +1,176 @@
+var assert = require('assert');
+var client = require('../surface_client.js');
+var ProtoBuf = require('protobufjs');
+var port_picker = require('../port_picker');
+
+var builder = ProtoBuf.loadProtoFile(__dirname + '/../examples/math.proto');
+var math = builder.build('math');
+
+/**
+ * Get a function that deserializes a specific type of protobuf.
+ * @param {function()} cls The constructor of the message type to deserialize
+ * @return {function(Buffer):cls} The deserialization function
+ */
+function deserializeCls(cls) {
+ /**
+ * Deserialize a buffer to a message object
+ * @param {Buffer} arg_buf The buffer to deserialize
+ * @return {cls} The resulting object
+ */
+ return function deserialize(arg_buf) {
+ return cls.decode(arg_buf);
+ };
+}
+
+/**
+ * Serialize an object to a buffer
+ * @param {*} arg The object to serialize
+ * @return {Buffer} The serialized object
+ */
+function serialize(arg) {
+ return new Buffer(arg.encode().toBuffer());
+}
+
+/**
+ * Sends a Div request on the channel.
+ * @param {client.Channel} channel The channel on which to make the request
+ * @param {DivArg} argument The argument to the call. Should be serializable
+ * with serialize
+ * @param {function(?Error, value=)} The callback to for when the response is
+ * received
+ * @param {array=} Array of metadata key/value pairs to add to the call
+ * @param {(number|Date)=} deadline The deadline for processing this request.
+ * Defaults to infinite future
+ * @return {EventEmitter} An event emitter for stream related events
+ */
+var div = client.makeUnaryRequestFunction(
+ '/Math/Div',
+ serialize,
+ deserializeCls(math.DivReply));
+
+/**
+ * Sends a Fib request on the channel.
+ * @param {client.Channel} channel The channel on which to make the request
+ * @param {*} argument The argument to the call. Should be serializable with
+ * serialize
+ * @param {array=} Array of metadata key/value pairs to add to the call
+ * @param {(number|Date)=} deadline The deadline for processing this request.
+ * Defaults to infinite future
+ * @return {EventEmitter} An event emitter for stream related events
+ */
+var fib = client.makeServerStreamRequestFunction(
+ '/Math/Fib',
+ serialize,
+ deserializeCls(math.Num));
+
+/**
+ * Sends a Sum request on the channel.
+ * @param {client.Channel} channel The channel on which to make the request
+ * @param {function(?Error, value=)} The callback to for when the response is
+ * received
+ * @param {array=} Array of metadata key/value pairs to add to the call
+ * @param {(number|Date)=} deadline The deadline for processing this request.
+ * Defaults to infinite future
+ * @return {EventEmitter} An event emitter for stream related events
+ */
+var sum = client.makeClientStreamRequestFunction(
+ '/Math/Sum',
+ serialize,
+ deserializeCls(math.Num));
+
+/**
+ * Sends a DivMany request on the channel.
+ * @param {client.Channel} channel The channel on which to make the request
+ * @param {array=} Array of metadata key/value pairs to add to the call
+ * @param {(number|Date)=} deadline The deadline for processing this request.
+ * Defaults to infinite future
+ * @return {EventEmitter} An event emitter for stream related events
+ */
+var divMany = client.makeBidiStreamRequestFunction(
+ '/Math/DivMany',
+ serialize,
+ deserializeCls(math.DivReply));
+
+/**
+ * Channel to use to make requests to a running server.
+ */
+var channel;
+
+/**
+ * Server to test against
+ */
+var server = require('../examples/math_server.js');
+
+
+describe('Math client', function() {
+ before(function(done) {
+ port_picker.nextAvailablePort(function(port) {
+ server.bind(port).listen();
+ channel = new client.Channel(port);
+ done();
+ });
+ });
+ after(function() {
+ server.shutdown();
+ });
+ it('should handle a single request', function(done) {
+ var arg = new math.DivArgs({dividend: 7, divisor: 4});
+ var call = div(channel, arg, function handleDivResult(err, value) {
+ assert.ifError(err);
+ assert.equal(value.get('quotient'), 1);
+ assert.equal(value.get('remainder'), 3);
+ });
+ call.on('status', function checkStatus(status) {
+ assert.strictEqual(status.code, client.status.OK);
+ done();
+ });
+ });
+ it('should handle a server streaming request', function(done) {
+ var arg = new math.FibArgs({limit: 7});
+ var call = fib(channel, arg);
+ var expected_results = [1, 1, 2, 3, 5, 8, 13];
+ var next_expected = 0;
+ call.on('data', function checkResponse(value) {
+ assert.equal(value.get('num'), expected_results[next_expected]);
+ next_expected += 1;
+ });
+ call.on('status', function checkStatus(status) {
+ assert.strictEqual(status.code, client.status.OK);
+ done();
+ });
+ });
+ it('should handle a client streaming request', function(done) {
+ var call = sum(channel, function handleSumResult(err, value) {
+ assert.ifError(err);
+ assert.equal(value.get('num'), 21);
+ });
+ for (var i = 0; i < 7; i++) {
+ call.write(new math.Num({'num': i}));
+ }
+ call.end();
+ call.on('status', function checkStatus(status) {
+ assert.strictEqual(status.code, client.status.OK);
+ done();
+ });
+ });
+ it('should handle a bidirectional streaming request', function(done) {
+ function checkResponse(index, value) {
+ assert.equal(value.get('quotient'), index);
+ assert.equal(value.get('remainder'), 1);
+ }
+ var call = divMany(channel);
+ var response_index = 0;
+ call.on('data', function(value) {
+ checkResponse(response_index, value);
+ response_index += 1;
+ });
+ for (var i = 0; i < 7; i++) {
+ call.write(new math.DivArgs({dividend: 2 * i + 1, divisor: 2}));
+ }
+ call.end();
+ call.on('status', function checkStatus(status) {
+ assert.strictEqual(status.code, client.status.OK);
+ done();
+ });
+ });
+});
diff --git a/src/node/test/math_client_test.js~ b/src/node/test/math_client_test.js~
new file mode 100644
index 0000000000..eaa424ad7f
--- /dev/null
+++ b/src/node/test/math_client_test.js~
@@ -0,0 +1,87 @@
+var client = require('../surface_client.js');
+
+var builder = ProtoBuf.loadProtoFile(__dirname + '/../examples/math.proto');
+var math = builder.build('math');
+
+/**
+ * Get a function that deserializes a specific type of protobuf.
+ * @param {function()} cls The constructor of the message type to deserialize
+ * @return {function(Buffer):cls} The deserialization function
+ */
+function deserializeCls(cls) {
+ /**
+ * Deserialize a buffer to a message object
+ * @param {Buffer} arg_buf The buffer to deserialize
+ * @return {cls} The resulting object
+ */
+ return function deserialize(arg_buf) {
+ return cls.decode(arg_buf);
+ };
+}
+
+/**
+ * Serialize an object to a buffer
+ * @param {*} arg The object to serialize
+ * @return {Buffer} The serialized object
+ */
+function serialize(arg) {
+ return new Buffer(arg.encode.toBuffer());
+}
+
+/**
+ * Sends a Div request on the channel.
+ * @param {client.Channel} channel The channel on which to make the request
+ * @param {*} argument The argument to the call. Should be serializable with
+ * serialize
+ * @param {function(?Error, value=)} The callback to for when the response is
+ * received
+ * @param {array=} Array of metadata key/value pairs to add to the call
+ * @param {(number|Date)=} deadline The deadline for processing this request.
+ * Defaults to infinite future
+ * @return {EventEmitter} An event emitter for stream related events
+ */
+var div = client.makeUnaryRequestFunction('/Math/Div',
+ serialize,
+ deserialize(math.DivReply));
+
+/**
+ * Sends a Fib request on the channel.
+ * @param {client.Channel} channel The channel on which to make the request
+ * @param {*} argument The argument to the call. Should be serializable with
+ * serialize
+ * @param {array=} Array of metadata key/value pairs to add to the call
+ * @param {(number|Date)=} deadline The deadline for processing this request.
+ * Defaults to infinite future
+ * @return {EventEmitter} An event emitter for stream related events
+ */
+var fib = client.makeServerStreamRequestFunction('/Math/Fib',
+ serialize,
+ deserialize(math.Num));
+
+/**
+ * Sends a Sum request on the channel.
+ * @param {client.Channel} channel The channel on which to make the request
+ * @param {function(?Error, value=)} The callback to for when the response is
+ * received
+ * @param {array=} Array of metadata key/value pairs to add to the call
+ * @param {(number|Date)=} deadline The deadline for processing this request.
+ * Defaults to infinite future
+ * @return {EventEmitter} An event emitter for stream related events
+ */
+var sum = client.makeClientStreamRequestFunction('/Math/Sum',
+ serialize,
+ deserialize(math.Num));
+
+/**
+ * Sends a DivMany request on the channel.
+ * @param {client.Channel} channel The channel on which to make the request
+ * @param {array=} Array of metadata key/value pairs to add to the call
+ * @param {(number|Date)=} deadline The deadline for processing this request.
+ * Defaults to infinite future
+ * @return {EventEmitter} An event emitter for stream related events
+ */
+var divMany = client.makeBidiStreamRequestFunction('/Math/DivMany',
+ serialize,
+ deserialize(math.DivReply));
+
+var channel = new client.Channel('localhost:7070');
diff --git a/src/node/test/server_test.js b/src/node/test/server_test.js
new file mode 100644
index 0000000000..4dd972c3ff
--- /dev/null
+++ b/src/node/test/server_test.js
@@ -0,0 +1,88 @@
+var assert = require('assert');
+var grpc = require('bindings')('grpc.node');
+var Server = require('../server');
+var port_picker = require('../port_picker');
+
+/**
+ * This is used for testing functions with multiple asynchronous calls that
+ * can happen in different orders. This should be passed the number of async
+ * function invocations that can occur last, and each of those should call this
+ * function's return value
+ * @param {function()} done The function that should be called when a test is
+ * complete.
+ * @param {number} count The number of calls to the resulting function if the
+ * test passes.
+ * @return {function()} The function that should be called at the end of each
+ * sequence of asynchronous functions.
+ */
+function multiDone(done, count) {
+ return function() {
+ count -= 1;
+ if (count <= 0) {
+ done();
+ }
+ };
+}
+
+/**
+ * Responds to every request with the same data as a response
+ * @param {Stream} stream
+ */
+function echoHandler(stream) {
+ stream.pipe(stream);
+}
+
+describe('echo server', function() {
+ it('should echo inputs as responses', function(done) {
+ done = multiDone(done, 4);
+ port_picker.nextAvailablePort(function(port) {
+ var server = new Server();
+ server.bind(port);
+ server.register('echo', echoHandler);
+ server.start();
+
+ var req_text = 'echo test string';
+ var status_text = 'OK';
+
+ var channel = new grpc.Channel(port);
+ var deadline = new Date();
+ deadline.setSeconds(deadline.getSeconds() + 3);
+ var call = new grpc.Call(channel,
+ 'echo',
+ deadline);
+ call.startInvoke(function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.INVOKE_ACCEPTED);
+ call.startWrite(
+ new Buffer(req_text),
+ function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.WRITE_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ call.writesDone(function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.FINISH_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ done();
+ });
+ }, 0);
+ call.startRead(function(event) {
+ assert.strictEqual(event.type, grpc.completionType.READ);
+ assert.strictEqual(event.data.toString(), req_text);
+ done();
+ });
+ },function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.CLIENT_METADATA_READ);
+ done();
+ },function(event) {
+ assert.strictEqual(event.type, grpc.completionType.FINISHED);
+ var status = event.data;
+ assert.strictEqual(status.code, grpc.status.OK);
+ assert.strictEqual(status.details, status_text);
+ server.shutdown();
+ done();
+ }, 0);
+ });
+ });
+});
diff --git a/src/node/test/server_test.js~ b/src/node/test/server_test.js~
new file mode 100644
index 0000000000..3613e08232
--- /dev/null
+++ b/src/node/test/server_test.js~
@@ -0,0 +1,22 @@
+var assert = require('assert');
+var grpc = require('./build/Debug/grpc');
+var Server = require('server');
+
+function echoHandler(arg_iter) {
+ return {
+ 'next' : function(write) {
+ arg_iter.next(function(err, value) {
+ write(err, value);
+ });
+ }
+ }
+}
+
+describe('echo server', function() {
+ it('should echo inputs as responses', function(done) {
+ var server = new Server('localhost:5000');
+ server.register('echo', echoHandler);
+ server.start();
+
+ });
+}); \ No newline at end of file
diff --git a/src/node/timeval.cc b/src/node/timeval.cc
new file mode 100644
index 0000000000..503ebe6f54
--- /dev/null
+++ b/src/node/timeval.cc
@@ -0,0 +1,33 @@
+#include <limits>
+
+#include "grpc/grpc.h"
+#include "grpc/support/time.h"
+#include "timeval.h"
+
+namespace grpc {
+namespace node {
+
+gpr_timespec MillisecondsToTimespec(double millis) {
+ if (millis == std::numeric_limits<double>::infinity()) {
+ return gpr_inf_future;
+ } else if (millis == -std::numeric_limits<double>::infinity()) {
+ return gpr_inf_past;
+ } else {
+ return gpr_time_from_micros(static_cast<int64_t>(millis*1000));
+ }
+}
+
+double TimespecToMilliseconds(gpr_timespec timespec) {
+ if (gpr_time_cmp(timespec, gpr_inf_future) == 0) {
+ return std::numeric_limits<double>::infinity();
+ } else if (gpr_time_cmp(timespec, gpr_inf_past) == 0) {
+ return -std::numeric_limits<double>::infinity();
+ } else {
+ struct timeval time = gpr_timeval_from_timespec(timespec);
+ return (static_cast<double>(time.tv_sec) * 1000 +
+ static_cast<double>(time.tv_usec) / 1000);
+ }
+}
+
+} // namespace node
+} // namespace grpc
diff --git a/src/node/timeval.h b/src/node/timeval.h
new file mode 100644
index 0000000000..a350a09b00
--- /dev/null
+++ b/src/node/timeval.h
@@ -0,0 +1,15 @@
+#ifndef NET_GRPC_NODE_TIMEVAL_H_
+#define NET_GRPC_NODE_TIMEVAL_H_
+
+#include "grpc/support/time.h"
+
+namespace grpc {
+namespace node {
+
+double TimespecToMilliseconds(gpr_timespec time);
+gpr_timespec MillisecondsToTimespec(double millis);
+
+} // namespace node
+} // namespace grpc
+
+#endif // NET_GRPC_NODE_TIMEVAL_H_