aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/node
diff options
context:
space:
mode:
Diffstat (limited to 'src/node')
-rw-r--r--src/node/binding.gyp9
-rw-r--r--src/node/ext/call.cc654
-rw-r--r--src/node/ext/call.h65
-rw-r--r--src/node/ext/completion_queue_async_worker.cc25
-rw-r--r--src/node/ext/completion_queue_async_worker.h2
-rw-r--r--src/node/ext/credentials.cc1
-rw-r--r--src/node/ext/event.cc173
-rw-r--r--src/node/ext/event.h48
-rw-r--r--src/node/ext/node_grpc.cc58
-rw-r--r--src/node/ext/server.cc65
-rw-r--r--src/node/ext/server_credentials.cc1
-rw-r--r--src/node/ext/tag.cc101
-rw-r--r--src/node/ext/tag.h59
-rw-r--r--src/node/index.js10
-rw-r--r--src/node/interop/interop_client.js2
-rw-r--r--src/node/interop/test.proto2
-rw-r--r--src/node/package.json2
-rw-r--r--src/node/src/client.js525
-rw-r--r--src/node/src/common.js25
-rw-r--r--src/node/src/server.js583
-rw-r--r--src/node/src/surface_client.js357
-rw-r--r--src/node/src/surface_server.js340
-rw-r--r--src/node/test/call_test.js126
-rw-r--r--src/node/test/client_server_test.js255
-rw-r--r--src/node/test/constant_test.js37
-rw-r--r--src/node/test/end_to_end_test.js250
-rw-r--r--src/node/test/interop_sanity_test.js2
-rw-r--r--src/node/test/math_client_test.js3
-rw-r--r--src/node/test/server_test.js122
-rw-r--r--src/node/test/surface_test.js4
30 files changed, 1656 insertions, 2250 deletions
diff --git a/src/node/binding.gyp b/src/node/binding.gyp
index cf2a6acb04..fb4c779f8e 100644
--- a/src/node/binding.gyp
+++ b/src/node/binding.gyp
@@ -9,14 +9,15 @@
'include_dirs': [
"<!(nodejs -e \"require('nan')\")"
],
- 'cxxflags': [
+ 'cflags': [
+ '-std=c++11',
'-Wall',
'-pthread',
'-pedantic',
'-g',
'-zdefs'
- '-Werror',
- ],
+ '-Werror'
+ ],
'ldflags': [
'-g'
],
@@ -33,11 +34,9 @@
"ext/channel.cc",
"ext/completion_queue_async_worker.cc",
"ext/credentials.cc",
- "ext/event.cc",
"ext/node_grpc.cc",
"ext/server.cc",
"ext/server_credentials.cc",
- "ext/tag.cc",
"ext/timeval.cc"
],
'conditions' : [
diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc
index 23aead07b2..9ed389f3bc 100644
--- a/src/node/ext/call.cc
+++ b/src/node/ext/call.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2014, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -31,17 +31,25 @@
*
*/
+#include <memory>
+#include <vector>
+#include <map>
+
#include <node.h>
#include "grpc/support/log.h"
#include "grpc/grpc.h"
+#include "grpc/support/alloc.h"
#include "grpc/support/time.h"
#include "byte_buffer.h"
#include "call.h"
#include "channel.h"
#include "completion_queue_async_worker.h"
#include "timeval.h"
-#include "tag.h"
+
+using std::unique_ptr;
+using std::shared_ptr;
+using std::vector;
namespace grpc {
namespace node {
@@ -49,6 +57,7 @@ namespace node {
using ::node::Buffer;
using v8::Arguments;
using v8::Array;
+using v8::Boolean;
using v8::Exception;
using v8::External;
using v8::Function;
@@ -68,37 +77,372 @@ using v8::Value;
Persistent<Function> Call::constructor;
Persistent<FunctionTemplate> Call::fun_tpl;
-Call::Call(grpc_call *call) : wrapped_call(call) {}
-Call::~Call() { grpc_call_destroy(wrapped_call); }
+bool CreateMetadataArray(Handle<Object> metadata, grpc_metadata_array *array,
+ shared_ptr<Resources> resources) {
+ NanScope();
+ grpc_metadata_array_init(array);
+ Handle<Array> keys(metadata->GetOwnPropertyNames());
+ for (unsigned int i = 0; i < keys->Length(); i++) {
+ Handle<String> current_key(keys->Get(i)->ToString());
+ if (!metadata->Get(current_key)->IsArray()) {
+ return false;
+ }
+ array->capacity += Local<Array>::Cast(metadata->Get(current_key))->Length();
+ }
+ array->metadata = reinterpret_cast<grpc_metadata*>(
+ gpr_malloc(array->capacity * sizeof(grpc_metadata)));
+ for (unsigned int i = 0; i < keys->Length(); i++) {
+ Handle<String> current_key(keys->Get(i)->ToString());
+ NanUtf8String *utf8_key = new NanUtf8String(current_key);
+ resources->strings.push_back(unique_ptr<NanUtf8String>(utf8_key));
+ Handle<Array> values = Local<Array>::Cast(metadata->Get(current_key));
+ for (unsigned int j = 0; j < values->Length(); j++) {
+ Handle<Value> value = values->Get(j);
+ grpc_metadata *current = &array->metadata[array->count];
+ current->key = **utf8_key;
+ if (Buffer::HasInstance(value)) {
+ current->value = Buffer::Data(value);
+ current->value_length = Buffer::Length(value);
+ Persistent<Value> handle;
+ NanAssignPersistent(handle, value);
+ resources->handles.push_back(unique_ptr<PersistentHolder>(
+ new PersistentHolder(handle)));
+ } else if (value->IsString()) {
+ Handle<String> string_value = value->ToString();
+ NanUtf8String *utf8_value = new NanUtf8String(string_value);
+ resources->strings.push_back(unique_ptr<NanUtf8String>(utf8_value));
+ current->value = **utf8_value;
+ current->value_length = string_value->Length();
+ } else {
+ return false;
+ }
+ array->count += 1;
+ }
+ }
+ return true;
+}
+
+Handle<Value> ParseMetadata(const grpc_metadata_array *metadata_array) {
+ NanEscapableScope();
+ grpc_metadata *metadata_elements = metadata_array->metadata;
+ size_t length = metadata_array->count;
+ std::map<const char*, size_t> size_map;
+ std::map<const char*, size_t> index_map;
+
+ for (unsigned int i = 0; i < length; i++) {
+ const char *key = metadata_elements[i].key;
+ if (size_map.count(key)) {
+ size_map[key] += 1;
+ }
+ index_map[key] = 0;
+ }
+ Handle<Object> metadata_object = NanNew<Object>();
+ for (unsigned int i = 0; i < length; i++) {
+ grpc_metadata* elem = &metadata_elements[i];
+ Handle<String> key_string = String::New(elem->key);
+ Handle<Array> array;
+ if (metadata_object->Has(key_string)) {
+ array = Handle<Array>::Cast(metadata_object->Get(key_string));
+ } else {
+ array = NanNew<Array>(size_map[elem->key]);
+ metadata_object->Set(key_string, array);
+ }
+ array->Set(index_map[elem->key],
+ MakeFastBuffer(
+ NanNewBufferHandle(elem->value, elem->value_length)));
+ index_map[elem->key] += 1;
+ }
+ return NanEscapeScope(metadata_object);
+}
+
+Handle<Value> Op::GetOpType() const {
+ NanEscapableScope();
+ return NanEscapeScope(NanNew<String>(GetTypeString()));
+}
+
+class SendMetadataOp : public Op {
+ public:
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ return NanEscapeScope(NanTrue());
+ }
+ bool ParseOp(Handle<Value> value, grpc_op *out,
+ shared_ptr<Resources> resources) {
+ if (!value->IsObject()) {
+ return false;
+ }
+ grpc_metadata_array array;
+ if (!CreateMetadataArray(value->ToObject(), &array, resources)) {
+ return false;
+ }
+ out->data.send_initial_metadata.count = array.count;
+ out->data.send_initial_metadata.metadata = array.metadata;
+ return true;
+ }
+ protected:
+ std::string GetTypeString() const {
+ return "send metadata";
+ }
+};
+
+class SendMessageOp : public Op {
+ public:
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ return NanEscapeScope(NanTrue());
+ }
+ bool ParseOp(Handle<Value> value, grpc_op *out,
+ shared_ptr<Resources> resources) {
+ if (!Buffer::HasInstance(value)) {
+ return false;
+ }
+ out->data.send_message = BufferToByteBuffer(value);
+ Persistent<Value> handle;
+ NanAssignPersistent(handle, value);
+ resources->handles.push_back(unique_ptr<PersistentHolder>(
+ new PersistentHolder(handle)));
+ return true;
+ }
+ protected:
+ std::string GetTypeString() const {
+ return "send message";
+ }
+};
+
+class SendClientCloseOp : public Op {
+ public:
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ return NanEscapeScope(NanTrue());
+ }
+ bool ParseOp(Handle<Value> value, grpc_op *out,
+ shared_ptr<Resources> resources) {
+ return true;
+ }
+ protected:
+ std::string GetTypeString() const {
+ return "client close";
+ }
+};
+
+class SendServerStatusOp : public Op {
+ public:
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ return NanEscapeScope(NanTrue());
+ }
+ bool ParseOp(Handle<Value> value, grpc_op *out,
+ shared_ptr<Resources> resources) {
+ if (!value->IsObject()) {
+ return false;
+ }
+ Handle<Object> server_status = value->ToObject();
+ if (!server_status->Get(NanNew("metadata"))->IsObject()) {
+ return false;
+ }
+ if (!server_status->Get(NanNew("code"))->IsUint32()) {
+ return false;
+ }
+ if (!server_status->Get(NanNew("details"))->IsString()) {
+ return false;
+ }
+ grpc_metadata_array array;
+ if (!CreateMetadataArray(server_status->Get(NanNew("metadata"))->
+ ToObject(),
+ &array, resources)) {
+ return false;
+ }
+ out->data.send_status_from_server.trailing_metadata_count = array.count;
+ out->data.send_status_from_server.trailing_metadata = array.metadata;
+ out->data.send_status_from_server.status =
+ static_cast<grpc_status_code>(
+ server_status->Get(NanNew("code"))->Uint32Value());
+ NanUtf8String *str = new NanUtf8String(
+ server_status->Get(NanNew("details")));
+ resources->strings.push_back(unique_ptr<NanUtf8String>(str));
+ out->data.send_status_from_server.status_details = **str;
+ return true;
+ }
+ protected:
+ std::string GetTypeString() const {
+ return "send status";
+ }
+};
+
+class GetMetadataOp : public Op {
+ public:
+ GetMetadataOp() {
+ grpc_metadata_array_init(&recv_metadata);
+ }
+
+ ~GetMetadataOp() {
+ grpc_metadata_array_destroy(&recv_metadata);
+ }
+
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ return NanEscapeScope(ParseMetadata(&recv_metadata));
+ }
+
+ bool ParseOp(Handle<Value> value, grpc_op *out,
+ shared_ptr<Resources> resources) {
+ out->data.recv_initial_metadata = &recv_metadata;
+ return true;
+ }
+
+ protected:
+ std::string GetTypeString() const {
+ return "metadata";
+ }
+
+ private:
+ grpc_metadata_array recv_metadata;
+};
+
+class ReadMessageOp : public Op {
+ public:
+ ReadMessageOp() {
+ recv_message = NULL;
+ }
+ ~ReadMessageOp() {
+ if (recv_message != NULL) {
+ gpr_free(recv_message);
+ }
+ }
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ return NanEscapeScope(ByteBufferToBuffer(recv_message));
+ }
+
+ bool ParseOp(Handle<Value> value, grpc_op *out,
+ shared_ptr<Resources> resources) {
+ out->data.recv_message = &recv_message;
+ return true;
+ }
+
+ protected:
+ std::string GetTypeString() const {
+ return "read";
+ }
+
+ private:
+ grpc_byte_buffer *recv_message;
+};
+
+class ClientStatusOp : public Op {
+ public:
+ ClientStatusOp() {
+ grpc_metadata_array_init(&metadata_array);
+ status_details = NULL;
+ details_capacity = 0;
+ }
+
+ ~ClientStatusOp() {
+ grpc_metadata_array_destroy(&metadata_array);
+ gpr_free(status_details);
+ }
+
+ bool ParseOp(Handle<Value> value, grpc_op *out,
+ shared_ptr<Resources> resources) {
+ out->data.recv_status_on_client.trailing_metadata = &metadata_array;
+ out->data.recv_status_on_client.status = &status;
+ out->data.recv_status_on_client.status_details = &status_details;
+ out->data.recv_status_on_client.status_details_capacity = &details_capacity;
+ return true;
+ }
+
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ Handle<Object> status_obj = NanNew<Object>();
+ status_obj->Set(NanNew("code"), NanNew<Number>(status));
+ if (status_details != NULL) {
+ status_obj->Set(NanNew("details"), String::New(status_details));
+ }
+ status_obj->Set(NanNew("metadata"), ParseMetadata(&metadata_array));
+ return NanEscapeScope(status_obj);
+ }
+ protected:
+ std::string GetTypeString() const {
+ return "status";
+ }
+ private:
+ grpc_metadata_array metadata_array;
+ grpc_status_code status;
+ char *status_details;
+ size_t details_capacity;
+};
+
+class ServerCloseResponseOp : public Op {
+ public:
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ return NanEscapeScope(NanNew<Boolean>(cancelled));
+ }
+
+ bool ParseOp(Handle<Value> value, grpc_op *out,
+ shared_ptr<Resources> resources) {
+ out->data.recv_close_on_server.cancelled = &cancelled;
+ return true;
+ }
+
+ protected:
+ std::string GetTypeString() const {
+ return "cancelled";
+ }
+
+ private:
+ int cancelled;
+};
+
+tag::tag(NanCallback *callback, OpVec *ops,
+ shared_ptr<Resources> resources) :
+ callback(callback), ops(ops), resources(resources){
+}
+
+tag::~tag() {
+ delete callback;
+ delete ops;
+}
+
+Handle<Value> GetTagNodeValue(void *tag) {
+ NanEscapableScope();
+ struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
+ Handle<Object> tag_obj = NanNew<Object>();
+ for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin();
+ it != tag_struct->ops->end(); ++it) {
+ Op *op_ptr = it->get();
+ tag_obj->Set(op_ptr->GetOpType(), op_ptr->GetNodeValue());
+ }
+ return NanEscapeScope(tag_obj);
+}
+
+NanCallback *GetTagCallback(void *tag) {
+ struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
+ return tag_struct->callback;
+}
+
+void DestroyTag(void *tag) {
+ struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
+ delete tag_struct;
+}
+
+Call::Call(grpc_call *call) : wrapped_call(call) {
+}
+
+Call::~Call() {
+ grpc_call_destroy(wrapped_call);
+}
void Call::Init(Handle<Object> exports) {
NanScope();
Local<FunctionTemplate> tpl = FunctionTemplate::New(New);
tpl->SetClassName(NanNew("Call"));
tpl->InstanceTemplate()->SetInternalFieldCount(1);
- NanSetPrototypeTemplate(tpl, "addMetadata",
- FunctionTemplate::New(AddMetadata)->GetFunction());
- NanSetPrototypeTemplate(tpl, "invoke",
- FunctionTemplate::New(Invoke)->GetFunction());
- NanSetPrototypeTemplate(tpl, "serverAccept",
- FunctionTemplate::New(ServerAccept)->GetFunction());
- NanSetPrototypeTemplate(
- tpl, "serverEndInitialMetadata",
- FunctionTemplate::New(ServerEndInitialMetadata)->GetFunction());
+ NanSetPrototypeTemplate(tpl, "startBatch",
+ FunctionTemplate::New(StartBatch)->GetFunction());
NanSetPrototypeTemplate(tpl, "cancel",
FunctionTemplate::New(Cancel)->GetFunction());
- NanSetPrototypeTemplate(tpl, "startWrite",
- FunctionTemplate::New(StartWrite)->GetFunction());
- NanSetPrototypeTemplate(
- tpl, "startWriteStatus",
- FunctionTemplate::New(StartWriteStatus)->GetFunction());
- NanSetPrototypeTemplate(tpl, "writesDone",
- FunctionTemplate::New(WritesDone)->GetFunction());
- NanSetPrototypeTemplate(tpl, "startReadMetadata",
- FunctionTemplate::New(WritesDone)->GetFunction());
- NanSetPrototypeTemplate(tpl, "startRead",
- FunctionTemplate::New(StartRead)->GetFunction());
NanAssignPersistent(fun_tpl, tpl);
NanAssignPersistent(constructor, tpl->GetFunction());
constructor->Set(NanNew("WRITE_BUFFER_HINT"),
@@ -152,9 +496,9 @@ NAN_METHOD(Call::New) {
NanUtf8String method(args[1]);
double deadline = args[2]->NumberValue();
grpc_channel *wrapped_channel = channel->GetWrappedChannel();
- grpc_call *wrapped_call = grpc_channel_create_call_old(
- wrapped_channel, *method, channel->GetHost(),
- MillisecondsToTimespec(deadline));
+ grpc_call *wrapped_call = grpc_channel_create_call(
+ wrapped_channel, CompletionQueueAsyncWorker::GetQueue(), *method,
+ channel->GetHost(), MillisecondsToTimespec(deadline));
call = new Call(wrapped_call);
args.This()->SetHiddenValue(String::NewSymbol("channel_"),
channel_object);
@@ -168,119 +512,74 @@ NAN_METHOD(Call::New) {
}
}
-NAN_METHOD(Call::AddMetadata) {
+NAN_METHOD(Call::StartBatch) {
NanScope();
if (!HasInstance(args.This())) {
- return NanThrowTypeError("addMetadata can only be called on Call objects");
+ return NanThrowTypeError("startBatch can only be called on Call objects");
}
- Call *call = ObjectWrap::Unwrap<Call>(args.This());
if (!args[0]->IsObject()) {
- return NanThrowTypeError("addMetadata's first argument must be an object");
- }
- Handle<Object> metadata = args[0]->ToObject();
- Handle<Array> keys(metadata->GetOwnPropertyNames());
- for (unsigned int i = 0; i < keys->Length(); i++) {
- Handle<String> current_key(keys->Get(i)->ToString());
- if (!metadata->Get(current_key)->IsArray()) {
- return NanThrowTypeError(
- "addMetadata's first argument's values must be arrays");
- }
- NanUtf8String utf8_key(current_key);
- Handle<Array> values = Local<Array>::Cast(metadata->Get(current_key));
- for (unsigned int j = 0; j < values->Length(); j++) {
- Handle<Value> value = values->Get(j);
- grpc_metadata metadata;
- grpc_call_error error;
- metadata.key = *utf8_key;
- if (Buffer::HasInstance(value)) {
- metadata.value = Buffer::Data(value);
- metadata.value_length = Buffer::Length(value);
- error = grpc_call_add_metadata_old(call->wrapped_call, &metadata, 0);
- } else if (value->IsString()) {
- Handle<String> string_value = value->ToString();
- NanUtf8String utf8_value(string_value);
- metadata.value = *utf8_value;
- metadata.value_length = string_value->Length();
- gpr_log(GPR_DEBUG, "adding metadata: %s, %s, %d", metadata.key,
- metadata.value, metadata.value_length);
- error = grpc_call_add_metadata_old(call->wrapped_call, &metadata, 0);
- } else {
- return NanThrowTypeError(
- "addMetadata values must be strings or buffers");
- }
- if (error != GRPC_CALL_OK) {
- return NanThrowError("addMetadata failed", error);
- }
- }
- }
- NanReturnUndefined();
-}
-
-NAN_METHOD(Call::Invoke) {
- NanScope();
- if (!HasInstance(args.This())) {
- return NanThrowTypeError("invoke can only be called on Call objects");
- }
- if (!args[0]->IsFunction()) {
- return NanThrowTypeError("invoke's first argument must be a function");
+ return NanThrowError("startBatch's first argument must be an object");
}
if (!args[1]->IsFunction()) {
- return NanThrowTypeError("invoke's second argument must be a function");
- }
- if (!args[2]->IsUint32()) {
- return NanThrowTypeError("invoke's third argument must be integer flags");
- }
- Call *call = ObjectWrap::Unwrap<Call>(args.This());
- unsigned int flags = args[3]->Uint32Value();
- grpc_call_error error = grpc_call_invoke_old(
- call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(),
- CreateTag(args[0], args.This()), CreateTag(args[1], args.This()), flags);
- if (error == GRPC_CALL_OK) {
- CompletionQueueAsyncWorker::Next();
- CompletionQueueAsyncWorker::Next();
- } else {
- return NanThrowError("invoke failed", error);
- }
- NanReturnUndefined();
-}
-
-NAN_METHOD(Call::ServerAccept) {
- NanScope();
- if (!HasInstance(args.This())) {
- return NanThrowTypeError("accept can only be called on Call objects");
- }
- if (!args[0]->IsFunction()) {
- return NanThrowTypeError("accept's first argument must be a function");
+ return NanThrowError("startBatch's second argument must be a callback");
}
+ Handle<Function> callback_func = args[1].As<Function>();
Call *call = ObjectWrap::Unwrap<Call>(args.This());
- grpc_call_error error = grpc_call_server_accept_old(
- call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(),
- CreateTag(args[0], args.This()));
- if (error == GRPC_CALL_OK) {
- CompletionQueueAsyncWorker::Next();
- } else {
- return NanThrowError("serverAccept failed", error);
- }
- NanReturnUndefined();
-}
-
-NAN_METHOD(Call::ServerEndInitialMetadata) {
- NanScope();
- if (!HasInstance(args.This())) {
- return NanThrowTypeError(
- "serverEndInitialMetadata can only be called on Call objects");
- }
- if (!args[0]->IsUint32()) {
- return NanThrowTypeError(
- "serverEndInitialMetadata's second argument must be integer flags");
+ shared_ptr<Resources> resources(new Resources);
+ Handle<Object> obj = args[0]->ToObject();
+ Handle<Array> keys = obj->GetOwnPropertyNames();
+ size_t nops = keys->Length();
+ vector<grpc_op> ops(nops);
+ unique_ptr<OpVec> op_vector(new OpVec());
+ for (unsigned int i = 0; i < nops; i++) {
+ unique_ptr<Op> op;
+ if (!keys->Get(i)->IsUint32()) {
+ return NanThrowError(
+ "startBatch's first argument's keys must be integers");
+ }
+ uint32_t type = keys->Get(i)->Uint32Value();
+ ops[i].op = static_cast<grpc_op_type>(type);
+ switch (type) {
+ case GRPC_OP_SEND_INITIAL_METADATA:
+ op.reset(new SendMetadataOp());
+ break;
+ case GRPC_OP_SEND_MESSAGE:
+ op.reset(new SendMessageOp());
+ break;
+ case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+ op.reset(new SendClientCloseOp());
+ break;
+ case GRPC_OP_SEND_STATUS_FROM_SERVER:
+ op.reset(new SendServerStatusOp());
+ break;
+ case GRPC_OP_RECV_INITIAL_METADATA:
+ op.reset(new GetMetadataOp());
+ break;
+ case GRPC_OP_RECV_MESSAGE:
+ op.reset(new ReadMessageOp());
+ break;
+ case GRPC_OP_RECV_STATUS_ON_CLIENT:
+ op.reset(new ClientStatusOp());
+ break;
+ case GRPC_OP_RECV_CLOSE_ON_SERVER:
+ op.reset(new ServerCloseResponseOp());
+ break;
+ default:
+ return NanThrowError("Argument object had an unrecognized key");
+ }
+ if (!op->ParseOp(obj->Get(type), &ops[i], resources)) {
+ return NanThrowTypeError("Incorrectly typed arguments to startBatch");
+ }
+ op_vector->push_back(std::move(op));
}
- Call *call = ObjectWrap::Unwrap<Call>(args.This());
- unsigned int flags = args[1]->Uint32Value();
- grpc_call_error error =
- grpc_call_server_end_initial_metadata_old(call->wrapped_call, flags);
+ NanCallback *callback = new NanCallback(callback_func);
+ grpc_call_error error = grpc_call_start_batch(
+ call->wrapped_call, &ops[0], nops, new struct tag(
+ callback, op_vector.release(), resources));
if (error != GRPC_CALL_OK) {
- return NanThrowError("serverEndInitialMetadata failed", error);
+ return NanThrowError("startBatch failed", error);
}
+ CompletionQueueAsyncWorker::Next();
NanReturnUndefined();
}
@@ -297,102 +596,5 @@ NAN_METHOD(Call::Cancel) {
NanReturnUndefined();
}
-NAN_METHOD(Call::StartWrite) {
- NanScope();
- if (!HasInstance(args.This())) {
- return NanThrowTypeError("startWrite can only be called on Call objects");
- }
- if (!Buffer::HasInstance(args[0])) {
- return NanThrowTypeError("startWrite's first argument must be a Buffer");
- }
- if (!args[1]->IsFunction()) {
- return NanThrowTypeError("startWrite's second argument must be a function");
- }
- if (!args[2]->IsUint32()) {
- return NanThrowTypeError(
- "startWrite's third argument must be integer flags");
- }
- Call *call = ObjectWrap::Unwrap<Call>(args.This());
- grpc_byte_buffer *buffer = BufferToByteBuffer(args[0]);
- unsigned int flags = args[2]->Uint32Value();
- grpc_call_error error = grpc_call_start_write_old(
- call->wrapped_call, buffer, CreateTag(args[1], args.This()), flags);
- if (error == GRPC_CALL_OK) {
- CompletionQueueAsyncWorker::Next();
- } else {
- return NanThrowError("startWrite failed", error);
- }
- NanReturnUndefined();
-}
-
-NAN_METHOD(Call::StartWriteStatus) {
- NanScope();
- if (!HasInstance(args.This())) {
- return NanThrowTypeError(
- "startWriteStatus can only be called on Call objects");
- }
- if (!args[0]->IsUint32()) {
- return NanThrowTypeError(
- "startWriteStatus's first argument must be a status code");
- }
- if (!args[1]->IsString()) {
- return NanThrowTypeError(
- "startWriteStatus's second argument must be a string");
- }
- if (!args[2]->IsFunction()) {
- return NanThrowTypeError(
- "startWriteStatus's third argument must be a function");
- }
- Call *call = ObjectWrap::Unwrap<Call>(args.This());
- NanUtf8String details(args[1]);
- grpc_call_error error = grpc_call_start_write_status_old(
- call->wrapped_call, (grpc_status_code)args[0]->Uint32Value(), *details,
- CreateTag(args[2], args.This()));
- if (error == GRPC_CALL_OK) {
- CompletionQueueAsyncWorker::Next();
- } else {
- return NanThrowError("startWriteStatus failed", error);
- }
- NanReturnUndefined();
-}
-
-NAN_METHOD(Call::WritesDone) {
- NanScope();
- if (!HasInstance(args.This())) {
- return NanThrowTypeError("writesDone can only be called on Call objects");
- }
- if (!args[0]->IsFunction()) {
- return NanThrowTypeError("writesDone's first argument must be a function");
- }
- Call *call = ObjectWrap::Unwrap<Call>(args.This());
- grpc_call_error error = grpc_call_writes_done_old(
- call->wrapped_call, CreateTag(args[0], args.This()));
- if (error == GRPC_CALL_OK) {
- CompletionQueueAsyncWorker::Next();
- } else {
- return NanThrowError("writesDone failed", error);
- }
- NanReturnUndefined();
-}
-
-NAN_METHOD(Call::StartRead) {
- NanScope();
- if (!HasInstance(args.This())) {
- return NanThrowTypeError("startRead can only be called on Call objects");
- }
- if (!args[0]->IsFunction()) {
- return NanThrowTypeError("startRead's first argument must be a function");
- }
- Call *call = ObjectWrap::Unwrap<Call>(args.This());
- grpc_call_error error = grpc_call_start_read_old(
- call->wrapped_call, CreateTag(args[0], args.This()));
- if (error == GRPC_CALL_OK) {
- CompletionQueueAsyncWorker::Next();
- } else {
- return NanThrowError("startRead failed", error);
- }
- NanReturnUndefined();
-}
-
} // namespace node
} // namespace grpc
diff --git a/src/node/ext/call.h b/src/node/ext/call.h
index 1924a1bf42..dbdb8e2ff6 100644
--- a/src/node/ext/call.h
+++ b/src/node/ext/call.h
@@ -34,15 +34,71 @@
#ifndef NET_GRPC_NODE_CALL_H_
#define NET_GRPC_NODE_CALL_H_
+#include <memory>
+#include <vector>
+
#include <node.h>
#include <nan.h>
#include "grpc/grpc.h"
#include "channel.h"
+
namespace grpc {
namespace node {
+using std::unique_ptr;
+using std::shared_ptr;
+
+v8::Handle<v8::Value> ParseMetadata(const grpc_metadata_array *metadata_array);
+
+class PersistentHolder {
+ public:
+ explicit PersistentHolder(v8::Persistent<v8::Value> persist) :
+ persist(persist) {
+ }
+
+ ~PersistentHolder() {
+ NanDisposePersistent(persist);
+ }
+
+ private:
+ v8::Persistent<v8::Value> persist;
+};
+
+struct Resources {
+ std::vector<unique_ptr<NanUtf8String> > strings;
+ std::vector<unique_ptr<PersistentHolder> > handles;
+};
+
+class Op {
+ public:
+ virtual v8::Handle<v8::Value> GetNodeValue() const = 0;
+ virtual bool ParseOp(v8::Handle<v8::Value> value, grpc_op *out,
+ shared_ptr<Resources> resources) = 0;
+ v8::Handle<v8::Value> GetOpType() const;
+
+ protected:
+ virtual std::string GetTypeString() const = 0;
+};
+
+typedef std::vector<unique_ptr<Op>> OpVec;
+
+struct tag {
+ tag(NanCallback *callback, OpVec *ops,
+ shared_ptr<Resources> resources);
+ ~tag();
+ NanCallback *callback;
+ OpVec *ops;
+ shared_ptr<Resources> resources;
+};
+
+v8::Handle<v8::Value> GetTagNodeValue(void *tag);
+
+NanCallback *GetTagCallback(void *tag);
+
+void DestroyTag(void *tag);
+
/* Wrapper class for grpc_call structs. */
class Call : public ::node::ObjectWrap {
public:
@@ -60,15 +116,8 @@ class Call : public ::node::ObjectWrap {
Call &operator=(const Call &);
static NAN_METHOD(New);
- static NAN_METHOD(AddMetadata);
- static NAN_METHOD(Invoke);
- static NAN_METHOD(ServerAccept);
- static NAN_METHOD(ServerEndInitialMetadata);
+ static NAN_METHOD(StartBatch);
static NAN_METHOD(Cancel);
- static NAN_METHOD(StartWrite);
- static NAN_METHOD(StartWriteStatus);
- static NAN_METHOD(WritesDone);
- static NAN_METHOD(StartRead);
static v8::Persistent<v8::Function> constructor;
// Used for typechecking instances of this javascript class
static v8::Persistent<v8::FunctionTemplate> fun_tpl;
diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc
index 8de7db66d5..a1f390f64b 100644
--- a/src/node/ext/completion_queue_async_worker.cc
+++ b/src/node/ext/completion_queue_async_worker.cc
@@ -35,10 +35,10 @@
#include <nan.h>
#include "grpc/grpc.h"
+#include "grpc/support/log.h"
#include "grpc/support/time.h"
#include "completion_queue_async_worker.h"
-#include "event.h"
-#include "tag.h"
+#include "call.h"
namespace grpc {
namespace node {
@@ -58,6 +58,9 @@ CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {}
void CompletionQueueAsyncWorker::Execute() {
result = grpc_completion_queue_next(queue, gpr_inf_future);
+ if (result->data.op_complete != GRPC_OP_OK) {
+ SetErrorMessage("The batch encountered an error");
+ }
}
grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; }
@@ -75,14 +78,26 @@ void CompletionQueueAsyncWorker::Init(Handle<Object> exports) {
void CompletionQueueAsyncWorker::HandleOKCallback() {
NanScope();
- NanCallback event_callback(GetTagHandle(result->tag).As<Function>());
- Handle<Value> argv[] = {CreateEventObject(result)};
+ NanCallback *callback = GetTagCallback(result->tag);
+ Handle<Value> argv[] = {NanNull(), GetTagNodeValue(result->tag)};
+
+ callback->Call(2, argv);
DestroyTag(result->tag);
grpc_event_finish(result);
result = NULL;
+}
+
+void CompletionQueueAsyncWorker::HandleErrorCallback() {
+ NanScope();
+ NanCallback *callback = GetTagCallback(result->tag);
+ Handle<Value> argv[] = {NanError(ErrorMessage())};
- event_callback.Call(1, argv);
+ callback->Call(1, argv);
+
+ DestroyTag(result->tag);
+ grpc_event_finish(result);
+ result = NULL;
}
} // namespace node
diff --git a/src/node/ext/completion_queue_async_worker.h b/src/node/ext/completion_queue_async_worker.h
index 2c928b7024..c04a303283 100644
--- a/src/node/ext/completion_queue_async_worker.h
+++ b/src/node/ext/completion_queue_async_worker.h
@@ -67,6 +67,8 @@ class CompletionQueueAsyncWorker : public NanAsyncWorker {
completion_queue_next */
void HandleOKCallback();
+ void HandleErrorCallback();
+
private:
grpc_event *result;
diff --git a/src/node/ext/credentials.cc b/src/node/ext/credentials.cc
index c8859ed941..b79c3e3019 100644
--- a/src/node/ext/credentials.cc
+++ b/src/node/ext/credentials.cc
@@ -63,7 +63,6 @@ Credentials::Credentials(grpc_credentials *credentials)
: wrapped_credentials(credentials) {}
Credentials::~Credentials() {
- gpr_log(GPR_DEBUG, "Destroying credentials object");
grpc_credentials_release(wrapped_credentials);
}
diff --git a/src/node/ext/event.cc b/src/node/ext/event.cc
deleted file mode 100644
index d59b68fb40..0000000000
--- a/src/node/ext/event.cc
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <map>
-
-#include <node.h>
-#include <nan.h>
-#include "grpc/grpc.h"
-#include "byte_buffer.h"
-#include "call.h"
-#include "event.h"
-#include "tag.h"
-#include "timeval.h"
-
-namespace grpc {
-namespace node {
-
-using ::node::Buffer;
-using v8::Array;
-using v8::Date;
-using v8::Handle;
-using v8::HandleScope;
-using v8::Number;
-using v8::Object;
-using v8::Persistent;
-using v8::String;
-using v8::Value;
-
-Handle<Value> ParseMetadata(grpc_metadata *metadata_elements, size_t length) {
- NanEscapableScope();
- std::map<const char*, size_t> size_map;
- std::map<const char*, size_t> index_map;
-
- for (unsigned int i = 0; i < length; i++) {
- const char *key = metadata_elements[i].key;
- if (size_map.count(key)) {
- size_map[key] += 1;
- }
- index_map[key] = 0;
- }
- Handle<Object> metadata_object = NanNew<Object>();
- for (unsigned int i = 0; i < length; i++) {
- grpc_metadata* elem = &metadata_elements[i];
- Handle<String> key_string = String::New(elem->key);
- Handle<Array> array;
- if (metadata_object->Has(key_string)) {
- array = Handle<Array>::Cast(metadata_object->Get(key_string));
- } else {
- array = NanNew<Array>(size_map[elem->key]);
- metadata_object->Set(key_string, array);
- }
- array->Set(index_map[elem->key],
- MakeFastBuffer(
- NanNewBufferHandle(elem->value, elem->value_length)));
- index_map[elem->key] += 1;
- }
- return NanEscapeScope(metadata_object);
-}
-
-Handle<Value> GetEventData(grpc_event *event) {
- NanEscapableScope();
- size_t count;
- grpc_metadata *items;
- Handle<Array> metadata;
- Handle<Object> status;
- Handle<Object> rpc_new;
- switch (event->type) {
- case GRPC_READ:
- return NanEscapeScope(ByteBufferToBuffer(event->data.read));
- case GRPC_WRITE_ACCEPTED:
- return NanEscapeScope(NanNew<Number>(event->data.write_accepted));
- case GRPC_FINISH_ACCEPTED:
- return NanEscapeScope(NanNew<Number>(event->data.finish_accepted));
- case GRPC_CLIENT_METADATA_READ:
- count = event->data.client_metadata_read.count;
- items = event->data.client_metadata_read.elements;
- return NanEscapeScope(ParseMetadata(items, count));
- case GRPC_FINISHED:
- status = NanNew<Object>();
- status->Set(NanNew("code"), NanNew<Number>(event->data.finished.status));
- if (event->data.finished.details != NULL) {
- status->Set(NanNew("details"),
- String::New(event->data.finished.details));
- }
- count = event->data.finished.metadata_count;
- items = event->data.finished.metadata_elements;
- status->Set(NanNew("metadata"), ParseMetadata(items, count));
- return NanEscapeScope(status);
- case GRPC_SERVER_RPC_NEW:
- rpc_new = NanNew<Object>();
- if (event->data.server_rpc_new.method == NULL) {
- return NanEscapeScope(NanNull());
- }
- rpc_new->Set(
- NanNew("method"),
- NanNew(event->data.server_rpc_new.method));
- rpc_new->Set(
- NanNew("host"),
- NanNew(event->data.server_rpc_new.host));
- rpc_new->Set(NanNew("absolute_deadline"),
- NanNew<Date>(TimespecToMilliseconds(
- event->data.server_rpc_new.deadline)));
- count = event->data.server_rpc_new.metadata_count;
- items = event->data.server_rpc_new.metadata_elements;
- metadata = NanNew<Array>(static_cast<int>(count));
- for (unsigned int i = 0; i < count; i++) {
- Handle<Object> item_obj = Object::New();
- item_obj->Set(NanNew("key"),
- NanNew(items[i].key));
- item_obj->Set(
- NanNew("value"),
- NanNew(items[i].value, static_cast<int>(items[i].value_length)));
- metadata->Set(i, item_obj);
- }
- rpc_new->Set(NanNew("metadata"), ParseMetadata(items, count));
- return NanEscapeScope(rpc_new);
- default:
- return NanEscapeScope(NanNull());
- }
-}
-
-Handle<Value> CreateEventObject(grpc_event *event) {
- NanEscapableScope();
- if (event == NULL) {
- return NanEscapeScope(NanNull());
- }
- Handle<Object> event_obj = NanNew<Object>();
- Handle<Value> call;
- if (TagHasCall(event->tag)) {
- call = TagGetCall(event->tag);
- } else {
- call = Call::WrapStruct(event->call);
- }
- event_obj->Set(NanNew<String, const char *>("call"), call);
- event_obj->Set(NanNew<String, const char *>("type"),
- NanNew<Number>(event->type));
- event_obj->Set(NanNew<String, const char *>("data"), GetEventData(event));
-
- return NanEscapeScope(event_obj);
-}
-
-} // namespace node
-} // namespace grpc
diff --git a/src/node/ext/event.h b/src/node/ext/event.h
deleted file mode 100644
index e06d8f0168..0000000000
--- a/src/node/ext/event.h
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef NET_GRPC_NODE_EVENT_H_
-#define NET_GRPC_NODE_EVENT_H_
-
-#include <node.h>
-#include "grpc/grpc.h"
-
-namespace grpc {
-namespace node {
-
-v8::Handle<v8::Value> CreateEventObject(grpc_event *event);
-
-} // namespace node
-} // namespace grpc
-
-#endif // NET_GRPC_NODE_EVENT_H_
diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc
index bc1dfaf899..9b0fe82976 100644
--- a/src/node/ext/node_grpc.cc
+++ b/src/node/ext/node_grpc.cc
@@ -130,35 +130,34 @@ void InitCallErrorConstants(Handle<Object> exports) {
call_error->Set(NanNew("INVALID_FLAGS"), INVALID_FLAGS);
}
-void InitOpErrorConstants(Handle<Object> exports) {
+void InitOpTypeConstants(Handle<Object> exports) {
NanScope();
- Handle<Object> op_error = Object::New();
- exports->Set(NanNew("opError"), op_error);
- Handle<Value> OK(NanNew<Uint32, uint32_t>(GRPC_OP_OK));
- op_error->Set(NanNew("OK"), OK);
- Handle<Value> ERROR(NanNew<Uint32, uint32_t>(GRPC_OP_ERROR));
- op_error->Set(NanNew("ERROR"), ERROR);
-}
-
-void InitCompletionTypeConstants(Handle<Object> exports) {
- NanScope();
- Handle<Object> completion_type = Object::New();
- exports->Set(NanNew("completionType"), completion_type);
- Handle<Value> QUEUE_SHUTDOWN(NanNew<Uint32, uint32_t>(GRPC_QUEUE_SHUTDOWN));
- completion_type->Set(NanNew("QUEUE_SHUTDOWN"), QUEUE_SHUTDOWN);
- Handle<Value> READ(NanNew<Uint32, uint32_t>(GRPC_READ));
- completion_type->Set(NanNew("READ"), READ);
- Handle<Value> WRITE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_WRITE_ACCEPTED));
- completion_type->Set(NanNew("WRITE_ACCEPTED"), WRITE_ACCEPTED);
- Handle<Value> FINISH_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_FINISH_ACCEPTED));
- completion_type->Set(NanNew("FINISH_ACCEPTED"), FINISH_ACCEPTED);
- Handle<Value> CLIENT_METADATA_READ(
- NanNew<Uint32, uint32_t>(GRPC_CLIENT_METADATA_READ));
- completion_type->Set(NanNew("CLIENT_METADATA_READ"), CLIENT_METADATA_READ);
- Handle<Value> FINISHED(NanNew<Uint32, uint32_t>(GRPC_FINISHED));
- completion_type->Set(NanNew("FINISHED"), FINISHED);
- Handle<Value> SERVER_RPC_NEW(NanNew<Uint32, uint32_t>(GRPC_SERVER_RPC_NEW));
- completion_type->Set(NanNew("SERVER_RPC_NEW"), SERVER_RPC_NEW);
+ Handle<Object> op_type = Object::New();
+ exports->Set(NanNew("opType"), op_type);
+ Handle<Value> SEND_INITIAL_METADATA(
+ NanNew<Uint32, uint32_t>(GRPC_OP_SEND_INITIAL_METADATA));
+ op_type->Set(NanNew("SEND_INITIAL_METADATA"), SEND_INITIAL_METADATA);
+ Handle<Value> SEND_MESSAGE(
+ NanNew<Uint32, uint32_t>(GRPC_OP_SEND_MESSAGE));
+ op_type->Set(NanNew("SEND_MESSAGE"), SEND_MESSAGE);
+ Handle<Value> SEND_CLOSE_FROM_CLIENT(
+ NanNew<Uint32, uint32_t>(GRPC_OP_SEND_CLOSE_FROM_CLIENT));
+ op_type->Set(NanNew("SEND_CLOSE_FROM_CLIENT"), SEND_CLOSE_FROM_CLIENT);
+ Handle<Value> SEND_STATUS_FROM_SERVER(
+ NanNew<Uint32, uint32_t>(GRPC_OP_SEND_STATUS_FROM_SERVER));
+ op_type->Set(NanNew("SEND_STATUS_FROM_SERVER"), SEND_STATUS_FROM_SERVER);
+ Handle<Value> RECV_INITIAL_METADATA(
+ NanNew<Uint32, uint32_t>(GRPC_OP_RECV_INITIAL_METADATA));
+ op_type->Set(NanNew("RECV_INITIAL_METADATA"), RECV_INITIAL_METADATA);
+ Handle<Value> RECV_MESSAGE(
+ NanNew<Uint32, uint32_t>(GRPC_OP_RECV_MESSAGE));
+ op_type->Set(NanNew("RECV_MESSAGE"), RECV_MESSAGE);
+ Handle<Value> RECV_STATUS_ON_CLIENT(
+ NanNew<Uint32, uint32_t>(GRPC_OP_RECV_STATUS_ON_CLIENT));
+ op_type->Set(NanNew("RECV_STATUS_ON_CLIENT"), RECV_STATUS_ON_CLIENT);
+ Handle<Value> RECV_CLOSE_ON_SERVER(
+ NanNew<Uint32, uint32_t>(GRPC_OP_RECV_CLOSE_ON_SERVER));
+ op_type->Set(NanNew("RECV_CLOSE_ON_SERVER"), RECV_CLOSE_ON_SERVER);
}
void init(Handle<Object> exports) {
@@ -166,8 +165,7 @@ void init(Handle<Object> exports) {
grpc_init();
InitStatusConstants(exports);
InitCallErrorConstants(exports);
- InitOpErrorConstants(exports);
- InitCompletionTypeConstants(exports);
+ InitOpTypeConstants(exports);
grpc::node::Call::Init(exports);
grpc::node::Channel::Init(exports);
diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc
index 6b8ccef9b1..ee3e1087ce 100644
--- a/src/node/ext/server.cc
+++ b/src/node/ext/server.cc
@@ -31,6 +31,8 @@
*
*/
+#include <memory>
+
#include "server.h"
#include <node.h>
@@ -41,17 +43,20 @@
#include <vector>
#include "grpc/grpc.h"
#include "grpc/grpc_security.h"
+#include "grpc/support/log.h"
#include "call.h"
#include "completion_queue_async_worker.h"
-#include "tag.h"
#include "server_credentials.h"
+#include "timeval.h"
namespace grpc {
namespace node {
+using std::unique_ptr;
using v8::Arguments;
using v8::Array;
using v8::Boolean;
+using v8::Date;
using v8::Exception;
using v8::Function;
using v8::FunctionTemplate;
@@ -67,6 +72,49 @@ using v8::Value;
Persistent<Function> Server::constructor;
Persistent<FunctionTemplate> Server::fun_tpl;
+class NewCallOp : public Op {
+ public:
+ NewCallOp() {
+ call = NULL;
+ grpc_call_details_init(&details);
+ grpc_metadata_array_init(&request_metadata);
+ }
+
+ ~NewCallOp() {
+ grpc_call_details_destroy(&details);
+ grpc_metadata_array_destroy(&request_metadata);
+ }
+
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ if (call == NULL) {
+ return NanEscapeScope(NanNull());
+ }
+ Handle<Object> obj = NanNew<Object>();
+ obj->Set(NanNew("call"), Call::WrapStruct(call));
+ obj->Set(NanNew("method"), NanNew(details.method));
+ obj->Set(NanNew("host"), NanNew(details.host));
+ obj->Set(NanNew("deadline"),
+ NanNew<Date>(TimespecToMilliseconds(details.deadline)));
+ obj->Set(NanNew("metadata"), ParseMetadata(&request_metadata));
+ return NanEscapeScope(obj);
+ }
+
+ bool ParseOp(Handle<Value> value, grpc_op *out,
+ shared_ptr<Resources> resources) {
+ return true;
+ }
+
+ grpc_call *call;
+ grpc_call_details details;
+ grpc_metadata_array request_metadata;
+
+ protected:
+ std::string GetTypeString() const {
+ return "new call";
+ }
+};
+
Server::Server(grpc_server *server) : wrapped_server(server) {}
Server::~Server() { grpc_server_destroy(wrapped_server); }
@@ -175,13 +223,18 @@ NAN_METHOD(Server::RequestCall) {
return NanThrowTypeError("requestCall can only be called on a Server");
}
Server *server = ObjectWrap::Unwrap<Server>(args.This());
- grpc_call_error error = grpc_server_request_call_old(
- server->wrapped_server, CreateTag(args[0], NanNull()));
- if (error == GRPC_CALL_OK) {
- CompletionQueueAsyncWorker::Next();
- } else {
+ NewCallOp *op = new NewCallOp();
+ unique_ptr<OpVec> ops(new OpVec());
+ ops->push_back(unique_ptr<Op>(op));
+ grpc_call_error error = grpc_server_request_call(
+ server->wrapped_server, &op->call, &op->details, &op->request_metadata,
+ CompletionQueueAsyncWorker::GetQueue(),
+ new struct tag(new NanCallback(args[0].As<Function>()), ops.release(),
+ shared_ptr<Resources>(nullptr)));
+ if (error != GRPC_CALL_OK) {
return NanThrowError("requestCall failed", error);
}
+ CompletionQueueAsyncWorker::Next();
NanReturnUndefined();
}
diff --git a/src/node/ext/server_credentials.cc b/src/node/ext/server_credentials.cc
index 393f3a6305..3add43c48c 100644
--- a/src/node/ext/server_credentials.cc
+++ b/src/node/ext/server_credentials.cc
@@ -63,7 +63,6 @@ ServerCredentials::ServerCredentials(grpc_server_credentials *credentials)
: wrapped_credentials(credentials) {}
ServerCredentials::~ServerCredentials() {
- gpr_log(GPR_DEBUG, "Destroying server credentials object");
grpc_server_credentials_release(wrapped_credentials);
}
diff --git a/src/node/ext/tag.cc b/src/node/ext/tag.cc
deleted file mode 100644
index dc8e523e12..0000000000
--- a/src/node/ext/tag.cc
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <stdlib.h>
-#include <node.h>
-#include <nan.h>
-#include "tag.h"
-
-namespace grpc {
-namespace node {
-
-using v8::Handle;
-using v8::HandleScope;
-using v8::Persistent;
-using v8::Value;
-
-struct tag {
- tag(Persistent<Value> *tag, Persistent<Value> *call)
- : persist_tag(tag), persist_call(call) {}
-
- ~tag() {
- persist_tag->Dispose();
- if (persist_call != NULL) {
- persist_call->Dispose();
- }
- }
- Persistent<Value> *persist_tag;
- Persistent<Value> *persist_call;
-};
-
-void *CreateTag(Handle<Value> tag, Handle<Value> call) {
- NanScope();
- Persistent<Value> *persist_tag = new Persistent<Value>();
- NanAssignPersistent(*persist_tag, tag);
- Persistent<Value> *persist_call;
- if (call->IsNull() || call->IsUndefined()) {
- persist_call = NULL;
- } else {
- persist_call = new Persistent<Value>();
- NanAssignPersistent(*persist_call, call);
- }
- struct tag *tag_struct = new struct tag(persist_tag, persist_call);
- return reinterpret_cast<void *>(tag_struct);
-}
-
-Handle<Value> GetTagHandle(void *tag) {
- NanEscapableScope();
- struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
- Handle<Value> tag_value = NanNew<Value>(*tag_struct->persist_tag);
- return NanEscapeScope(tag_value);
-}
-
-bool TagHasCall(void *tag) {
- struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
- return tag_struct->persist_call != NULL;
-}
-
-Handle<Value> TagGetCall(void *tag) {
- NanEscapableScope();
- struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
- if (tag_struct->persist_call == NULL) {
- return NanEscapeScope(NanNull());
- }
- Handle<Value> call_value = NanNew<Value>(*tag_struct->persist_call);
- return NanEscapeScope(call_value);
-}
-
-void DestroyTag(void *tag) { delete reinterpret_cast<struct tag *>(tag); }
-
-} // namespace node
-} // namespace grpc
diff --git a/src/node/ext/tag.h b/src/node/ext/tag.h
deleted file mode 100644
index bdb09252d9..0000000000
--- a/src/node/ext/tag.h
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef NET_GRPC_NODE_TAG_H_
-#define NET_GRPC_NODE_TAG_H_
-
-#include <node.h>
-
-namespace grpc {
-namespace node {
-
-/* Create a void* tag that can be passed to various grpc_call functions from
- a javascript value and the javascript wrapper for the call. The call can be
- null. */
-void *CreateTag(v8::Handle<v8::Value> tag, v8::Handle<v8::Value> call);
-/* Return the javascript value stored in the tag */
-v8::Handle<v8::Value> GetTagHandle(void *tag);
-/* Returns true if the call was set (non-null) when the tag was created */
-bool TagHasCall(void *tag);
-/* Returns the javascript wrapper for the call associated with this tag */
-v8::Handle<v8::Value> TagGetCall(void *call);
-/* Destroy the tag and all resources it is holding. It is illegal to call any
- of these other functions on a tag after it has been destroyed. */
-void DestroyTag(void *tag);
-
-} // namespace node
-} // namespace grpc
-
-#endif // NET_GRPC_NODE_TAG_H_
diff --git a/src/node/index.js b/src/node/index.js
index 0627e7f557..baef4d03c6 100644
--- a/src/node/index.js
+++ b/src/node/index.js
@@ -35,9 +35,9 @@ var _ = require('underscore');
var ProtoBuf = require('protobufjs');
-var surface_client = require('./src/surface_client.js');
+var client = require('./src/client.js');
-var surface_server = require('./src/surface_server.js');
+var server = require('./src/server.js');
var grpc = require('bindings')('grpc');
@@ -54,7 +54,7 @@ function loadObject(value) {
});
return result;
} else if (value.className === 'Service') {
- return surface_client.makeClientConstructor(value);
+ return client.makeClientConstructor(value);
} else if (value.className === 'Message' || value.className === 'Enum') {
return value.build();
} else {
@@ -84,9 +84,9 @@ exports.loadObject = loadObject;
exports.load = load;
/**
- * See docs for surface_server.makeServerConstructor
+ * See docs for server.makeServerConstructor
*/
-exports.buildServer = surface_server.makeServerConstructor;
+exports.buildServer = server.makeServerConstructor;
/**
* Status name to code number mapping
diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js
index ce18f77fe7..8737af6cde 100644
--- a/src/node/interop/interop_client.js
+++ b/src/node/interop/interop_client.js
@@ -145,8 +145,8 @@ function serverStreaming(client, done) {
resp_index += 1;
});
call.on('status', function(status) {
- assert.strictEqual(resp_index, 4);
assert.strictEqual(status.code, grpc.status.OK);
+ assert.strictEqual(resp_index, 4);
if (done) {
done();
}
diff --git a/src/node/interop/test.proto b/src/node/interop/test.proto
index cce0889bba..c2437630b7 100644
--- a/src/node/interop/test.proto
+++ b/src/node/interop/test.proto
@@ -44,7 +44,7 @@ service TestService {
rpc EmptyCall(grpc.testing.Empty) returns (grpc.testing.Empty);
// One request followed by one response.
- // The server returns the client payload as-is.
+ // TODO(Issue 527): Describe required server behavior.
rpc UnaryCall(SimpleRequest) returns (SimpleResponse);
// One request followed by a sequence of responses (streamed download).
diff --git a/src/node/package.json b/src/node/package.json
index 028dc20555..8f81014c1e 100644
--- a/src/node/package.json
+++ b/src/node/package.json
@@ -1,6 +1,6 @@
{
"name": "grpc",
- "version": "0.1.0",
+ "version": "0.2.0",
"description": "gRPC Library for Node",
"scripts": {
"test": "./node_modules/mocha/bin/mocha"
diff --git a/src/node/src/client.js b/src/node/src/client.js
index 3a1c9eef84..81fa65eb26 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2014, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -31,185 +31,452 @@
*
*/
+var _ = require('underscore');
+
+var capitalize = require('underscore.string/capitalize');
+var decapitalize = require('underscore.string/decapitalize');
+
var grpc = require('bindings')('grpc.node');
-var common = require('./common');
+var common = require('./common.js');
+
+var EventEmitter = require('events').EventEmitter;
-var Duplex = require('stream').Duplex;
+var stream = require('stream');
+
+var Readable = stream.Readable;
+var Writable = stream.Writable;
+var Duplex = stream.Duplex;
var util = require('util');
-util.inherits(GrpcClientStream, Duplex);
+util.inherits(ClientWritableStream, Writable);
/**
- * Class for representing a gRPC client side stream as a Node stream. Extends
- * from stream.Duplex.
+ * A stream that the client can write to. Used for calls that are streaming from
+ * the client side.
* @constructor
- * @param {grpc.Call} call Call object to proxy
- * @param {function(*):Buffer=} serialize Serialization function for requests
- * @param {function(Buffer):*=} deserialize Deserialization function for
- * responses
+ * @param {grpc.Call} call The call object to send data with
+ * @param {function(*):Buffer=} serialize Serialization function for writes.
*/
-function GrpcClientStream(call, serialize, deserialize) {
- Duplex.call(this, {objectMode: true});
- if (!serialize) {
- serialize = function(value) {
- return value;
- };
- }
- if (!deserialize) {
- deserialize = function(value) {
- return value;
- };
- }
- var self = this;
- var finished = false;
- // Indicates that a read is currently pending
- var reading = false;
- // Indicates that a write is currently pending
- var writing = false;
- this._call = call;
+function ClientWritableStream(call, serialize) {
+ Writable.call(this, {objectMode: true});
+ this.call = call;
+ this.serialize = common.wrapIgnoreNull(serialize);
+ this.on('finish', function() {
+ var batch = {};
+ batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
+ call.startBatch(batch, function() {});
+ });
+}
- /**
- * Serialize a request value to a buffer. Always maps null to null. Otherwise
- * uses the provided serialize function
- * @param {*} value The value to serialize
- * @return {Buffer} The serialized value
- */
- this.serialize = function(value) {
- if (value === null || value === undefined) {
- return null;
+/**
+ * Attempt to write the given chunk. Calls the callback when done. This is an
+ * implementation of a method needed for implementing stream.Writable.
+ * @param {Buffer} chunk The chunk to write
+ * @param {string} encoding Ignored
+ * @param {function(Error=)} callback Called when the write is complete
+ */
+function _write(chunk, encoding, callback) {
+ var batch = {};
+ batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
+ this.call.startBatch(batch, function(err, event) {
+ if (err) {
+ throw err;
}
- return serialize(value);
- };
+ callback();
+ });
+};
- /**
- * Deserialize a response buffer to a value. Always maps null to null.
- * Otherwise uses the provided deserialize function.
- * @param {Buffer} buffer The buffer to deserialize
- * @return {*} The deserialized value
- */
- this.deserialize = function(buffer) {
- if (buffer === null) {
- return null;
- }
- return deserialize(buffer);
- };
+ClientWritableStream.prototype._write = _write;
+
+util.inherits(ClientReadableStream, Readable);
+
+/**
+ * A stream that the client can read from. Used for calls that are streaming
+ * from the server side.
+ * @constructor
+ * @param {grpc.Call} call The call object to read data with
+ * @param {function(Buffer):*=} deserialize Deserialization function for reads
+ */
+function ClientReadableStream(call, deserialize) {
+ Readable.call(this, {objectMode: true});
+ this.call = call;
+ this.finished = false;
+ this.reading = false;
+ this.deserialize = common.wrapIgnoreNull(deserialize);
+}
+
+/**
+ * Read the next object from the stream.
+ * @param {*} size Ignored because we use objectMode=true
+ */
+function _read(size) {
+ var self = this;
/**
* Callback to be called when a READ event is received. Pushes the data onto
* the read queue and starts reading again if applicable
* @param {grpc.Event} event READ event object
*/
- function readCallback(event) {
- if (finished) {
+ function readCallback(err, event) {
+ if (err) {
+ throw err;
+ }
+ if (self.finished) {
self.push(null);
return;
}
- var data = event.data;
+ var data = event.read;
if (self.push(self.deserialize(data)) && data != null) {
- self._call.startRead(readCallback);
+ var read_batch = {};
+ read_batch[grpc.opType.RECV_MESSAGE] = true;
+ self.call.startBatch(read_batch, readCallback);
} else {
- reading = false;
+ self.reading = false;
+ }
+ }
+ if (self.finished) {
+ self.push(null);
+ } else {
+ if (!self.reading) {
+ self.reading = true;
+ var read_batch = {};
+ read_batch[grpc.opType.RECV_MESSAGE] = true;
+ self.call.startBatch(read_batch, readCallback);
}
}
- call.invoke(function(event) {
- self.emit('metadata', event.data);
- }, function(event) {
- finished = true;
- self.emit('status', event.data);
- }, 0);
+};
+
+ClientReadableStream.prototype._read = _read;
+
+util.inherits(ClientDuplexStream, Duplex);
+
+/**
+ * A stream that the client can read from or write to. Used for calls with
+ * duplex streaming.
+ * @constructor
+ * @param {grpc.Call} call Call object to proxy
+ * @param {function(*):Buffer=} serialize Serialization function for requests
+ * @param {function(Buffer):*=} deserialize Deserialization function for
+ * responses
+ */
+function ClientDuplexStream(call, serialize, deserialize) {
+ Duplex.call(this, {objectMode: true});
+ this.serialize = common.wrapIgnoreNull(serialize);
+ this.deserialize = common.wrapIgnoreNull(deserialize);
+ var self = this;
+ var finished = false;
+ // Indicates that a read is currently pending
+ var reading = false;
+ this.call = call;
this.on('finish', function() {
- call.writesDone(function() {});
+ var batch = {};
+ batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
+ call.startBatch(batch, function() {});
});
- /**
- * Start reading if there is not already a pending read. Reading will
- * continue until self.push returns false (indicating reads should slow
- * down) or the read data is null (indicating that there is no more data).
- */
- this.startReading = function() {
- if (finished) {
- self.push(null);
- } else {
- if (!reading) {
- reading = true;
- self._call.startRead(readCallback);
- }
- }
- };
}
+ClientDuplexStream.prototype._read = _read;
+ClientDuplexStream.prototype._write = _write;
+
/**
- * Start reading. This is an implementation of a method needed for implementing
- * stream.Readable.
- * @param {number} size Ignored
+ * Cancel the ongoing call
*/
-GrpcClientStream.prototype._read = function(size) {
- this.startReading();
-};
+function cancel() {
+ this.call.cancel();
+}
+
+ClientReadableStream.prototype.cancel = cancel;
+ClientWritableStream.prototype.cancel = cancel;
+ClientDuplexStream.prototype.cancel = cancel;
/**
- * Attempt to write the given chunk. Calls the callback when done. This is an
- * implementation of a method needed for implementing stream.Writable.
- * @param {Buffer} chunk The chunk to write
- * @param {string} encoding Ignored
- * @param {function(Error=)} callback Ignored
+ * Get a function that can make unary requests to the specified method.
+ * @param {string} method The name of the method to request
+ * @param {function(*):Buffer} serialize The serialization function for inputs
+ * @param {function(Buffer)} deserialize The deserialization function for
+ * outputs
+ * @return {Function} makeUnaryRequest
*/
-GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
- var self = this;
- self._call.startWrite(self.serialize(chunk), function(event) {
- callback();
- }, 0);
-};
+function makeUnaryRequestFunction(method, serialize, deserialize) {
+ /**
+ * Make a unary request with this method on the given channel with the given
+ * argument, callback, etc.
+ * @this {Client} Client object. Must have a channel member.
+ * @param {*} argument The argument to the call. Should be serializable with
+ * serialize
+ * @param {function(?Error, value=)} callback The callback to for when the
+ * response is received
+ * @param {array=} metadata Array of metadata key/value pairs to add to the
+ * call
+ * @param {(number|Date)=} deadline The deadline for processing this request.
+ * Defaults to infinite future
+ * @return {EventEmitter} An event emitter for stream related events
+ */
+ function makeUnaryRequest(argument, callback, metadata, deadline) {
+ if (deadline === undefined) {
+ deadline = Infinity;
+ }
+ var emitter = new EventEmitter();
+ var call = new grpc.Call(this.channel, method, deadline);
+ if (metadata === null || metadata === undefined) {
+ metadata = {};
+ }
+ emitter.cancel = function cancel() {
+ call.cancel();
+ };
+ var client_batch = {};
+ client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
+ client_batch[grpc.opType.SEND_MESSAGE] = serialize(argument);
+ client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
+ client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
+ client_batch[grpc.opType.RECV_MESSAGE] = true;
+ client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
+ call.startBatch(client_batch, function(err, response) {
+ if (err) {
+ callback(err);
+ return;
+ }
+ if (response.status.code != grpc.status.OK) {
+ callback(response.status);
+ return;
+ }
+ emitter.emit('status', response.status);
+ emitter.emit('metadata', response.metadata);
+ callback(null, deserialize(response.read));
+ });
+ return emitter;
+ }
+ return makeUnaryRequest;
+}
/**
- * Cancel the ongoing call. If the call has not already finished, it will finish
- * with status CANCELLED.
+ * Get a function that can make client stream requests to the specified method.
+ * @param {string} method The name of the method to request
+ * @param {function(*):Buffer} serialize The serialization function for inputs
+ * @param {function(Buffer)} deserialize The deserialization function for
+ * outputs
+ * @return {Function} makeClientStreamRequest
*/
-GrpcClientStream.prototype.cancel = function() {
- this._call.cancel();
-};
+function makeClientStreamRequestFunction(method, serialize, deserialize) {
+ /**
+ * Make a client stream request with this method on the given channel with the
+ * given callback, etc.
+ * @this {Client} Client object. Must have a channel member.
+ * @param {function(?Error, value=)} callback The callback to for when the
+ * response is received
+ * @param {array=} metadata Array of metadata key/value pairs to add to the
+ * call
+ * @param {(number|Date)=} deadline The deadline for processing this request.
+ * Defaults to infinite future
+ * @return {EventEmitter} An event emitter for stream related events
+ */
+ function makeClientStreamRequest(callback, metadata, deadline) {
+ if (deadline === undefined) {
+ deadline = Infinity;
+ }
+ var call = new grpc.Call(this.channel, method, deadline);
+ if (metadata === null || metadata === undefined) {
+ metadata = {};
+ }
+ var stream = new ClientWritableStream(call, serialize);
+ var metadata_batch = {};
+ metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
+ metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
+ call.startBatch(metadata_batch, function(err, response) {
+ if (err) {
+ callback(err);
+ return;
+ }
+ stream.emit('metadata', response.metadata);
+ });
+ var client_batch = {};
+ client_batch[grpc.opType.RECV_MESSAGE] = true;
+ client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
+ call.startBatch(client_batch, function(err, response) {
+ if (err) {
+ callback(err);
+ return;
+ }
+ if (response.status.code != grpc.status.OK) {
+ callback(response.status);
+ return;
+ }
+ stream.emit('status', response.status);
+ callback(null, deserialize(response.read));
+ });
+ return stream;
+ }
+ return makeClientStreamRequest;
+}
/**
- * Make a request on the channel to the given method with the given arguments
- * @param {grpc.Channel} channel The channel on which to make the request
- * @param {string} method The method to request
- * @param {function(*):Buffer} serialize Serialization function for requests
- * @param {function(Buffer):*} deserialize Deserialization function for
- * responses
- * @param {array=} metadata Array of metadata key/value pairs to add to the call
- * @param {(number|Date)=} deadline The deadline for processing this request.
- * Defaults to infinite future.
- * @return {stream=} The stream of responses
+ * Get a function that can make server stream requests to the specified method.
+ * @param {string} method The name of the method to request
+ * @param {function(*):Buffer} serialize The serialization function for inputs
+ * @param {function(Buffer)} deserialize The deserialization function for
+ * outputs
+ * @return {Function} makeServerStreamRequest
*/
-function makeRequest(channel,
- method,
- serialize,
- deserialize,
- metadata,
- deadline) {
- if (deadline === undefined) {
- deadline = Infinity;
+function makeServerStreamRequestFunction(method, serialize, deserialize) {
+ /**
+ * Make a server stream request with this method on the given channel with the
+ * given argument, etc.
+ * @this {SurfaceClient} Client object. Must have a channel member.
+ * @param {*} argument The argument to the call. Should be serializable with
+ * serialize
+ * @param {array=} metadata Array of metadata key/value pairs to add to the
+ * call
+ * @param {(number|Date)=} deadline The deadline for processing this request.
+ * Defaults to infinite future
+ * @return {EventEmitter} An event emitter for stream related events
+ */
+ function makeServerStreamRequest(argument, metadata, deadline) {
+ if (deadline === undefined) {
+ deadline = Infinity;
+ }
+ var call = new grpc.Call(this.channel, method, deadline);
+ if (metadata === null || metadata === undefined) {
+ metadata = {};
+ }
+ var stream = new ClientReadableStream(call, deserialize);
+ var start_batch = {};
+ start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
+ start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
+ start_batch[grpc.opType.SEND_MESSAGE] = serialize(argument);
+ start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
+ call.startBatch(start_batch, function(err, response) {
+ if (err) {
+ throw err;
+ }
+ stream.emit('metadata', response.metadata);
+ });
+ var status_batch = {};
+ status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
+ call.startBatch(status_batch, function(err, response) {
+ if (err) {
+ throw err;
+ }
+ stream.emit('status', response.status);
+ });
+ return stream;
}
- var call = new grpc.Call(channel, method, deadline);
- if (metadata) {
- call.addMetadata(metadata);
+ return makeServerStreamRequest;
+}
+
+/**
+ * Get a function that can make bidirectional stream requests to the specified
+ * method.
+ * @param {string} method The name of the method to request
+ * @param {function(*):Buffer} serialize The serialization function for inputs
+ * @param {function(Buffer)} deserialize The deserialization function for
+ * outputs
+ * @return {Function} makeBidiStreamRequest
+ */
+function makeBidiStreamRequestFunction(method, serialize, deserialize) {
+ /**
+ * Make a bidirectional stream request with this method on the given channel.
+ * @this {SurfaceClient} Client object. Must have a channel member.
+ * @param {array=} metadata Array of metadata key/value pairs to add to the
+ * call
+ * @param {(number|Date)=} deadline The deadline for processing this request.
+ * Defaults to infinite future
+ * @return {EventEmitter} An event emitter for stream related events
+ */
+ function makeBidiStreamRequest(metadata, deadline) {
+ if (deadline === undefined) {
+ deadline = Infinity;
+ }
+ var call = new grpc.Call(this.channel, method, deadline);
+ if (metadata === null || metadata === undefined) {
+ metadata = {};
+ }
+ var stream = new ClientDuplexStream(call, serialize, deserialize);
+ var start_batch = {};
+ start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
+ start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
+ call.startBatch(start_batch, function(err, response) {
+ if (err) {
+ throw err;
+ }
+ stream.emit('metadata', response.metadata);
+ });
+ var status_batch = {};
+ status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
+ call.startBatch(status_batch, function(err, response) {
+ if (err) {
+ throw err;
+ }
+ stream.emit('status', response.status);
+ });
+ return stream;
}
- return new GrpcClientStream(call, serialize, deserialize);
+ return makeBidiStreamRequest;
}
+
/**
- * See documentation for makeRequest above
+ * Map with short names for each of the requester maker functions. Used in
+ * makeClientConstructor
*/
-exports.makeRequest = makeRequest;
+var requester_makers = {
+ unary: makeUnaryRequestFunction,
+ server_stream: makeServerStreamRequestFunction,
+ client_stream: makeClientStreamRequestFunction,
+ bidi: makeBidiStreamRequestFunction
+};
/**
- * Represents a client side gRPC channel associated with a single host.
+ * Creates a constructor for clients for the given service
+ * @param {ProtoBuf.Reflect.Service} service The service to generate a client
+ * for
+ * @return {function(string, Object)} New client constructor
*/
-exports.Channel = grpc.Channel;
+function makeClientConstructor(service) {
+ var prefix = '/' + common.fullyQualifiedName(service) + '/';
+ /**
+ * Create a client with the given methods
+ * @constructor
+ * @param {string} address The address of the server to connect to
+ * @param {Object} options Options to pass to the underlying channel
+ */
+ function Client(address, options) {
+ this.channel = new grpc.Channel(address, options);
+ }
+
+ _.each(service.children, function(method) {
+ var method_type;
+ if (method.requestStream) {
+ if (method.responseStream) {
+ method_type = 'bidi';
+ } else {
+ method_type = 'client_stream';
+ }
+ } else {
+ if (method.responseStream) {
+ method_type = 'server_stream';
+ } else {
+ method_type = 'unary';
+ }
+ }
+ Client.prototype[decapitalize(method.name)] =
+ requester_makers[method_type](
+ prefix + capitalize(method.name),
+ common.serializeCls(method.resolvedRequestType.build()),
+ common.deserializeCls(method.resolvedResponseType.build()));
+ });
+
+ Client.service = service;
+
+ return Client;
+}
+
+exports.makeClientConstructor = makeClientConstructor;
+
/**
- * Status name to code number mapping
+ * See docs for client.status
*/
exports.status = grpc.status;
/**
- * Call error name to code number mapping
+ * See docs for client.callError
*/
exports.callError = grpc.callError;
diff --git a/src/node/src/common.js b/src/node/src/common.js
index 54247e3fa1..7560cf1bdd 100644
--- a/src/node/src/common.js
+++ b/src/node/src/common.js
@@ -31,6 +31,8 @@
*
*/
+var _ = require('underscore');
+
var capitalize = require('underscore.string/capitalize');
/**
@@ -88,6 +90,24 @@ function fullyQualifiedName(value) {
}
/**
+ * Wrap a function to pass null-like values through without calling it. If no
+ * function is given, just uses the identity;
+ * @param {?function} func The function to wrap
+ * @return {function} The wrapped function
+ */
+function wrapIgnoreNull(func) {
+ if (!func) {
+ return _.identity;
+ }
+ return function(arg) {
+ if (arg === null || arg === undefined) {
+ return null;
+ }
+ return func(arg);
+ };
+}
+
+/**
* See docs for deserializeCls
*/
exports.deserializeCls = deserializeCls;
@@ -101,3 +121,8 @@ exports.serializeCls = serializeCls;
* See docs for fullyQualifiedName
*/
exports.fullyQualifiedName = fullyQualifiedName;
+
+/**
+ * See docs for wrapIgnoreNull
+ */
+exports.wrapIgnoreNull = wrapIgnoreNull;
diff --git a/src/node/src/server.js b/src/node/src/server.js
index e4f71ff05f..48c349ef99 100644
--- a/src/node/src/server.js
+++ b/src/node/src/server.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2014, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -33,80 +33,108 @@
var _ = require('underscore');
+var capitalize = require('underscore.string/capitalize');
+var decapitalize = require('underscore.string/decapitalize');
+
var grpc = require('bindings')('grpc.node');
var common = require('./common');
-var Duplex = require('stream').Duplex;
+var stream = require('stream');
+
+var Readable = stream.Readable;
+var Writable = stream.Writable;
+var Duplex = stream.Duplex;
var util = require('util');
-util.inherits(GrpcServerStream, Duplex);
+var EventEmitter = require('events').EventEmitter;
+
+var common = require('./common.js');
/**
- * Class for representing a gRPC server side stream as a Node stream. Extends
- * from stream.Duplex.
- * @constructor
- * @param {grpc.Call} call Call object to proxy
- * @param {function(*):Buffer=} serialize Serialization function for responses
- * @param {function(Buffer):*=} deserialize Deserialization function for
- * requests
+ * Handle an error on a call by sending it as a status
+ * @param {grpc.Call} call The call to send the error on
+ * @param {Object} error The error object
*/
-function GrpcServerStream(call, serialize, deserialize) {
- Duplex.call(this, {objectMode: true});
- if (!serialize) {
- serialize = function(value) {
- return value;
- };
- }
- if (!deserialize) {
- deserialize = function(value) {
- return value;
- };
- }
- this._call = call;
- // Indicate that a status has been sent
- var finished = false;
- var self = this;
+function handleError(call, error) {
var status = {
- 'code' : grpc.status.OK,
- 'details' : 'OK'
+ code: grpc.status.INTERNAL,
+ details: 'Unknown Error',
+ metadata: {}
};
-
- /**
- * Serialize a response value to a buffer. Always maps null to null. Otherwise
- * uses the provided serialize function
- * @param {*} value The value to serialize
- * @return {Buffer} The serialized value
- */
- this.serialize = function(value) {
- if (value === null || value === undefined) {
- return null;
+ if (error.hasOwnProperty('message')) {
+ status.details = error.message;
+ }
+ if (error.hasOwnProperty('code')) {
+ status.code = error.code;
+ if (error.hasOwnProperty('details')) {
+ status.details = error.details;
}
- return serialize(value);
- };
+ }
+ var error_batch = {};
+ error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
+ call.startBatch(error_batch, function(){});
+}
- /**
- * Deserialize a request buffer to a value. Always maps null to null.
- * Otherwise uses the provided deserialize function.
- * @param {Buffer} buffer The buffer to deserialize
- * @return {*} The deserialized value
- */
- this.deserialize = function(buffer) {
- if (buffer === null) {
- return null;
+/**
+ * Wait for the client to close, then emit a cancelled event if the client
+ * cancelled.
+ * @param {grpc.Call} call The call object to wait on
+ * @param {EventEmitter} emitter The event emitter to emit the cancelled event
+ * on
+ */
+function waitForCancel(call, emitter) {
+ var cancel_batch = {};
+ cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
+ call.startBatch(cancel_batch, function(err, result) {
+ if (err) {
+ emitter.emit('error', err);
}
- return deserialize(buffer);
+ if (result.cancelled) {
+ emitter.cancelled = true;
+ emitter.emit('cancelled');
+ }
+ });
+}
+
+/**
+ * Send a response to a unary or client streaming call.
+ * @param {grpc.Call} call The call to respond on
+ * @param {*} value The value to respond with
+ * @param {function(*):Buffer=} serialize Serialization function for the
+ * response
+ */
+function sendUnaryResponse(call, value, serialize) {
+ var end_batch = {};
+ end_batch[grpc.opType.SEND_MESSAGE] = serialize(value);
+ end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
+ code: grpc.status.OK,
+ details: 'OK',
+ metadata: {}
};
+ call.startBatch(end_batch, function (){});
+}
- /**
- * Send the pending status
- */
+/**
+ * Initialize a writable stream. This is used for both the writable and duplex
+ * stream constructors.
+ * @param {Writable} stream The stream to set up
+ * @param {function(*):Buffer=} Serialization function for responses
+ */
+function setUpWritable(stream, serialize) {
+ stream.finished = false;
+ stream.status = {
+ code : grpc.status.OK,
+ details : 'OK',
+ metadata : {}
+ };
+ stream.serialize = common.wrapIgnoreNull(serialize);
function sendStatus() {
- call.startWriteStatus(status.code, status.details, function() {
- });
- finished = true;
+ var batch = {};
+ batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status;
+ stream.call.startBatch(batch, function(){});
}
- this.on('finish', sendStatus);
+ stream.on('finish', sendStatus);
/**
* Set the pending status to a given error status. If the error does not have
* code or details properties, the code will be set to grpc.status.INTERNAL
@@ -116,14 +144,16 @@ function GrpcServerStream(call, serialize, deserialize) {
function setStatus(err) {
var code = grpc.status.INTERNAL;
var details = 'Unknown Error';
-
+ if (err.hasOwnProperty('message')) {
+ details = err.message;
+ }
if (err.hasOwnProperty('code')) {
code = err.code;
if (err.hasOwnProperty('details')) {
details = err.details;
}
}
- status = {'code': code, 'details': details};
+ stream.status = {code: code, details: details, metadata: {}};
}
/**
* Terminate the call. This includes indicating that reads are done, draining
@@ -133,69 +163,250 @@ function GrpcServerStream(call, serialize, deserialize) {
*/
function terminateCall(err) {
// Drain readable data
- this.on('data', function() {});
setStatus(err);
- this.end();
+ stream.end();
}
- this.on('error', terminateCall);
- // Indicates that a read is pending
- var reading = false;
+ stream.on('error', terminateCall);
+}
+
+/**
+ * Initialize a readable stream. This is used for both the readable and duplex
+ * stream constructors.
+ * @param {Readable} stream The stream to initialize
+ * @param {function(Buffer):*=} deserialize Deserialization function for
+ * incoming data.
+ */
+function setUpReadable(stream, deserialize) {
+ stream.deserialize = common.wrapIgnoreNull(deserialize);
+ stream.finished = false;
+ stream.reading = false;
+
+ stream.terminate = function() {
+ stream.finished = true;
+ stream.on('data', function() {});
+ };
+
+ stream.on('cancelled', function() {
+ stream.terminate();
+ });
+}
+
+util.inherits(ServerWritableStream, Writable);
+
+/**
+ * A stream that the server can write to. Used for calls that are streaming from
+ * the server side.
+ * @constructor
+ * @param {grpc.Call} call The call object to send data with
+ * @param {function(*):Buffer=} serialize Serialization function for writes
+ */
+function ServerWritableStream(call, serialize) {
+ Writable.call(this, {objectMode: true});
+ this.call = call;
+
+ this.finished = false;
+ setUpWritable(this, serialize);
+}
+
+/**
+ * Start writing a chunk of data. This is an implementation of a method required
+ * for implementing stream.Writable.
+ * @param {Buffer} chunk The chunk of data to write
+ * @param {string} encoding Ignored
+ * @param {function(Error=)} callback Callback to indicate that the write is
+ * complete
+ */
+function _write(chunk, encoding, callback) {
+ var batch = {};
+ batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
+ this.call.startBatch(batch, function(err, value) {
+ if (err) {
+ this.emit('error', err);
+ return;
+ }
+ callback();
+ });
+}
+
+ServerWritableStream.prototype._write = _write;
+
+util.inherits(ServerReadableStream, Readable);
+
+/**
+ * A stream that the server can read from. Used for calls that are streaming
+ * from the client side.
+ * @constructor
+ * @param {grpc.Call} call The call object to read data with
+ * @param {function(Buffer):*=} deserialize Deserialization function for reads
+ */
+function ServerReadableStream(call, deserialize) {
+ Readable.call(this, {objectMode: true});
+ this.call = call;
+ setUpReadable(this, deserialize);
+}
+
+/**
+ * Start reading from the gRPC data source. This is an implementation of a
+ * method required for implementing stream.Readable
+ * @param {number} size Ignored
+ */
+function _read(size) {
+ var self = this;
/**
* Callback to be called when a READ event is received. Pushes the data onto
* the read queue and starts reading again if applicable
* @param {grpc.Event} event READ event object
*/
- function readCallback(event) {
- if (finished) {
+ function readCallback(err, event) {
+ if (err) {
+ self.terminate();
+ return;
+ }
+ if (self.finished) {
self.push(null);
return;
}
- var data = event.data;
+ var data = event.read;
if (self.push(self.deserialize(data)) && data != null) {
- self._call.startRead(readCallback);
+ var read_batch = {};
+ read_batch[grpc.opType.RECV_MESSAGE] = true;
+ self.call.startBatch(read_batch, readCallback);
} else {
- reading = false;
+ self.reading = false;
}
}
- /**
- * Start reading if there is not already a pending read. Reading will
- * continue until self.push returns false (indicating reads should slow
- * down) or the read data is null (indicating that there is no more data).
- */
- this.startReading = function() {
- if (finished) {
- self.push(null);
- } else {
- if (!reading) {
- reading = true;
- self._call.startRead(readCallback);
+ if (self.finished) {
+ self.push(null);
+ } else {
+ if (!self.reading) {
+ self.reading = true;
+ var batch = {};
+ batch[grpc.opType.RECV_MESSAGE] = true;
+ self.call.startBatch(batch, readCallback);
+ }
+ }
+}
+
+ServerReadableStream.prototype._read = _read;
+
+util.inherits(ServerDuplexStream, Duplex);
+
+/**
+ * A stream that the server can read from or write to. Used for calls with
+ * duplex streaming.
+ * @constructor
+ * @param {grpc.Call} call Call object to proxy
+ * @param {function(*):Buffer=} serialize Serialization function for requests
+ * @param {function(Buffer):*=} deserialize Deserialization function for
+ * responses
+ */
+function ServerDuplexStream(call, serialize, deserialize) {
+ Duplex.call(this, {objectMode: true});
+ this.call = call;
+ setUpWritable(this, serialize);
+ setUpReadable(this, deserialize);
+}
+
+ServerDuplexStream.prototype._read = _read;
+ServerDuplexStream.prototype._write = _write;
+
+/**
+ * Fully handle a unary call
+ * @param {grpc.Call} call The call to handle
+ * @param {Object} handler Request handler object for the method that was called
+ * @param {Object} metadata Metadata from the client
+ */
+function handleUnary(call, handler, metadata) {
+ var emitter = new EventEmitter();
+ emitter.on('error', function(error) {
+ handleError(call, error);
+ });
+ waitForCancel(call, emitter);
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
+ batch[grpc.opType.RECV_MESSAGE] = true;
+ call.startBatch(batch, function(err, result) {
+ if (err) {
+ handleError(call, err);
+ return;
+ }
+ emitter.request = handler.deserialize(result.read);
+ if (emitter.cancelled) {
+ return;
+ }
+ handler.func(emitter, function sendUnaryData(err, value) {
+ if (err) {
+ handleError(call, err);
}
+ sendUnaryResponse(call, value, handler.serialize);
+ });
+ });
+}
+
+/**
+ * Fully handle a server streaming call
+ * @param {grpc.Call} call The call to handle
+ * @param {Object} handler Request handler object for the method that was called
+ * @param {Object} metadata Metadata from the client
+ */
+function handleServerStreaming(call, handler, metadata) {
+ var stream = new ServerWritableStream(call, handler.serialize);
+ waitForCancel(call, stream);
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
+ batch[grpc.opType.RECV_MESSAGE] = true;
+ call.startBatch(batch, function(err, result) {
+ if (err) {
+ stream.emit('error', err);
+ return;
}
- };
+ stream.request = handler.deserialize(result.read);
+ handler.func(stream);
+ });
}
/**
- * Start reading from the gRPC data source. This is an implementation of a
- * method required for implementing stream.Readable
- * @param {number} size Ignored
+ * Fully handle a client streaming call
+ * @param {grpc.Call} call The call to handle
+ * @param {Object} handler Request handler object for the method that was called
+ * @param {Object} metadata Metadata from the client
*/
-GrpcServerStream.prototype._read = function(size) {
- this.startReading();
-};
+function handleClientStreaming(call, handler, metadata) {
+ var stream = new ServerReadableStream(call, handler.deserialize);
+ waitForCancel(call, stream);
+ var metadata_batch = {};
+ metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
+ call.startBatch(metadata_batch, function() {});
+ handler.func(stream, function(err, value) {
+ stream.terminate();
+ if (err) {
+ handleError(call, err);
+ }
+ sendUnaryResponse(call, value, handler.serialize);
+ });
+}
/**
- * Start writing a chunk of data. This is an implementation of a method required
- * for implementing stream.Writable.
- * @param {Buffer} chunk The chunk of data to write
- * @param {string} encoding Ignored
- * @param {function(Error=)} callback Callback to indicate that the write is
- * complete
+ * Fully handle a bidirectional streaming call
+ * @param {grpc.Call} call The call to handle
+ * @param {Object} handler Request handler object for the method that was called
+ * @param {Object} metadata Metadata from the client
*/
-GrpcServerStream.prototype._write = function(chunk, encoding, callback) {
- var self = this;
- self._call.startWrite(self.serialize(chunk), function(event) {
- callback();
- }, 0);
+function handleBidiStreaming(call, handler, metadata) {
+ var stream = new ServerDuplexStream(call, handler.serialize,
+ handler.deserialize);
+ waitForCancel(call, stream);
+ var metadata_batch = {};
+ metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
+ call.startBatch(metadata_batch, function() {});
+ handler.func(stream);
+}
+
+var streamHandlers = {
+ unary: handleUnary,
+ server_stream: handleServerStreaming,
+ client_stream: handleClientStreaming,
+ bidi: handleBidiStreaming
};
/**
@@ -218,7 +429,7 @@ function Server(getMetadata, options) {
* Start the server and begin handling requests
* @this Server
*/
- this.start = function() {
+ this.listen = function() {
console.log('Server starting');
_.each(handlers, function(handler, handler_name) {
console.log('Serving', handler_name);
@@ -233,48 +444,39 @@ function Server(getMetadata, options) {
* wait for the next request
* @param {grpc.Event} event The event to handle with tag SERVER_RPC_NEW
*/
- function handleNewCall(event) {
- var call = event.call;
- var data = event.data;
- if (data === null) {
+ function handleNewCall(err, event) {
+ if (err) {
+ return;
+ }
+ var details = event['new call'];
+ var call = details.call;
+ var method = details.method;
+ var metadata = details.metadata;
+ if (method === null) {
return;
}
server.requestCall(handleNewCall);
var handler = undefined;
- var deadline = data.absolute_deadline;
- var cancelled = false;
- call.serverAccept(function(event) {
- if (event.data.code === grpc.status.CANCELLED) {
- cancelled = true;
- if (stream) {
- stream.emit('cancelled');
- }
- }
- }, 0);
- if (handlers.hasOwnProperty(data.method)) {
- handler = handlers[data.method];
+ var deadline = details.deadline;
+ if (handlers.hasOwnProperty(method)) {
+ handler = handlers[method];
} else {
- call.serverEndInitialMetadata(0);
- call.startWriteStatus(
- grpc.status.UNIMPLEMENTED,
- "This method is not available on this server.",
- function() {});
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] = {};
+ batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
+ code: grpc.status.UNIMPLEMENTED,
+ details: "This method is not available on this server.",
+ metadata: {}
+ };
+ batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
+ call.startBatch(batch, function() {});
return;
}
+ var response_metadata = {};
if (getMetadata) {
- call.addMetadata(getMetadata(data.method, data.metadata));
- }
- call.serverEndInitialMetadata(0);
- var stream = new GrpcServerStream(call, handler.serialize,
- handler.deserialize);
- Object.defineProperty(stream, 'cancelled', {
- get: function() { return cancelled;}
- });
- try {
- handler.func(stream, data.metadata);
- } catch (e) {
- stream.emit('error', e);
+ response_metadata = getMetadata(method, metadata);
}
+ streamHandlers[handler.type](call, handler, response_metadata);
}
server.requestCall(handleNewCall);
};
@@ -294,17 +496,20 @@ function Server(getMetadata, options) {
* returns a stream of response values
* @param {function(*):Buffer} serialize Serialization function for responses
* @param {function(Buffer):*} deserialize Deserialization function for requests
+ * @param {string} type The streaming type of method that this handles
* @return {boolean} True if the handler was set. False if a handler was already
* set for that name.
*/
-Server.prototype.register = function(name, handler, serialize, deserialize) {
+Server.prototype.register = function(name, handler, serialize, deserialize,
+ type) {
if (this.handlers.hasOwnProperty(name)) {
return false;
}
this.handlers[name] = {
func: handler,
serialize: serialize,
- deserialize: deserialize
+ deserialize: deserialize,
+ type: type
};
return true;
};
@@ -324,6 +529,110 @@ Server.prototype.bind = function(port, secure) {
};
/**
- * See documentation for Server
+ * Creates a constructor for servers with a service defined by the methods
+ * object. The methods object has string keys and values of this form:
+ * {serialize: function, deserialize: function, client_stream: bool,
+ * server_stream: bool}
+ * @param {Object} methods Method descriptor for each method the server should
+ * expose
+ * @param {string} prefix The prefex to prepend to each method name
+ * @return {function(Object, Object)} New server constructor
+ */
+function makeServerConstructor(services) {
+ var qual_names = [];
+ _.each(services, function(service) {
+ _.each(service.children, function(method) {
+ var name = common.fullyQualifiedName(method);
+ if (_.indexOf(qual_names, name) !== -1) {
+ throw new Error('Method ' + name + ' exposed by more than one service');
+ }
+ qual_names.push(name);
+ });
+ });
+ /**
+ * Create a server with the given handlers for all of the methods.
+ * @constructor
+ * @param {Object} service_handlers Map from service names to map from method
+ * names to handlers
+ * @param {function(string, Object<string, Array<Buffer>>):
+ Object<string, Array<Buffer|string>>=} getMetadata Callback that
+ * gets metatada for a given method
+ * @param {Object=} options Options to pass to the underlying server
+ */
+ function SurfaceServer(service_handlers, getMetadata, options) {
+ var server = new Server(getMetadata, options);
+ this.inner_server = server;
+ _.each(services, function(service) {
+ var service_name = common.fullyQualifiedName(service);
+ if (service_handlers[service_name] === undefined) {
+ throw new Error('Handlers for service ' +
+ service_name + ' not provided.');
+ }
+ var prefix = '/' + common.fullyQualifiedName(service) + '/';
+ _.each(service.children, function(method) {
+ var method_type;
+ if (method.requestStream) {
+ if (method.responseStream) {
+ method_type = 'bidi';
+ } else {
+ method_type = 'client_stream';
+ }
+ } else {
+ if (method.responseStream) {
+ method_type = 'server_stream';
+ } else {
+ method_type = 'unary';
+ }
+ }
+ if (service_handlers[service_name][decapitalize(method.name)] ===
+ undefined) {
+ throw new Error('Method handler for ' +
+ common.fullyQualifiedName(method) + ' not provided.');
+ }
+ var serialize = common.serializeCls(
+ method.resolvedResponseType.build());
+ var deserialize = common.deserializeCls(
+ method.resolvedRequestType.build());
+ server.register(
+ prefix + capitalize(method.name),
+ service_handlers[service_name][decapitalize(method.name)],
+ serialize, deserialize, method_type);
+ });
+ }, this);
+ }
+
+ /**
+ * Binds the server to the given port, with SSL enabled if secure is specified
+ * @param {string} port The port that the server should bind on, in the format
+ * "address:port"
+ * @param {boolean=} secure Whether the server should open a secure port
+ * @return {SurfaceServer} this
+ */
+ SurfaceServer.prototype.bind = function(port, secure) {
+ return this.inner_server.bind(port, secure);
+ };
+
+ /**
+ * Starts the server listening on any bound ports
+ * @return {SurfaceServer} this
+ */
+ SurfaceServer.prototype.listen = function() {
+ this.inner_server.listen();
+ return this;
+ };
+
+ /**
+ * Shuts the server down; tells it to stop listening for new requests and to
+ * kill old requests.
+ */
+ SurfaceServer.prototype.shutdown = function() {
+ this.inner_server.shutdown();
+ };
+
+ return SurfaceServer;
+}
+
+/**
+ * See documentation for makeServerConstructor
*/
-module.exports = Server;
+exports.makeServerConstructor = makeServerConstructor;
diff --git a/src/node/src/surface_client.js b/src/node/src/surface_client.js
deleted file mode 100644
index 16c31809f4..0000000000
--- a/src/node/src/surface_client.js
+++ /dev/null
@@ -1,357 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-var _ = require('underscore');
-
-var capitalize = require('underscore.string/capitalize');
-var decapitalize = require('underscore.string/decapitalize');
-
-var client = require('./client.js');
-
-var common = require('./common.js');
-
-var EventEmitter = require('events').EventEmitter;
-
-var stream = require('stream');
-
-var Readable = stream.Readable;
-var Writable = stream.Writable;
-var Duplex = stream.Duplex;
-var util = require('util');
-
-
-function forwardEvent(fromEmitter, toEmitter, event) {
- fromEmitter.on(event, function forward() {
- _.partial(toEmitter.emit, event).apply(toEmitter, arguments);
- });
-}
-
-util.inherits(ClientReadableObjectStream, Readable);
-
-/**
- * Class for representing a gRPC server streaming call as a Node stream on the
- * client side. Extends from stream.Readable.
- * @constructor
- * @param {stream} stream Underlying binary Duplex stream for the call
- */
-function ClientReadableObjectStream(stream) {
- var options = {objectMode: true};
- Readable.call(this, options);
- this._stream = stream;
- var self = this;
- forwardEvent(stream, this, 'status');
- forwardEvent(stream, this, 'metadata');
- this._stream.on('data', function forwardData(chunk) {
- if (!self.push(chunk)) {
- self._stream.pause();
- }
- });
- this._stream.pause();
-}
-
-/**
- * _read implementation for both types of streams that allow reading.
- * @this {ClientReadableObjectStream}
- * @param {number} size Ignored
- */
-function _read(size) {
- this._stream.resume();
-}
-
-/**
- * See docs for _read
- */
-ClientReadableObjectStream.prototype._read = _read;
-
-util.inherits(ClientWritableObjectStream, Writable);
-
-/**
- * Class for representing a gRPC client streaming call as a Node stream on the
- * client side. Extends from stream.Writable.
- * @constructor
- * @param {stream} stream Underlying binary Duplex stream for the call
- */
-function ClientWritableObjectStream(stream) {
- var options = {objectMode: true};
- Writable.call(this, options);
- this._stream = stream;
- forwardEvent(stream, this, 'status');
- forwardEvent(stream, this, 'metadata');
- this.on('finish', function() {
- this._stream.end();
- });
-}
-
-/**
- * _write implementation for both types of streams that allow writing
- * @this {ClientWritableObjectStream}
- * @param {*} chunk The value to write to the stream
- * @param {string} encoding Ignored
- * @param {function(Error)} callback Callback to call when finished writing
- */
-function _write(chunk, encoding, callback) {
- this._stream.write(chunk, encoding, callback);
-}
-
-/**
- * See docs for _write
- */
-ClientWritableObjectStream.prototype._write = _write;
-
-/**
- * Cancel the underlying call
- */
-function cancel() {
- this._stream.cancel();
-}
-
-ClientReadableObjectStream.prototype.cancel = cancel;
-ClientWritableObjectStream.prototype.cancel = cancel;
-
-/**
- * Get a function that can make unary requests to the specified method.
- * @param {string} method The name of the method to request
- * @param {function(*):Buffer} serialize The serialization function for inputs
- * @param {function(Buffer)} deserialize The deserialization function for
- * outputs
- * @return {Function} makeUnaryRequest
- */
-function makeUnaryRequestFunction(method, serialize, deserialize) {
- /**
- * Make a unary request with this method on the given channel with the given
- * argument, callback, etc.
- * @this {SurfaceClient} Client object. Must have a channel member.
- * @param {*} argument The argument to the call. Should be serializable with
- * serialize
- * @param {function(?Error, value=)} callback The callback to for when the
- * response is received
- * @param {array=} metadata Array of metadata key/value pairs to add to the
- * call
- * @param {(number|Date)=} deadline The deadline for processing this request.
- * Defaults to infinite future
- * @return {EventEmitter} An event emitter for stream related events
- */
- function makeUnaryRequest(argument, callback, metadata, deadline) {
- var stream = client.makeRequest(this.channel, method, serialize,
- deserialize, metadata, deadline);
- var emitter = new EventEmitter();
- emitter.cancel = function cancel() {
- stream.cancel();
- };
- forwardEvent(stream, emitter, 'status');
- forwardEvent(stream, emitter, 'metadata');
- stream.write(argument);
- stream.end();
- stream.on('data', function forwardData(chunk) {
- try {
- callback(null, chunk);
- } catch (e) {
- callback(e);
- }
- });
- stream.on('status', function forwardStatus(status) {
- if (status.code !== client.status.OK) {
- callback(status);
- }
- });
- return emitter;
- }
- return makeUnaryRequest;
-}
-
-/**
- * Get a function that can make client stream requests to the specified method.
- * @param {string} method The name of the method to request
- * @param {function(*):Buffer} serialize The serialization function for inputs
- * @param {function(Buffer)} deserialize The deserialization function for
- * outputs
- * @return {Function} makeClientStreamRequest
- */
-function makeClientStreamRequestFunction(method, serialize, deserialize) {
- /**
- * Make a client stream request with this method on the given channel with the
- * given callback, etc.
- * @this {SurfaceClient} Client object. Must have a channel member.
- * @param {function(?Error, value=)} callback The callback to for when the
- * response is received
- * @param {array=} metadata Array of metadata key/value pairs to add to the
- * call
- * @param {(number|Date)=} deadline The deadline for processing this request.
- * Defaults to infinite future
- * @return {EventEmitter} An event emitter for stream related events
- */
- function makeClientStreamRequest(callback, metadata, deadline) {
- var stream = client.makeRequest(this.channel, method, serialize,
- deserialize, metadata, deadline);
- var obj_stream = new ClientWritableObjectStream(stream);
- stream.on('data', function forwardData(chunk) {
- try {
- callback(null, chunk);
- } catch (e) {
- callback(e);
- }
- });
- stream.on('status', function forwardStatus(status) {
- if (status.code !== client.status.OK) {
- callback(status);
- }
- });
- return obj_stream;
- }
- return makeClientStreamRequest;
-}
-
-/**
- * Get a function that can make server stream requests to the specified method.
- * @param {string} method The name of the method to request
- * @param {function(*):Buffer} serialize The serialization function for inputs
- * @param {function(Buffer)} deserialize The deserialization function for
- * outputs
- * @return {Function} makeServerStreamRequest
- */
-function makeServerStreamRequestFunction(method, serialize, deserialize) {
- /**
- * Make a server stream request with this method on the given channel with the
- * given argument, etc.
- * @this {SurfaceClient} Client object. Must have a channel member.
- * @param {*} argument The argument to the call. Should be serializable with
- * serialize
- * @param {array=} metadata Array of metadata key/value pairs to add to the
- * call
- * @param {(number|Date)=} deadline The deadline for processing this request.
- * Defaults to infinite future
- * @return {EventEmitter} An event emitter for stream related events
- */
- function makeServerStreamRequest(argument, metadata, deadline) {
- var stream = client.makeRequest(this.channel, method, serialize,
- deserialize, metadata, deadline);
- var obj_stream = new ClientReadableObjectStream(stream);
- stream.write(argument);
- stream.end();
- return obj_stream;
- }
- return makeServerStreamRequest;
-}
-
-/**
- * Get a function that can make bidirectional stream requests to the specified
- * method.
- * @param {string} method The name of the method to request
- * @param {function(*):Buffer} serialize The serialization function for inputs
- * @param {function(Buffer)} deserialize The deserialization function for
- * outputs
- * @return {Function} makeBidiStreamRequest
- */
-function makeBidiStreamRequestFunction(method, serialize, deserialize) {
- /**
- * Make a bidirectional stream request with this method on the given channel.
- * @this {SurfaceClient} Client object. Must have a channel member.
- * @param {array=} metadata Array of metadata key/value pairs to add to the
- * call
- * @param {(number|Date)=} deadline The deadline for processing this request.
- * Defaults to infinite future
- * @return {EventEmitter} An event emitter for stream related events
- */
- function makeBidiStreamRequest(metadata, deadline) {
- return client.makeRequest(this.channel, method, serialize,
- deserialize, metadata, deadline);
- }
- return makeBidiStreamRequest;
-}
-
-/**
- * Map with short names for each of the requester maker functions. Used in
- * makeClientConstructor
- */
-var requester_makers = {
- unary: makeUnaryRequestFunction,
- server_stream: makeServerStreamRequestFunction,
- client_stream: makeClientStreamRequestFunction,
- bidi: makeBidiStreamRequestFunction
-}
-
-/**
- * Creates a constructor for clients for the given service
- * @param {ProtoBuf.Reflect.Service} service The service to generate a client
- * for
- * @return {function(string, Object)} New client constructor
- */
-function makeClientConstructor(service) {
- var prefix = '/' + common.fullyQualifiedName(service) + '/';
- /**
- * Create a client with the given methods
- * @constructor
- * @param {string} address The address of the server to connect to
- * @param {Object} options Options to pass to the underlying channel
- */
- function SurfaceClient(address, options) {
- this.channel = new client.Channel(address, options);
- }
-
- _.each(service.children, function(method) {
- var method_type;
- if (method.requestStream) {
- if (method.responseStream) {
- method_type = 'bidi';
- } else {
- method_type = 'client_stream';
- }
- } else {
- if (method.responseStream) {
- method_type = 'server_stream';
- } else {
- method_type = 'unary';
- }
- }
- SurfaceClient.prototype[decapitalize(method.name)] =
- requester_makers[method_type](
- prefix + capitalize(method.name),
- common.serializeCls(method.resolvedRequestType.build()),
- common.deserializeCls(method.resolvedResponseType.build()));
- });
-
- SurfaceClient.service = service;
-
- return SurfaceClient;
-}
-
-exports.makeClientConstructor = makeClientConstructor;
-
-/**
- * See docs for client.status
- */
-exports.status = client.status;
-/**
- * See docs for client.callError
- */
-exports.callError = client.callError;
diff --git a/src/node/src/surface_server.js b/src/node/src/surface_server.js
deleted file mode 100644
index a47d1fa23d..0000000000
--- a/src/node/src/surface_server.js
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-var _ = require('underscore');
-
-var capitalize = require('underscore.string/capitalize');
-var decapitalize = require('underscore.string/decapitalize');
-
-var Server = require('./server.js');
-
-var stream = require('stream');
-
-var Readable = stream.Readable;
-var Writable = stream.Writable;
-var Duplex = stream.Duplex;
-var util = require('util');
-
-var common = require('./common.js');
-
-util.inherits(ServerReadableObjectStream, Readable);
-
-/**
- * Class for representing a gRPC client streaming call as a Node stream on the
- * server side. Extends from stream.Readable.
- * @constructor
- * @param {stream} stream Underlying binary Duplex stream for the call
- */
-function ServerReadableObjectStream(stream) {
- var options = {objectMode: true};
- Readable.call(this, options);
- this._stream = stream;
- Object.defineProperty(this, 'cancelled', {
- get: function() { return stream.cancelled; }
- });
- var self = this;
- this._stream.on('cancelled', function() {
- self.emit('cancelled');
- });
- this._stream.on('data', function forwardData(chunk) {
- if (!self.push(chunk)) {
- self._stream.pause();
- }
- });
- this._stream.on('end', function forwardEnd() {
- self.push(null);
- });
- this._stream.pause();
-}
-
-/**
- * _read implementation for both types of streams that allow reading.
- * @this {ServerReadableObjectStream|ServerBidiObjectStream}
- * @param {number} size Ignored
- */
-function _read(size) {
- this._stream.resume();
-}
-
-/**
- * See docs for _read
- */
-ServerReadableObjectStream.prototype._read = _read;
-
-util.inherits(ServerWritableObjectStream, Writable);
-
-/**
- * Class for representing a gRPC server streaming call as a Node stream on the
- * server side. Extends from stream.Writable.
- * @constructor
- * @param {stream} stream Underlying binary Duplex stream for the call
- */
-function ServerWritableObjectStream(stream) {
- var options = {objectMode: true};
- Writable.call(this, options);
- this._stream = stream;
- this._stream.on('cancelled', function() {
- self.emit('cancelled');
- });
- this.on('finish', function() {
- this._stream.end();
- });
-}
-
-/**
- * _write implementation for both types of streams that allow writing
- * @this {ServerWritableObjectStream}
- * @param {*} chunk The value to write to the stream
- * @param {string} encoding Ignored
- * @param {function(Error)} callback Callback to call when finished writing
- */
-function _write(chunk, encoding, callback) {
- this._stream.write(chunk, encoding, callback);
-}
-
-/**
- * See docs for _write
- */
-ServerWritableObjectStream.prototype._write = _write;
-
-/**
- * Creates a binary stream handler function from a unary handler function
- * @param {function(Object, function(Error, *), metadata=)} handler Unary call
- * handler
- * @return {function(stream, metadata=)} Binary stream handler
- */
-function makeUnaryHandler(handler) {
- /**
- * Handles a stream by reading a single data value, passing it to the handler,
- * and writing the response back to the stream.
- * @param {stream} stream Binary data stream
- * @param {metadata=} metadata Incoming metadata array
- */
- return function handleUnaryCall(stream, metadata) {
- stream.on('data', function handleUnaryData(value) {
- var call = {request: value};
- Object.defineProperty(call, 'cancelled', {
- get: function() { return stream.cancelled;}
- });
- stream.on('cancelled', function() {
- call.emit('cancelled');
- });
- handler(call, function sendUnaryData(err, value) {
- if (err) {
- stream.emit('error', err);
- } else {
- stream.write(value);
- stream.end();
- }
- }, metadata);
- });
- };
-}
-
-/**
- * Creates a binary stream handler function from a client stream handler
- * function
- * @param {function(Readable, function(Error, *), metadata=)} handler Client
- * stream call handler
- * @return {function(stream, metadata=)} Binary stream handler
- */
-function makeClientStreamHandler(handler) {
- /**
- * Handles a stream by passing a deserializing stream to the handler and
- * writing the response back to the stream.
- * @param {stream} stream Binary data stream
- * @param {metadata=} metadata Incoming metadata array
- */
- return function handleClientStreamCall(stream, metadata) {
- var object_stream = new ServerReadableObjectStream(stream);
- handler(object_stream, function sendClientStreamData(err, value) {
- if (err) {
- stream.emit('error', err);
- } else {
- stream.write(value);
- stream.end();
- }
- }, metadata);
- };
-}
-
-/**
- * Creates a binary stream handler function from a server stream handler
- * function
- * @param {function(Writable, metadata=)} handler Server stream call handler
- * @return {function(stream, metadata=)} Binary stream handler
- */
-function makeServerStreamHandler(handler) {
- /**
- * Handles a stream by attaching it to a serializing stream, and passing it to
- * the handler.
- * @param {stream} stream Binary data stream
- * @param {metadata=} metadata Incoming metadata array
- */
- return function handleServerStreamCall(stream, metadata) {
- stream.on('data', function handleClientData(value) {
- var object_stream = new ServerWritableObjectStream(stream);
- object_stream.request = value;
- handler(object_stream, metadata);
- });
- };
-}
-
-/**
- * Creates a binary stream handler function from a bidi stream handler function
- * @param {function(Duplex, metadata=)} handler Unary call handler
- * @return {function(stream, metadata=)} Binary stream handler
- */
-function makeBidiStreamHandler(handler) {
- return handler;
-}
-
-/**
- * Map with short names for each of the handler maker functions. Used in
- * makeServerConstructor
- */
-var handler_makers = {
- unary: makeUnaryHandler,
- server_stream: makeServerStreamHandler,
- client_stream: makeClientStreamHandler,
- bidi: makeBidiStreamHandler
-};
-
-/**
- * Creates a constructor for servers with a service defined by the methods
- * object. The methods object has string keys and values of this form:
- * {serialize: function, deserialize: function, client_stream: bool,
- * server_stream: bool}
- * @param {Object} methods Method descriptor for each method the server should
- * expose
- * @param {string} prefix The prefex to prepend to each method name
- * @return {function(Object, Object)} New server constructor
- */
-function makeServerConstructor(services) {
- var qual_names = [];
- _.each(services, function(service) {
- _.each(service.children, function(method) {
- var name = common.fullyQualifiedName(method);
- if (_.indexOf(qual_names, name) !== -1) {
- throw new Error('Method ' + name + ' exposed by more than one service');
- }
- qual_names.push(name);
- });
- });
- /**
- * Create a server with the given handlers for all of the methods.
- * @constructor
- * @param {Object} service_handlers Map from service names to map from method
- * names to handlers
- * @param {function(string, Object<string, Array<Buffer>>):
- Object<string, Array<Buffer|string>>=} getMetadata Callback that
- * gets metatada for a given method
- * @param {Object=} options Options to pass to the underlying server
- */
- function SurfaceServer(service_handlers, getMetadata, options) {
- var server = new Server(getMetadata, options);
- this.inner_server = server;
- _.each(services, function(service) {
- var service_name = common.fullyQualifiedName(service);
- if (service_handlers[service_name] === undefined) {
- throw new Error('Handlers for service ' +
- service_name + ' not provided.');
- }
- var prefix = '/' + common.fullyQualifiedName(service) + '/';
- _.each(service.children, function(method) {
- var method_type;
- if (method.requestStream) {
- if (method.responseStream) {
- method_type = 'bidi';
- } else {
- method_type = 'client_stream';
- }
- } else {
- if (method.responseStream) {
- method_type = 'server_stream';
- } else {
- method_type = 'unary';
- }
- }
- if (service_handlers[service_name][decapitalize(method.name)] ===
- undefined) {
- throw new Error('Method handler for ' +
- common.fullyQualifiedName(method) + ' not provided.');
- }
- var binary_handler = handler_makers[method_type](
- service_handlers[service_name][decapitalize(method.name)]);
- var serialize = common.serializeCls(
- method.resolvedResponseType.build());
- var deserialize = common.deserializeCls(
- method.resolvedRequestType.build());
- server.register(prefix + capitalize(method.name), binary_handler,
- serialize, deserialize);
- });
- }, this);
- }
-
- /**
- * Binds the server to the given port, with SSL enabled if secure is specified
- * @param {string} port The port that the server should bind on, in the format
- * "address:port"
- * @param {boolean=} secure Whether the server should open a secure port
- * @return {SurfaceServer} this
- */
- SurfaceServer.prototype.bind = function(port, secure) {
- return this.inner_server.bind(port, secure);
- };
-
- /**
- * Starts the server listening on any bound ports
- * @return {SurfaceServer} this
- */
- SurfaceServer.prototype.listen = function() {
- this.inner_server.start();
- return this;
- };
-
- /**
- * Shuts the server down; tells it to stop listening for new requests and to
- * kill old requests.
- */
- SurfaceServer.prototype.shutdown = function() {
- this.inner_server.shutdown();
- };
-
- return SurfaceServer;
-}
-
-/**
- * See documentation for makeServerConstructor
- */
-exports.makeServerConstructor = makeServerConstructor;
diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js
index 48db245498..c1a7e95fa0 100644
--- a/src/node/test/call_test.js
+++ b/src/node/test/call_test.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2014, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -98,100 +98,80 @@ describe('call', function() {
}, TypeError);
});
});
- describe('addMetadata', function() {
- it('should succeed with a map from strings to string arrays', function() {
+ describe('startBatch', function() {
+ it('should fail without an object and a function', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
- assert.doesNotThrow(function() {
- call.addMetadata({'key': ['value']});
+ assert.throws(function() {
+ call.startBatch();
});
- assert.doesNotThrow(function() {
- call.addMetadata({'key1': ['value1'], 'key2': ['value2']});
+ assert.throws(function() {
+ call.startBatch({});
+ });
+ assert.throws(function() {
+ call.startBatch(null, function(){});
});
});
- it('should succeed with a map from strings to buffer arrays', function() {
+ it('should succeed with an empty object', function(done) {
var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.doesNotThrow(function() {
- call.addMetadata({'key': [new Buffer('value')]});
- });
- assert.doesNotThrow(function() {
- call.addMetadata({'key1': [new Buffer('value1')],
- 'key2': [new Buffer('value2')]});
+ call.startBatch({}, function(err) {
+ assert.ifError(err);
+ done();
+ });
});
});
- it('should fail with other parameter types', function() {
+ });
+ describe('startBatch with metadata', function() {
+ it('should succeed with a map of strings to string arrays', function(done) {
var call = new grpc.Call(channel, 'method', getDeadline(1));
- assert.throws(function() {
- call.addMetadata();
+ assert.doesNotThrow(function() {
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] = {'key1': ['value1'],
+ 'key2': ['value2']};
+ call.startBatch(batch, function(err, resp) {
+ assert.ifError(err);
+ assert.deepEqual(resp, {'send metadata': true});
+ done();
+ });
});
- assert.throws(function() {
- call.addMetadata(null);
- }, TypeError);
- assert.throws(function() {
- call.addMetadata('value');
- }, TypeError);
- assert.throws(function() {
- call.addMetadata(5);
- }, TypeError);
});
- it.skip('should fail if invoke was already called', function(done) {
+ it('should succeed with a map of strings to buffer arrays', function(done) {
var call = new grpc.Call(channel, 'method', getDeadline(1));
- call.invoke(function() {},
- function() {done();},
- 0);
- assert.throws(function() {
- call.addMetadata({'key': ['value']});
+ assert.doesNotThrow(function() {
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] = {
+ 'key1': [new Buffer('value1')],
+ 'key2': [new Buffer('value2')]
+ };
+ call.startBatch(batch, function(err, resp) {
+ assert.ifError(err);
+ assert.deepEqual(resp, {'send metadata': true});
+ done();
+ });
});
- // Cancel to speed up the test
- call.cancel();
});
- });
- describe('invoke', function() {
- it('should fail with fewer than 3 arguments', function() {
+ it('should fail with other parameter types', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() {
- call.invoke();
- }, TypeError);
- assert.throws(function() {
- call.invoke(function() {});
- }, TypeError);
- assert.throws(function() {
- call.invoke(function() {},
- function() {});
- }, TypeError);
- });
- it('should work with 2 args and an int', function(done) {
- assert.doesNotThrow(function() {
- var call = new grpc.Call(channel, 'method', getDeadline(1));
- call.invoke(function() {},
- function() {done();},
- 0);
- // Cancel to speed up the test
- call.cancel();
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] = undefined;
+ call.startBatch(batch, function(){});
});
- });
- it('should reject incorrectly typed arguments', function() {
- var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() {
- call.invoke(0, 0, 0);
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] = null;
+ call.startBatch(batch, function(){});
}, TypeError);
assert.throws(function() {
- call.invoke(function() {},
- function() {}, 'test');
- });
- });
- });
- describe('serverAccept', function() {
- it('should fail with fewer than 1 argument1', function() {
- var call = new grpc.Call(channel, 'method', getDeadline(1));
- assert.throws(function() {
- call.serverAccept();
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] = 'value';
+ call.startBatch(batch, function(){});
}, TypeError);
- });
- it.skip('should return an error when called on a client Call', function() {
- var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() {
- call.serverAccept(function() {});
- });
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] = 5;
+ call.startBatch(batch, function(){});
+ }, TypeError);
});
});
describe('cancel', function() {
diff --git a/src/node/test/client_server_test.js b/src/node/test/client_server_test.js
deleted file mode 100644
index 1db9f69467..0000000000
--- a/src/node/test/client_server_test.js
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-var assert = require('assert');
-var fs = require('fs');
-var path = require('path');
-var grpc = require('bindings')('grpc.node');
-var Server = require('../src/server');
-var client = require('../src/client');
-var common = require('../src/common');
-
-var ca_path = path.join(__dirname, 'data/ca.pem');
-
-var key_path = path.join(__dirname, 'data/server1.key');
-
-var pem_path = path.join(__dirname, 'data/server1.pem');
-
-/**
- * Helper function to return an absolute deadline given a relative timeout in
- * seconds.
- * @param {number} timeout_secs The number of seconds to wait before timing out
- * @return {Date} A date timeout_secs in the future
- */
-function getDeadline(timeout_secs) {
- var deadline = new Date();
- deadline.setSeconds(deadline.getSeconds() + timeout_secs);
- return deadline;
-}
-
-/**
- * Responds to every request with the same data as a response
- * @param {Stream} stream
- */
-function echoHandler(stream) {
- stream.pipe(stream);
-}
-
-/**
- * Responds to every request with an error status
- * @param {Stream} stream
- */
-function errorHandler(stream) {
- throw {
- 'code' : grpc.status.UNIMPLEMENTED,
- 'details' : 'error details'
- };
-}
-
-/**
- * Wait for a cancellation instead of responding
- * @param {Stream} stream
- */
-function cancelHandler(stream) {
- // do nothing
-}
-
-function metadataHandler(stream, metadata) {
- stream.end();
-}
-
-/**
- * Serialize a string to a Buffer
- * @param {string} value The string to serialize
- * @return {Buffer} The serialized value
- */
-function stringSerialize(value) {
- return new Buffer(value);
-}
-
-/**
- * Deserialize a Buffer to a string
- * @param {Buffer} buffer The buffer to deserialize
- * @return {string} The string value of the buffer
- */
-function stringDeserialize(buffer) {
- return buffer.toString();
-}
-
-describe('echo client', function() {
- var server;
- var channel;
- before(function() {
- server = new Server(function getMetadata(method, metadata) {
- return {method: [method]};
- });
- var port_num = server.bind('0.0.0.0:0');
- server.register('echo', echoHandler);
- server.register('error', errorHandler);
- server.register('cancellation', cancelHandler);
- server.register('metadata', metadataHandler);
- server.start();
-
- channel = new grpc.Channel('localhost:' + port_num);
- });
- after(function() {
- server.shutdown();
- });
- it('should receive echo responses', function(done) {
- var messages = ['echo1', 'echo2', 'echo3', 'echo4'];
- var stream = client.makeRequest(
- channel,
- 'echo',
- stringSerialize,
- stringDeserialize);
- for (var i = 0; i < messages.length; i++) {
- stream.write(messages[i]);
- }
- stream.end();
- var index = 0;
- stream.on('data', function(chunk) {
- assert.equal(messages[index], chunk);
- index += 1;
- });
- stream.on('status', function(status) {
- assert.equal(status.code, client.status.OK);
- });
- stream.on('end', function() {
- assert.equal(index, messages.length);
- done();
- });
- });
- it('should recieve metadata set by the server', function(done) {
- var stream = client.makeRequest(channel, 'metadata');
- stream.on('metadata', function(metadata) {
- assert.strictEqual(metadata.method[0].toString(), 'metadata');
- });
- stream.on('status', function(status) {
- assert.equal(status.code, client.status.OK);
- done();
- });
- stream.end();
- });
- it('should get an error status that the server throws', function(done) {
- var stream = client.makeRequest(channel, 'error');
-
- stream.on('data', function() {});
- stream.write(new Buffer('test'));
- stream.end();
- stream.on('status', function(status) {
- assert.equal(status.code, grpc.status.UNIMPLEMENTED);
- assert.equal(status.details, 'error details');
- done();
- });
- });
- it('should be able to cancel a call', function(done) {
- var stream = client.makeRequest(
- channel,
- 'cancellation',
- null,
- getDeadline(1));
-
- stream.cancel();
- stream.on('status', function(status) {
- assert.equal(status.code, grpc.status.CANCELLED);
- done();
- });
- });
- it('should get correct status for unimplemented method', function(done) {
- var stream = client.makeRequest(channel, 'unimplemented_method');
- stream.end();
- stream.on('status', function(status) {
- assert.equal(status.code, grpc.status.UNIMPLEMENTED);
- done();
- });
- });
-});
-/* TODO(mlumish): explore options for reducing duplication between this test
- * and the insecure echo client test */
-describe('secure echo client', function() {
- var server;
- var channel;
- before(function(done) {
- fs.readFile(ca_path, function(err, ca_data) {
- assert.ifError(err);
- fs.readFile(key_path, function(err, key_data) {
- assert.ifError(err);
- fs.readFile(pem_path, function(err, pem_data) {
- assert.ifError(err);
- var creds = grpc.Credentials.createSsl(ca_data);
- var server_creds = grpc.ServerCredentials.createSsl(null,
- key_data,
- pem_data);
-
- server = new Server(null, {'credentials' : server_creds});
- var port_num = server.bind('0.0.0.0:0', true);
- server.register('echo', echoHandler);
- server.start();
-
- channel = new grpc.Channel('localhost:' + port_num, {
- 'grpc.ssl_target_name_override' : 'foo.test.google.com',
- 'credentials' : creds
- });
- done();
- });
- });
- });
- });
- after(function() {
- server.shutdown();
- });
- it('should recieve echo responses', function(done) {
- var messages = ['echo1', 'echo2', 'echo3', 'echo4'];
- var stream = client.makeRequest(
- channel,
- 'echo',
- stringSerialize,
- stringDeserialize);
- for (var i = 0; i < messages.length; i++) {
- stream.write(messages[i]);
- }
- stream.end();
- var index = 0;
- stream.on('data', function(chunk) {
- assert.equal(messages[index], chunk);
- index += 1;
- });
- stream.on('status', function(status) {
- assert.equal(status.code, client.status.OK);
- });
- stream.on('end', function() {
- assert.equal(index, messages.length);
- done();
- });
- });
-});
diff --git a/src/node/test/constant_test.js b/src/node/test/constant_test.js
index 0138a55226..4d11e6f527 100644
--- a/src/node/test/constant_test.js
+++ b/src/node/test/constant_test.js
@@ -76,31 +76,6 @@ var callErrorNames = [
'INVALID_FLAGS'
];
-/**
- * List of all op error names
- * @const
- * @type {Array.<string>}
- */
-var opErrorNames = [
- 'OK',
- 'ERROR'
-];
-
-/**
- * List of all completion type names
- * @const
- * @type {Array.<string>}
- */
-var completionTypeNames = [
- 'QUEUE_SHUTDOWN',
- 'READ',
- 'WRITE_ACCEPTED',
- 'FINISH_ACCEPTED',
- 'CLIENT_METADATA_READ',
- 'FINISHED',
- 'SERVER_RPC_NEW'
-];
-
describe('constants', function() {
it('should have all of the status constants', function() {
for (var i = 0; i < statusNames.length; i++) {
@@ -114,16 +89,4 @@ describe('constants', function() {
'call error missing: ' + callErrorNames[i]);
}
});
- it('should have all of the op errors', function() {
- for (var i = 0; i < opErrorNames.length; i++) {
- assert(grpc.opError.hasOwnProperty(opErrorNames[i]),
- 'op error missing: ' + opErrorNames[i]);
- }
- });
- it('should have all of the completion types', function() {
- for (var i = 0; i < completionTypeNames.length; i++) {
- assert(grpc.completionType.hasOwnProperty(completionTypeNames[i]),
- 'completion type missing: ' + completionTypeNames[i]);
- }
- });
});
diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js
index 1f53df23f3..f8899beae8 100644
--- a/src/node/test/end_to_end_test.js
+++ b/src/node/test/end_to_end_test.js
@@ -74,40 +74,49 @@ describe('end-to-end', function() {
var status_text = 'xyz';
var call = new grpc.Call(channel,
'dummy_method',
- deadline);
- call.invoke(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.CLIENT_METADATA_READ);
- },function(event) {
- assert.strictEqual(event.type, grpc.completionType.FINISHED);
- var status = event.data;
- assert.strictEqual(status.code, grpc.status.OK);
- assert.strictEqual(status.details, status_text);
+ Infinity);
+ var client_batch = {};
+ client_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
+ client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
+ client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
+ client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
+ call.startBatch(client_batch, function(err, response) {
+ assert.ifError(err);
+ assert.deepEqual(response, {
+ 'send metadata': true,
+ 'client close': true,
+ 'metadata': {},
+ 'status': {
+ 'code': grpc.status.OK,
+ 'details': status_text,
+ 'metadata': {}
+ }
+ });
done();
- }, 0);
+ });
- server.requestCall(function(event) {
- assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW);
- var server_call = event.call;
+ server.requestCall(function(err, call_details) {
+ var new_call = call_details['new call'];
+ assert.notEqual(new_call, null);
+ var server_call = new_call.call;
assert.notEqual(server_call, null);
- server_call.serverAccept(function(event) {
- assert.strictEqual(event.type, grpc.completionType.FINISHED);
- }, 0);
- server_call.serverEndInitialMetadata(0);
- server_call.startWriteStatus(
- grpc.status.OK,
- status_text,
- function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- done();
- });
- });
- call.writesDone(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
+ var server_batch = {};
+ server_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
+ server_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
+ 'metadata': {},
+ 'code': grpc.status.OK,
+ 'details': status_text
+ };
+ server_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
+ server_call.startBatch(server_batch, function(err, response) {
+ assert.ifError(err);
+ assert.deepEqual(response, {
+ 'send metadata': true,
+ 'send status': true,
+ 'cancelled': false
+ });
+ done();
+ });
});
});
it('should successfully send and receive metadata', function(complete) {
@@ -117,115 +126,110 @@ describe('end-to-end', function() {
var status_text = 'xyz';
var call = new grpc.Call(channel,
'dummy_method',
- deadline);
- call.addMetadata({'client_key': ['client_value']});
- call.invoke(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.CLIENT_METADATA_READ);
- assert.strictEqual(event.data.server_key[0].toString(), 'server_value');
- },function(event) {
- assert.strictEqual(event.type, grpc.completionType.FINISHED);
- var status = event.data;
- assert.strictEqual(status.code, grpc.status.OK);
- assert.strictEqual(status.details, status_text);
+ Infinity);
+ var client_batch = {};
+ client_batch[grpc.opType.SEND_INITIAL_METADATA] = {
+ 'client_key': ['client_value']
+ };
+ client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
+ client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
+ client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
+ call.startBatch(client_batch, function(err, response) {
+ assert.ifError(err);
+ assert(response['send metadata']);
+ assert(response['client close']);
+ assert(response.hasOwnProperty('metadata'));
+ assert.strictEqual(response.metadata.server_key[0].toString(),
+ 'server_value');
+ assert.deepEqual(response.status, {'code': grpc.status.OK,
+ 'details': status_text,
+ 'metadata': {}});
done();
- }, 0);
+ });
- server.requestCall(function(event) {
- assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW);
- assert.strictEqual(event.data.metadata.client_key[0].toString(),
+ server.requestCall(function(err, call_details) {
+ var new_call = call_details['new call'];
+ assert.notEqual(new_call, null);
+ assert.strictEqual(new_call.metadata.client_key[0].toString(),
'client_value');
- var server_call = event.call;
+ var server_call = new_call.call;
assert.notEqual(server_call, null);
- server_call.serverAccept(function(event) {
- assert.strictEqual(event.type, grpc.completionType.FINISHED);
- }, 0);
- server_call.addMetadata({'server_key': ['server_value']});
- server_call.serverEndInitialMetadata(0);
- server_call.startWriteStatus(
- grpc.status.OK,
- status_text,
- function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- done();
- });
- });
- call.writesDone(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
+ var server_batch = {};
+ server_batch[grpc.opType.SEND_INITIAL_METADATA] = {
+ 'server_key': ['server_value']
+ };
+ server_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
+ 'metadata': {},
+ 'code': grpc.status.OK,
+ 'details': status_text
+ };
+ server_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
+ server_call.startBatch(server_batch, function(err, response) {
+ assert.ifError(err);
+ assert.deepEqual(response, {
+ 'send metadata': true,
+ 'send status': true,
+ 'cancelled': false
+ });
+ done();
+ });
});
});
it('should send and receive data without error', function(complete) {
var req_text = 'client_request';
var reply_text = 'server_response';
- var done = multiDone(complete, 6);
+ var done = multiDone(complete, 2);
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 3);
var status_text = 'success';
var call = new grpc.Call(channel,
'dummy_method',
- deadline);
- call.invoke(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.CLIENT_METADATA_READ);
- done();
- },function(event) {
- assert.strictEqual(event.type, grpc.completionType.FINISHED);
- var status = event.data;
- assert.strictEqual(status.code, grpc.status.OK);
- assert.strictEqual(status.details, status_text);
- done();
- }, 0);
- call.startWrite(
- new Buffer(req_text),
- function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.WRITE_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- call.writesDone(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- done();
- });
- }, 0);
- call.startRead(function(event) {
- assert.strictEqual(event.type, grpc.completionType.READ);
- assert.strictEqual(event.data.toString(), reply_text);
+ Infinity);
+ var client_batch = {};
+ client_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
+ client_batch[grpc.opType.SEND_MESSAGE] = new Buffer(req_text);
+ client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
+ client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
+ client_batch[grpc.opType.RECV_MESSAGE] = true;
+ client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
+ call.startBatch(client_batch, function(err, response) {
+ assert.ifError(err);
+ assert(response['send metadata']);
+ assert(response['client close']);
+ assert.deepEqual(response.metadata, {});
+ assert(response['send message']);
+ assert.strictEqual(response.read.toString(), reply_text);
+ assert.deepEqual(response.status, {'code': grpc.status.OK,
+ 'details': status_text,
+ 'metadata': {}});
done();
});
- server.requestCall(function(event) {
- assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW);
- var server_call = event.call;
+
+ server.requestCall(function(err, call_details) {
+ var new_call = call_details['new call'];
+ assert.notEqual(new_call, null);
+ var server_call = new_call.call;
assert.notEqual(server_call, null);
- server_call.serverAccept(function(event) {
- assert.strictEqual(event.type, grpc.completionType.FINISHED);
- done();
- });
- server_call.serverEndInitialMetadata(0);
- server_call.startRead(function(event) {
- assert.strictEqual(event.type, grpc.completionType.READ);
- assert.strictEqual(event.data.toString(), req_text);
- server_call.startWrite(
- new Buffer(reply_text),
- function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.WRITE_ACCEPTED);
- assert.strictEqual(event.data,
- grpc.opError.OK);
- server_call.startWriteStatus(
- grpc.status.OK,
- status_text,
- function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- done();
- });
- }, 0);
+ var server_batch = {};
+ server_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
+ server_batch[grpc.opType.RECV_MESSAGE] = true;
+ server_call.startBatch(server_batch, function(err, response) {
+ assert.ifError(err);
+ assert(response['send metadata']);
+ assert.strictEqual(response.read.toString(), req_text);
+ var response_batch = {};
+ response_batch[grpc.opType.SEND_MESSAGE] = new Buffer(reply_text);
+ response_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
+ 'metadata': {},
+ 'code': grpc.status.OK,
+ 'details': status_text
+ };
+ response_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
+ server_call.startBatch(response_batch, function(err, response) {
+ assert(response['send status']);
+ assert(!response['cancelled']);
+ done();
+ });
});
});
});
diff --git a/src/node/test/interop_sanity_test.js b/src/node/test/interop_sanity_test.js
index 7ecaad833d..81cd9fa5b9 100644
--- a/src/node/test/interop_sanity_test.js
+++ b/src/node/test/interop_sanity_test.js
@@ -56,7 +56,7 @@ describe('Interop tests', function() {
interop_client.runTest(port, name_override, 'empty_unary', true, done);
});
// This fails due to an unknown bug
- it.skip('should pass large_unary', function(done) {
+ it('should pass large_unary', function(done) {
interop_client.runTest(port, name_override, 'large_unary', true, done);
});
it('should pass client_streaming', function(done) {
diff --git a/src/node/test/math_client_test.js b/src/node/test/math_client_test.js
index 0e365bf870..61b4a2fa2f 100644
--- a/src/node/test/math_client_test.js
+++ b/src/node/test/math_client_test.js
@@ -63,9 +63,6 @@ describe('Math client', function() {
assert.ifError(err);
assert.equal(value.quotient, 1);
assert.equal(value.remainder, 3);
- });
- call.on('status', function checkStatus(status) {
- assert.strictEqual(status.code, grpc.status.OK);
done();
});
});
diff --git a/src/node/test/server_test.js b/src/node/test/server_test.js
deleted file mode 100644
index a3e1edf50f..0000000000
--- a/src/node/test/server_test.js
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-var assert = require('assert');
-var grpc = require('bindings')('grpc.node');
-var Server = require('../src/server');
-
-/**
- * This is used for testing functions with multiple asynchronous calls that
- * can happen in different orders. This should be passed the number of async
- * function invocations that can occur last, and each of those should call this
- * function's return value
- * @param {function()} done The function that should be called when a test is
- * complete.
- * @param {number} count The number of calls to the resulting function if the
- * test passes.
- * @return {function()} The function that should be called at the end of each
- * sequence of asynchronous functions.
- */
-function multiDone(done, count) {
- return function() {
- count -= 1;
- if (count <= 0) {
- done();
- }
- };
-}
-
-/**
- * Responds to every request with the same data as a response
- * @param {Stream} stream
- */
-function echoHandler(stream) {
- stream.pipe(stream);
-}
-
-describe('echo server', function() {
- var server;
- var channel;
- before(function() {
- server = new Server();
- var port_num = server.bind('[::]:0');
- server.register('echo', echoHandler);
- server.start();
-
- channel = new grpc.Channel('localhost:' + port_num);
- });
- after(function() {
- server.shutdown();
- });
- it('should echo inputs as responses', function(done) {
- done = multiDone(done, 4);
-
- var req_text = 'echo test string';
- var status_text = 'OK';
-
- var deadline = new Date();
- deadline.setSeconds(deadline.getSeconds() + 3);
- var call = new grpc.Call(channel,
- 'echo',
- deadline);
- call.invoke(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.CLIENT_METADATA_READ);
- done();
- },function(event) {
- assert.strictEqual(event.type, grpc.completionType.FINISHED);
- var status = event.data;
- assert.strictEqual(status.code, grpc.status.OK);
- assert.strictEqual(status.details, status_text);
- done();
- }, 0);
- call.startWrite(
- new Buffer(req_text),
- function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.WRITE_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- call.writesDone(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- done();
- });
- }, 0);
- call.startRead(function(event) {
- assert.strictEqual(event.type, grpc.completionType.READ);
- assert.strictEqual(event.data.toString(), req_text);
- done();
- });
- });
-});
diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js
index 1038f9ab33..34e4ab4013 100644
--- a/src/node/test/surface_test.js
+++ b/src/node/test/surface_test.js
@@ -33,9 +33,9 @@
var assert = require('assert');
-var surface_server = require('../src/surface_server.js');
+var surface_server = require('../src/server.js');
-var surface_client = require('../src/surface_client.js');
+var surface_client = require('../src/client.js');
var ProtoBuf = require('protobufjs');