From e5061519185627e58495bede780f757339ba07c5 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 12 Jan 2015 18:14:35 -0800 Subject: Clean commit of Node.js library source --- src/node/README.md | 12 + src/node/binding.gyp | 46 ++++ src/node/byte_buffer.cc | 46 ++++ src/node/byte_buffer.h | 23 ++ src/node/call.cc | 384 ++++++++++++++++++++++++++++++ src/node/call.h | 49 ++++ src/node/channel.cc | 155 ++++++++++++ src/node/channel.h | 46 ++++ src/node/client.js | 176 ++++++++++++++ src/node/common.js | 29 +++ src/node/completion_queue_async_worker.cc | 62 +++++ src/node/completion_queue_async_worker.h | 46 ++++ src/node/credentials.cc | 180 ++++++++++++++ src/node/credentials.h | 48 ++++ src/node/event.cc | 132 ++++++++++ src/node/event.h | 15 ++ src/node/examples/math.proto | 25 ++ src/node/examples/math_server.js | 168 +++++++++++++ src/node/node_grpc.cc | 151 ++++++++++++ src/node/package.json | 18 ++ src/node/port_picker.js | 19 ++ src/node/server.cc | 212 +++++++++++++++++ src/node/server.h | 46 ++++ src/node/server.js | 228 ++++++++++++++++++ src/node/server_credentials.cc | 131 ++++++++++ src/node/server_credentials.h | 44 ++++ src/node/surface_client.js | 306 ++++++++++++++++++++++++ src/node/surface_server.js | 325 +++++++++++++++++++++++++ src/node/tag.cc | 71 ++++++ src/node/tag.h | 26 ++ src/node/test/byte_buffer_test.js~ | 35 +++ src/node/test/call_test.js | 169 +++++++++++++ src/node/test/call_test.js~ | 6 + src/node/test/channel_test.js | 55 +++++ src/node/test/client_server_test.js | 150 ++++++++++++ src/node/test/client_server_test.js~ | 59 +++++ src/node/test/completion_queue_test.js~ | 30 +++ src/node/test/constant_test.js | 97 ++++++++ src/node/test/constant_test.js~ | 25 ++ src/node/test/data/README | 1 + src/node/test/data/ca.pem | 15 ++ src/node/test/data/server1.key | 16 ++ src/node/test/data/server1.pem | 16 ++ src/node/test/end_to_end_test.js | 168 +++++++++++++ src/node/test/end_to_end_test.js~ | 72 ++++++ src/node/test/math_client_test.js | 176 ++++++++++++++ src/node/test/math_client_test.js~ | 87 +++++++ src/node/test/server_test.js | 88 +++++++ src/node/test/server_test.js~ | 22 ++ src/node/timeval.cc | 33 +++ src/node/timeval.h | 15 ++ 51 files changed, 4554 insertions(+) create mode 100644 src/node/README.md create mode 100644 src/node/binding.gyp create mode 100644 src/node/byte_buffer.cc create mode 100644 src/node/byte_buffer.h create mode 100644 src/node/call.cc create mode 100644 src/node/call.h create mode 100644 src/node/channel.cc create mode 100644 src/node/channel.h create mode 100644 src/node/client.js create mode 100644 src/node/common.js create mode 100644 src/node/completion_queue_async_worker.cc create mode 100644 src/node/completion_queue_async_worker.h create mode 100644 src/node/credentials.cc create mode 100644 src/node/credentials.h create mode 100644 src/node/event.cc create mode 100644 src/node/event.h create mode 100644 src/node/examples/math.proto create mode 100644 src/node/examples/math_server.js create mode 100644 src/node/node_grpc.cc create mode 100644 src/node/package.json create mode 100644 src/node/port_picker.js create mode 100644 src/node/server.cc create mode 100644 src/node/server.h create mode 100644 src/node/server.js create mode 100644 src/node/server_credentials.cc create mode 100644 src/node/server_credentials.h create mode 100644 src/node/surface_client.js create mode 100644 src/node/surface_server.js create mode 100644 src/node/tag.cc create mode 100644 src/node/tag.h create mode 100644 src/node/test/byte_buffer_test.js~ create mode 100644 src/node/test/call_test.js create mode 100644 src/node/test/call_test.js~ create mode 100644 src/node/test/channel_test.js create mode 100644 src/node/test/client_server_test.js create mode 100644 src/node/test/client_server_test.js~ create mode 100644 src/node/test/completion_queue_test.js~ create mode 100644 src/node/test/constant_test.js create mode 100644 src/node/test/constant_test.js~ create mode 100644 src/node/test/data/README create mode 100644 src/node/test/data/ca.pem create mode 100644 src/node/test/data/server1.key create mode 100644 src/node/test/data/server1.pem create mode 100644 src/node/test/end_to_end_test.js create mode 100644 src/node/test/end_to_end_test.js~ create mode 100644 src/node/test/math_client_test.js create mode 100644 src/node/test/math_client_test.js~ create mode 100644 src/node/test/server_test.js create mode 100644 src/node/test/server_test.js~ create mode 100644 src/node/timeval.cc create mode 100644 src/node/timeval.h (limited to 'src/node') diff --git a/src/node/README.md b/src/node/README.md new file mode 100644 index 0000000000..55329d8cb2 --- /dev/null +++ b/src/node/README.md @@ -0,0 +1,12 @@ +# Node.js GRPC extension + +The package is built with + + node-gyp configure + node-gyp build + +or, for brevity + + node-gyp configure build + +The tests can be run with `npm test` on a dev install. \ No newline at end of file diff --git a/src/node/binding.gyp b/src/node/binding.gyp new file mode 100644 index 0000000000..4a1fd7aaf0 --- /dev/null +++ b/src/node/binding.gyp @@ -0,0 +1,46 @@ +{ + "targets" : [ + { + 'include_dirs': [ + " +#include + +#include +#include +#include "grpc/grpc.h" +#include "grpc/support/slice.h" + +namespace grpc { +namespace node { + +#include "byte_buffer.h" + +using ::node::Buffer; +using v8::Handle; +using v8::Value; + +grpc_byte_buffer *BufferToByteBuffer(Handle buffer) { + NanScope(); + int length = Buffer::Length(buffer); + char *data = Buffer::Data(buffer); + gpr_slice slice = gpr_slice_malloc(length); + memcpy(GPR_SLICE_START_PTR(slice), data, length); + grpc_byte_buffer *byte_buffer(grpc_byte_buffer_create(&slice, 1)); + gpr_slice_unref(slice); + return byte_buffer; +} + +Handle ByteBufferToBuffer(grpc_byte_buffer *buffer) { + NanEscapableScope(); + if (buffer == NULL) { + NanReturnNull(); + } + size_t length = grpc_byte_buffer_length(buffer); + char *result = reinterpret_cast(calloc(length, sizeof(char))); + size_t offset = 0; + grpc_byte_buffer_reader *reader = grpc_byte_buffer_reader_create(buffer); + gpr_slice next; + while (grpc_byte_buffer_reader_next(reader, &next) != 0) { + memcpy(result+offset, GPR_SLICE_START_PTR(next), GPR_SLICE_LENGTH(next)); + offset += GPR_SLICE_LENGTH(next); + } + return NanEscapeScope(NanNewBufferHandle(result, length)); +} +} // namespace node +} // namespace grpc diff --git a/src/node/byte_buffer.h b/src/node/byte_buffer.h new file mode 100644 index 0000000000..b439de15b1 --- /dev/null +++ b/src/node/byte_buffer.h @@ -0,0 +1,23 @@ +#ifndef NET_GRPC_NODE_BYTE_BUFFER_H_ +#define NET_GRPC_NODE_BYTE_BUFFER_H_ + +#include + +#include +#include +#include "grpc/grpc.h" + +namespace grpc { +namespace node { + +/* Convert a Node.js Buffer to grpc_byte_buffer. Requires that + ::node::Buffer::HasInstance(buffer) */ +grpc_byte_buffer *BufferToByteBuffer(v8::Handle buffer); + +/* Convert a grpc_byte_buffer to a Node.js Buffer */ +v8::Handle ByteBufferToBuffer(grpc_byte_buffer *buffer); + +} // namespace node +} // namespace grpc + +#endif // NET_GRPC_NODE_BYTE_BUFFER_H_ diff --git a/src/node/call.cc b/src/node/call.cc new file mode 100644 index 0000000000..c6685101ca --- /dev/null +++ b/src/node/call.cc @@ -0,0 +1,384 @@ +#include + +#include "grpc/grpc.h" +#include "grpc/support/time.h" +#include "byte_buffer.h" +#include "call.h" +#include "channel.h" +#include "completion_queue_async_worker.h" +#include "timeval.h" +#include "tag.h" + +namespace grpc { +namespace node { + +using ::node::Buffer; +using v8::Arguments; +using v8::Array; +using v8::Exception; +using v8::External; +using v8::Function; +using v8::FunctionTemplate; +using v8::Handle; +using v8::HandleScope; +using v8::Integer; +using v8::Local; +using v8::Number; +using v8::Object; +using v8::ObjectTemplate; +using v8::Persistent; +using v8::Uint32; +using v8::String; +using v8::Value; + +Persistent Call::constructor; +Persistent Call::fun_tpl; + +Call::Call(grpc_call *call) : wrapped_call(call) { +} + +Call::~Call() { + grpc_call_destroy(wrapped_call); +} + +void Call::Init(Handle exports) { + NanScope(); + Local tpl = FunctionTemplate::New(New); + tpl->SetClassName(NanNew("Call")); + tpl->InstanceTemplate()->SetInternalFieldCount(1); + NanSetPrototypeTemplate(tpl, "addMetadata", + FunctionTemplate::New(AddMetadata)->GetFunction()); + NanSetPrototypeTemplate(tpl, "startInvoke", + FunctionTemplate::New(StartInvoke)->GetFunction()); + NanSetPrototypeTemplate(tpl, "serverAccept", + FunctionTemplate::New(ServerAccept)->GetFunction()); + NanSetPrototypeTemplate( + tpl, + "serverEndInitialMetadata", + FunctionTemplate::New(ServerEndInitialMetadata)->GetFunction()); + NanSetPrototypeTemplate(tpl, "cancel", + FunctionTemplate::New(Cancel)->GetFunction()); + NanSetPrototypeTemplate(tpl, "startWrite", + FunctionTemplate::New(StartWrite)->GetFunction()); + NanSetPrototypeTemplate( + tpl, "startWriteStatus", + FunctionTemplate::New(StartWriteStatus)->GetFunction()); + NanSetPrototypeTemplate(tpl, "writesDone", + FunctionTemplate::New(WritesDone)->GetFunction()); + NanSetPrototypeTemplate(tpl, "startReadMetadata", + FunctionTemplate::New(WritesDone)->GetFunction()); + NanSetPrototypeTemplate(tpl, "startRead", + FunctionTemplate::New(StartRead)->GetFunction()); + NanAssignPersistent(fun_tpl, tpl); + NanAssignPersistent(constructor, tpl->GetFunction()); + constructor->Set(NanNew("WRITE_BUFFER_HINT"), + NanNew(GRPC_WRITE_BUFFER_HINT)); + constructor->Set(NanNew("WRITE_NO_COMPRESS"), + NanNew(GRPC_WRITE_NO_COMPRESS)); + exports->Set(String::NewSymbol("Call"), constructor); +} + +bool Call::HasInstance(Handle val) { + NanScope(); + return NanHasInstance(fun_tpl, val); +} + +Handle Call::WrapStruct(grpc_call *call) { + NanEscapableScope(); + if (call == NULL) { + return NanEscapeScope(NanNull()); + } + const int argc = 1; + Handle argv[argc] = { External::New(reinterpret_cast(call)) }; + return NanEscapeScope(constructor->NewInstance(argc, argv)); +} + +NAN_METHOD(Call::New) { + NanScope(); + + if (args.IsConstructCall()) { + Call *call; + if (args[0]->IsExternal()) { + // This option is used for wrapping an existing call + grpc_call *call_value = reinterpret_cast( + External::Unwrap(args[0])); + call = new Call(call_value); + } else { + if (!Channel::HasInstance(args[0])) { + return NanThrowTypeError("Call's first argument must be a Channel"); + } + if (!args[1]->IsString()) { + return NanThrowTypeError("Call's second argument must be a string"); + } + if (!(args[2]->IsNumber() || args[2]->IsDate())) { + return NanThrowTypeError( + "Call's third argument must be a date or a number"); + } + Handle channel_object = args[0]->ToObject(); + Channel *channel = ObjectWrap::Unwrap(channel_object); + if (channel->GetWrappedChannel() == NULL) { + return NanThrowError("Call cannot be created from a closed channel"); + } + NanUtf8String method(args[1]); + double deadline = args[2]->NumberValue(); + grpc_channel *wrapped_channel = channel->GetWrappedChannel(); + grpc_call *wrapped_call = grpc_channel_create_call( + wrapped_channel, + *method, + channel->GetHost(), + MillisecondsToTimespec(deadline)); + call = new Call(wrapped_call); + args.This()->SetHiddenValue(String::NewSymbol("channel_"), + channel_object); + } + call->Wrap(args.This()); + NanReturnValue(args.This()); + } else { + const int argc = 4; + Local argv[argc] = { args[0], args[1], args[2], args[3] }; + NanReturnValue(constructor->NewInstance(argc, argv)); + } +} + +NAN_METHOD(Call::AddMetadata) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError( + "addMetadata can only be called on Call objects"); + } + Call *call = ObjectWrap::Unwrap(args.This()); + for (int i=0; !args[i]->IsUndefined(); i++) { + if (!args[i]->IsObject()) { + return NanThrowTypeError( + "addMetadata arguments must be objects with key and value"); + } + Handle item = args[i]->ToObject(); + Handle key = item->Get(NanNew("key")); + if (!key->IsString()) { + return NanThrowTypeError( + "objects passed to addMetadata must have key->string"); + } + Handle value = item->Get(NanNew("value")); + if (!Buffer::HasInstance(value)) { + return NanThrowTypeError( + "objects passed to addMetadata must have value->Buffer"); + } + grpc_metadata metadata; + NanUtf8String utf8_key(key); + metadata.key = *utf8_key; + metadata.value = Buffer::Data(value); + metadata.value_length = Buffer::Length(value); + grpc_call_error error = grpc_call_add_metadata(call->wrapped_call, + &metadata, + 0); + if (error != GRPC_CALL_OK) { + return NanThrowError("addMetadata failed", error); + } + } + NanReturnUndefined(); +} + +NAN_METHOD(Call::StartInvoke) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError("startInvoke can only be called on Call objects"); + } + if (!args[0]->IsFunction()) { + return NanThrowTypeError( + "StartInvoke's first argument must be a function"); + } + if (!args[1]->IsFunction()) { + return NanThrowTypeError( + "StartInvoke's second argument must be a function"); + } + if (!args[2]->IsFunction()) { + return NanThrowTypeError( + "StartInvoke's third argument must be a function"); + } + if (!args[3]->IsUint32()) { + return NanThrowTypeError( + "StartInvoke's fourth argument must be integer flags"); + } + Call *call = ObjectWrap::Unwrap(args.This()); + unsigned int flags = args[3]->Uint32Value(); + grpc_call_error error = grpc_call_start_invoke( + call->wrapped_call, + CompletionQueueAsyncWorker::GetQueue(), + CreateTag(args[0], args.This()), + CreateTag(args[1], args.This()), + CreateTag(args[2], args.This()), + flags); + if (error == GRPC_CALL_OK) { + CompletionQueueAsyncWorker::Next(); + CompletionQueueAsyncWorker::Next(); + CompletionQueueAsyncWorker::Next(); + } else { + return NanThrowError("startInvoke failed", error); + } + NanReturnUndefined(); +} + +NAN_METHOD(Call::ServerAccept) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError("accept can only be called on Call objects"); + } + if (!args[0]->IsFunction()) { + return NanThrowTypeError( + "accept's first argument must be a function"); + } + Call *call = ObjectWrap::Unwrap(args.This()); + grpc_call_error error = grpc_call_server_accept( + call->wrapped_call, + CompletionQueueAsyncWorker::GetQueue(), + CreateTag(args[0], args.This())); + if (error == GRPC_CALL_OK) { + CompletionQueueAsyncWorker::Next(); + } else { + return NanThrowError("serverAccept failed", error); + } + NanReturnUndefined(); +} + +NAN_METHOD(Call::ServerEndInitialMetadata) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError( + "serverEndInitialMetadata can only be called on Call objects"); + } + if (!args[0]->IsUint32()) { + return NanThrowTypeError( + "serverEndInitialMetadata's second argument must be integer flags"); + } + Call *call = ObjectWrap::Unwrap(args.This()); + unsigned int flags = args[1]->Uint32Value(); + grpc_call_error error = grpc_call_server_end_initial_metadata( + call->wrapped_call, + flags); + if (error != GRPC_CALL_OK) { + return NanThrowError("serverEndInitialMetadata failed", error); + } + NanReturnUndefined(); +} + +NAN_METHOD(Call::Cancel) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError("startInvoke can only be called on Call objects"); + } + Call *call = ObjectWrap::Unwrap(args.This()); + grpc_call_error error = grpc_call_cancel(call->wrapped_call); + if (error != GRPC_CALL_OK) { + return NanThrowError("cancel failed", error); + } + NanReturnUndefined(); +} + +NAN_METHOD(Call::StartWrite) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError("startWrite can only be called on Call objects"); + } + if (!Buffer::HasInstance(args[0])) { + return NanThrowTypeError( + "startWrite's first argument must be a Buffer"); + } + if (!args[1]->IsFunction()) { + return NanThrowTypeError( + "startWrite's second argument must be a function"); + } + if (!args[2]->IsUint32()) { + return NanThrowTypeError( + "startWrite's third argument must be integer flags"); + } + Call *call = ObjectWrap::Unwrap(args.This()); + grpc_byte_buffer *buffer = BufferToByteBuffer(args[0]); + unsigned int flags = args[2]->Uint32Value(); + grpc_call_error error = grpc_call_start_write(call->wrapped_call, + buffer, + CreateTag(args[1], args.This()), + flags); + if (error == GRPC_CALL_OK) { + CompletionQueueAsyncWorker::Next(); + } else { + return NanThrowError("startWrite failed", error); + } + NanReturnUndefined(); +} + +NAN_METHOD(Call::StartWriteStatus) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError( + "startWriteStatus can only be called on Call objects"); + } + if (!args[0]->IsUint32()) { + return NanThrowTypeError( + "startWriteStatus's first argument must be a status code"); + } + if (!args[1]->IsString()) { + return NanThrowTypeError( + "startWriteStatus's second argument must be a string"); + } + if (!args[2]->IsFunction()) { + return NanThrowTypeError( + "startWriteStatus's third argument must be a function"); + } + Call *call = ObjectWrap::Unwrap(args.This()); + NanUtf8String details(args[1]); + grpc_call_error error = grpc_call_start_write_status( + call->wrapped_call, + (grpc_status_code)args[0]->Uint32Value(), + *details, + CreateTag(args[2], args.This())); + if (error == GRPC_CALL_OK) { + CompletionQueueAsyncWorker::Next(); + } else { + return NanThrowError("startWriteStatus failed", error); + } + NanReturnUndefined(); +} + +NAN_METHOD(Call::WritesDone) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError("writesDone can only be called on Call objects"); + } + if (!args[0]->IsFunction()) { + return NanThrowTypeError( + "writesDone's first argument must be a function"); + } + Call *call = ObjectWrap::Unwrap(args.This()); + grpc_call_error error = grpc_call_writes_done( + call->wrapped_call, + CreateTag(args[0], args.This())); + if (error == GRPC_CALL_OK) { + CompletionQueueAsyncWorker::Next(); + } else { + return NanThrowError("writesDone failed", error); + } + NanReturnUndefined(); +} + +NAN_METHOD(Call::StartRead) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError("startRead can only be called on Call objects"); + } + if (!args[0]->IsFunction()) { + return NanThrowTypeError( + "startRead's first argument must be a function"); + } + Call *call = ObjectWrap::Unwrap(args.This()); + grpc_call_error error = grpc_call_start_read(call->wrapped_call, + CreateTag(args[0], args.This())); + if (error == GRPC_CALL_OK) { + CompletionQueueAsyncWorker::Next(); + } else { + return NanThrowError("startRead failed", error); + } + NanReturnUndefined(); +} + +} // namespace node +} // namespace grpc diff --git a/src/node/call.h b/src/node/call.h new file mode 100644 index 0000000000..1bf8e32aab --- /dev/null +++ b/src/node/call.h @@ -0,0 +1,49 @@ +#ifndef NET_GRPC_NODE_CALL_H_ +#define NET_GRPC_NODE_CALL_H_ + +#include +#include +#include "grpc/grpc.h" + +#include "channel.h" + +namespace grpc { +namespace node { + +/* Wrapper class for grpc_call structs. */ +class Call : public ::node::ObjectWrap { + public: + static void Init(v8::Handle exports); + static bool HasInstance(v8::Handle val); + /* Wrap a grpc_call struct in a javascript object */ + static v8::Handle WrapStruct(grpc_call *call); + + private: + explicit Call(grpc_call *call); + ~Call(); + + // Prevent copying + Call(const Call&); + Call& operator=(const Call&); + + static NAN_METHOD(New); + static NAN_METHOD(AddMetadata); + static NAN_METHOD(StartInvoke); + static NAN_METHOD(ServerAccept); + static NAN_METHOD(ServerEndInitialMetadata); + static NAN_METHOD(Cancel); + static NAN_METHOD(StartWrite); + static NAN_METHOD(StartWriteStatus); + static NAN_METHOD(WritesDone); + static NAN_METHOD(StartRead); + static v8::Persistent constructor; + // Used for typechecking instances of this javascript class + static v8::Persistent fun_tpl; + + grpc_call *wrapped_call; +}; + +} // namespace node +} // namespace grpc + +#endif // NET_GRPC_NODE_CALL_H_ diff --git a/src/node/channel.cc b/src/node/channel.cc new file mode 100644 index 0000000000..222fb3b4e0 --- /dev/null +++ b/src/node/channel.cc @@ -0,0 +1,155 @@ +#include + +#include + +#include +#include +#include "grpc/grpc.h" +#include "grpc/grpc_security.h" +#include "channel.h" +#include "credentials.h" + +namespace grpc { +namespace node { + +using v8::Arguments; +using v8::Array; +using v8::Exception; +using v8::Function; +using v8::FunctionTemplate; +using v8::Handle; +using v8::HandleScope; +using v8::Integer; +using v8::Local; +using v8::Object; +using v8::Persistent; +using v8::String; +using v8::Value; + +Persistent Channel::constructor; +Persistent Channel::fun_tpl; + +Channel::Channel(grpc_channel *channel, NanUtf8String *host) + : wrapped_channel(channel), host(host) { +} + +Channel::~Channel() { + if (wrapped_channel != NULL) { + grpc_channel_destroy(wrapped_channel); + } + delete host; +} + +void Channel::Init(Handle exports) { + NanScope(); + Local tpl = FunctionTemplate::New(New); + tpl->SetClassName(NanNew("Channel")); + tpl->InstanceTemplate()->SetInternalFieldCount(1); + NanSetPrototypeTemplate(tpl, "close", + FunctionTemplate::New(Close)->GetFunction()); + NanAssignPersistent(fun_tpl, tpl); + NanAssignPersistent(constructor, tpl->GetFunction()); + exports->Set(NanNew("Channel"), constructor); +} + +bool Channel::HasInstance(Handle val) { + NanScope(); + return NanHasInstance(fun_tpl, val); +} + +grpc_channel *Channel::GetWrappedChannel() { + return this->wrapped_channel; +} + +char *Channel::GetHost() { + return **this->host; +} + +NAN_METHOD(Channel::New) { + NanScope(); + + if (args.IsConstructCall()) { + if (!args[0]->IsString()) { + return NanThrowTypeError("Channel expects a string and an object"); + } + grpc_channel *wrapped_channel; + // Owned by the Channel object + NanUtf8String *host = new NanUtf8String(args[0]); + if (args[1]->IsUndefined()) { + wrapped_channel = grpc_channel_create(**host, NULL); + } else if (args[1]->IsObject()) { + grpc_credentials *creds = NULL; + Handle args_hash(args[1]->ToObject()->Clone()); + if (args_hash->HasOwnProperty(NanNew("credentials"))) { + Handle creds_value = args_hash->Get(NanNew("credentials")); + if (!Credentials::HasInstance(creds_value)) { + return NanThrowTypeError( + "credentials arg must be a Credentials object"); + } + Credentials *creds_object = ObjectWrap::Unwrap( + creds_value->ToObject()); + creds = creds_object->GetWrappedCredentials(); + args_hash->Delete(NanNew("credentials")); + } + Handle keys(args_hash->GetOwnPropertyNames()); + grpc_channel_args channel_args; + channel_args.num_args = keys->Length(); + channel_args.args = reinterpret_cast( + calloc(channel_args.num_args, sizeof(grpc_arg))); + /* These are used to keep all strings until then end of the block, then + destroy them */ + std::vector key_strings(keys->Length()); + std::vector value_strings(keys->Length()); + for (unsigned int i = 0; i < channel_args.num_args; i++) { + Handle current_key(keys->Get(i)->ToString()); + Handle current_value(args_hash->Get(current_key)); + key_strings[i] = new NanUtf8String(current_key); + channel_args.args[i].key = **key_strings[i]; + if (current_value->IsInt32()) { + channel_args.args[i].type = GRPC_ARG_INTEGER; + channel_args.args[i].value.integer = current_value->Int32Value(); + } else if (current_value->IsString()) { + channel_args.args[i].type = GRPC_ARG_STRING; + value_strings[i] = new NanUtf8String(current_value); + channel_args.args[i].value.string = **value_strings[i]; + } else { + free(channel_args.args); + return NanThrowTypeError("Arg values must be strings"); + } + } + if (creds == NULL) { + wrapped_channel = grpc_channel_create(**host, &channel_args); + } else { + wrapped_channel = grpc_secure_channel_create(creds, + **host, + &channel_args); + } + free(channel_args.args); + } else { + return NanThrowTypeError("Channel expects a string and an object"); + } + Channel *channel = new Channel(wrapped_channel, host); + channel->Wrap(args.This()); + NanReturnValue(args.This()); + } else { + const int argc = 2; + Local argv[argc] = { args[0], args[1] }; + NanReturnValue(constructor->NewInstance(argc, argv)); + } +} + +NAN_METHOD(Channel::Close) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError("close can only be called on Channel objects"); + } + Channel *channel = ObjectWrap::Unwrap(args.This()); + if (channel->wrapped_channel != NULL) { + grpc_channel_destroy(channel->wrapped_channel); + channel->wrapped_channel = NULL; + } + NanReturnUndefined(); +} + +} // namespace node +} // namespace grpc diff --git a/src/node/channel.h b/src/node/channel.h new file mode 100644 index 0000000000..fbfb83874c --- /dev/null +++ b/src/node/channel.h @@ -0,0 +1,46 @@ +#ifndef NET_GRPC_NODE_CHANNEL_H_ +#define NET_GRPC_NODE_CHANNEL_H_ + +#include +#include +#include "grpc/grpc.h" + +namespace grpc { +namespace node { + +/* Wrapper class for grpc_channel structs */ +class Channel : public ::node::ObjectWrap { + public: + static void Init(v8::Handle exports); + static bool HasInstance(v8::Handle val); + /* This is used to typecheck javascript objects before converting them to + this type */ + static v8::Persistent prototype; + + /* Returns the grpc_channel struct that this object wraps */ + grpc_channel *GetWrappedChannel(); + + /* Return the hostname that this channel connects to */ + char *GetHost(); + + private: + explicit Channel(grpc_channel *channel, NanUtf8String *host); + ~Channel(); + + // Prevent copying + Channel(const Channel&); + Channel& operator=(const Channel&); + + static NAN_METHOD(New); + static NAN_METHOD(Close); + static v8::Persistent constructor; + static v8::Persistent fun_tpl; + + grpc_channel *wrapped_channel; + NanUtf8String *host; +}; + +} // namespace node +} // namespace grpc + +#endif // NET_GRPC_NODE_CHANNEL_H_ diff --git a/src/node/client.js b/src/node/client.js new file mode 100644 index 0000000000..e42e36aad4 --- /dev/null +++ b/src/node/client.js @@ -0,0 +1,176 @@ +var grpc = require('bindings')('grpc.node'); + +var common = require('./common'); + +var Duplex = require('stream').Duplex; +var util = require('util'); + +util.inherits(GrpcClientStream, Duplex); + +/** + * Class for representing a gRPC client side stream as a Node stream. Extends + * from stream.Duplex. + * @constructor + * @param {grpc.Call} call Call object to proxy + * @param {object} options Stream options + */ +function GrpcClientStream(call, options) { + Duplex.call(this, options); + var self = this; + // Indicates that we can start reading and have not received a null read + var can_read = false; + // Indicates that a read is currently pending + var reading = false; + // Indicates that we can call startWrite + var can_write = false; + // Indicates that a write is currently pending + var writing = false; + this._call = call; + /** + * Callback to handle receiving a READ event. Pushes the data from that event + * onto the read queue and starts reading again if applicable. + * @param {grpc.Event} event The READ event object + */ + function readCallback(event) { + var data = event.data; + if (self.push(data)) { + if (data == null) { + // Disable starting to read after null read was received + can_read = false; + reading = false; + } else { + call.startRead(readCallback); + } + } else { + // Indicate that reading can be resumed by calling startReading + reading = false; + } + }; + /** + * Initiate a read, which continues until self.push returns false (indicating + * that reading should be paused) or data is null (indicating that there is no + * more data to read). + */ + function startReading() { + call.startRead(readCallback); + } + // TODO(mlumish): possibly change queue implementation due to shift slowness + var write_queue = []; + /** + * Write the next chunk of data in the write queue if there is one. Otherwise + * indicate that there is no pending write. When the write succeeds, this + * function is called again. + */ + function writeNext() { + if (write_queue.length > 0) { + writing = true; + var next = write_queue.shift(); + var writeCallback = function(event) { + next.callback(); + writeNext(); + }; + call.startWrite(next.chunk, writeCallback, 0); + } else { + writing = false; + } + } + call.startInvoke(function(event) { + can_read = true; + can_write = true; + startReading(); + writeNext(); + }, function(event) { + self.emit('metadata', event.data); + }, function(event) { + self.emit('status', event.data); + }, 0); + this.on('finish', function() { + call.writesDone(function() {}); + }); + /** + * Indicate that reads should start, and start them if the INVOKE_ACCEPTED + * event has been received. + */ + this._enableRead = function() { + if (!reading) { + reading = true; + if (can_read) { + startReading(); + } + } + }; + /** + * Push the chunk onto the write queue, and write from the write queue if + * there is not a pending write + * @param {Buffer} chunk The chunk of data to write + * @param {function(Error=)} callback The callback to call when the write + * completes + */ + this._tryWrite = function(chunk, callback) { + write_queue.push({chunk: chunk, callback: callback}); + if (can_write && !writing) { + writeNext(); + } + }; +} + +/** + * Start reading. This is an implementation of a method needed for implementing + * stream.Readable. + * @param {number} size Ignored + */ +GrpcClientStream.prototype._read = function(size) { + this._enableRead(); +}; + +/** + * Attempt to write the given chunk. Calls the callback when done. This is an + * implementation of a method needed for implementing stream.Writable. + * @param {Buffer} chunk The chunk to write + * @param {string} encoding Ignored + * @param {function(Error=)} callback Ignored + */ +GrpcClientStream.prototype._write = function(chunk, encoding, callback) { + this._tryWrite(chunk, callback); +}; + +/** + * Make a request on the channel to the given method with the given arguments + * @param {grpc.Channel} channel The channel on which to make the request + * @param {string} method The method to request + * @param {array=} metadata Array of metadata key/value pairs to add to the call + * @param {(number|Date)=} deadline The deadline for processing this request. + * Defaults to infinite future. + * @return {stream=} The stream of responses + */ +function makeRequest(channel, + method, + metadata, + deadline) { + if (deadline === undefined) { + deadline = Infinity; + } + var call = new grpc.Call(channel, method, deadline); + if (metadata) { + call.addMetadata(metadata); + } + return new GrpcClientStream(call); +} + +/** + * See documentation for makeRequest above + */ +exports.makeRequest = makeRequest; + +/** + * Represents a client side gRPC channel associated with a single host. + */ +exports.Channel = grpc.Channel; +/** + * Status name to code number mapping + */ +exports.status = grpc.status; +/** + * Call error name to code number mapping + */ +exports.callError = grpc.callError; diff --git a/src/node/common.js b/src/node/common.js new file mode 100644 index 0000000000..b856bf6387 --- /dev/null +++ b/src/node/common.js @@ -0,0 +1,29 @@ +var _ = require('highland'); + +/** + * When the given stream finishes without error, call the callback once. This + * will not be called until something begins to consume the stream. + * @param {function} callback The callback to call at stream end + * @param {stream} source The stream to watch + * @return {stream} The stream with the callback attached + */ +function onSuccessfulStreamEnd(callback, source) { + var error = false; + return source.consume(function(err, x, push, next) { + if (x === _.nil) { + if (!error) { + callback(); + } + push(null, x); + } else if (err) { + error = true; + push(err); + next(); + } else { + push(err, x); + next(); + } + }); +} + +exports.onSuccessfulStreamEnd = onSuccessfulStreamEnd; diff --git a/src/node/completion_queue_async_worker.cc b/src/node/completion_queue_async_worker.cc new file mode 100644 index 0000000000..d8156654cd --- /dev/null +++ b/src/node/completion_queue_async_worker.cc @@ -0,0 +1,62 @@ +#include +#include + +#include "grpc/grpc.h" +#include "grpc/support/time.h" +#include "completion_queue_async_worker.h" +#include "event.h" +#include "tag.h" + +namespace grpc { +namespace node { + +using v8::Function; +using v8::Handle; +using v8::Object; +using v8::Persistent; +using v8::Value; + +grpc_completion_queue *CompletionQueueAsyncWorker::queue; + +CompletionQueueAsyncWorker::CompletionQueueAsyncWorker() : + NanAsyncWorker(NULL) { +} + +CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() { +} + +void CompletionQueueAsyncWorker::Execute() { + result = grpc_completion_queue_next(queue, gpr_inf_future); +} + +grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { + return queue; +} + +void CompletionQueueAsyncWorker::Next() { + NanScope(); + CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); + NanAsyncQueueWorker(worker); +} + +void CompletionQueueAsyncWorker::Init(Handle exports) { + NanScope(); + queue = grpc_completion_queue_create(); +} + +void CompletionQueueAsyncWorker::HandleOKCallback() { + NanScope(); + NanCallback event_callback(GetTagHandle(result->tag).As()); + Handle argv[] = { + CreateEventObject(result) + }; + + DestroyTag(result->tag); + grpc_event_finish(result); + result = NULL; + + event_callback.Call(1, argv); +} + +} // namespace node +} // namespace grpc diff --git a/src/node/completion_queue_async_worker.h b/src/node/completion_queue_async_worker.h new file mode 100644 index 0000000000..9e6d03b552 --- /dev/null +++ b/src/node/completion_queue_async_worker.h @@ -0,0 +1,46 @@ +#ifndef NET_GRPC_NODE_COMPLETION_QUEUE_ASYNC_WORKER_H_ +#define NET_GRPC_NODE_COMPLETION_QUEUE_ASYNC_WORKER_H_ +#include + +#include "grpc/grpc.h" + +namespace grpc { +namespace node { + +/* A worker that asynchronously calls completion_queue_next, and queues onto the + node event loop a call to the function stored in the event's tag. */ +class CompletionQueueAsyncWorker : public NanAsyncWorker { + public: + CompletionQueueAsyncWorker(); + + ~CompletionQueueAsyncWorker(); + /* Calls completion_queue_next with the provided deadline, and stores the + event if there was one or sets an error message if there was not */ + void Execute(); + + /* Returns the completion queue attached to this class */ + static grpc_completion_queue *GetQueue(); + + /* Convenience function to create a worker with the given arguments and queue + it to run asynchronously */ + static void Next(); + + /* Initialize the CompletionQueueAsyncWorker class */ + static void Init(v8::Handle exports); + + protected: + /* Called when Execute has succeeded (completed without setting an error + message). Calls the saved callback with the event that came from + completion_queue_next */ + void HandleOKCallback(); + + private: + grpc_event *result; + + static grpc_completion_queue *queue; +}; + +} // namespace node +} // namespace grpc + +#endif // NET_GRPC_NODE_COMPLETION_QUEUE_ASYNC_WORKER_H_ diff --git a/src/node/credentials.cc b/src/node/credentials.cc new file mode 100644 index 0000000000..a5743d7a09 --- /dev/null +++ b/src/node/credentials.cc @@ -0,0 +1,180 @@ +#include + +#include "grpc/grpc.h" +#include "grpc/grpc_security.h" +#include "grpc/support/log.h" +#include "credentials.h" + +namespace grpc { +namespace node { + +using ::node::Buffer; +using v8::Arguments; +using v8::Exception; +using v8::External; +using v8::Function; +using v8::FunctionTemplate; +using v8::Handle; +using v8::HandleScope; +using v8::Integer; +using v8::Local; +using v8::Object; +using v8::ObjectTemplate; +using v8::Persistent; +using v8::Value; + +Persistent Credentials::constructor; +Persistent Credentials::fun_tpl; + +Credentials::Credentials(grpc_credentials *credentials) + : wrapped_credentials(credentials) { +} + +Credentials::~Credentials() { + gpr_log(GPR_DEBUG, "Destroying credentials object"); + grpc_credentials_release(wrapped_credentials); +} + +void Credentials::Init(Handle exports) { + NanScope(); + Local tpl = FunctionTemplate::New(New); + tpl->SetClassName(NanNew("Credentials")); + tpl->InstanceTemplate()->SetInternalFieldCount(1); + NanAssignPersistent(fun_tpl, tpl); + NanAssignPersistent(constructor, tpl->GetFunction()); + constructor->Set(NanNew("createDefault"), + FunctionTemplate::New(CreateDefault)->GetFunction()); + constructor->Set(NanNew("createSsl"), + FunctionTemplate::New(CreateSsl)->GetFunction()); + constructor->Set(NanNew("createComposite"), + FunctionTemplate::New(CreateComposite)->GetFunction()); + constructor->Set(NanNew("createGce"), + FunctionTemplate::New(CreateGce)->GetFunction()); + constructor->Set(NanNew("createFake"), + FunctionTemplate::New(CreateFake)->GetFunction()); + constructor->Set(NanNew("createIam"), + FunctionTemplate::New(CreateIam)->GetFunction()); + exports->Set(NanNew("Credentials"), constructor); +} + +bool Credentials::HasInstance(Handle val) { + NanScope(); + return NanHasInstance(fun_tpl, val); +} + +Handle Credentials::WrapStruct(grpc_credentials *credentials) { + NanEscapableScope(); + if (credentials == NULL) { + return NanEscapeScope(NanNull()); + } + const int argc = 1; + Handle argv[argc] = { + External::New(reinterpret_cast(credentials)) }; + return NanEscapeScope(constructor->NewInstance(argc, argv)); +} + +grpc_credentials *Credentials::GetWrappedCredentials() { + return wrapped_credentials; +} + +NAN_METHOD(Credentials::New) { + NanScope(); + + if (args.IsConstructCall()) { + if (!args[0]->IsExternal()) { + return NanThrowTypeError( + "Credentials can only be created with the provided functions"); + } + grpc_credentials *creds_value = reinterpret_cast( + External::Unwrap(args[0])); + Credentials *credentials = new Credentials(creds_value); + credentials->Wrap(args.This()); + NanReturnValue(args.This()); + } else { + const int argc = 1; + Local argv[argc] = { args[0] }; + NanReturnValue(constructor->NewInstance(argc, argv)); + } +} + +NAN_METHOD(Credentials::CreateDefault) { + NanScope(); + NanReturnValue(WrapStruct(grpc_default_credentials_create())); +} + +NAN_METHOD(Credentials::CreateSsl) { + NanScope(); + char *root_certs; + char *private_key = NULL; + char *cert_chain = NULL; + int root_certs_length, private_key_length = 0, cert_chain_length = 0; + if (!Buffer::HasInstance(args[0])) { + return NanThrowTypeError( + "createSsl's first argument must be a Buffer"); + } + root_certs = Buffer::Data(args[0]); + root_certs_length = Buffer::Length(args[0]); + if (Buffer::HasInstance(args[1])) { + private_key = Buffer::Data(args[1]); + private_key_length = Buffer::Length(args[1]); + } else if (!(args[1]->IsNull() || args[1]->IsUndefined())) { + return NanThrowTypeError( + "createSSl's second argument must be a Buffer if provided"); + } + if (Buffer::HasInstance(args[2])) { + cert_chain = Buffer::Data(args[2]); + cert_chain_length = Buffer::Length(args[2]); + } else if (!(args[2]->IsNull() || args[2]->IsUndefined())) { + return NanThrowTypeError( + "createSSl's third argument must be a Buffer if provided"); + } + NanReturnValue(WrapStruct(grpc_ssl_credentials_create( + reinterpret_cast(root_certs), root_certs_length, + reinterpret_cast(private_key), private_key_length, + reinterpret_cast(cert_chain), cert_chain_length))); +} + +NAN_METHOD(Credentials::CreateComposite) { + NanScope(); + if (!HasInstance(args[0])) { + return NanThrowTypeError( + "createComposite's first argument must be a Credentials object"); + } + if (!HasInstance(args[1])) { + return NanThrowTypeError( + "createComposite's second argument must be a Credentials object"); + } + Credentials *creds1 = ObjectWrap::Unwrap(args[0]->ToObject()); + Credentials *creds2 = ObjectWrap::Unwrap(args[1]->ToObject()); + NanReturnValue(WrapStruct(grpc_composite_credentials_create( + creds1->wrapped_credentials, creds2->wrapped_credentials))); +} + +NAN_METHOD(Credentials::CreateGce) { + NanScope(); + NanReturnValue(WrapStruct(grpc_compute_engine_credentials_create())); +} + +NAN_METHOD(Credentials::CreateFake) { + NanScope(); + NanReturnValue(WrapStruct(grpc_fake_transport_security_credentials_create())); +} + +NAN_METHOD(Credentials::CreateIam) { + NanScope(); + if (!args[0]->IsString()) { + return NanThrowTypeError( + "createIam's first argument must be a string"); + } + if (!args[1]->IsString()) { + return NanThrowTypeError( + "createIam's second argument must be a string"); + } + NanUtf8String auth_token(args[0]); + NanUtf8String auth_selector(args[1]); + NanReturnValue(WrapStruct(grpc_iam_credentials_create(*auth_token, + *auth_selector))); +} + +} // namespace node +} // namespace grpc diff --git a/src/node/credentials.h b/src/node/credentials.h new file mode 100644 index 0000000000..f30cc7373e --- /dev/null +++ b/src/node/credentials.h @@ -0,0 +1,48 @@ +#ifndef NET_GRPC_NODE_CREDENTIALS_H_ +#define NET_GRPC_NODE_CREDENTIALS_H_ + +#include +#include +#include "grpc/grpc.h" +#include "grpc/grpc_security.h" + +namespace grpc { +namespace node { + +/* Wrapper class for grpc_credentials structs */ +class Credentials : public ::node::ObjectWrap { + public: + static void Init(v8::Handle exports); + static bool HasInstance(v8::Handle val); + /* Wrap a grpc_credentials struct in a javascript object */ + static v8::Handle WrapStruct(grpc_credentials *credentials); + + /* Returns the grpc_credentials struct that this object wraps */ + grpc_credentials *GetWrappedCredentials(); + + private: + explicit Credentials(grpc_credentials *credentials); + ~Credentials(); + + // Prevent copying + Credentials(const Credentials&); + Credentials& operator=(const Credentials&); + + static NAN_METHOD(New); + static NAN_METHOD(CreateDefault); + static NAN_METHOD(CreateSsl); + static NAN_METHOD(CreateComposite); + static NAN_METHOD(CreateGce); + static NAN_METHOD(CreateFake); + static NAN_METHOD(CreateIam); + static v8::Persistent constructor; + // Used for typechecking instances of this javascript class + static v8::Persistent fun_tpl; + + grpc_credentials *wrapped_credentials; +}; + +} // namespace node +} // namespace grpc + +#endif // NET_GRPC_NODE_CREDENTIALS_H_ diff --git a/src/node/event.cc b/src/node/event.cc new file mode 100644 index 0000000000..0eb4a54c54 --- /dev/null +++ b/src/node/event.cc @@ -0,0 +1,132 @@ +#include +#include +#include "grpc/grpc.h" +#include "byte_buffer.h" +#include "call.h" +#include "event.h" +#include "tag.h" +#include "timeval.h" + +namespace grpc { +namespace node { + +using v8::Array; +using v8::Date; +using v8::Handle; +using v8::HandleScope; +using v8::Number; +using v8::Object; +using v8::Persistent; +using v8::String; +using v8::Value; + +Handle GetEventData(grpc_event *event) { + NanEscapableScope(); + size_t count; + grpc_metadata *items; + Handle metadata; + Handle status; + Handle rpc_new; + switch (event->type) { + case GRPC_READ: + return NanEscapeScope(ByteBufferToBuffer(event->data.read)); + case GRPC_INVOKE_ACCEPTED: + return NanEscapeScope(NanNew(event->data.invoke_accepted)); + case GRPC_WRITE_ACCEPTED: + return NanEscapeScope(NanNew(event->data.write_accepted)); + case GRPC_FINISH_ACCEPTED: + return NanEscapeScope(NanNew(event->data.finish_accepted)); + case GRPC_CLIENT_METADATA_READ: + count = event->data.client_metadata_read.count; + items = event->data.client_metadata_read.elements; + metadata = NanNew(static_cast(count)); + for (unsigned int i = 0; i < count; i++) { + Handle item_obj = NanNew(); + item_obj->Set(NanNew("key"), + NanNew(items[i].key)); + item_obj->Set(NanNew("value"), + NanNew( + items[i].value, + static_cast(items[i].value_length))); + metadata->Set(i, item_obj); + } + return NanEscapeScope(metadata); + case GRPC_FINISHED: + status = NanNew(); + status->Set(NanNew("code"), NanNew( + event->data.finished.status)); + if (event->data.finished.details != NULL) { + status->Set(NanNew("details"), String::New( + event->data.finished.details)); + } + count = event->data.finished.metadata_count; + items = event->data.finished.metadata_elements; + metadata = NanNew(static_cast(count)); + for (unsigned int i = 0; i < count; i++) { + Handle item_obj = NanNew(); + item_obj->Set(NanNew("key"), + NanNew(items[i].key)); + item_obj->Set(NanNew("value"), + NanNew( + items[i].value, + static_cast(items[i].value_length))); + metadata->Set(i, item_obj); + } + status->Set(NanNew("metadata"), metadata); + return NanEscapeScope(status); + case GRPC_SERVER_RPC_NEW: + rpc_new = NanNew(); + if (event->data.server_rpc_new.method == NULL) { + return NanEscapeScope(NanNull()); + } + rpc_new->Set(NanNew("method"), + NanNew( + event->data.server_rpc_new.method)); + rpc_new->Set(NanNew("host"), + NanNew( + event->data.server_rpc_new.host)); + rpc_new->Set(NanNew("absolute_deadline"), + NanNew(TimespecToMilliseconds( + event->data.server_rpc_new.deadline))); + count = event->data.server_rpc_new.metadata_count; + items = event->data.server_rpc_new.metadata_elements; + metadata = NanNew(static_cast(count)); + for (unsigned int i = 0; i < count; i++) { + Handle item_obj = Object::New(); + item_obj->Set(NanNew("key"), + NanNew(items[i].key)); + item_obj->Set(NanNew("value"), + NanNew( + items[i].value, + static_cast(items[i].value_length))); + metadata->Set(i, item_obj); + } + rpc_new->Set(NanNew("metadata"), metadata); + return NanEscapeScope(rpc_new); + default: + return NanEscapeScope(NanNull()); + } +} + +Handle CreateEventObject(grpc_event *event) { + NanEscapableScope(); + if (event == NULL) { + return NanEscapeScope(NanNull()); + } + Handle event_obj = NanNew(); + Handle call; + if (TagHasCall(event->tag)) { + call = TagGetCall(event->tag); + } else { + call = Call::WrapStruct(event->call); + } + event_obj->Set(NanNew("call"), call); + event_obj->Set(NanNew("type"), + NanNew(event->type)); + event_obj->Set(NanNew("data"), GetEventData(event)); + + return NanEscapeScope(event_obj); +} + +} // namespace node +} // namespace grpc diff --git a/src/node/event.h b/src/node/event.h new file mode 100644 index 0000000000..cbc4b9c311 --- /dev/null +++ b/src/node/event.h @@ -0,0 +1,15 @@ +#ifndef NET_GRPC_NODE_EVENT_H_ +#define NET_GRPC_NODE_EVENT_H_ + +#include +#include "grpc/grpc.h" + +namespace grpc { +namespace node { + +v8::Handle CreateEventObject(grpc_event *event); + +} // namespace node +} // namespace grpc + +#endif // NET_GRPC_NODE_EVENT_H_ diff --git a/src/node/examples/math.proto b/src/node/examples/math.proto new file mode 100644 index 0000000000..14eff5daaf --- /dev/null +++ b/src/node/examples/math.proto @@ -0,0 +1,25 @@ +syntax = "proto2"; + +package math; + +message DivArgs { + required int64 dividend = 1; + required int64 divisor = 2; +} + +message DivReply { + required int64 quotient = 1; + required int64 remainder = 2; +} + +message FibArgs { + optional int64 limit = 1; +} + +message Num { + required int64 num = 1; +} + +message FibReply { + required int64 count = 1; +} \ No newline at end of file diff --git a/src/node/examples/math_server.js b/src/node/examples/math_server.js new file mode 100644 index 0000000000..ebbeeb8763 --- /dev/null +++ b/src/node/examples/math_server.js @@ -0,0 +1,168 @@ +var _ = require('underscore'); +var ProtoBuf = require('protobufjs'); +var fs = require('fs'); +var util = require('util'); + +var Transform = require('stream').Transform; + +var builder = ProtoBuf.loadProtoFile(__dirname + '/math.proto'); +var math = builder.build('math'); + +var makeConstructor = require('../surface_server.js').makeServerConstructor; + +/** + * Get a function that deserializes a specific type of protobuf. + * @param {function()} cls The constructor of the message type to deserialize + * @return {function(Buffer):cls} The deserialization function + */ +function deserializeCls(cls) { + /** + * Deserialize a buffer to a message object + * @param {Buffer} arg_buf The buffer to deserialize + * @return {cls} The resulting object + */ + return function deserialize(arg_buf) { + return cls.decode(arg_buf); + }; +} + +/** + * Get a function that serializes objects to a buffer by protobuf class. + * @param {function()} Cls The constructor of the message type to serialize + * @return {function(Cls):Buffer} The serialization function + */ +function serializeCls(Cls) { + /** + * Serialize an object to a Buffer + * @param {Object} arg The object to serialize + * @return {Buffer} The serialized object + */ + return function serialize(arg) { + return new Buffer(new Cls(arg).encode().toBuffer()); + }; +} + +/* This function call creates a server constructor for servers that that expose + * the four specified methods. This specifies how to serialize messages that the + * server sends and deserialize messages that the client sends, and whether the + * client or the server will send a stream of messages, for each method. This + * also specifies a prefix that will be added to method names when sending them + * on the wire. This function call and all of the preceding code in this file + * are intended to approximate what the generated code will look like for the + * math service */ +var Server = makeConstructor({ + Div: { + serialize: serializeCls(math.DivReply), + deserialize: deserializeCls(math.DivArgs), + client_stream: false, + server_stream: false + }, + Fib: { + serialize: serializeCls(math.Num), + deserialize: deserializeCls(math.FibArgs), + client_stream: false, + server_stream: true + }, + Sum: { + serialize: serializeCls(math.Num), + deserialize: deserializeCls(math.Num), + client_stream: true, + server_stream: false + }, + DivMany: { + serialize: serializeCls(math.DivReply), + deserialize: deserializeCls(math.DivArgs), + client_stream: true, + server_stream: true + } +}, '/Math/'); + +/** + * Server function for division. Provides the /Math/DivMany and /Math/Div + * functions (Div is just DivMany with only one stream element). For each + * DivArgs parameter, responds with a DivReply with the results of the division + * @param {Object} call The object containing request and cancellation info + * @param {function(Error, *)} cb Response callback + */ +function mathDiv(call, cb) { + var req = call.request; + if (req.divisor == 0) { + cb(new Error('cannot divide by zero')); + } + cb(null, { + quotient: req.dividend / req.divisor, + remainder: req.dividend % req.divisor + }); +} + +/** + * Server function for Fibonacci numbers. Provides the /Math/Fib function. Reads + * a single parameter that indicates the number of responses, and then responds + * with a stream of that many Fibonacci numbers. + * @param {stream} stream The stream for sending responses. + */ +function mathFib(stream) { + // Here, call is a standard writable Node object Stream + var previous = 0, current = 1; + for (var i = 0; i < stream.request.limit; i++) { + stream.write({num: current}); + var temp = current; + current += previous; + previous = temp; + } + stream.end(); +} + +/** + * Server function for summation. Provides the /Math/Sum function. Reads a + * stream of number parameters, then responds with their sum. + * @param {stream} call The stream of arguments. + * @param {function(Error, *)} cb Response callback + */ +function mathSum(call, cb) { + // Here, call is a standard readable Node object Stream + var sum = 0; + call.on('data', function(data) { + sum += data.num | 0; + }); + call.on('end', function() { + cb(null, {num: sum}); + }); +} + +function mathDivMany(stream) { + // Here, call is a standard duplex Node object Stream + util.inherits(DivTransform, Transform); + function DivTransform() { + var options = {objectMode: true}; + Transform.call(this, options); + } + DivTransform.prototype._transform = function(div_args, encoding, callback) { + if (div_args.divisor == 0) { + callback(new Error('cannot divide by zero')); + } + callback(null, { + quotient: div_args.dividend / div_args.divisor, + remainder: div_args.dividend % div_args.divisor + }); + }; + var transform = new DivTransform(); + stream.pipe(transform); + transform.pipe(stream); +} + +var server = new Server({ + Div: mathDiv, + Fib: mathFib, + Sum: mathSum, + DivMany: mathDivMany +}); + +if (require.main === module) { + server.bind('localhost:7070').listen(); +} + +/** + * See docs for server + */ +module.exports = server; diff --git a/src/node/node_grpc.cc b/src/node/node_grpc.cc new file mode 100644 index 0000000000..fb9e060693 --- /dev/null +++ b/src/node/node_grpc.cc @@ -0,0 +1,151 @@ +#include +#include +#include +#include "grpc/grpc.h" + +#include "call.h" +#include "channel.h" +#include "event.h" +#include "server.h" +#include "completion_queue_async_worker.h" +#include "credentials.h" +#include "server_credentials.h" + +using v8::Handle; +using v8::Value; +using v8::Object; +using v8::Uint32; +using v8::String; + +void InitStatusConstants(Handle exports) { + NanScope(); + Handle status = Object::New(); + exports->Set(NanNew("status"), status); + Handle OK(NanNew(GRPC_STATUS_OK)); + status->Set(NanNew("OK"), OK); + Handle CANCELLED(NanNew(GRPC_STATUS_CANCELLED)); + status->Set(NanNew("CANCELLED"), CANCELLED); + Handle UNKNOWN(NanNew(GRPC_STATUS_UNKNOWN)); + status->Set(NanNew("UNKNOWN"), UNKNOWN); + Handle INVALID_ARGUMENT( + NanNew(GRPC_STATUS_INVALID_ARGUMENT)); + status->Set(NanNew("INVALID_ARGUMENT"), INVALID_ARGUMENT); + Handle DEADLINE_EXCEEDED( + NanNew(GRPC_STATUS_DEADLINE_EXCEEDED)); + status->Set(NanNew("DEADLINE_EXCEEDED"), DEADLINE_EXCEEDED); + Handle NOT_FOUND(NanNew(GRPC_STATUS_NOT_FOUND)); + status->Set(NanNew("NOT_FOUND"), NOT_FOUND); + Handle ALREADY_EXISTS( + NanNew(GRPC_STATUS_ALREADY_EXISTS)); + status->Set(NanNew("ALREADY_EXISTS"), ALREADY_EXISTS); + Handle PERMISSION_DENIED( + NanNew(GRPC_STATUS_PERMISSION_DENIED)); + status->Set(NanNew("PERMISSION_DENIED"), PERMISSION_DENIED); + Handle UNAUTHENTICATED( + NanNew(GRPC_STATUS_UNAUTHENTICATED)); + status->Set(NanNew("UNAUTHENTICATED"), UNAUTHENTICATED); + Handle RESOURCE_EXHAUSTED( + NanNew(GRPC_STATUS_RESOURCE_EXHAUSTED)); + status->Set(NanNew("RESOURCE_EXHAUSTED"), RESOURCE_EXHAUSTED); + Handle FAILED_PRECONDITION( + NanNew(GRPC_STATUS_FAILED_PRECONDITION)); + status->Set(NanNew("FAILED_PRECONDITION"), FAILED_PRECONDITION); + Handle ABORTED(NanNew(GRPC_STATUS_ABORTED)); + status->Set(NanNew("ABORTED"), ABORTED); + Handle OUT_OF_RANGE( + NanNew(GRPC_STATUS_OUT_OF_RANGE)); + status->Set(NanNew("OUT_OF_RANGE"), OUT_OF_RANGE); + Handle UNIMPLEMENTED( + NanNew(GRPC_STATUS_UNIMPLEMENTED)); + status->Set(NanNew("UNIMPLEMENTED"), UNIMPLEMENTED); + Handle INTERNAL(NanNew(GRPC_STATUS_INTERNAL)); + status->Set(NanNew("INTERNAL"), INTERNAL); + Handle UNAVAILABLE(NanNew(GRPC_STATUS_UNAVAILABLE)); + status->Set(NanNew("UNAVAILABLE"), UNAVAILABLE); + Handle DATA_LOSS(NanNew(GRPC_STATUS_DATA_LOSS)); + status->Set(NanNew("DATA_LOSS"), DATA_LOSS); +} + +void InitCallErrorConstants(Handle exports) { + NanScope(); + Handle call_error = Object::New(); + exports->Set(NanNew("callError"), call_error); + Handle OK(NanNew(GRPC_CALL_OK)); + call_error->Set(NanNew("OK"), OK); + Handle ERROR(NanNew(GRPC_CALL_ERROR)); + call_error->Set(NanNew("ERROR"), ERROR); + Handle NOT_ON_SERVER( + NanNew(GRPC_CALL_ERROR_NOT_ON_SERVER)); + call_error->Set(NanNew("NOT_ON_SERVER"), NOT_ON_SERVER); + Handle NOT_ON_CLIENT( + NanNew(GRPC_CALL_ERROR_NOT_ON_CLIENT)); + call_error->Set(NanNew("NOT_ON_CLIENT"), NOT_ON_CLIENT); + Handle ALREADY_INVOKED( + NanNew(GRPC_CALL_ERROR_ALREADY_INVOKED)); + call_error->Set(NanNew("ALREADY_INVOKED"), ALREADY_INVOKED); + Handle NOT_INVOKED( + NanNew(GRPC_CALL_ERROR_NOT_INVOKED)); + call_error->Set(NanNew("NOT_INVOKED"), NOT_INVOKED); + Handle ALREADY_FINISHED( + NanNew(GRPC_CALL_ERROR_ALREADY_FINISHED)); + call_error->Set(NanNew("ALREADY_FINISHED"), ALREADY_FINISHED); + Handle TOO_MANY_OPERATIONS( + NanNew(GRPC_CALL_ERROR_TOO_MANY_OPERATIONS)); + call_error->Set(NanNew("TOO_MANY_OPERATIONS"), + TOO_MANY_OPERATIONS); + Handle INVALID_FLAGS( + NanNew(GRPC_CALL_ERROR_INVALID_FLAGS)); + call_error->Set(NanNew("INVALID_FLAGS"), INVALID_FLAGS); +} + +void InitOpErrorConstants(Handle exports) { + NanScope(); + Handle op_error = Object::New(); + exports->Set(NanNew("opError"), op_error); + Handle OK(NanNew(GRPC_OP_OK)); + op_error->Set(NanNew("OK"), OK); + Handle ERROR(NanNew(GRPC_OP_ERROR)); + op_error->Set(NanNew("ERROR"), ERROR); +} + +void InitCompletionTypeConstants(Handle exports) { + NanScope(); + Handle completion_type = Object::New(); + exports->Set(NanNew("completionType"), completion_type); + Handle QUEUE_SHUTDOWN(NanNew(GRPC_QUEUE_SHUTDOWN)); + completion_type->Set(NanNew("QUEUE_SHUTDOWN"), QUEUE_SHUTDOWN); + Handle READ(NanNew(GRPC_READ)); + completion_type->Set(NanNew("READ"), READ); + Handle INVOKE_ACCEPTED(NanNew(GRPC_INVOKE_ACCEPTED)); + completion_type->Set(NanNew("INVOKE_ACCEPTED"), INVOKE_ACCEPTED); + Handle WRITE_ACCEPTED(NanNew(GRPC_WRITE_ACCEPTED)); + completion_type->Set(NanNew("WRITE_ACCEPTED"), WRITE_ACCEPTED); + Handle FINISH_ACCEPTED(NanNew(GRPC_FINISH_ACCEPTED)); + completion_type->Set(NanNew("FINISH_ACCEPTED"), FINISH_ACCEPTED); + Handle CLIENT_METADATA_READ( + NanNew(GRPC_CLIENT_METADATA_READ)); + completion_type->Set(NanNew("CLIENT_METADATA_READ"), + CLIENT_METADATA_READ); + Handle FINISHED(NanNew(GRPC_FINISHED)); + completion_type->Set(NanNew("FINISHED"), FINISHED); + Handle SERVER_RPC_NEW(NanNew(GRPC_SERVER_RPC_NEW)); + completion_type->Set(NanNew("SERVER_RPC_NEW"), SERVER_RPC_NEW); +} + +void init(Handle exports) { + NanScope(); + grpc_init(); + InitStatusConstants(exports); + InitCallErrorConstants(exports); + InitOpErrorConstants(exports); + InitCompletionTypeConstants(exports); + + grpc::node::Call::Init(exports); + grpc::node::Channel::Init(exports); + grpc::node::Server::Init(exports); + grpc::node::CompletionQueueAsyncWorker::Init(exports); + grpc::node::Credentials::Init(exports); + grpc::node::ServerCredentials::Init(exports); +} + +NODE_MODULE(grpc, init) diff --git a/src/node/package.json b/src/node/package.json new file mode 100644 index 0000000000..a2940b29bb --- /dev/null +++ b/src/node/package.json @@ -0,0 +1,18 @@ +{ + "name": "grpc", + "version": "0.1.0", + "description": "gRPC Library for Node", + "scripts": { + "test": "./node_modules/mocha/bin/mocha" + }, + "dependencies": { + "bindings": "^1.2.1", + "nan": "~1.3.0", + "underscore": "^1.7.0" + }, + "devDependencies": { + "mocha": "~1.21.0", + "highland": "~2.0.0", + "protobufjs": "~3.8.0" + } +} diff --git a/src/node/port_picker.js b/src/node/port_picker.js new file mode 100644 index 0000000000..7b3cfd74ff --- /dev/null +++ b/src/node/port_picker.js @@ -0,0 +1,19 @@ +var net = require('net'); + +/** + * Finds a free port that a server can bind to, in the format + * "address:port" + * @param {function(string)} cb The callback that should execute when the port + * is available + */ +function nextAvailablePort(cb) { + var server = net.createServer(); + server.listen(function() { + var address = server.address(); + server.close(function() { + cb(address.address + ':' + address.port.toString()); + }); + }); +} + +exports.nextAvailablePort = nextAvailablePort; diff --git a/src/node/server.cc b/src/node/server.cc new file mode 100644 index 0000000000..0e6c59b47c --- /dev/null +++ b/src/node/server.cc @@ -0,0 +1,212 @@ +#include "server.h" + +#include +#include + +#include + +#include +#include "grpc/grpc.h" +#include "grpc/grpc_security.h" +#include "call.h" +#include "completion_queue_async_worker.h" +#include "tag.h" +#include "server_credentials.h" + +namespace grpc { +namespace node { + +using v8::Arguments; +using v8::Array; +using v8::Boolean; +using v8::Exception; +using v8::Function; +using v8::FunctionTemplate; +using v8::Handle; +using v8::HandleScope; +using v8::Local; +using v8::Number; +using v8::Object; +using v8::Persistent; +using v8::String; +using v8::Value; + +Persistent Server::constructor; +Persistent Server::fun_tpl; + +Server::Server(grpc_server *server) : wrapped_server(server) { +} + +Server::~Server() { + grpc_server_destroy(wrapped_server); +} + +void Server::Init(Handle exports) { + NanScope(); + Local tpl = FunctionTemplate::New(New); + tpl->SetClassName(String::NewSymbol("Server")); + tpl->InstanceTemplate()->SetInternalFieldCount(1); + NanSetPrototypeTemplate(tpl, "requestCall", + FunctionTemplate::New(RequestCall)->GetFunction()); + + NanSetPrototypeTemplate(tpl, "addHttp2Port", + FunctionTemplate::New(AddHttp2Port)->GetFunction()); + + NanSetPrototypeTemplate(tpl, "addSecureHttp2Port", + FunctionTemplate::New( + AddSecureHttp2Port)->GetFunction()); + + NanSetPrototypeTemplate(tpl, "start", + FunctionTemplate::New(Start)->GetFunction()); + + NanSetPrototypeTemplate(tpl, "shutdown", + FunctionTemplate::New(Shutdown)->GetFunction()); + + NanAssignPersistent(fun_tpl, tpl); + NanAssignPersistent(constructor, tpl->GetFunction()); + exports->Set(String::NewSymbol("Server"), constructor); +} + +bool Server::HasInstance(Handle val) { + return NanHasInstance(fun_tpl, val); +} + +NAN_METHOD(Server::New) { + NanScope(); + + /* If this is not a constructor call, make a constructor call and return + the result */ + if (!args.IsConstructCall()) { + const int argc = 1; + Local argv[argc] = { args[0] }; + NanReturnValue(constructor->NewInstance(argc, argv)); + } + grpc_server *wrapped_server; + grpc_completion_queue *queue = CompletionQueueAsyncWorker::GetQueue(); + if (args[0]->IsUndefined()) { + wrapped_server = grpc_server_create(queue, NULL); + } else if (args[0]->IsObject()) { + grpc_server_credentials *creds = NULL; + Handle args_hash(args[0]->ToObject()->Clone()); + if (args_hash->HasOwnProperty(NanNew("credentials"))) { + Handle creds_value = args_hash->Get(NanNew("credentials")); + if (!ServerCredentials::HasInstance(creds_value)) { + return NanThrowTypeError( + "credentials arg must be a ServerCredentials object"); + } + ServerCredentials *creds_object = ObjectWrap::Unwrap( + creds_value->ToObject()); + creds = creds_object->GetWrappedServerCredentials(); + args_hash->Delete(NanNew("credentials")); + } + Handle keys(args_hash->GetOwnPropertyNames()); + grpc_channel_args channel_args; + channel_args.num_args = keys->Length(); + channel_args.args = reinterpret_cast( + calloc(channel_args.num_args, sizeof(grpc_arg))); + /* These are used to keep all strings until then end of the block, then + destroy them */ + std::vector key_strings(keys->Length()); + std::vector value_strings(keys->Length()); + for (unsigned int i = 0; i < channel_args.num_args; i++) { + Handle current_key(keys->Get(i)->ToString()); + Handle current_value(args_hash->Get(current_key)); + key_strings[i] = new NanUtf8String(current_key); + channel_args.args[i].key = **key_strings[i]; + if (current_value->IsInt32()) { + channel_args.args[i].type = GRPC_ARG_INTEGER; + channel_args.args[i].value.integer = current_value->Int32Value(); + } else if (current_value->IsString()) { + channel_args.args[i].type = GRPC_ARG_STRING; + value_strings[i] = new NanUtf8String(current_value); + channel_args.args[i].value.string = **value_strings[i]; + } else { + free(channel_args.args); + return NanThrowTypeError("Arg values must be strings"); + } + } + if (creds == NULL) { + wrapped_server = grpc_server_create(queue, + &channel_args); + } else { + wrapped_server = grpc_secure_server_create(creds, + queue, + &channel_args); + } + free(channel_args.args); + } else { + return NanThrowTypeError("Server expects an object"); + } + Server *server = new Server(wrapped_server); + server->Wrap(args.This()); + NanReturnValue(args.This()); +} + +NAN_METHOD(Server::RequestCall) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError("requestCall can only be called on a Server"); + } + Server *server = ObjectWrap::Unwrap(args.This()); + grpc_call_error error = grpc_server_request_call( + server->wrapped_server, + CreateTag(args[0], NanNull())); + if (error == GRPC_CALL_OK) { + CompletionQueueAsyncWorker::Next(); + } else { + return NanThrowError("requestCall failed", error); + } + NanReturnUndefined(); +} + +NAN_METHOD(Server::AddHttp2Port) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError("addHttp2Port can only be called on a Server"); + } + if (!args[0]->IsString()) { + return NanThrowTypeError("addHttp2Port's argument must be a String"); + } + Server *server = ObjectWrap::Unwrap(args.This()); + NanReturnValue(NanNew(grpc_server_add_http2_port( + server->wrapped_server, + *NanUtf8String(args[0])))); +} + +NAN_METHOD(Server::AddSecureHttp2Port) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError( + "addSecureHttp2Port can only be called on a Server"); + } + if (!args[0]->IsString()) { + return NanThrowTypeError("addSecureHttp2Port's argument must be a String"); + } + Server *server = ObjectWrap::Unwrap(args.This()); + NanReturnValue(NanNew(grpc_server_add_secure_http2_port( + server->wrapped_server, + *NanUtf8String(args[0])))); +} + +NAN_METHOD(Server::Start) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError("start can only be called on a Server"); + } + Server *server = ObjectWrap::Unwrap(args.This()); + grpc_server_start(server->wrapped_server); + NanReturnUndefined(); +} + +NAN_METHOD(Server::Shutdown) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError("shutdown can only be called on a Server"); + } + Server *server = ObjectWrap::Unwrap(args.This()); + grpc_server_shutdown(server->wrapped_server); + NanReturnUndefined(); +} + +} // namespace node +} // namespace grpc diff --git a/src/node/server.h b/src/node/server.h new file mode 100644 index 0000000000..5edf9557f5 --- /dev/null +++ b/src/node/server.h @@ -0,0 +1,46 @@ +#ifndef NET_GRPC_NODE_SERVER_H_ +#define NET_GRPC_NODE_SERVER_H_ + +#include +#include +#include "grpc/grpc.h" + +namespace grpc { +namespace node { + +/* Wraps grpc_server as a JavaScript object. Provides a constructor + and wrapper methods for grpc_server_create, grpc_server_request_call, + grpc_server_add_http2_port, and grpc_server_start. */ +class Server : public ::node::ObjectWrap { + public: + /* Initializes the Server class and exposes the constructor and + wrapper methods to JavaScript */ + static void Init(v8::Handle exports); + /* Tests whether the given value was constructed by this class's + JavaScript constructor */ + static bool HasInstance(v8::Handle val); + + private: + explicit Server(grpc_server *server); + ~Server(); + + // Prevent copying + Server(const Server&); + Server& operator=(const Server&); + + static NAN_METHOD(New); + static NAN_METHOD(RequestCall); + static NAN_METHOD(AddHttp2Port); + static NAN_METHOD(AddSecureHttp2Port); + static NAN_METHOD(Start); + static NAN_METHOD(Shutdown); + static v8::Persistent constructor; + static v8::Persistent fun_tpl; + + grpc_server *wrapped_server; +}; + +} // namespace node +} // namespace grpc + +#endif // NET_GRPC_NODE_SERVER_H_ diff --git a/src/node/server.js b/src/node/server.js new file mode 100644 index 0000000000..4079b2914e --- /dev/null +++ b/src/node/server.js @@ -0,0 +1,228 @@ +var grpc = require('bindings')('grpc.node'); + +var common = require('./common'); + +var Duplex = require('stream').Duplex; +var util = require('util'); + +util.inherits(GrpcServerStream, Duplex); + +/** + * Class for representing a gRPC server side stream as a Node stream. Extends + * from stream.Duplex. + * @constructor + * @param {grpc.Call} call Call object to proxy + * @param {object} options Stream options + */ +function GrpcServerStream(call, options) { + Duplex.call(this, options); + this._call = call; + // Indicate that a status has been sent + var finished = false; + var self = this; + var status = { + 'code' : grpc.status.OK, + 'details' : 'OK' + }; + /** + * Send the pending status + */ + function sendStatus() { + call.startWriteStatus(status.code, status.details, function() { + }); + finished = true; + } + this.on('finish', sendStatus); + /** + * Set the pending status to a given error status. If the error does not have + * code or details properties, the code will be set to grpc.status.INTERNAL + * and the details will be set to 'Unknown Error'. + * @param {Error} err The error object + */ + function setStatus(err) { + var code = grpc.status.INTERNAL; + var details = 'Unknown Error'; + + if (err.hasOwnProperty('code')) { + code = err.code; + if (err.hasOwnProperty('details')) { + details = err.details; + } + } + status = {'code': code, 'details': details}; + } + /** + * Terminate the call. This includes indicating that reads are done, draining + * all pending writes, and sending the given error as a status + * @param {Error} err The error object + * @this GrpcServerStream + */ + function terminateCall(err) { + // Drain readable data + this.on('data', function() {}); + setStatus(err); + this.end(); + } + this.on('error', terminateCall); + // Indicates that a read is pending + var reading = false; + /** + * Callback to be called when a READ event is received. Pushes the data onto + * the read queue and starts reading again if applicable + * @param {grpc.Event} event READ event object + */ + function readCallback(event) { + if (finished) { + self.push(null); + return; + } + var data = event.data; + if (self.push(data) && data != null) { + self._call.startRead(readCallback); + } else { + reading = false; + } + } + /** + * Start reading if there is not already a pending read. Reading will + * continue until self.push returns false (indicating reads should slow + * down) or the read data is null (indicating that there is no more data). + */ + this.startReading = function() { + if (finished) { + self.push(null); + } else { + if (!reading) { + reading = true; + self._call.startRead(readCallback); + } + } + }; +} + +/** + * Start reading from the gRPC data source. This is an implementation of a + * method required for implementing stream.Readable + * @param {number} size Ignored + */ +GrpcServerStream.prototype._read = function(size) { + this.startReading(); +}; + +/** + * Start writing a chunk of data. This is an implementation of a method required + * for implementing stream.Writable. + * @param {Buffer} chunk The chunk of data to write + * @param {string} encoding Ignored + * @param {function(Error=)} callback Callback to indicate that the write is + * complete + */ +GrpcServerStream.prototype._write = function(chunk, encoding, callback) { + var self = this; + self._call.startWrite(chunk, function(event) { + callback(); + }, 0); +}; + +/** + * Constructs a server object that stores request handlers and delegates + * incoming requests to those handlers + * @constructor + * @param {Array} options Options that should be passed to the internal server + * implementation + */ +function Server(options) { + this.handlers = {}; + var handlers = this.handlers; + var server = new grpc.Server(options); + this._server = server; + var started = false; + /** + * Start the server and begin handling requests + * @this Server + */ + this.start = function() { + if (this.started) { + throw 'Server is already running'; + } + server.start(); + /** + * Handles the SERVER_RPC_NEW event. If there is a handler associated with + * the requested method, use that handler to respond to the request. Then + * wait for the next request + * @param {grpc.Event} event The event to handle with tag SERVER_RPC_NEW + */ + function handleNewCall(event) { + var call = event.call; + var data = event.data; + if (data == null) { + return; + } + server.requestCall(handleNewCall); + var handler = undefined; + var deadline = data.absolute_deadline; + var cancelled = false; + if (handlers.hasOwnProperty(data.method)) { + handler = handlers[data.method]; + } + call.serverAccept(function(event) { + if (event.data.code === grpc.status.CANCELLED) { + cancelled = true; + } + }, 0); + call.serverEndInitialMetadata(0); + var stream = new GrpcServerStream(call); + Object.defineProperty(stream, 'cancelled', { + get: function() { return cancelled;} + }); + try { + handler(stream, data.metadata); + } catch (e) { + stream.emit('error', e); + } + } + server.requestCall(handleNewCall); + }; + /** Shuts down the server. + */ + this.shutdown = function() { + server.shutdown(); + }; +} + +/** + * Registers a handler to handle the named method. Fails if there already is + * a handler for the given method. Returns true on success + * @param {string} name The name of the method that the provided function should + * handle/respond to. + * @param {function} handler Function that takes a stream of request values and + * returns a stream of response values + * @return {boolean} True if the handler was set. False if a handler was already + * set for that name. + */ +Server.prototype.register = function(name, handler) { + if (this.handlers.hasOwnProperty(name)) { + return false; + } + this.handlers[name] = handler; + return true; +}; + +/** + * Binds the server to the given port, with SSL enabled if secure is specified + * @param {string} port The port that the server should bind on, in the format + * "address:port" + * @param {boolean=} secure Whether the server should open a secure port + */ +Server.prototype.bind = function(port, secure) { + if (secure) { + this._server.addSecureHttp2Port(port); + } else { + this._server.addHttp2Port(port); + } +}; + +/** + * See documentation for Server + */ +module.exports = Server; diff --git a/src/node/server_credentials.cc b/src/node/server_credentials.cc new file mode 100644 index 0000000000..3d32bdafa9 --- /dev/null +++ b/src/node/server_credentials.cc @@ -0,0 +1,131 @@ +#include + +#include "grpc/grpc.h" +#include "grpc/grpc_security.h" +#include "grpc/support/log.h" +#include "server_credentials.h" + +namespace grpc { +namespace node { + +using ::node::Buffer; +using v8::Arguments; +using v8::Exception; +using v8::External; +using v8::Function; +using v8::FunctionTemplate; +using v8::Handle; +using v8::HandleScope; +using v8::Integer; +using v8::Local; +using v8::Object; +using v8::ObjectTemplate; +using v8::Persistent; +using v8::Value; + +Persistent ServerCredentials::constructor; +Persistent ServerCredentials::fun_tpl; + +ServerCredentials::ServerCredentials(grpc_server_credentials *credentials) + : wrapped_credentials(credentials) { +} + +ServerCredentials::~ServerCredentials() { + gpr_log(GPR_DEBUG, "Destroying server credentials object"); + grpc_server_credentials_release(wrapped_credentials); +} + +void ServerCredentials::Init(Handle exports) { + NanScope(); + Local tpl = FunctionTemplate::New(New); + tpl->SetClassName(NanNew("ServerCredentials")); + tpl->InstanceTemplate()->SetInternalFieldCount(1); + NanAssignPersistent(fun_tpl, tpl); + NanAssignPersistent(constructor, tpl->GetFunction()); + constructor->Set(NanNew("createSsl"), + FunctionTemplate::New(CreateSsl)->GetFunction()); + constructor->Set(NanNew("createFake"), + FunctionTemplate::New(CreateFake)->GetFunction()); + exports->Set(NanNew("ServerCredentials"), constructor); +} + +bool ServerCredentials::HasInstance(Handle val) { + NanScope(); + return NanHasInstance(fun_tpl, val); +} + +Handle ServerCredentials::WrapStruct( + grpc_server_credentials *credentials) { + NanEscapableScope(); + if (credentials == NULL) { + return NanEscapeScope(NanNull()); + } + const int argc = 1; + Handle argv[argc] = { + External::New(reinterpret_cast(credentials)) }; + return NanEscapeScope(constructor->NewInstance(argc, argv)); +} + +grpc_server_credentials *ServerCredentials::GetWrappedServerCredentials() { + return wrapped_credentials; +} + +NAN_METHOD(ServerCredentials::New) { + NanScope(); + + if (args.IsConstructCall()) { + if (!args[0]->IsExternal()) { + return NanThrowTypeError( + "ServerCredentials can only be created with the provide functions"); + } + grpc_server_credentials *creds_value = + reinterpret_cast(External::Unwrap(args[0])); + ServerCredentials *credentials = new ServerCredentials(creds_value); + credentials->Wrap(args.This()); + NanReturnValue(args.This()); + } else { + const int argc = 1; + Local argv[argc] = { args[0] }; + NanReturnValue(constructor->NewInstance(argc, argv)); + } +} + +NAN_METHOD(ServerCredentials::CreateSsl) { + NanScope(); + char *root_certs = NULL; + char *private_key; + char *cert_chain; + int root_certs_length = 0, private_key_length, cert_chain_length; + if (Buffer::HasInstance(args[0])) { + root_certs = Buffer::Data(args[0]); + root_certs_length = Buffer::Length(args[0]); + } else if (!(args[0]->IsNull() || args[0]->IsUndefined())) { + return NanThrowTypeError( + "createSSl's first argument must be a Buffer if provided"); + } + if (!Buffer::HasInstance(args[1])) { + return NanThrowTypeError( + "createSsl's second argument must be a Buffer"); + } + private_key = Buffer::Data(args[1]); + private_key_length = Buffer::Length(args[1]); + if (!Buffer::HasInstance(args[2])) { + return NanThrowTypeError( + "createSsl's third argument must be a Buffer"); + } + cert_chain = Buffer::Data(args[2]); + cert_chain_length = Buffer::Length(args[2]); + NanReturnValue(WrapStruct(grpc_ssl_server_credentials_create( + reinterpret_cast(root_certs), root_certs_length, + reinterpret_cast(private_key), private_key_length, + reinterpret_cast(cert_chain), cert_chain_length))); +} + +NAN_METHOD(ServerCredentials::CreateFake) { + NanScope(); + NanReturnValue(WrapStruct( + grpc_fake_transport_security_server_credentials_create())); +} + +} // namespace node +} // namespace grpc diff --git a/src/node/server_credentials.h b/src/node/server_credentials.h new file mode 100644 index 0000000000..63220f5eee --- /dev/null +++ b/src/node/server_credentials.h @@ -0,0 +1,44 @@ +#ifndef NET_GRPC_NODE_SERVER_CREDENTIALS_H_ +#define NET_GRPC_NODE_SERVER_CREDENTIALS_H_ + +#include +#include +#include "grpc/grpc.h" +#include "grpc/grpc_security.h" + +namespace grpc { +namespace node { + +/* Wrapper class for grpc_server_credentials structs */ +class ServerCredentials : public ::node::ObjectWrap { + public: + static void Init(v8::Handle exports); + static bool HasInstance(v8::Handle val); + /* Wrap a grpc_server_credentials struct in a javascript object */ + static v8::Handle WrapStruct(grpc_server_credentials *credentials); + + /* Returns the grpc_server_credentials struct that this object wraps */ + grpc_server_credentials *GetWrappedServerCredentials(); + + private: + explicit ServerCredentials(grpc_server_credentials *credentials); + ~ServerCredentials(); + + // Prevent copying + ServerCredentials(const ServerCredentials&); + ServerCredentials& operator=(const ServerCredentials&); + + static NAN_METHOD(New); + static NAN_METHOD(CreateSsl); + static NAN_METHOD(CreateFake); + static v8::Persistent constructor; + // Used for typechecking instances of this javascript class + static v8::Persistent fun_tpl; + + grpc_server_credentials *wrapped_credentials; +}; + +} // namespace node +} // namespace grpc + +#endif // NET_GRPC_NODE_SERVER_CREDENTIALS_H_ diff --git a/src/node/surface_client.js b/src/node/surface_client.js new file mode 100644 index 0000000000..24f634ec65 --- /dev/null +++ b/src/node/surface_client.js @@ -0,0 +1,306 @@ +var _ = require('underscore'); + +var client = require('./client.js'); + +var EventEmitter = require('events').EventEmitter; + +var stream = require('stream'); + +var Readable = stream.Readable; +var Writable = stream.Writable; +var Duplex = stream.Duplex; +var util = require('util'); + +function forwardEvent(fromEmitter, toEmitter, event) { + fromEmitter.on(event, function forward() { + _.partial(toEmitter.emit, event).apply(toEmitter, arguments); + }); +} + +util.inherits(ClientReadableObjectStream, Readable); + +/** + * Class for representing a gRPC server streaming call as a Node stream on the + * client side. Extends from stream.Readable. + * @constructor + * @param {stream} stream Underlying binary Duplex stream for the call + * @param {function(Buffer)} deserialize Function for deserializing binary data + * @param {object} options Stream options + */ +function ClientReadableObjectStream(stream, deserialize, options) { + options = _.extend(options, {objectMode: true}); + Readable.call(this, options); + this._stream = stream; + var self = this; + forwardEvent(stream, this, 'status'); + forwardEvent(stream, this, 'metadata'); + this._stream.on('data', function forwardData(chunk) { + if (!self.push(deserialize(chunk))) { + self._stream.pause(); + } + }); + this._stream.pause(); +} + +util.inherits(ClientWritableObjectStream, Writable); + +/** + * Class for representing a gRPC client streaming call as a Node stream on the + * client side. Extends from stream.Writable. + * @constructor + * @param {stream} stream Underlying binary Duplex stream for the call + * @param {function(*):Buffer} serialize Function for serializing objects + * @param {object} options Stream options + */ +function ClientWritableObjectStream(stream, serialize, options) { + options = _.extend(options, {objectMode: true}); + Writable.call(this, options); + this._stream = stream; + this._serialize = serialize; + forwardEvent(stream, this, 'status'); + forwardEvent(stream, this, 'metadata'); + this.on('finish', function() { + this._stream.end(); + }); +} + + +util.inherits(ClientBidiObjectStream, Duplex); + +/** + * Class for representing a gRPC bidi streaming call as a Node stream on the + * client side. Extends from stream.Duplex. + * @constructor + * @param {stream} stream Underlying binary Duplex stream for the call + * @param {function(*):Buffer} serialize Function for serializing objects + * @param {function(Buffer)} deserialize Function for deserializing binary data + * @param {object} options Stream options + */ +function ClientBidiObjectStream(stream, serialize, deserialize, options) { + options = _.extend(options, {objectMode: true}); + Duplex.call(this, options); + this._stream = stream; + this._serialize = serialize; + var self = this; + forwardEvent(stream, this, 'status'); + forwardEvent(stream, this, 'metadata'); + this._stream.on('data', function forwardData(chunk) { + if (!self.push(deserialize(chunk))) { + self._stream.pause(); + } + }); + this._stream.pause(); + this.on('finish', function() { + this._stream.end(); + }); +} + +/** + * _read implementation for both types of streams that allow reading. + * @this {ClientReadableObjectStream|ClientBidiObjectStream} + * @param {number} size Ignored + */ +function _read(size) { + this._stream.resume(); +} + +/** + * See docs for _read + */ +ClientReadableObjectStream.prototype._read = _read; +/** + * See docs for _read + */ +ClientBidiObjectStream.prototype._read = _read; + +/** + * _write implementation for both types of streams that allow writing + * @this {ClientWritableObjectStream|ClientBidiObjectStream} + * @param {*} chunk The value to write to the stream + * @param {string} encoding Ignored + * @param {function(Error)} callback Callback to call when finished writing + */ +function _write(chunk, encoding, callback) { + this._stream.write(this._serialize(chunk), encoding, callback); +} + +/** + * See docs for _write + */ +ClientWritableObjectStream.prototype._write = _write; +/** + * See docs for _write + */ +ClientBidiObjectStream.prototype._write = _write; + +/** + * Get a function that can make unary requests to the specified method. + * @param {string} method The name of the method to request + * @param {function(*):Buffer} serialize The serialization function for inputs + * @param {function(Buffer)} deserialize The deserialization function for + * outputs + * @return {Function} makeUnaryRequest + */ +function makeUnaryRequestFunction(method, serialize, deserialize) { + /** + * Make a unary request with this method on the given channel with the given + * argument, callback, etc. + * @param {client.Channel} channel The channel on which to make the request + * @param {*} argument The argument to the call. Should be serializable with + * serialize + * @param {function(?Error, value=)} callback The callback to for when the + * response is received + * @param {array=} metadata Array of metadata key/value pairs to add to the + * call + * @param {(number|Date)=} deadline The deadline for processing this request. + * Defaults to infinite future + * @return {EventEmitter} An event emitter for stream related events + */ + function makeUnaryRequest(channel, argument, callback, metadata, deadline) { + var stream = client.makeRequest(channel, method, metadata, deadline); + var emitter = new EventEmitter(); + forwardEvent(stream, emitter, 'status'); + forwardEvent(stream, emitter, 'metadata'); + stream.write(serialize(argument)); + stream.end(); + stream.on('data', function forwardData(chunk) { + try { + callback(null, deserialize(chunk)); + } catch (e) { + callback(e); + } + }); + return emitter; + } + return makeUnaryRequest; +} + +/** + * Get a function that can make client stream requests to the specified method. + * @param {string} method The name of the method to request + * @param {function(*):Buffer} serialize The serialization function for inputs + * @param {function(Buffer)} deserialize The deserialization function for + * outputs + * @return {Function} makeClientStreamRequest + */ +function makeClientStreamRequestFunction(method, serialize, deserialize) { + /** + * Make a client stream request with this method on the given channel with the + * given callback, etc. + * @param {client.Channel} channel The channel on which to make the request + * @param {function(?Error, value=)} callback The callback to for when the + * response is received + * @param {array=} metadata Array of metadata key/value pairs to add to the + * call + * @param {(number|Date)=} deadline The deadline for processing this request. + * Defaults to infinite future + * @return {EventEmitter} An event emitter for stream related events + */ + function makeClientStreamRequest(channel, callback, metadata, deadline) { + var stream = client.makeRequest(channel, method, metadata, deadline); + var obj_stream = new ClientWritableObjectStream(stream, serialize, {}); + stream.on('data', function forwardData(chunk) { + try { + callback(null, deserialize(chunk)); + } catch (e) { + callback(e); + } + }); + return obj_stream; + } + return makeClientStreamRequest; +} + +/** + * Get a function that can make server stream requests to the specified method. + * @param {string} method The name of the method to request + * @param {function(*):Buffer} serialize The serialization function for inputs + * @param {function(Buffer)} deserialize The deserialization function for + * outputs + * @return {Function} makeServerStreamRequest + */ +function makeServerStreamRequestFunction(method, serialize, deserialize) { + /** + * Make a server stream request with this method on the given channel with the + * given argument, etc. + * @param {client.Channel} channel The channel on which to make the request + * @param {*} argument The argument to the call. Should be serializable with + * serialize + * @param {array=} metadata Array of metadata key/value pairs to add to the + * call + * @param {(number|Date)=} deadline The deadline for processing this request. + * Defaults to infinite future + * @return {EventEmitter} An event emitter for stream related events + */ + function makeServerStreamRequest(channel, argument, metadata, deadline) { + var stream = client.makeRequest(channel, method, metadata, deadline); + var obj_stream = new ClientReadableObjectStream(stream, deserialize, {}); + stream.write(serialize(argument)); + stream.end(); + return obj_stream; + } + return makeServerStreamRequest; +} + +/** + * Get a function that can make bidirectional stream requests to the specified + * method. + * @param {string} method The name of the method to request + * @param {function(*):Buffer} serialize The serialization function for inputs + * @param {function(Buffer)} deserialize The deserialization function for + * outputs + * @return {Function} makeBidiStreamRequest + */ +function makeBidiStreamRequestFunction(method, serialize, deserialize) { + /** + * Make a bidirectional stream request with this method on the given channel. + * @param {client.Channel} channel The channel on which to make the request + * @param {array=} metadata Array of metadata key/value pairs to add to the + * call + * @param {(number|Date)=} deadline The deadline for processing this request. + * Defaults to infinite future + * @return {EventEmitter} An event emitter for stream related events + */ + function makeBidiStreamRequest(channel, metadata, deadline) { + var stream = client.makeRequest(channel, method, metadata, deadline); + var obj_stream = new ClientBidiObjectStream(stream, + serialize, + deserialize, + {}); + return obj_stream; + } + return makeBidiStreamRequest; +} + +/** + * See docs for makeUnaryRequestFunction + */ +exports.makeUnaryRequestFunction = makeUnaryRequestFunction; + +/** + * See docs for makeClientStreamRequestFunction + */ +exports.makeClientStreamRequestFunction = makeClientStreamRequestFunction; + +/** + * See docs for makeServerStreamRequestFunction + */ +exports.makeServerStreamRequestFunction = makeServerStreamRequestFunction; + +/** + * See docs for makeBidiStreamRequestFunction + */ +exports.makeBidiStreamRequestFunction = makeBidiStreamRequestFunction; + +/** + * See docs for client.Channel + */ +exports.Channel = client.Channel; +/** + * See docs for client.status + */ +exports.status = client.status; +/** + * See docs for client.callError + */ +exports.callError = client.callError; diff --git a/src/node/surface_server.js b/src/node/surface_server.js new file mode 100644 index 0000000000..38b4233d99 --- /dev/null +++ b/src/node/surface_server.js @@ -0,0 +1,325 @@ +var _ = require('underscore'); + +var Server = require('./server.js'); + +var stream = require('stream'); + +var Readable = stream.Readable; +var Writable = stream.Writable; +var Duplex = stream.Duplex; +var util = require('util'); + +util.inherits(ServerReadableObjectStream, Readable); + +/** + * Class for representing a gRPC client streaming call as a Node stream on the + * server side. Extends from stream.Readable. + * @constructor + * @param {stream} stream Underlying binary Duplex stream for the call + * @param {function(Buffer)} deserialize Function for deserializing binary data + * @param {object} options Stream options + */ +function ServerReadableObjectStream(stream, deserialize, options) { + options = _.extend(options, {objectMode: true}); + Readable.call(this, options); + this._stream = stream; + Object.defineProperty(this, 'cancelled', { + get: function() { return stream.cancelled; } + }); + var self = this; + this._stream.on('data', function forwardData(chunk) { + if (!self.push(deserialize(chunk))) { + self._stream.pause(); + } + }); + this._stream.on('end', function forwardEnd() { + self.push(null); + }); + this._stream.pause(); +} + +util.inherits(ServerWritableObjectStream, Writable); + +/** + * Class for representing a gRPC server streaming call as a Node stream on the + * server side. Extends from stream.Writable. + * @constructor + * @param {stream} stream Underlying binary Duplex stream for the call + * @param {function(*):Buffer} serialize Function for serializing objects + * @param {object} options Stream options + */ +function ServerWritableObjectStream(stream, serialize, options) { + options = _.extend(options, {objectMode: true}); + Writable.call(this, options); + this._stream = stream; + this._serialize = serialize; + this.on('finish', function() { + this._stream.end(); + }); +} + +util.inherits(ServerBidiObjectStream, Duplex); + +/** + * Class for representing a gRPC bidi streaming call as a Node stream on the + * server side. Extends from stream.Duplex. + * @constructor + * @param {stream} stream Underlying binary Duplex stream for the call + * @param {function(*):Buffer} serialize Function for serializing objects + * @param {function(Buffer)} deserialize Function for deserializing binary data + * @param {object} options Stream options + */ +function ServerBidiObjectStream(stream, serialize, deserialize, options) { + options = _.extend(options, {objectMode: true}); + Duplex.call(this, options); + this._stream = stream; + this._serialize = serialize; + var self = this; + this._stream.on('data', function forwardData(chunk) { + if (!self.push(deserialize(chunk))) { + self._stream.pause(); + } + }); + this._stream.on('end', function forwardEnd() { + self.push(null); + }); + this._stream.pause(); + this.on('finish', function() { + this._stream.end(); + }); +} + +/** + * _read implementation for both types of streams that allow reading. + * @this {ServerReadableObjectStream|ServerBidiObjectStream} + * @param {number} size Ignored + */ +function _read(size) { + this._stream.resume(); +} + +/** + * See docs for _read + */ +ServerReadableObjectStream.prototype._read = _read; +/** + * See docs for _read + */ +ServerBidiObjectStream.prototype._read = _read; + +/** + * _write implementation for both types of streams that allow writing + * @this {ServerWritableObjectStream|ServerBidiObjectStream} + * @param {*} chunk The value to write to the stream + * @param {string} encoding Ignored + * @param {function(Error)} callback Callback to call when finished writing + */ +function _write(chunk, encoding, callback) { + this._stream.write(this._serialize(chunk), encoding, callback); +} + +/** + * See docs for _write + */ +ServerWritableObjectStream.prototype._write = _write; +/** + * See docs for _write + */ +ServerBidiObjectStream.prototype._write = _write; + +/** + * Creates a binary stream handler function from a unary handler function + * @param {function(Object, function(Error, *))} handler Unary call handler + * @param {function(*):Buffer} serialize Serialization function + * @param {function(Buffer):*} deserialize Deserialization function + * @return {function(stream)} Binary stream handler + */ +function makeUnaryHandler(handler, serialize, deserialize) { + /** + * Handles a stream by reading a single data value, passing it to the handler, + * and writing the response back to the stream. + * @param {stream} stream Binary data stream + */ + return function handleUnaryCall(stream) { + stream.on('data', function handleUnaryData(value) { + var call = {request: deserialize(value)}; + Object.defineProperty(call, 'cancelled', { + get: function() { return stream.cancelled;} + }); + handler(call, function sendUnaryData(err, value) { + if (err) { + stream.emit('error', err); + } else { + stream.write(serialize(value)); + stream.end(); + } + }); + }); + }; +} + +/** + * Creates a binary stream handler function from a client stream handler + * function + * @param {function(Readable, function(Error, *))} handler Client stream call + * handler + * @param {function(*):Buffer} serialize Serialization function + * @param {function(Buffer):*} deserialize Deserialization function + * @return {function(stream)} Binary stream handler + */ +function makeClientStreamHandler(handler, serialize, deserialize) { + /** + * Handles a stream by passing a deserializing stream to the handler and + * writing the response back to the stream. + * @param {stream} stream Binary data stream + */ + return function handleClientStreamCall(stream) { + var object_stream = new ServerReadableObjectStream(stream, deserialize, {}); + handler(object_stream, function sendClientStreamData(err, value) { + if (err) { + stream.emit('error', err); + } else { + stream.write(serialize(value)); + stream.end(); + } + }); + }; +} + +/** + * Creates a binary stream handler function from a server stream handler + * function + * @param {function(Writable)} handler Server stream call handler + * @param {function(*):Buffer} serialize Serialization function + * @param {function(Buffer):*} deserialize Deserialization function + * @return {function(stream)} Binary stream handler + */ +function makeServerStreamHandler(handler, serialize, deserialize) { + /** + * Handles a stream by attaching it to a serializing stream, and passing it to + * the handler. + * @param {stream} stream Binary data stream + */ + return function handleServerStreamCall(stream) { + stream.on('data', function handleClientData(value) { + var object_stream = new ServerWritableObjectStream(stream, + serialize, + {}); + object_stream.request = deserialize(value); + handler(object_stream); + }); + }; +} + +/** + * Creates a binary stream handler function from a bidi stream handler function + * @param {function(Duplex)} handler Unary call handler + * @param {function(*):Buffer} serialize Serialization function + * @param {function(Buffer):*} deserialize Deserialization function + * @return {function(stream)} Binary stream handler + */ +function makeBidiStreamHandler(handler, serialize, deserialize) { + /** + * Handles a stream by wrapping it in a serializing and deserializing object + * stream, and passing it to the handler. + * @param {stream} stream Binary data stream + */ + return function handleBidiStreamCall(stream) { + var object_stream = new ServerBidiObjectStream(stream, + serialize, + deserialize, + {}); + handler(object_stream); + }; +} + +/** + * Map with short names for each of the handler maker functions. Used in + * makeServerConstructor + */ +var handler_makers = { + unary: makeUnaryHandler, + server_stream: makeServerStreamHandler, + client_stream: makeClientStreamHandler, + bidi: makeBidiStreamHandler +}; + +/** + * Creates a constructor for servers with a service defined by the methods + * object. The methods object has string keys and values of this form: + * {serialize: function, deserialize: function, client_stream: bool, + * server_stream: bool} + * @param {Object} methods Method descriptor for each method the server should + * expose + * @param {string} prefix The prefex to prepend to each method name + * @return {function(Object, Object)} New server constructor + */ +function makeServerConstructor(methods, prefix) { + /** + * Create a server with the given handlers for all of the methods. + * @constructor + * @param {Object} handlers Map from method names to method handlers. + * @param {Object} options Options to pass to the underlying server + */ + function SurfaceServer(handlers, options) { + var server = new Server(options); + this.inner_server = server; + _.each(handlers, function(handler, name) { + var method = methods[name]; + var method_type; + if (method.client_stream) { + if (method.server_stream) { + method_type = 'bidi'; + } else { + method_type = 'client_stream'; + } + } else { + if (method.server_stream) { + method_type = 'server_stream'; + } else { + method_type = 'unary'; + } + } + var binary_handler = handler_makers[method_type](handler, + method.serialize, + method.deserialize); + server.register('' + prefix + name, binary_handler); + }, this); + } + + /** + * Binds the server to the given port, with SSL enabled if secure is specified + * @param {string} port The port that the server should bind on, in the format + * "address:port" + * @param {boolean=} secure Whether the server should open a secure port + * @return {SurfaceServer} this + */ + SurfaceServer.prototype.bind = function(port, secure) { + this.inner_server.bind(port, secure); + return this; + }; + + /** + * Starts the server listening on any bound ports + * @return {SurfaceServer} this + */ + SurfaceServer.prototype.listen = function() { + this.inner_server.start(); + return this; + }; + + /** + * Shuts the server down; tells it to stop listening for new requests and to + * kill old requests. + */ + SurfaceServer.prototype.shutdown = function() { + this.inner_server.shutdown(); + }; + + return SurfaceServer; +} + +/** + * See documentation for makeServerConstructor + */ +exports.makeServerConstructor = makeServerConstructor; diff --git a/src/node/tag.cc b/src/node/tag.cc new file mode 100644 index 0000000000..0cebb83d1b --- /dev/null +++ b/src/node/tag.cc @@ -0,0 +1,71 @@ +#include +#include +#include +#include "tag.h" + +namespace grpc { +namespace node { + +using v8::Handle; +using v8::HandleScope; +using v8::Persistent; +using v8::Value; + +struct tag { + tag(Persistent *tag, Persistent *call) + : persist_tag(tag), persist_call(call) { + } + + ~tag() { + persist_tag->Dispose(); + if (persist_call != NULL) { + persist_call->Dispose(); + } + } + Persistent *persist_tag; + Persistent *persist_call; +}; + +void *CreateTag(Handle tag, Handle call) { + NanScope(); + Persistent *persist_tag = new Persistent(); + NanAssignPersistent(*persist_tag, tag); + Persistent *persist_call; + if (call->IsNull() || call->IsUndefined()) { + persist_call = NULL; + } else { + persist_call = new Persistent(); + NanAssignPersistent(*persist_call, call); + } + struct tag *tag_struct = new struct tag(persist_tag, persist_call); + return reinterpret_cast(tag_struct); +} + +Handle GetTagHandle(void *tag) { + NanEscapableScope(); + struct tag *tag_struct = reinterpret_cast(tag); + Handle tag_value = NanNew(*tag_struct->persist_tag); + return NanEscapeScope(tag_value); +} + +bool TagHasCall(void *tag) { + struct tag *tag_struct = reinterpret_cast(tag); + return tag_struct->persist_call != NULL; +} + +Handle TagGetCall(void *tag) { + NanEscapableScope(); + struct tag *tag_struct = reinterpret_cast(tag); + if (tag_struct->persist_call == NULL) { + return NanEscapeScope(NanNull()); + } + Handle call_value = NanNew(*tag_struct->persist_call); + return NanEscapeScope(call_value); +} + +void DestroyTag(void *tag) { + delete reinterpret_cast(tag); +} + +} // namespace node +} // namespace grpc diff --git a/src/node/tag.h b/src/node/tag.h new file mode 100644 index 0000000000..46c95de5b4 --- /dev/null +++ b/src/node/tag.h @@ -0,0 +1,26 @@ +#ifndef NET_GRPC_NODE_TAG_H_ +#define NET_GRPC_NODE_TAG_H_ + +#include + +namespace grpc { +namespace node { + +/* Create a void* tag that can be passed to various grpc_call functions from + a javascript value and the javascript wrapper for the call. The call can be + null. */ +void *CreateTag(v8::Handle tag, v8::Handle call); +/* Return the javascript value stored in the tag */ +v8::Handle GetTagHandle(void *tag); +/* Returns true if the call was set (non-null) when the tag was created */ +bool TagHasCall(void *tag); +/* Returns the javascript wrapper for the call associated with this tag */ +v8::Handle TagGetCall(void *call); +/* Destroy the tag and all resources it is holding. It is illegal to call any + of these other functions on a tag after it has been destroyed. */ +void DestroyTag(void *tag); + +} // namespace node +} // namespace grpc + +#endif // NET_GRPC_NODE_TAG_H_ diff --git a/src/node/test/byte_buffer_test.js~ b/src/node/test/byte_buffer_test.js~ new file mode 100644 index 0000000000..8f272e5bd6 --- /dev/null +++ b/src/node/test/byte_buffer_test.js~ @@ -0,0 +1,35 @@ +var assert = require('assert'); +var grpc = require('..build/Release/grpc'); + +describe('byte buffer', function() { + describe('constructor', function() { + it('should reject bad constructor calls', function() { + it('should require at least one argument', function() { + assert.throws(new grpc.ByteBuffer(), TypeError); + }); + it('should reject non-string arguments', function() { + assert.throws(new grpc.ByteBuffer(0), TypeError); + assert.throws(new grpc.ByteBuffer(1.5), TypeError); + assert.throws(new grpc.ByteBuffer(null), TypeError); + assert.throws(new grpc.ByteBuffer(Date.now()), TypeError); + }); + it('should accept string arguments', function() { + assert.doesNotThrow(new grpc.ByteBuffer('')); + assert.doesNotThrow(new grpc.ByteBuffer('test')); + assert.doesNotThrow(new grpc.ByteBuffer('\0')); + }); + }); + }); + describe('bytes', function() { + it('should return the passed string', function() { + it('should preserve simple strings', function() { + var buffer = new grpc.ByteBuffer('test'); + assert.strictEqual(buffer.bytes(), 'test'); + }); + it('should preserve null characters', function() { + var buffer = new grpc.ByteBuffer('test\0test'); + assert.strictEqual(buffer.bytes(), 'test\0test'); + }); + }); + }); +}); diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js new file mode 100644 index 0000000000..25860d040d --- /dev/null +++ b/src/node/test/call_test.js @@ -0,0 +1,169 @@ +var assert = require('assert'); +var grpc = require('bindings')('grpc.node'); + +var channel = new grpc.Channel('localhost:7070'); + +/** + * Helper function to return an absolute deadline given a relative timeout in + * seconds. + * @param {number} timeout_secs The number of seconds to wait before timing out + * @return {Date} A date timeout_secs in the future + */ +function getDeadline(timeout_secs) { + var deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + timeout_secs); + return deadline; +} + +describe('call', function() { + describe('constructor', function() { + it('should reject anything less than 3 arguments', function() { + assert.throws(function() { + new grpc.Call(); + }, TypeError); + assert.throws(function() { + new grpc.Call(channel); + }, TypeError); + assert.throws(function() { + new grpc.Call(channel, 'method'); + }, TypeError); + }); + it('should succeed with a Channel, a string, and a date or number', + function() { + assert.doesNotThrow(function() { + new grpc.Call(channel, 'method', new Date()); + }); + assert.doesNotThrow(function() { + new grpc.Call(channel, 'method', 0); + }); + }); + it('should fail with a closed channel', function() { + var local_channel = new grpc.Channel('hostname'); + local_channel.close(); + assert.throws(function() { + new grpc.Call(channel, 'method'); + }); + }); + it('should fail with other types', function() { + assert.throws(function() { + new grpc.Call({}, 'method', 0); + }, TypeError); + assert.throws(function() { + new grpc.Call(channel, null, 0); + }, TypeError); + assert.throws(function() { + new grpc.Call(channel, 'method', 'now'); + }, TypeError); + }); + }); + describe('addMetadata', function() { + it('should succeed with objects containing keys and values', function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + assert.doesNotThrow(function() { + call.addMetadata(); + }); + assert.doesNotThrow(function() { + call.addMetadata({'key' : 'key', + 'value' : new Buffer('value')}); + }); + assert.doesNotThrow(function() { + call.addMetadata({'key' : 'key1', + 'value' : new Buffer('value1')}, + {'key' : 'key2', + 'value' : new Buffer('value2')}); + }); + }); + it('should fail with other parameter types', function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + assert.throws(function() { + call.addMetadata(null); + }, TypeError); + assert.throws(function() { + call.addMetadata('value'); + }, TypeError); + assert.throws(function() { + call.addMetadata(5); + }, TypeError); + }); + it('should fail if startInvoke was already called', function(done) { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + call.startInvoke(function() {}, + function() {}, + function() {done();}, + 0); + assert.throws(function() { + call.addMetadata({'key' : 'key', 'value' : new Buffer('value') }); + }, function(err) { + return err.code === grpc.callError.ALREADY_INVOKED; + }); + // Cancel to speed up the test + call.cancel(); + }); + }); + describe('startInvoke', function() { + it('should fail with fewer than 4 arguments', function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + assert.throws(function() { + call.startInvoke(); + }, TypeError); + assert.throws(function() { + call.startInvoke(function() {}); + }, TypeError); + assert.throws(function() { + call.startInvoke(function() {}, + function() {}); + }, TypeError); + assert.throws(function() { + call.startInvoke(function() {}, + function() {}, + function() {}); + }, TypeError); + }); + it('should work with 3 args and an int', function(done) { + assert.doesNotThrow(function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + call.startInvoke(function() {}, + function() {}, + function() {done();}, + 0); + // Cancel to speed up the test + call.cancel(); + }); + }); + it('should reject incorrectly typed arguments', function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + assert.throws(function() { + call.startInvoke(0, 0, 0, 0); + }, TypeError); + assert.throws(function() { + call.startInvoke(function() {}, + function() {}, + function() {}, 'test'); + }); + }); + }); + describe('serverAccept', function() { + it('should fail with fewer than 1 argument1', function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + assert.throws(function() { + call.serverAccept(); + }, TypeError); + }); + it('should return an error when called on a client Call', function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + assert.throws(function() { + call.serverAccept(function() {}); + }, function(err) { + return err.code === grpc.callError.NOT_ON_CLIENT; + }); + }); + }); + describe('cancel', function() { + it('should succeed', function() { + var call = new grpc.Call(channel, 'method', getDeadline(1)); + assert.doesNotThrow(function() { + call.cancel(); + }); + }); + }); +}); diff --git a/src/node/test/call_test.js~ b/src/node/test/call_test.js~ new file mode 100644 index 0000000000..9506f209bd --- /dev/null +++ b/src/node/test/call_test.js~ @@ -0,0 +1,6 @@ +var assert = require('assert'); +var grpc = require('../build/Release/grpc'); + +describe('call', function() { + describe('constructor', function() { + it('should reject anything less than 4 arguments', function() { \ No newline at end of file diff --git a/src/node/test/channel_test.js b/src/node/test/channel_test.js new file mode 100644 index 0000000000..61833875b6 --- /dev/null +++ b/src/node/test/channel_test.js @@ -0,0 +1,55 @@ +var assert = require('assert'); +var grpc = require('bindings')('grpc.node'); + +describe('channel', function() { + describe('constructor', function() { + it('should require a string for the first argument', function() { + assert.doesNotThrow(function() { + new grpc.Channel('hostname'); + }); + assert.throws(function() { + new grpc.Channel(); + }, TypeError); + assert.throws(function() { + new grpc.Channel(5); + }); + }); + it('should accept an object for the second parameter', function() { + assert.doesNotThrow(function() { + new grpc.Channel('hostname', {}); + }); + assert.throws(function() { + new grpc.Channel('hostname', 5); + }); + }); + it('should only accept objects with string or int values', function() { + assert.doesNotThrow(function() { + new grpc.Channel('hostname', {'key' : 'value'}); + }); + assert.doesNotThrow(function() { + new grpc.Channel('hostname', {'key' : 5}); + }); + assert.throws(function() { + new grpc.Channel('hostname', {'key' : null}); + }); + assert.throws(function() { + new grpc.Channel('hostname', {'key' : new Date()}); + }); + }); + }); + describe('close', function() { + it('should succeed silently', function() { + var channel = new grpc.Channel('hostname', {}); + assert.doesNotThrow(function() { + channel.close(); + }); + }); + it('should be idempotent', function() { + var channel = new grpc.Channel('hostname', {}); + assert.doesNotThrow(function() { + channel.close(); + channel.close(); + }); + }); + }); +}); diff --git a/src/node/test/client_server_test.js b/src/node/test/client_server_test.js new file mode 100644 index 0000000000..9f15a76520 --- /dev/null +++ b/src/node/test/client_server_test.js @@ -0,0 +1,150 @@ +var assert = require('assert'); +var fs = require('fs'); +var path = require('path'); +var grpc = require('bindings')('grpc.node'); +var Server = require('../server'); +var client = require('../client'); +var port_picker = require('../port_picker'); +var common = require('../common'); +var _ = require('highland'); + +var ca_path = path.join(__dirname, 'data/ca.pem'); + +var key_path = path.join(__dirname, 'data/server1.key'); + +var pem_path = path.join(__dirname, 'data/server1.pem'); + +/** + * Helper function to return an absolute deadline given a relative timeout in + * seconds. + * @param {number} timeout_secs The number of seconds to wait before timing out + * @return {Date} A date timeout_secs in the future + */ +function getDeadline(timeout_secs) { + var deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + timeout_secs); + return deadline; +} + +/** + * Responds to every request with the same data as a response + * @param {Stream} stream + */ +function echoHandler(stream) { + stream.pipe(stream); +} + +/** + * Responds to every request with an error status + * @param {Stream} stream + */ +function errorHandler(stream) { + throw { + 'code' : grpc.status.UNIMPLEMENTED, + 'details' : 'error details' + }; +} + +describe('echo client', function() { + it('should receive echo responses', function(done) { + port_picker.nextAvailablePort(function(port) { + var server = new Server(); + server.bind(port); + server.register('echo', echoHandler); + server.start(); + + var messages = ['echo1', 'echo2', 'echo3', 'echo4']; + var channel = new grpc.Channel(port); + var stream = client.makeRequest( + channel, + 'echo'); + _(messages).map(function(val) { + return new Buffer(val); + }).pipe(stream); + var index = 0; + stream.on('data', function(chunk) { + assert.equal(messages[index], chunk.toString()); + index += 1; + }); + stream.on('end', function() { + server.shutdown(); + done(); + }); + }); + }); + it('should get an error status that the server throws', function(done) { + port_picker.nextAvailablePort(function(port) { + var server = new Server(); + server.bind(port); + server.register('error', errorHandler); + server.start(); + + var channel = new grpc.Channel(port); + var stream = client.makeRequest( + channel, + 'error', + null, + getDeadline(1)); + + stream.on('data', function() {}); + stream.write(new Buffer('test')); + stream.end(); + stream.on('status', function(status) { + assert.equal(status.code, grpc.status.UNIMPLEMENTED); + assert.equal(status.details, 'error details'); + server.shutdown(); + done(); + }); + + }); + }); +}); +/* TODO(mlumish): explore options for reducing duplication between this test + * and the insecure echo client test */ +describe('secure echo client', function() { + it('should recieve echo responses', function(done) { + port_picker.nextAvailablePort(function(port) { + fs.readFile(ca_path, function(err, ca_data) { + assert.ifError(err); + fs.readFile(key_path, function(err, key_data) { + assert.ifError(err); + fs.readFile(pem_path, function(err, pem_data) { + assert.ifError(err); + var creds = grpc.Credentials.createSsl(ca_data); + var server_creds = grpc.ServerCredentials.createSsl(null, + key_data, + pem_data); + + var server = new Server({'credentials' : server_creds}); + server.bind(port, true); + server.register('echo', echoHandler); + server.start(); + + var messages = ['echo1', 'echo2', 'echo3', 'echo4']; + var channel = new grpc.Channel(port, { + 'grpc.ssl_target_name_override' : 'foo.test.google.com', + 'credentials' : creds + }); + var stream = client.makeRequest( + channel, + 'echo'); + + _(messages).map(function(val) { + return new Buffer(val); + }).pipe(stream); + var index = 0; + stream.on('data', function(chunk) { + assert.equal(messages[index], chunk.toString()); + index += 1; + }); + stream.on('end', function() { + server.shutdown(); + done(); + }); + }); + + }); + }); + }); + }); +}); diff --git a/src/node/test/client_server_test.js~ b/src/node/test/client_server_test.js~ new file mode 100644 index 0000000000..22a0523939 --- /dev/null +++ b/src/node/test/client_server_test.js~ @@ -0,0 +1,59 @@ +var assert = require('assert'); +var grpc = require('../build/Debug/grpc'); +var Server = require('../server'); +var client = require('../client'); +var port_picker = require('../port_picker'); +var iterators = require('async-iterators'); + +/** + * General function to process an event by checking that there was no error and + * calling the callback passed as a tag. + * @param {*} err Truthy values indicate an error (in this case, that there was + * no event available). + * @param {grpc.Event} event The event to process. + */ +function processEvent(err, event) { + assert.ifError(err); + assert.notEqual(event, null); + event.getTag()(event); +} + +/** + * Responds to every request with the same data as a response + * @param {{next:function(function(*, Buffer))}} arg_iter The async iterator of + * arguments. + * @return {{next:function(function(*, Buffer))}} The async iterator of results + */ +function echoHandler(arg_iter) { + return { + 'next' : function(write) { + arg_iter.next(function(err, value) { + if (value == undefined) { + write({ + 'code' : grpc.status.OK, + 'details' : 'OK' + }); + } else { + write(err, value); + } + }); + } + }; +} + +describe('echo client server', function() { + it('should recieve echo responses', function(done) { + port_picker.nextAvailablePort(function(port) { + var server = new Server(port); + server.register('echo', echoHandler); + server.start(); + + var messages = ['echo1', 'echo2', 'echo3']; + var channel = new grpc.Channel(port); + var responses = client.makeRequest(channel, + 'echo', + iterators.fromArray(messages)); + assert.equal(messages, iterators.toArray(responses)); + }); + }); +}); diff --git a/src/node/test/completion_queue_test.js~ b/src/node/test/completion_queue_test.js~ new file mode 100644 index 0000000000..5d2d509be6 --- /dev/null +++ b/src/node/test/completion_queue_test.js~ @@ -0,0 +1,30 @@ +var assert = require('assert'); +var grpc = require('../build/Release/grpc'); + +describe('completion queue', function() { + describe('constructor', function() { + it('should succeed with now arguments', function() { + assert.doesNotThrow(function() { + new grpc.CompletionQueue(); + }); + }); + }); + describe('next', function() { + it('should require a date parameter', function() { + var queue = new grpc.CompletionQueue(); + assert.throws(function() { + queue->next(); + }, TypeError); + assert.throws(function() { + queue->next('test'); + }, TypeError); + assert.doesNotThrow(function() { + queue->next(Date.now()); + }); + }); + it('should return null from a new queue', function() { + var queue = new grpc.CompletionQueue(); + assert.strictEqual(queue->next(Date.now()), null); + }); + }); +}); \ No newline at end of file diff --git a/src/node/test/constant_test.js b/src/node/test/constant_test.js new file mode 100644 index 0000000000..b9f8958da9 --- /dev/null +++ b/src/node/test/constant_test.js @@ -0,0 +1,97 @@ +var assert = require('assert'); +var grpc = require('bindings')('grpc.node'); + +/** + * List of all status names + * @const + * @type {Array.} + */ +var statusNames = [ + 'OK', + 'CANCELLED', + 'UNKNOWN', + 'INVALID_ARGUMENT', + 'DEADLINE_EXCEEDED', + 'NOT_FOUND', + 'ALREADY_EXISTS', + 'PERMISSION_DENIED', + 'UNAUTHENTICATED', + 'RESOURCE_EXHAUSTED', + 'FAILED_PRECONDITION', + 'ABORTED', + 'OUT_OF_RANGE', + 'UNIMPLEMENTED', + 'INTERNAL', + 'UNAVAILABLE', + 'DATA_LOSS' +]; + +/** + * List of all call error names + * @const + * @type {Array.} + */ +var callErrorNames = [ + 'OK', + 'ERROR', + 'NOT_ON_SERVER', + 'NOT_ON_CLIENT', + 'ALREADY_INVOKED', + 'NOT_INVOKED', + 'ALREADY_FINISHED', + 'TOO_MANY_OPERATIONS', + 'INVALID_FLAGS' +]; + +/** + * List of all op error names + * @const + * @type {Array.} + */ +var opErrorNames = [ + 'OK', + 'ERROR' +]; + +/** + * List of all completion type names + * @const + * @type {Array.} + */ +var completionTypeNames = [ + 'QUEUE_SHUTDOWN', + 'READ', + 'INVOKE_ACCEPTED', + 'WRITE_ACCEPTED', + 'FINISH_ACCEPTED', + 'CLIENT_METADATA_READ', + 'FINISHED', + 'SERVER_RPC_NEW' +]; + +describe('constants', function() { + it('should have all of the status constants', function() { + for (var i = 0; i < statusNames.length; i++) { + assert(grpc.status.hasOwnProperty(statusNames[i]), + 'status missing: ' + statusNames[i]); + } + }); + it('should have all of the call errors', function() { + for (var i = 0; i < callErrorNames.length; i++) { + assert(grpc.callError.hasOwnProperty(callErrorNames[i]), + 'call error missing: ' + callErrorNames[i]); + } + }); + it('should have all of the op errors', function() { + for (var i = 0; i < opErrorNames.length; i++) { + assert(grpc.opError.hasOwnProperty(opErrorNames[i]), + 'op error missing: ' + opErrorNames[i]); + } + }); + it('should have all of the completion types', function() { + for (var i = 0; i < completionTypeNames.length; i++) { + assert(grpc.completionType.hasOwnProperty(completionTypeNames[i]), + 'completion type missing: ' + completionTypeNames[i]); + } + }); +}); diff --git a/src/node/test/constant_test.js~ b/src/node/test/constant_test.js~ new file mode 100644 index 0000000000..8ad9f81bbe --- /dev/null +++ b/src/node/test/constant_test.js~ @@ -0,0 +1,25 @@ +var assert = require("assert"); +var grpc = require("../build/Release"); + +var status_names = [ + "OK", + "CANCELLED", + "UNKNOWN", + "INVALID_ARGUMENT", + "DEADLINE_EXCEEDED", + "NOT_FOUND", + "ALREADY_EXISTS", + "PERMISSION_DENIED", + "UNAUTHENTICATED", + "RESOURCE_EXHAUSTED", + "FAILED_PRECONDITION", + "ABORTED", + "OUT_OF_RANGE", + "UNIMPLEMENTED", + "INTERNAL", + "UNAVAILABLE", + "DATA_LOSS" +]; + +describe("constants", function() { + it("should have all of the status constants", function() { diff --git a/src/node/test/data/README b/src/node/test/data/README new file mode 100644 index 0000000000..888d95b900 --- /dev/null +++ b/src/node/test/data/README @@ -0,0 +1 @@ +CONFIRMEDTESTKEY diff --git a/src/node/test/data/ca.pem b/src/node/test/data/ca.pem new file mode 100644 index 0000000000..6c8511a73c --- /dev/null +++ b/src/node/test/data/ca.pem @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICSjCCAbOgAwIBAgIJAJHGGR4dGioHMA0GCSqGSIb3DQEBCwUAMFYxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQxDzANBgNVBAMTBnRlc3RjYTAeFw0xNDExMTEyMjMxMjla +Fw0yNDExMDgyMjMxMjlaMFYxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0 +YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMT +BnRlc3RjYTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAwEDfBV5MYdlHVHJ7 ++L4nxrZy7mBfAVXpOc5vMYztssUI7mL2/iYujiIXM+weZYNTEpLdjyJdu7R5gGUu +g1jSVK/EPHfc74O7AyZU34PNIP4Sh33N+/A5YexrNgJlPY+E3GdVYi4ldWJjgkAd +Qah2PH5ACLrIIC6tRka9hcaBlIECAwEAAaMgMB4wDAYDVR0TBAUwAwEB/zAOBgNV +HQ8BAf8EBAMCAgQwDQYJKoZIhvcNAQELBQADgYEAHzC7jdYlzAVmddi/gdAeKPau +sPBG/C2HCWqHzpCUHcKuvMzDVkY/MP2o6JIW2DBbY64bO/FceExhjcykgaYtCH/m +oIU63+CFOTtR7otyQAWHqXa7q4SbCDlG7DyRFxqG0txPtGvy12lgldA2+RgcigQG +Dfcog5wrJytaQ6UA0wE= +-----END CERTIFICATE----- diff --git a/src/node/test/data/server1.key b/src/node/test/data/server1.key new file mode 100644 index 0000000000..143a5b8765 --- /dev/null +++ b/src/node/test/data/server1.key @@ -0,0 +1,16 @@ +-----BEGIN PRIVATE KEY----- +MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAOHDFScoLCVJpYDD +M4HYtIdV6Ake/sMNaaKdODjDMsux/4tDydlumN+fm+AjPEK5GHhGn1BgzkWF+slf +3BxhrA/8dNsnunstVA7ZBgA/5qQxMfGAq4wHNVX77fBZOgp9VlSMVfyd9N8YwbBY +AckOeUQadTi2X1S6OgJXgQ0m3MWhAgMBAAECgYAn7qGnM2vbjJNBm0VZCkOkTIWm +V10okw7EPJrdL2mkre9NasghNXbE1y5zDshx5Nt3KsazKOxTT8d0Jwh/3KbaN+YY +tTCbKGW0pXDRBhwUHRcuRzScjli8Rih5UOCiZkhefUTcRb6xIhZJuQy71tjaSy0p +dHZRmYyBYO2YEQ8xoQJBAPrJPhMBkzmEYFtyIEqAxQ/o/A6E+E4w8i+KM7nQCK7q +K4JXzyXVAjLfyBZWHGM2uro/fjqPggGD6QH1qXCkI4MCQQDmdKeb2TrKRh5BY1LR +81aJGKcJ2XbcDu6wMZK4oqWbTX2KiYn9GB0woM6nSr/Y6iy1u145YzYxEV/iMwff +DJULAkB8B2MnyzOg0pNFJqBJuH29bKCcHa8gHJzqXhNO5lAlEbMK95p/P2Wi+4Hd +aiEIAF1BF326QJcvYKmwSmrORp85AkAlSNxRJ50OWrfMZnBgzVjDx3xG6KsFQVk2 +ol6VhqL6dFgKUORFUWBvnKSyhjJxurlPEahV6oo6+A+mPhFY8eUvAkAZQyTdupP3 +XEFQKctGz+9+gKkemDp7LBBMEMBXrGTLPhpEfcjv/7KPdnFHYmhYeBTBnuVmTVWe +F98XJ7tIFfJq +-----END PRIVATE KEY----- diff --git a/src/node/test/data/server1.pem b/src/node/test/data/server1.pem new file mode 100644 index 0000000000..8e582e571f --- /dev/null +++ b/src/node/test/data/server1.pem @@ -0,0 +1,16 @@ +-----BEGIN CERTIFICATE----- +MIICmzCCAgSgAwIBAgIBAzANBgkqhkiG9w0BAQUFADBWMQswCQYDVQQGEwJBVTET +MBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0cyBQ +dHkgTHRkMQ8wDQYDVQQDDAZ0ZXN0Y2EwHhcNMTQwNzIyMDYwMDU3WhcNMjQwNzE5 +MDYwMDU3WjBkMQswCQYDVQQGEwJVUzERMA8GA1UECBMISWxsaW5vaXMxEDAOBgNV +BAcTB0NoaWNhZ28xFDASBgNVBAoTC0dvb2dsZSBJbmMuMRowGAYDVQQDFBEqLnRl +c3QuZ29vZ2xlLmNvbTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA4cMVJygs +JUmlgMMzgdi0h1XoCR7+ww1pop04OMMyy7H/i0PJ2W6Y35+b4CM8QrkYeEafUGDO +RYX6yV/cHGGsD/x02ye6ey1UDtkGAD/mpDEx8YCrjAc1Vfvt8Fk6Cn1WVIxV/J30 +3xjBsFgByQ55RBp1OLZfVLo6AleBDSbcxaECAwEAAaNrMGkwCQYDVR0TBAIwADAL +BgNVHQ8EBAMCBeAwTwYDVR0RBEgwRoIQKi50ZXN0Lmdvb2dsZS5mcoIYd2F0ZXJ6 +b29pLnRlc3QuZ29vZ2xlLmJlghIqLnRlc3QueW91dHViZS5jb22HBMCoAQMwDQYJ +KoZIhvcNAQEFBQADgYEAM2Ii0LgTGbJ1j4oqX9bxVcxm+/R5Yf8oi0aZqTJlnLYS +wXcBykxTx181s7WyfJ49WwrYXo78zTDAnf1ma0fPq3e4mpspvyndLh1a+OarHa1e +aT0DIIYk7qeEa1YcVljx2KyLd0r1BBAfrwyGaEPVeJQVYWaOJRU2we/KD4ojf9s= +-----END CERTIFICATE----- diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js new file mode 100644 index 0000000000..64b207cb5c --- /dev/null +++ b/src/node/test/end_to_end_test.js @@ -0,0 +1,168 @@ +var assert = require('assert'); +var grpc = require('bindings')('grpc.node'); +var port_picker = require('../port_picker'); + +/** + * This is used for testing functions with multiple asynchronous calls that + * can happen in different orders. This should be passed the number of async + * function invocations that can occur last, and each of those should call this + * function's return value + * @param {function()} done The function that should be called when a test is + * complete. + * @param {number} count The number of calls to the resulting function if the + * test passes. + * @return {function()} The function that should be called at the end of each + * sequence of asynchronous functions. + */ +function multiDone(done, count) { + return function() { + count -= 1; + if (count <= 0) { + done(); + } + }; +} + +describe('end-to-end', function() { + it('should start and end a request without error', function(complete) { + port_picker.nextAvailablePort(function(port) { + var server = new grpc.Server(); + var done = multiDone(function() { + complete(); + server.shutdown(); + }, 2); + server.addHttp2Port(port); + var channel = new grpc.Channel(port); + var deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 3); + var status_text = 'xyz'; + var call = new grpc.Call(channel, + 'dummy_method', + deadline); + call.startInvoke(function(event) { + assert.strictEqual(event.type, + grpc.completionType.INVOKE_ACCEPTED); + + call.writesDone(function(event) { + assert.strictEqual(event.type, + grpc.completionType.FINISH_ACCEPTED); + assert.strictEqual(event.data, grpc.opError.OK); + }); + },function(event) { + assert.strictEqual(event.type, + grpc.completionType.CLIENT_METADATA_READ); + },function(event) { + assert.strictEqual(event.type, grpc.completionType.FINISHED); + var status = event.data; + assert.strictEqual(status.code, grpc.status.OK); + assert.strictEqual(status.details, status_text); + done(); + }, 0); + + server.start(); + server.requestCall(function(event) { + assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW); + var server_call = event.call; + assert.notEqual(server_call, null); + server_call.serverAccept(function(event) { + assert.strictEqual(event.type, grpc.completionType.FINISHED); + }, 0); + server_call.serverEndInitialMetadata(0); + server_call.startWriteStatus( + grpc.status.OK, + status_text, + function(event) { + assert.strictEqual(event.type, + grpc.completionType.FINISH_ACCEPTED); + assert.strictEqual(event.data, grpc.opError.OK); + done(); + }); + }); + }); + }); + + it('should send and receive data without error', function(complete) { + port_picker.nextAvailablePort(function(port) { + var req_text = 'client_request'; + var reply_text = 'server_response'; + var server = new grpc.Server(); + var done = multiDone(function() { + complete(); + server.shutdown(); + }, 6); + server.addHttp2Port(port); + var channel = new grpc.Channel(port); + var deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 3); + var status_text = 'success'; + var call = new grpc.Call(channel, + 'dummy_method', + deadline); + call.startInvoke(function(event) { + assert.strictEqual(event.type, + grpc.completionType.INVOKE_ACCEPTED); + call.startWrite( + new Buffer(req_text), + function(event) { + assert.strictEqual(event.type, + grpc.completionType.WRITE_ACCEPTED); + assert.strictEqual(event.data, grpc.opError.OK); + call.writesDone(function(event) { + assert.strictEqual(event.type, + grpc.completionType.FINISH_ACCEPTED); + assert.strictEqual(event.data, grpc.opError.OK); + done(); + }); + }, 0); + call.startRead(function(event) { + assert.strictEqual(event.type, grpc.completionType.READ); + assert.strictEqual(event.data.toString(), reply_text); + done(); + }); + },function(event) { + assert.strictEqual(event.type, + grpc.completionType.CLIENT_METADATA_READ); + done(); + },function(event) { + assert.strictEqual(event.type, grpc.completionType.FINISHED); + var status = event.data; + assert.strictEqual(status.code, grpc.status.OK); + assert.strictEqual(status.details, status_text); + done(); + }, 0); + + server.start(); + server.requestCall(function(event) { + assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW); + var server_call = event.call; + assert.notEqual(server_call, null); + server_call.serverAccept(function(event) { + assert.strictEqual(event.type, grpc.completionType.FINISHED); + done(); + }); + server_call.serverEndInitialMetadata(0); + server_call.startRead(function(event) { + assert.strictEqual(event.type, grpc.completionType.READ); + assert.strictEqual(event.data.toString(), req_text); + server_call.startWrite( + new Buffer(reply_text), + function(event) { + assert.strictEqual(event.type, + grpc.completionType.WRITE_ACCEPTED); + assert.strictEqual(event.data, + grpc.opError.OK); + server_call.startWriteStatus( + grpc.status.OK, + status_text, + function(event) { + assert.strictEqual(event.type, + grpc.completionType.FINISH_ACCEPTED); + assert.strictEqual(event.data, grpc.opError.OK); + done(); + }); + }, 0); + }); + }); + }); + }); +}); diff --git a/src/node/test/end_to_end_test.js~ b/src/node/test/end_to_end_test.js~ new file mode 100644 index 0000000000..74184a287f --- /dev/null +++ b/src/node/test/end_to_end_test.js~ @@ -0,0 +1,72 @@ +var assert = require('assert'); +var grpc = require('../build/Release/grpc'); + +describe('end-to-end', function() { + it('should start and end a request without error', function() { + var event; + var client_queue = new grpc.CompletionQueue(); + var server_queue = new grpc.CompletionQueue(); + var server = new grpc.Server(server_queue); + server.addHttp2Port('localhost:9000'); + var channel = new grpc.Channel('localhost:9000'); + var deadline = Infinity; + var status_text = 'xyz'; + var call = new grpc.Call(channel, 'dummy_method', deadline); + var tag = 1; + assert.strictEqual(call.startInvoke(client_queue, tag, tag, tag), + grpc.callError.OK); + var server_tag = 2; + + // the client invocation was accepted + event = client_queue.next(deadline); + assert.notEqual(event, null); + assert.strictEqual(event->getType(), grpc.completionType.INVOKE_ACCEPTED); + + assert.strictEqual(call.writesDone(tag), grpc.callError.CALL_OK); + event = client_queue.next(deadline); + assert.notEqual(event, null); + assert.strictEqual(event.getType(), grpc.completionType.FINISH_ACCEPTED); + assert.strictEqual(event.getData(), grpc.opError.OK); + + // check that a server rpc new was recieved + assert(server.start()); + assert.strictEqual(server.requestCall(server_tag, server_tag), + grpc.callError.OK); + event = server_queue.next(deadline); + assert.notEqual(event, null); + assert.strictEqual(event.getType(), grpc.completionType.SERVER_RPC_NEW); + var server_call = event.getCall(); + assert.notEqual(server_call, null); + assert.strictEqual(server_call.accept(server_queue, server_tag), + grpc.callError.OK); + + // the server sends the status + assert.strictEqual(server_call.start_write_status(grpc.status.OK, + status_text, + server_tag), + grpc.callError.OK); + event = server_queue.next(deadline); + assert.notEqual(event, null); + assert.strictEqual(event.getType(), grpc.completionType.FINISH_ACCEPTED); + assert.strictEqual(event.getData(), grpc.opError.OK); + + // the client gets CLIENT_METADATA_READ + event = client_queue.next(deadline); + assert.notEqual(event, null); + assert.strictEqual(event.getType(), + grpc.completionType.CLIENT_METADATA_READ); + + // the client gets FINISHED + event = client_queue.next(deadline); + assert.notEqual(event, null); + assert.strictEqual(event.getType(), grpc.completionType.FINISHED); + var status = event.getData(); + assert.strictEqual(status.code, grpc.status.OK); + assert.strictEqual(status.details, status_text); + + // the server gets FINISHED + event = client_queue.next(deadline); + assert.notEqual(event, null); + assert.strictEqual(event.getType(), grpc.completionType.FINISHED); + }); +}); diff --git a/src/node/test/math_client_test.js b/src/node/test/math_client_test.js new file mode 100644 index 0000000000..dd2e183831 --- /dev/null +++ b/src/node/test/math_client_test.js @@ -0,0 +1,176 @@ +var assert = require('assert'); +var client = require('../surface_client.js'); +var ProtoBuf = require('protobufjs'); +var port_picker = require('../port_picker'); + +var builder = ProtoBuf.loadProtoFile(__dirname + '/../examples/math.proto'); +var math = builder.build('math'); + +/** + * Get a function that deserializes a specific type of protobuf. + * @param {function()} cls The constructor of the message type to deserialize + * @return {function(Buffer):cls} The deserialization function + */ +function deserializeCls(cls) { + /** + * Deserialize a buffer to a message object + * @param {Buffer} arg_buf The buffer to deserialize + * @return {cls} The resulting object + */ + return function deserialize(arg_buf) { + return cls.decode(arg_buf); + }; +} + +/** + * Serialize an object to a buffer + * @param {*} arg The object to serialize + * @return {Buffer} The serialized object + */ +function serialize(arg) { + return new Buffer(arg.encode().toBuffer()); +} + +/** + * Sends a Div request on the channel. + * @param {client.Channel} channel The channel on which to make the request + * @param {DivArg} argument The argument to the call. Should be serializable + * with serialize + * @param {function(?Error, value=)} The callback to for when the response is + * received + * @param {array=} Array of metadata key/value pairs to add to the call + * @param {(number|Date)=} deadline The deadline for processing this request. + * Defaults to infinite future + * @return {EventEmitter} An event emitter for stream related events + */ +var div = client.makeUnaryRequestFunction( + '/Math/Div', + serialize, + deserializeCls(math.DivReply)); + +/** + * Sends a Fib request on the channel. + * @param {client.Channel} channel The channel on which to make the request + * @param {*} argument The argument to the call. Should be serializable with + * serialize + * @param {array=} Array of metadata key/value pairs to add to the call + * @param {(number|Date)=} deadline The deadline for processing this request. + * Defaults to infinite future + * @return {EventEmitter} An event emitter for stream related events + */ +var fib = client.makeServerStreamRequestFunction( + '/Math/Fib', + serialize, + deserializeCls(math.Num)); + +/** + * Sends a Sum request on the channel. + * @param {client.Channel} channel The channel on which to make the request + * @param {function(?Error, value=)} The callback to for when the response is + * received + * @param {array=} Array of metadata key/value pairs to add to the call + * @param {(number|Date)=} deadline The deadline for processing this request. + * Defaults to infinite future + * @return {EventEmitter} An event emitter for stream related events + */ +var sum = client.makeClientStreamRequestFunction( + '/Math/Sum', + serialize, + deserializeCls(math.Num)); + +/** + * Sends a DivMany request on the channel. + * @param {client.Channel} channel The channel on which to make the request + * @param {array=} Array of metadata key/value pairs to add to the call + * @param {(number|Date)=} deadline The deadline for processing this request. + * Defaults to infinite future + * @return {EventEmitter} An event emitter for stream related events + */ +var divMany = client.makeBidiStreamRequestFunction( + '/Math/DivMany', + serialize, + deserializeCls(math.DivReply)); + +/** + * Channel to use to make requests to a running server. + */ +var channel; + +/** + * Server to test against + */ +var server = require('../examples/math_server.js'); + + +describe('Math client', function() { + before(function(done) { + port_picker.nextAvailablePort(function(port) { + server.bind(port).listen(); + channel = new client.Channel(port); + done(); + }); + }); + after(function() { + server.shutdown(); + }); + it('should handle a single request', function(done) { + var arg = new math.DivArgs({dividend: 7, divisor: 4}); + var call = div(channel, arg, function handleDivResult(err, value) { + assert.ifError(err); + assert.equal(value.get('quotient'), 1); + assert.equal(value.get('remainder'), 3); + }); + call.on('status', function checkStatus(status) { + assert.strictEqual(status.code, client.status.OK); + done(); + }); + }); + it('should handle a server streaming request', function(done) { + var arg = new math.FibArgs({limit: 7}); + var call = fib(channel, arg); + var expected_results = [1, 1, 2, 3, 5, 8, 13]; + var next_expected = 0; + call.on('data', function checkResponse(value) { + assert.equal(value.get('num'), expected_results[next_expected]); + next_expected += 1; + }); + call.on('status', function checkStatus(status) { + assert.strictEqual(status.code, client.status.OK); + done(); + }); + }); + it('should handle a client streaming request', function(done) { + var call = sum(channel, function handleSumResult(err, value) { + assert.ifError(err); + assert.equal(value.get('num'), 21); + }); + for (var i = 0; i < 7; i++) { + call.write(new math.Num({'num': i})); + } + call.end(); + call.on('status', function checkStatus(status) { + assert.strictEqual(status.code, client.status.OK); + done(); + }); + }); + it('should handle a bidirectional streaming request', function(done) { + function checkResponse(index, value) { + assert.equal(value.get('quotient'), index); + assert.equal(value.get('remainder'), 1); + } + var call = divMany(channel); + var response_index = 0; + call.on('data', function(value) { + checkResponse(response_index, value); + response_index += 1; + }); + for (var i = 0; i < 7; i++) { + call.write(new math.DivArgs({dividend: 2 * i + 1, divisor: 2})); + } + call.end(); + call.on('status', function checkStatus(status) { + assert.strictEqual(status.code, client.status.OK); + done(); + }); + }); +}); diff --git a/src/node/test/math_client_test.js~ b/src/node/test/math_client_test.js~ new file mode 100644 index 0000000000..eaa424ad7f --- /dev/null +++ b/src/node/test/math_client_test.js~ @@ -0,0 +1,87 @@ +var client = require('../surface_client.js'); + +var builder = ProtoBuf.loadProtoFile(__dirname + '/../examples/math.proto'); +var math = builder.build('math'); + +/** + * Get a function that deserializes a specific type of protobuf. + * @param {function()} cls The constructor of the message type to deserialize + * @return {function(Buffer):cls} The deserialization function + */ +function deserializeCls(cls) { + /** + * Deserialize a buffer to a message object + * @param {Buffer} arg_buf The buffer to deserialize + * @return {cls} The resulting object + */ + return function deserialize(arg_buf) { + return cls.decode(arg_buf); + }; +} + +/** + * Serialize an object to a buffer + * @param {*} arg The object to serialize + * @return {Buffer} The serialized object + */ +function serialize(arg) { + return new Buffer(arg.encode.toBuffer()); +} + +/** + * Sends a Div request on the channel. + * @param {client.Channel} channel The channel on which to make the request + * @param {*} argument The argument to the call. Should be serializable with + * serialize + * @param {function(?Error, value=)} The callback to for when the response is + * received + * @param {array=} Array of metadata key/value pairs to add to the call + * @param {(number|Date)=} deadline The deadline for processing this request. + * Defaults to infinite future + * @return {EventEmitter} An event emitter for stream related events + */ +var div = client.makeUnaryRequestFunction('/Math/Div', + serialize, + deserialize(math.DivReply)); + +/** + * Sends a Fib request on the channel. + * @param {client.Channel} channel The channel on which to make the request + * @param {*} argument The argument to the call. Should be serializable with + * serialize + * @param {array=} Array of metadata key/value pairs to add to the call + * @param {(number|Date)=} deadline The deadline for processing this request. + * Defaults to infinite future + * @return {EventEmitter} An event emitter for stream related events + */ +var fib = client.makeServerStreamRequestFunction('/Math/Fib', + serialize, + deserialize(math.Num)); + +/** + * Sends a Sum request on the channel. + * @param {client.Channel} channel The channel on which to make the request + * @param {function(?Error, value=)} The callback to for when the response is + * received + * @param {array=} Array of metadata key/value pairs to add to the call + * @param {(number|Date)=} deadline The deadline for processing this request. + * Defaults to infinite future + * @return {EventEmitter} An event emitter for stream related events + */ +var sum = client.makeClientStreamRequestFunction('/Math/Sum', + serialize, + deserialize(math.Num)); + +/** + * Sends a DivMany request on the channel. + * @param {client.Channel} channel The channel on which to make the request + * @param {array=} Array of metadata key/value pairs to add to the call + * @param {(number|Date)=} deadline The deadline for processing this request. + * Defaults to infinite future + * @return {EventEmitter} An event emitter for stream related events + */ +var divMany = client.makeBidiStreamRequestFunction('/Math/DivMany', + serialize, + deserialize(math.DivReply)); + +var channel = new client.Channel('localhost:7070'); diff --git a/src/node/test/server_test.js b/src/node/test/server_test.js new file mode 100644 index 0000000000..4dd972c3ff --- /dev/null +++ b/src/node/test/server_test.js @@ -0,0 +1,88 @@ +var assert = require('assert'); +var grpc = require('bindings')('grpc.node'); +var Server = require('../server'); +var port_picker = require('../port_picker'); + +/** + * This is used for testing functions with multiple asynchronous calls that + * can happen in different orders. This should be passed the number of async + * function invocations that can occur last, and each of those should call this + * function's return value + * @param {function()} done The function that should be called when a test is + * complete. + * @param {number} count The number of calls to the resulting function if the + * test passes. + * @return {function()} The function that should be called at the end of each + * sequence of asynchronous functions. + */ +function multiDone(done, count) { + return function() { + count -= 1; + if (count <= 0) { + done(); + } + }; +} + +/** + * Responds to every request with the same data as a response + * @param {Stream} stream + */ +function echoHandler(stream) { + stream.pipe(stream); +} + +describe('echo server', function() { + it('should echo inputs as responses', function(done) { + done = multiDone(done, 4); + port_picker.nextAvailablePort(function(port) { + var server = new Server(); + server.bind(port); + server.register('echo', echoHandler); + server.start(); + + var req_text = 'echo test string'; + var status_text = 'OK'; + + var channel = new grpc.Channel(port); + var deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 3); + var call = new grpc.Call(channel, + 'echo', + deadline); + call.startInvoke(function(event) { + assert.strictEqual(event.type, + grpc.completionType.INVOKE_ACCEPTED); + call.startWrite( + new Buffer(req_text), + function(event) { + assert.strictEqual(event.type, + grpc.completionType.WRITE_ACCEPTED); + assert.strictEqual(event.data, grpc.opError.OK); + call.writesDone(function(event) { + assert.strictEqual(event.type, + grpc.completionType.FINISH_ACCEPTED); + assert.strictEqual(event.data, grpc.opError.OK); + done(); + }); + }, 0); + call.startRead(function(event) { + assert.strictEqual(event.type, grpc.completionType.READ); + assert.strictEqual(event.data.toString(), req_text); + done(); + }); + },function(event) { + assert.strictEqual(event.type, + grpc.completionType.CLIENT_METADATA_READ); + done(); + },function(event) { + assert.strictEqual(event.type, grpc.completionType.FINISHED); + var status = event.data; + assert.strictEqual(status.code, grpc.status.OK); + assert.strictEqual(status.details, status_text); + server.shutdown(); + done(); + }, 0); + }); + }); +}); diff --git a/src/node/test/server_test.js~ b/src/node/test/server_test.js~ new file mode 100644 index 0000000000..3613e08232 --- /dev/null +++ b/src/node/test/server_test.js~ @@ -0,0 +1,22 @@ +var assert = require('assert'); +var grpc = require('./build/Debug/grpc'); +var Server = require('server'); + +function echoHandler(arg_iter) { + return { + 'next' : function(write) { + arg_iter.next(function(err, value) { + write(err, value); + }); + } + } +} + +describe('echo server', function() { + it('should echo inputs as responses', function(done) { + var server = new Server('localhost:5000'); + server.register('echo', echoHandler); + server.start(); + + }); +}); \ No newline at end of file diff --git a/src/node/timeval.cc b/src/node/timeval.cc new file mode 100644 index 0000000000..503ebe6f54 --- /dev/null +++ b/src/node/timeval.cc @@ -0,0 +1,33 @@ +#include + +#include "grpc/grpc.h" +#include "grpc/support/time.h" +#include "timeval.h" + +namespace grpc { +namespace node { + +gpr_timespec MillisecondsToTimespec(double millis) { + if (millis == std::numeric_limits::infinity()) { + return gpr_inf_future; + } else if (millis == -std::numeric_limits::infinity()) { + return gpr_inf_past; + } else { + return gpr_time_from_micros(static_cast(millis*1000)); + } +} + +double TimespecToMilliseconds(gpr_timespec timespec) { + if (gpr_time_cmp(timespec, gpr_inf_future) == 0) { + return std::numeric_limits::infinity(); + } else if (gpr_time_cmp(timespec, gpr_inf_past) == 0) { + return -std::numeric_limits::infinity(); + } else { + struct timeval time = gpr_timeval_from_timespec(timespec); + return (static_cast(time.tv_sec) * 1000 + + static_cast(time.tv_usec) / 1000); + } +} + +} // namespace node +} // namespace grpc diff --git a/src/node/timeval.h b/src/node/timeval.h new file mode 100644 index 0000000000..a350a09b00 --- /dev/null +++ b/src/node/timeval.h @@ -0,0 +1,15 @@ +#ifndef NET_GRPC_NODE_TIMEVAL_H_ +#define NET_GRPC_NODE_TIMEVAL_H_ + +#include "grpc/support/time.h" + +namespace grpc { +namespace node { + +double TimespecToMilliseconds(gpr_timespec time); +gpr_timespec MillisecondsToTimespec(double millis); + +} // namespace node +} // namespace grpc + +#endif // NET_GRPC_NODE_TIMEVAL_H_ -- cgit v1.2.3