aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/node
diff options
context:
space:
mode:
Diffstat (limited to 'src/node')
-rw-r--r--src/node/binding.gyp9
-rw-r--r--src/node/ext/call.cc665
-rw-r--r--src/node/ext/call.h59
-rw-r--r--src/node/ext/completion_queue_async_worker.cc24
-rw-r--r--src/node/ext/completion_queue_async_worker.h2
-rw-r--r--src/node/ext/node_grpc.cc58
-rw-r--r--src/node/ext/server.cc64
-rw-r--r--src/node/ext/tag.cc284
-rw-r--r--src/node/ext/tag.h33
-rw-r--r--src/node/test/call_test.js124
-rw-r--r--src/node/test/constant_test.js37
-rw-r--r--src/node/test/end_to_end_test.js246
12 files changed, 1057 insertions, 548 deletions
diff --git a/src/node/binding.gyp b/src/node/binding.gyp
index cf2a6acb04..fb4c779f8e 100644
--- a/src/node/binding.gyp
+++ b/src/node/binding.gyp
@@ -9,14 +9,15 @@
'include_dirs': [
"<!(nodejs -e \"require('nan')\")"
],
- 'cxxflags': [
+ 'cflags': [
+ '-std=c++11',
'-Wall',
'-pthread',
'-pedantic',
'-g',
'-zdefs'
- '-Werror',
- ],
+ '-Werror'
+ ],
'ldflags': [
'-g'
],
@@ -33,11 +34,9 @@
"ext/channel.cc",
"ext/completion_queue_async_worker.cc",
"ext/credentials.cc",
- "ext/event.cc",
"ext/node_grpc.cc",
"ext/server.cc",
"ext/server_credentials.cc",
- "ext/tag.cc",
"ext/timeval.cc"
],
'conditions' : [
diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc
index 23aead07b2..9a6359fe44 100644
--- a/src/node/ext/call.cc
+++ b/src/node/ext/call.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2014, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -31,17 +31,23 @@
*
*/
+#include <memory>
+#include <vector>
+#include <map>
+
#include <node.h>
#include "grpc/support/log.h"
#include "grpc/grpc.h"
+#include "grpc/support/alloc.h"
#include "grpc/support/time.h"
#include "byte_buffer.h"
#include "call.h"
#include "channel.h"
#include "completion_queue_async_worker.h"
#include "timeval.h"
-#include "tag.h"
+
+using std::unique_ptr;
namespace grpc {
namespace node {
@@ -49,6 +55,7 @@ namespace node {
using ::node::Buffer;
using v8::Arguments;
using v8::Array;
+using v8::Boolean;
using v8::Exception;
using v8::External;
using v8::Function;
@@ -68,6 +75,372 @@ using v8::Value;
Persistent<Function> Call::constructor;
Persistent<FunctionTemplate> Call::fun_tpl;
+
+bool CreateMetadataArray(
+ Handle<Object> metadata, grpc_metadata_array *array,
+ std::vector<unique_ptr<NanUtf8String> > *string_handles,
+ std::vector<unique_ptr<PersistentHolder> > *handles) {
+ NanScope();
+ grpc_metadata_array_init(array);
+ Handle<Array> keys(metadata->GetOwnPropertyNames());
+ for (unsigned int i = 0; i < keys->Length(); i++) {
+ Handle<String> current_key(keys->Get(i)->ToString());
+ if (!metadata->Get(current_key)->IsArray()) {
+ return false;
+ }
+ array->capacity += Local<Array>::Cast(metadata->Get(current_key))->Length();
+ }
+ array->metadata = reinterpret_cast<grpc_metadata*>(
+ gpr_malloc(array->capacity * sizeof(grpc_metadata)));
+ for (unsigned int i = 0; i < keys->Length(); i++) {
+ Handle<String> current_key(keys->Get(i)->ToString());
+ NanUtf8String *utf8_key = new NanUtf8String(current_key);
+ string_handles->push_back(unique_ptr<NanUtf8String>(utf8_key));
+ Handle<Array> values = Local<Array>::Cast(metadata->Get(current_key));
+ for (unsigned int j = 0; j < values->Length(); j++) {
+ Handle<Value> value = values->Get(j);
+ grpc_metadata *current = &array->metadata[array->count];
+ current->key = **utf8_key;
+ if (Buffer::HasInstance(value)) {
+ current->value = Buffer::Data(value);
+ current->value_length = Buffer::Length(value);
+ Persistent<Value> handle;
+ NanAssignPersistent(handle, value);
+ handles->push_back(unique_ptr<PersistentHolder>(
+ new PersistentHolder(handle)));
+ } else if (value->IsString()) {
+ Handle<String> string_value = value->ToString();
+ NanUtf8String *utf8_value = new NanUtf8String(string_value);
+ string_handles->push_back(unique_ptr<NanUtf8String>(utf8_value));
+ current->value = **utf8_value;
+ current->value_length = string_value->Length();
+ } else {
+ return false;
+ }
+ array->count += 1;
+ }
+ }
+ return true;
+}
+
+Handle<Value> ParseMetadata(const grpc_metadata_array *metadata_array) {
+ NanEscapableScope();
+ grpc_metadata *metadata_elements = metadata_array->metadata;
+ size_t length = metadata_array->count;
+ std::map<const char*, size_t> size_map;
+ std::map<const char*, size_t> index_map;
+
+ for (unsigned int i = 0; i < length; i++) {
+ const char *key = metadata_elements[i].key;
+ if (size_map.count(key)) {
+ size_map[key] += 1;
+ }
+ index_map[key] = 0;
+ }
+ Handle<Object> metadata_object = NanNew<Object>();
+ for (unsigned int i = 0; i < length; i++) {
+ grpc_metadata* elem = &metadata_elements[i];
+ Handle<String> key_string = String::New(elem->key);
+ Handle<Array> array;
+ if (metadata_object->Has(key_string)) {
+ array = Handle<Array>::Cast(metadata_object->Get(key_string));
+ } else {
+ array = NanNew<Array>(size_map[elem->key]);
+ metadata_object->Set(key_string, array);
+ }
+ array->Set(index_map[elem->key],
+ MakeFastBuffer(
+ NanNewBufferHandle(elem->value, elem->value_length)));
+ index_map[elem->key] += 1;
+ }
+ return NanEscapeScope(metadata_object);
+}
+
+Handle<Value> Op::GetOpType() const {
+ NanEscapableScope();
+ return NanEscapeScope(NanNew<String>(GetTypeString()));
+}
+
+class SendMetadataOp : public Op {
+ public:
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ return NanEscapeScope(NanTrue());
+ }
+ bool ParseOp(Handle<Value> value, grpc_op *out,
+ std::vector<unique_ptr<NanUtf8String> > *strings,
+ std::vector<unique_ptr<PersistentHolder> > *handles) {
+ if (!value->IsObject()) {
+ return false;
+ }
+ grpc_metadata_array array;
+ if (!CreateMetadataArray(value->ToObject(), &array, strings, handles)) {
+ return false;
+ }
+ out->data.send_initial_metadata.count = array.count;
+ out->data.send_initial_metadata.metadata = array.metadata;
+ return true;
+ }
+ protected:
+ std::string GetTypeString() const {
+ return "send metadata";
+ }
+};
+
+class SendMessageOp : public Op {
+ public:
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ return NanEscapeScope(NanTrue());
+ }
+ bool ParseOp(Handle<Value> value, grpc_op *out,
+ std::vector<unique_ptr<NanUtf8String> > *strings,
+ std::vector<unique_ptr<PersistentHolder> > *handles) {
+ if (!Buffer::HasInstance(value)) {
+ return false;
+ }
+ out->data.send_message = BufferToByteBuffer(value);
+ Persistent<Value> handle;
+ NanAssignPersistent(handle, value);
+ handles->push_back(unique_ptr<PersistentHolder>(
+ new PersistentHolder(handle)));
+ return true;
+ }
+ protected:
+ std::string GetTypeString() const {
+ return "send message";
+ }
+};
+
+class SendClientCloseOp : public Op {
+ public:
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ return NanEscapeScope(NanTrue());
+ }
+ bool ParseOp(Handle<Value> value, grpc_op *out,
+ std::vector<unique_ptr<NanUtf8String> > *strings,
+ std::vector<unique_ptr<PersistentHolder> > *handles) {
+ return true;
+ }
+ protected:
+ std::string GetTypeString() const {
+ return "client close";
+ }
+};
+
+class SendServerStatusOp : public Op {
+ public:
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ return NanEscapeScope(NanTrue());
+ }
+ bool ParseOp(Handle<Value> value, grpc_op *out,
+ std::vector<unique_ptr<NanUtf8String> > *strings,
+ std::vector<unique_ptr<PersistentHolder> > *handles) {
+ if (!value->IsObject()) {
+ return false;
+ }
+ Handle<Object> server_status = value->ToObject();
+ if (!server_status->Get(NanNew("metadata"))->IsObject()) {
+ return false;
+ }
+ if (!server_status->Get(NanNew("code"))->IsUint32()) {
+ return false;
+ }
+ if (!server_status->Get(NanNew("details"))->IsString()) {
+ return false;
+ }
+ grpc_metadata_array array;
+ if (!CreateMetadataArray(server_status->Get(NanNew("metadata"))->
+ ToObject(),
+ &array, strings, handles)) {
+ return false;
+ }
+ out->data.send_status_from_server.trailing_metadata_count = array.count;
+ out->data.send_status_from_server.trailing_metadata = array.metadata;
+ out->data.send_status_from_server.status =
+ static_cast<grpc_status_code>(
+ server_status->Get(NanNew("code"))->Uint32Value());
+ NanUtf8String *str = new NanUtf8String(
+ server_status->Get(NanNew("details")));
+ strings->push_back(unique_ptr<NanUtf8String>(str));
+ out->data.send_status_from_server.status_details = **str;
+ return true;
+ }
+ protected:
+ std::string GetTypeString() const {
+ return "send status";
+ }
+};
+
+class GetMetadataOp : public Op {
+ public:
+ GetMetadataOp() {
+ grpc_metadata_array_init(&recv_metadata);
+ }
+
+ ~GetMetadataOp() {
+ grpc_metadata_array_destroy(&recv_metadata);
+ }
+
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ return NanEscapeScope(ParseMetadata(&recv_metadata));
+ }
+
+ bool ParseOp(Handle<Value> value, grpc_op *out,
+ std::vector<unique_ptr<NanUtf8String> > *strings,
+ std::vector<unique_ptr<PersistentHolder> > *handles) {
+ out->data.recv_initial_metadata = &recv_metadata;
+ return true;
+ }
+
+ protected:
+ std::string GetTypeString() const {
+ return "metadata";
+ }
+
+ private:
+ grpc_metadata_array recv_metadata;
+};
+
+class ReadMessageOp : public Op {
+ public:
+ ReadMessageOp() {
+ recv_message = NULL;
+ }
+ ~ReadMessageOp() {
+ if (recv_message != NULL) {
+ gpr_free(recv_message);
+ }
+ }
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ return NanEscapeScope(ByteBufferToBuffer(recv_message));
+ }
+
+ bool ParseOp(Handle<Value> value, grpc_op *out,
+ std::vector<unique_ptr<NanUtf8String> > *strings,
+ std::vector<unique_ptr<PersistentHolder> > *handles) {
+ out->data.recv_message = &recv_message;
+ return true;
+ }
+
+ protected:
+ std::string GetTypeString() const {
+ return "read";
+ }
+
+ private:
+ grpc_byte_buffer *recv_message;
+};
+
+class ClientStatusOp : public Op {
+ public:
+ ClientStatusOp() {
+ grpc_metadata_array_init(&metadata_array);
+ status_details = NULL;
+ details_capacity = 0;
+ }
+
+ ~ClientStatusOp() {
+ grpc_metadata_array_destroy(&metadata_array);
+ gpr_free(status_details);
+ }
+
+ bool ParseOp(Handle<Value> value, grpc_op *out,
+ std::vector<unique_ptr<NanUtf8String> > *strings,
+ std::vector<unique_ptr<PersistentHolder> > *handles) {
+ out->data.recv_status_on_client.trailing_metadata = &metadata_array;
+ out->data.recv_status_on_client.status = &status;
+ out->data.recv_status_on_client.status_details = &status_details;
+ out->data.recv_status_on_client.status_details_capacity = &details_capacity;
+ return true;
+ }
+
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ Handle<Object> status_obj = NanNew<Object>();
+ status_obj->Set(NanNew("code"), NanNew<Number>(status));
+ if (status_details != NULL) {
+ status_obj->Set(NanNew("details"), String::New(status_details));
+ }
+ status_obj->Set(NanNew("metadata"), ParseMetadata(&metadata_array));
+ return NanEscapeScope(status_obj);
+ }
+ protected:
+ std::string GetTypeString() const {
+ return "status";
+ }
+ private:
+ grpc_metadata_array metadata_array;
+ grpc_status_code status;
+ char *status_details;
+ size_t details_capacity;
+};
+
+class ServerCloseResponseOp : public Op {
+ public:
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ return NanEscapeScope(NanNew<Boolean>(cancelled));
+ }
+
+ bool ParseOp(Handle<Value> value, grpc_op *out,
+ std::vector<unique_ptr<NanUtf8String> > *strings,
+ std::vector<unique_ptr<PersistentHolder> > *handles) {
+ out->data.recv_close_on_server.cancelled = &cancelled;
+ return true;
+ }
+
+ protected:
+ std::string GetTypeString() const {
+ return "cancelled";
+ }
+
+ private:
+ int cancelled;
+};
+
+tag::tag(NanCallback *callback, std::vector<unique_ptr<Op> > *ops,
+ std::vector<unique_ptr<PersistentHolder> > *handles,
+ std::vector<unique_ptr<NanUtf8String> > *strings) :
+ callback(callback), ops(ops), handles(handles), strings(strings){
+}
+tag::~tag() {
+ delete callback;
+ delete ops;
+ if (handles != NULL) {
+ delete handles;
+ }
+ if (strings != NULL) {
+ delete strings;
+ }
+}
+
+Handle<Value> GetTagNodeValue(void *tag) {
+ NanEscapableScope();
+ struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
+ Handle<Object> tag_obj = NanNew<Object>();
+ for (std::vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin();
+ it != tag_struct->ops->end(); ++it) {
+ Op *op_ptr = it->get();
+ tag_obj->Set(op_ptr->GetOpType(), op_ptr->GetNodeValue());
+ }
+ return NanEscapeScope(tag_obj);
+}
+
+NanCallback GetTagCallback(void *tag) {
+ struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
+ return *tag_struct->callback;
+}
+
+void DestroyTag(void *tag) {
+ struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
+ delete tag_struct;
+}
+
Call::Call(grpc_call *call) : wrapped_call(call) {}
Call::~Call() { grpc_call_destroy(wrapped_call); }
@@ -77,28 +450,10 @@ void Call::Init(Handle<Object> exports) {
Local<FunctionTemplate> tpl = FunctionTemplate::New(New);
tpl->SetClassName(NanNew("Call"));
tpl->InstanceTemplate()->SetInternalFieldCount(1);
- NanSetPrototypeTemplate(tpl, "addMetadata",
- FunctionTemplate::New(AddMetadata)->GetFunction());
- NanSetPrototypeTemplate(tpl, "invoke",
- FunctionTemplate::New(Invoke)->GetFunction());
- NanSetPrototypeTemplate(tpl, "serverAccept",
- FunctionTemplate::New(ServerAccept)->GetFunction());
- NanSetPrototypeTemplate(
- tpl, "serverEndInitialMetadata",
- FunctionTemplate::New(ServerEndInitialMetadata)->GetFunction());
+ NanSetPrototypeTemplate(tpl, "startBatch",
+ FunctionTemplate::New(StartBatch)->GetFunction());
NanSetPrototypeTemplate(tpl, "cancel",
FunctionTemplate::New(Cancel)->GetFunction());
- NanSetPrototypeTemplate(tpl, "startWrite",
- FunctionTemplate::New(StartWrite)->GetFunction());
- NanSetPrototypeTemplate(
- tpl, "startWriteStatus",
- FunctionTemplate::New(StartWriteStatus)->GetFunction());
- NanSetPrototypeTemplate(tpl, "writesDone",
- FunctionTemplate::New(WritesDone)->GetFunction());
- NanSetPrototypeTemplate(tpl, "startReadMetadata",
- FunctionTemplate::New(WritesDone)->GetFunction());
- NanSetPrototypeTemplate(tpl, "startRead",
- FunctionTemplate::New(StartRead)->GetFunction());
NanAssignPersistent(fun_tpl, tpl);
NanAssignPersistent(constructor, tpl->GetFunction());
constructor->Set(NanNew("WRITE_BUFFER_HINT"),
@@ -152,9 +507,9 @@ NAN_METHOD(Call::New) {
NanUtf8String method(args[1]);
double deadline = args[2]->NumberValue();
grpc_channel *wrapped_channel = channel->GetWrappedChannel();
- grpc_call *wrapped_call = grpc_channel_create_call_old(
- wrapped_channel, *method, channel->GetHost(),
- MillisecondsToTimespec(deadline));
+ grpc_call *wrapped_call = grpc_channel_create_call(
+ wrapped_channel, CompletionQueueAsyncWorker::GetQueue(), *method,
+ channel->GetHost(), MillisecondsToTimespec(deadline));
call = new Call(wrapped_call);
args.This()->SetHiddenValue(String::NewSymbol("channel_"),
channel_object);
@@ -168,119 +523,78 @@ NAN_METHOD(Call::New) {
}
}
-NAN_METHOD(Call::AddMetadata) {
+NAN_METHOD(Call::StartBatch) {
NanScope();
if (!HasInstance(args.This())) {
- return NanThrowTypeError("addMetadata can only be called on Call objects");
+ return NanThrowTypeError("startBatch can only be called on Call objects");
}
- Call *call = ObjectWrap::Unwrap<Call>(args.This());
if (!args[0]->IsObject()) {
- return NanThrowTypeError("addMetadata's first argument must be an object");
- }
- Handle<Object> metadata = args[0]->ToObject();
- Handle<Array> keys(metadata->GetOwnPropertyNames());
- for (unsigned int i = 0; i < keys->Length(); i++) {
- Handle<String> current_key(keys->Get(i)->ToString());
- if (!metadata->Get(current_key)->IsArray()) {
- return NanThrowTypeError(
- "addMetadata's first argument's values must be arrays");
- }
- NanUtf8String utf8_key(current_key);
- Handle<Array> values = Local<Array>::Cast(metadata->Get(current_key));
- for (unsigned int j = 0; j < values->Length(); j++) {
- Handle<Value> value = values->Get(j);
- grpc_metadata metadata;
- grpc_call_error error;
- metadata.key = *utf8_key;
- if (Buffer::HasInstance(value)) {
- metadata.value = Buffer::Data(value);
- metadata.value_length = Buffer::Length(value);
- error = grpc_call_add_metadata_old(call->wrapped_call, &metadata, 0);
- } else if (value->IsString()) {
- Handle<String> string_value = value->ToString();
- NanUtf8String utf8_value(string_value);
- metadata.value = *utf8_value;
- metadata.value_length = string_value->Length();
- gpr_log(GPR_DEBUG, "adding metadata: %s, %s, %d", metadata.key,
- metadata.value, metadata.value_length);
- error = grpc_call_add_metadata_old(call->wrapped_call, &metadata, 0);
- } else {
- return NanThrowTypeError(
- "addMetadata values must be strings or buffers");
- }
- if (error != GRPC_CALL_OK) {
- return NanThrowError("addMetadata failed", error);
- }
- }
- }
- NanReturnUndefined();
-}
-
-NAN_METHOD(Call::Invoke) {
- NanScope();
- if (!HasInstance(args.This())) {
- return NanThrowTypeError("invoke can only be called on Call objects");
- }
- if (!args[0]->IsFunction()) {
- return NanThrowTypeError("invoke's first argument must be a function");
+ return NanThrowError("startBatch's first argument must be an object");
}
if (!args[1]->IsFunction()) {
- return NanThrowTypeError("invoke's second argument must be a function");
- }
- if (!args[2]->IsUint32()) {
- return NanThrowTypeError("invoke's third argument must be integer flags");
+ return NanThrowError("startBatch's second argument must be a callback");
}
Call *call = ObjectWrap::Unwrap<Call>(args.This());
- unsigned int flags = args[3]->Uint32Value();
- grpc_call_error error = grpc_call_invoke_old(
- call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(),
- CreateTag(args[0], args.This()), CreateTag(args[1], args.This()), flags);
- if (error == GRPC_CALL_OK) {
- CompletionQueueAsyncWorker::Next();
- CompletionQueueAsyncWorker::Next();
- } else {
- return NanThrowError("invoke failed", error);
- }
- NanReturnUndefined();
-}
-
-NAN_METHOD(Call::ServerAccept) {
- NanScope();
- if (!HasInstance(args.This())) {
- return NanThrowTypeError("accept can only be called on Call objects");
- }
- if (!args[0]->IsFunction()) {
- return NanThrowTypeError("accept's first argument must be a function");
- }
- Call *call = ObjectWrap::Unwrap<Call>(args.This());
- grpc_call_error error = grpc_call_server_accept_old(
- call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(),
- CreateTag(args[0], args.This()));
- if (error == GRPC_CALL_OK) {
- CompletionQueueAsyncWorker::Next();
- } else {
- return NanThrowError("serverAccept failed", error);
- }
- NanReturnUndefined();
-}
-
-NAN_METHOD(Call::ServerEndInitialMetadata) {
- NanScope();
- if (!HasInstance(args.This())) {
- return NanThrowTypeError(
- "serverEndInitialMetadata can only be called on Call objects");
- }
- if (!args[0]->IsUint32()) {
- return NanThrowTypeError(
- "serverEndInitialMetadata's second argument must be integer flags");
+ std::vector<unique_ptr<PersistentHolder> > *handles =
+ new std::vector<unique_ptr<PersistentHolder> >();
+ std::vector<unique_ptr<NanUtf8String> > *strings =
+ new std::vector<unique_ptr<NanUtf8String> >();
+ Persistent<Value> handle;
+ Handle<Object> obj = args[0]->ToObject();
+ Handle<Array> keys = obj->GetOwnPropertyNames();
+ size_t nops = keys->Length();
+ grpc_op *ops = new grpc_op[nops];
+ std::vector<unique_ptr<Op> > *op_vector = new std::vector<unique_ptr<Op> >();
+ for (unsigned int i = 0; i < nops; i++) {
+ Op *op;
+ if (!keys->Get(i)->IsUint32()) {
+ return NanThrowError(
+ "startBatch's first argument's keys must be integers");
+ }
+ uint32_t type = keys->Get(i)->Uint32Value();
+ ops[i].op = static_cast<grpc_op_type>(type);
+ switch (type) {
+ case GRPC_OP_SEND_INITIAL_METADATA:
+ op = new SendMetadataOp();
+ break;
+ case GRPC_OP_SEND_MESSAGE:
+ op = new SendMessageOp();
+ break;
+ case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+ op = new SendClientCloseOp();
+ break;
+ case GRPC_OP_SEND_STATUS_FROM_SERVER:
+ op = new SendServerStatusOp();
+ break;
+ case GRPC_OP_RECV_INITIAL_METADATA:
+ op = new GetMetadataOp();
+ break;
+ case GRPC_OP_RECV_MESSAGE:
+ op = new ReadMessageOp();
+ break;
+ case GRPC_OP_RECV_STATUS_ON_CLIENT:
+ op = new ClientStatusOp();
+ break;
+ case GRPC_OP_RECV_CLOSE_ON_SERVER:
+ op = new ServerCloseResponseOp();
+ break;
+ default:
+ return NanThrowError("Argument object had an unrecognized key");
+ }
+ if (!op->ParseOp(obj->Get(type), &ops[i], strings, handles)) {
+ return NanThrowTypeError("Incorrectly typed arguments to startBatch");
+ }
+ op_vector->push_back(unique_ptr<Op>(op));
}
- Call *call = ObjectWrap::Unwrap<Call>(args.This());
- unsigned int flags = args[1]->Uint32Value();
- grpc_call_error error =
- grpc_call_server_end_initial_metadata_old(call->wrapped_call, flags);
+ grpc_call_error error = grpc_call_start_batch(
+ call->wrapped_call, ops, nops, new struct tag(
+ new NanCallback(args[1].As<Function>()),
+ op_vector, handles,
+ strings));
if (error != GRPC_CALL_OK) {
- return NanThrowError("serverEndInitialMetadata failed", error);
+ return NanThrowError("startBatch failed", error);
}
+ CompletionQueueAsyncWorker::Next();
NanReturnUndefined();
}
@@ -297,102 +611,5 @@ NAN_METHOD(Call::Cancel) {
NanReturnUndefined();
}
-NAN_METHOD(Call::StartWrite) {
- NanScope();
- if (!HasInstance(args.This())) {
- return NanThrowTypeError("startWrite can only be called on Call objects");
- }
- if (!Buffer::HasInstance(args[0])) {
- return NanThrowTypeError("startWrite's first argument must be a Buffer");
- }
- if (!args[1]->IsFunction()) {
- return NanThrowTypeError("startWrite's second argument must be a function");
- }
- if (!args[2]->IsUint32()) {
- return NanThrowTypeError(
- "startWrite's third argument must be integer flags");
- }
- Call *call = ObjectWrap::Unwrap<Call>(args.This());
- grpc_byte_buffer *buffer = BufferToByteBuffer(args[0]);
- unsigned int flags = args[2]->Uint32Value();
- grpc_call_error error = grpc_call_start_write_old(
- call->wrapped_call, buffer, CreateTag(args[1], args.This()), flags);
- if (error == GRPC_CALL_OK) {
- CompletionQueueAsyncWorker::Next();
- } else {
- return NanThrowError("startWrite failed", error);
- }
- NanReturnUndefined();
-}
-
-NAN_METHOD(Call::StartWriteStatus) {
- NanScope();
- if (!HasInstance(args.This())) {
- return NanThrowTypeError(
- "startWriteStatus can only be called on Call objects");
- }
- if (!args[0]->IsUint32()) {
- return NanThrowTypeError(
- "startWriteStatus's first argument must be a status code");
- }
- if (!args[1]->IsString()) {
- return NanThrowTypeError(
- "startWriteStatus's second argument must be a string");
- }
- if (!args[2]->IsFunction()) {
- return NanThrowTypeError(
- "startWriteStatus's third argument must be a function");
- }
- Call *call = ObjectWrap::Unwrap<Call>(args.This());
- NanUtf8String details(args[1]);
- grpc_call_error error = grpc_call_start_write_status_old(
- call->wrapped_call, (grpc_status_code)args[0]->Uint32Value(), *details,
- CreateTag(args[2], args.This()));
- if (error == GRPC_CALL_OK) {
- CompletionQueueAsyncWorker::Next();
- } else {
- return NanThrowError("startWriteStatus failed", error);
- }
- NanReturnUndefined();
-}
-
-NAN_METHOD(Call::WritesDone) {
- NanScope();
- if (!HasInstance(args.This())) {
- return NanThrowTypeError("writesDone can only be called on Call objects");
- }
- if (!args[0]->IsFunction()) {
- return NanThrowTypeError("writesDone's first argument must be a function");
- }
- Call *call = ObjectWrap::Unwrap<Call>(args.This());
- grpc_call_error error = grpc_call_writes_done_old(
- call->wrapped_call, CreateTag(args[0], args.This()));
- if (error == GRPC_CALL_OK) {
- CompletionQueueAsyncWorker::Next();
- } else {
- return NanThrowError("writesDone failed", error);
- }
- NanReturnUndefined();
-}
-
-NAN_METHOD(Call::StartRead) {
- NanScope();
- if (!HasInstance(args.This())) {
- return NanThrowTypeError("startRead can only be called on Call objects");
- }
- if (!args[0]->IsFunction()) {
- return NanThrowTypeError("startRead's first argument must be a function");
- }
- Call *call = ObjectWrap::Unwrap<Call>(args.This());
- grpc_call_error error = grpc_call_start_read_old(
- call->wrapped_call, CreateTag(args[0], args.This()));
- if (error == GRPC_CALL_OK) {
- CompletionQueueAsyncWorker::Next();
- } else {
- return NanThrowError("startRead failed", error);
- }
- NanReturnUndefined();
-}
-
} // namespace node
} // namespace grpc
diff --git a/src/node/ext/call.h b/src/node/ext/call.h
index 1924a1bf42..434bcf8a63 100644
--- a/src/node/ext/call.h
+++ b/src/node/ext/call.h
@@ -34,6 +34,9 @@
#ifndef NET_GRPC_NODE_CALL_H_
#define NET_GRPC_NODE_CALL_H_
+#include <memory>
+#include <vector>
+
#include <node.h>
#include <nan.h>
#include "grpc/grpc.h"
@@ -43,6 +46,53 @@
namespace grpc {
namespace node {
+using std::unique_ptr;
+
+v8::Handle<v8::Value> ParseMetadata(const grpc_metadata_array *metadata_array);
+
+class PersistentHolder {
+ public:
+ explicit PersistentHolder(v8::Persistent<v8::Value> persist) :
+ persist(persist) {
+ }
+
+ ~PersistentHolder() {
+ persist.Dispose();
+}
+
+ private:
+ v8::Persistent<v8::Value> persist;
+};
+
+class Op {
+ public:
+ virtual v8::Handle<v8::Value> GetNodeValue() const = 0;
+ virtual bool ParseOp(v8::Handle<v8::Value> value, grpc_op *out,
+ std::vector<unique_ptr<NanUtf8String> > *strings,
+ std::vector<unique_ptr<PersistentHolder> > *handles) = 0;
+ v8::Handle<v8::Value> GetOpType() const;
+
+ protected:
+ virtual std::string GetTypeString() const = 0;
+};
+
+struct tag {
+ tag(NanCallback *callback, std::vector<unique_ptr<Op> > *ops,
+ std::vector<unique_ptr<PersistentHolder> > *handles,
+ std::vector<unique_ptr<NanUtf8String> > *strings);
+ ~tag();
+ NanCallback *callback;
+ std::vector<unique_ptr<Op> > *ops;
+ std::vector<unique_ptr<PersistentHolder> > *handles;
+ std::vector<unique_ptr<NanUtf8String> > *strings;
+};
+
+v8::Handle<v8::Value> GetTagNodeValue(void *tag);
+
+NanCallback GetTagCallback(void *tag);
+
+void DestroyTag(void *tag);
+
/* Wrapper class for grpc_call structs. */
class Call : public ::node::ObjectWrap {
public:
@@ -60,15 +110,8 @@ class Call : public ::node::ObjectWrap {
Call &operator=(const Call &);
static NAN_METHOD(New);
- static NAN_METHOD(AddMetadata);
- static NAN_METHOD(Invoke);
- static NAN_METHOD(ServerAccept);
- static NAN_METHOD(ServerEndInitialMetadata);
+ static NAN_METHOD(StartBatch);
static NAN_METHOD(Cancel);
- static NAN_METHOD(StartWrite);
- static NAN_METHOD(StartWriteStatus);
- static NAN_METHOD(WritesDone);
- static NAN_METHOD(StartRead);
static v8::Persistent<v8::Function> constructor;
// Used for typechecking instances of this javascript class
static v8::Persistent<v8::FunctionTemplate> fun_tpl;
diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc
index 8de7db66d5..5c0e27e6a7 100644
--- a/src/node/ext/completion_queue_async_worker.cc
+++ b/src/node/ext/completion_queue_async_worker.cc
@@ -37,8 +37,7 @@
#include "grpc/grpc.h"
#include "grpc/support/time.h"
#include "completion_queue_async_worker.h"
-#include "event.h"
-#include "tag.h"
+#include "call.h"
namespace grpc {
namespace node {
@@ -58,6 +57,9 @@ CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {}
void CompletionQueueAsyncWorker::Execute() {
result = grpc_completion_queue_next(queue, gpr_inf_future);
+ if (result->data.op_complete != GRPC_OP_OK) {
+ SetErrorMessage("The batch encountered an error");
+ }
}
grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; }
@@ -75,14 +77,26 @@ void CompletionQueueAsyncWorker::Init(Handle<Object> exports) {
void CompletionQueueAsyncWorker::HandleOKCallback() {
NanScope();
- NanCallback event_callback(GetTagHandle(result->tag).As<Function>());
- Handle<Value> argv[] = {CreateEventObject(result)};
+ NanCallback callback = GetTagCallback(result->tag);
+ Handle<Value> argv[] = {NanNull(), GetTagNodeValue(result->tag)};
DestroyTag(result->tag);
grpc_event_finish(result);
result = NULL;
- event_callback.Call(1, argv);
+ callback.Call(2, argv);
+}
+
+void CompletionQueueAsyncWorker::HandleErrorCallback() {
+ NanScope();
+ NanCallback callback = GetTagCallback(result->tag);
+ Handle<Value> argv[] = {NanError(ErrorMessage())};
+
+ DestroyTag(result->tag);
+ grpc_event_finish(result);
+ result = NULL;
+
+ callback.Call(1, argv);
}
} // namespace node
diff --git a/src/node/ext/completion_queue_async_worker.h b/src/node/ext/completion_queue_async_worker.h
index 2c928b7024..c04a303283 100644
--- a/src/node/ext/completion_queue_async_worker.h
+++ b/src/node/ext/completion_queue_async_worker.h
@@ -67,6 +67,8 @@ class CompletionQueueAsyncWorker : public NanAsyncWorker {
completion_queue_next */
void HandleOKCallback();
+ void HandleErrorCallback();
+
private:
grpc_event *result;
diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc
index bc1dfaf899..9b0fe82976 100644
--- a/src/node/ext/node_grpc.cc
+++ b/src/node/ext/node_grpc.cc
@@ -130,35 +130,34 @@ void InitCallErrorConstants(Handle<Object> exports) {
call_error->Set(NanNew("INVALID_FLAGS"), INVALID_FLAGS);
}
-void InitOpErrorConstants(Handle<Object> exports) {
+void InitOpTypeConstants(Handle<Object> exports) {
NanScope();
- Handle<Object> op_error = Object::New();
- exports->Set(NanNew("opError"), op_error);
- Handle<Value> OK(NanNew<Uint32, uint32_t>(GRPC_OP_OK));
- op_error->Set(NanNew("OK"), OK);
- Handle<Value> ERROR(NanNew<Uint32, uint32_t>(GRPC_OP_ERROR));
- op_error->Set(NanNew("ERROR"), ERROR);
-}
-
-void InitCompletionTypeConstants(Handle<Object> exports) {
- NanScope();
- Handle<Object> completion_type = Object::New();
- exports->Set(NanNew("completionType"), completion_type);
- Handle<Value> QUEUE_SHUTDOWN(NanNew<Uint32, uint32_t>(GRPC_QUEUE_SHUTDOWN));
- completion_type->Set(NanNew("QUEUE_SHUTDOWN"), QUEUE_SHUTDOWN);
- Handle<Value> READ(NanNew<Uint32, uint32_t>(GRPC_READ));
- completion_type->Set(NanNew("READ"), READ);
- Handle<Value> WRITE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_WRITE_ACCEPTED));
- completion_type->Set(NanNew("WRITE_ACCEPTED"), WRITE_ACCEPTED);
- Handle<Value> FINISH_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_FINISH_ACCEPTED));
- completion_type->Set(NanNew("FINISH_ACCEPTED"), FINISH_ACCEPTED);
- Handle<Value> CLIENT_METADATA_READ(
- NanNew<Uint32, uint32_t>(GRPC_CLIENT_METADATA_READ));
- completion_type->Set(NanNew("CLIENT_METADATA_READ"), CLIENT_METADATA_READ);
- Handle<Value> FINISHED(NanNew<Uint32, uint32_t>(GRPC_FINISHED));
- completion_type->Set(NanNew("FINISHED"), FINISHED);
- Handle<Value> SERVER_RPC_NEW(NanNew<Uint32, uint32_t>(GRPC_SERVER_RPC_NEW));
- completion_type->Set(NanNew("SERVER_RPC_NEW"), SERVER_RPC_NEW);
+ Handle<Object> op_type = Object::New();
+ exports->Set(NanNew("opType"), op_type);
+ Handle<Value> SEND_INITIAL_METADATA(
+ NanNew<Uint32, uint32_t>(GRPC_OP_SEND_INITIAL_METADATA));
+ op_type->Set(NanNew("SEND_INITIAL_METADATA"), SEND_INITIAL_METADATA);
+ Handle<Value> SEND_MESSAGE(
+ NanNew<Uint32, uint32_t>(GRPC_OP_SEND_MESSAGE));
+ op_type->Set(NanNew("SEND_MESSAGE"), SEND_MESSAGE);
+ Handle<Value> SEND_CLOSE_FROM_CLIENT(
+ NanNew<Uint32, uint32_t>(GRPC_OP_SEND_CLOSE_FROM_CLIENT));
+ op_type->Set(NanNew("SEND_CLOSE_FROM_CLIENT"), SEND_CLOSE_FROM_CLIENT);
+ Handle<Value> SEND_STATUS_FROM_SERVER(
+ NanNew<Uint32, uint32_t>(GRPC_OP_SEND_STATUS_FROM_SERVER));
+ op_type->Set(NanNew("SEND_STATUS_FROM_SERVER"), SEND_STATUS_FROM_SERVER);
+ Handle<Value> RECV_INITIAL_METADATA(
+ NanNew<Uint32, uint32_t>(GRPC_OP_RECV_INITIAL_METADATA));
+ op_type->Set(NanNew("RECV_INITIAL_METADATA"), RECV_INITIAL_METADATA);
+ Handle<Value> RECV_MESSAGE(
+ NanNew<Uint32, uint32_t>(GRPC_OP_RECV_MESSAGE));
+ op_type->Set(NanNew("RECV_MESSAGE"), RECV_MESSAGE);
+ Handle<Value> RECV_STATUS_ON_CLIENT(
+ NanNew<Uint32, uint32_t>(GRPC_OP_RECV_STATUS_ON_CLIENT));
+ op_type->Set(NanNew("RECV_STATUS_ON_CLIENT"), RECV_STATUS_ON_CLIENT);
+ Handle<Value> RECV_CLOSE_ON_SERVER(
+ NanNew<Uint32, uint32_t>(GRPC_OP_RECV_CLOSE_ON_SERVER));
+ op_type->Set(NanNew("RECV_CLOSE_ON_SERVER"), RECV_CLOSE_ON_SERVER);
}
void init(Handle<Object> exports) {
@@ -166,8 +165,7 @@ void init(Handle<Object> exports) {
grpc_init();
InitStatusConstants(exports);
InitCallErrorConstants(exports);
- InitOpErrorConstants(exports);
- InitCompletionTypeConstants(exports);
+ InitOpTypeConstants(exports);
grpc::node::Call::Init(exports);
grpc::node::Channel::Init(exports);
diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc
index 6b8ccef9b1..75ea681fa7 100644
--- a/src/node/ext/server.cc
+++ b/src/node/ext/server.cc
@@ -31,6 +31,8 @@
*
*/
+#include <memory>
+
#include "server.h"
#include <node.h>
@@ -43,15 +45,17 @@
#include "grpc/grpc_security.h"
#include "call.h"
#include "completion_queue_async_worker.h"
-#include "tag.h"
#include "server_credentials.h"
+#include "timeval.h"
namespace grpc {
namespace node {
+using std::unique_ptr;
using v8::Arguments;
using v8::Array;
using v8::Boolean;
+using v8::Date;
using v8::Exception;
using v8::Function;
using v8::FunctionTemplate;
@@ -67,6 +71,50 @@ using v8::Value;
Persistent<Function> Server::constructor;
Persistent<FunctionTemplate> Server::fun_tpl;
+class NewCallOp : public Op {
+ public:
+ NewCallOp() {
+ call = NULL;
+ grpc_call_details_init(&details);
+ grpc_metadata_array_init(&request_metadata);
+ }
+
+ ~NewCallOp() {
+ grpc_call_details_destroy(&details);
+ grpc_metadata_array_destroy(&request_metadata);
+ }
+
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ if (call == NULL) {
+ return NanEscapeScope(NanNull());
+ }
+ Handle<Object> obj = NanNew<Object>();
+ obj->Set(NanNew("call"), Call::WrapStruct(call));
+ obj->Set(NanNew("method"), NanNew(details.method));
+ obj->Set(NanNew("host"), NanNew(details.host));
+ obj->Set(NanNew("deadline"),
+ NanNew<Date>(TimespecToMilliseconds(details.deadline)));
+ obj->Set(NanNew("metadata"), ParseMetadata(&request_metadata));
+ return NanEscapeScope(obj);
+ }
+
+ bool ParseOp(Handle<Value> value, grpc_op *out,
+ std::vector<unique_ptr<NanUtf8String> > *strings,
+ std::vector<unique_ptr<PersistentHolder> > *handles) {
+ return true;
+ }
+
+ grpc_call *call;
+ grpc_call_details details;
+ grpc_metadata_array request_metadata;
+
+ protected:
+ std::string GetTypeString() const {
+ return "new call";
+ }
+};
+
Server::Server(grpc_server *server) : wrapped_server(server) {}
Server::~Server() { grpc_server_destroy(wrapped_server); }
@@ -175,13 +223,17 @@ NAN_METHOD(Server::RequestCall) {
return NanThrowTypeError("requestCall can only be called on a Server");
}
Server *server = ObjectWrap::Unwrap<Server>(args.This());
- grpc_call_error error = grpc_server_request_call_old(
- server->wrapped_server, CreateTag(args[0], NanNull()));
- if (error == GRPC_CALL_OK) {
- CompletionQueueAsyncWorker::Next();
- } else {
+ NewCallOp *op = new NewCallOp();
+ std::vector<unique_ptr<Op> > *ops = new std::vector<unique_ptr<Op> >();
+ ops->push_back(unique_ptr<Op>(op));
+ grpc_call_error error = grpc_server_request_call(
+ server->wrapped_server, &op->call, &op->details, &op->request_metadata,
+ CompletionQueueAsyncWorker::GetQueue(),
+ new struct tag(new NanCallback(args[0].As<Function>()), ops, NULL, NULL));
+ if (error != GRPC_CALL_OK) {
return NanThrowError("requestCall failed", error);
}
+ CompletionQueueAsyncWorker::Next();
NanReturnUndefined();
}
diff --git a/src/node/ext/tag.cc b/src/node/ext/tag.cc
index dc8e523e12..27baa94a8e 100644
--- a/src/node/ext/tag.cc
+++ b/src/node/ext/tag.cc
@@ -31,68 +31,292 @@
*
*/
+#include <map>
+#include <vector>
+
+#include <grpc/grpc.h>
#include <stdlib.h>
#include <node.h>
#include <nan.h>
#include "tag.h"
+#include "call.h"
namespace grpc {
namespace node {
+using v8::Boolean;
+using v8::Function;
using v8::Handle;
using v8::HandleScope;
using v8::Persistent;
using v8::Value;
-struct tag {
- tag(Persistent<Value> *tag, Persistent<Value> *call)
- : persist_tag(tag), persist_call(call) {}
+Handle<Value> ParseMetadata(grpc_metadata_array *metadata_array) {
+ NanEscapableScope();
+ grpc_metadata *metadata_elements = metadata_array->metadata;
+ size_t length = metadata_array->count;
+ std::map<char*, size_t> size_map;
+ std::map<char*, size_t> index_map;
+
+ for (unsigned int i = 0; i < length; i++) {
+ char *key = metadata_elements[i].key;
+ if (size_map.count(key)) {
+ size_map[key] += 1;
+ }
+ index_map[key] = 0;
+ }
+ Handle<Object> metadata_object = NanNew<Object>();
+ for (unsigned int i = 0; i < length; i++) {
+ grpc_metadata* elem = &metadata_elements[i];
+ Handle<String> key_string = String::New(elem->key);
+ Handle<Array> array;
+ if (metadata_object->Has(key_string)) {
+ array = Handle<Array>::Cast(metadata_object->Get(key_string));
+ } else {
+ array = NanNew<Array>(size_map[elem->key]);
+ metadata_object->Set(key_string, array);
+ }
+ array->Set(index_map[elem->key],
+ MakeFastBuffer(
+ NanNewBufferHandle(elem->value, elem->value_length)));
+ index_map[elem->key] += 1;
+ }
+ return NanEscapeScope(metadata_object);
+}
+
+class OpResponse {
+ public:
+ explicit OpResponse(char *name): name(name) {
+ }
+ virtual Handle<Value> GetNodeValue() const = 0;
+ virtual bool ParseOp() = 0;
+ Handle<Value> GetOpType() const {
+ NanEscapableScope();
+ return NanEscapeScope(NanNew(name));
+ }
+
+ private:
+ char *name;
+};
+
+class SendResponse : public OpResponse {
+ public:
+ explicit SendResponse(char *name): OpResponse(name) {
+ }
+
+ Handle<Value> GetNodeValue() {
+ NanEscapableScope();
+ return NanEscapeScope(NanTrue());
+ }
+}
+
+class MetadataResponse : public OpResponse {
+ public:
+ explicit MetadataResponse(grpc_metadata_array *recv_metadata):
+ recv_metadata(recv_metadata), OpResponse("metadata") {
+ }
+
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ return NanEscapeScope(ParseMetadata(recv_metadata));
+ }
+
+ private:
+ grpc_metadata_array *recv_metadata;
+};
+
+class MessageResponse : public OpResponse {
+ public:
+ explicit MessageResponse(grpc_byte_buffer **recv_message):
+ recv_message(recv_message), OpResponse("read") {
+ }
+
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ return NanEscapeScope(ByteBufferToBuffer(*recv_message));
+ }
+
+ private:
+ grpc_byte_buffer **recv_message;
+};
+switch () {
+case GRPC_RECV_CLIENT_STATUS:
+ op = new ClientStatusResponse;
+ break;
+}
+
+
+class ClientStatusResponse : public OpResponse {
+ public:
+ explicit ClientStatusResponse():
+ OpResponse("status") {
+ }
+
+ bool ParseOp(Handle<Value> obj, grpc_op *out) {
+ }
+
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ Handle<Object> status_obj = NanNew<Object>();
+ status_obj->Set(NanNew("code"), NanNew<Number>(*status));
+ if (event->data.finished.details != NULL) {
+ status_obj->Set(NanNew("details"), String::New(*status_details));
+ }
+ status_obj->Set(NanNew("metadata"), ParseMetadata(metadata_array));
+ return NanEscapeScope(status_obj);
+ }
+ private:
+ grpc_metadata_array metadata_array;
+ grpc_status_code status;
+ char *status_details;
+};
+
+class ServerCloseResponse : public OpResponse {
+ public:
+ explicit ServerCloseResponse(int *cancelled): cancelled(cancelled),
+ OpResponse("cancelled") {
+ }
+
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ NanEscapeScope(NanNew<Boolean>(*cancelled));
+ }
+
+ private:
+ int *cancelled;
+};
+
+class NewCallResponse : public OpResponse {
+ public:
+ explicit NewCallResponse(grpc_call **call, grpc_call_details *details,
+ grpc_metadata_array *request_metadata) :
+ call(call), details(details), request_metadata(request_metadata),
+ OpResponse("call"){
+ }
+
+ Handle<Value> GetNodeValue() const {
+ NanEscapableScope();
+ if (*call == NULL) {
+ return NanEscapeScope(NanNull());
+ }
+ Handle<Object> obj = NanNew<Object>();
+ obj->Set(NanNew("call"), Call::WrapStruct(*call));
+ obj->Set(NanNew("method"), NanNew(details->method));
+ obj->Set(NanNew("host"), NanNew(details->host));
+ obj->Set(NanNew("deadline"),
+ NanNew<Date>(TimespecToMilliseconds(details->deadline)));
+ obj->Set(NanNew("metadata"), ParseMetadata(request_metadata));
+ return NanEscapeScope(obj);
+ }
+ private:
+ grpc_call **call;
+ grpc_call_details *details;
+ grpc_metadata_array *request_metadata;
+}
+
+struct tag {
+ tag(NanCallback *callback, std::vector<OpResponse*> *responses,
+ std::vector<Persistent<Value>> *handles,
+ std::vector<NanUtf8String *> *strings) :
+ callback(callback), repsonses(responses), handles(handles),
+ strings(strings){
+ }
~tag() {
- persist_tag->Dispose();
- if (persist_call != NULL) {
- persist_call->Dispose();
+ for (std::vector<OpResponse *>::iterator it = responses->begin();
+ it != responses->end(); ++it) {
+ delete *it;
+ }
+ for (std::vector<NanUtf8String *>::iterator it = responses->begin();
+ it != responses->end(); ++it) {
+ delete *it;
}
+ delete callback;
+ delete responses;
+ delete handles;
+ delete strings;
}
- Persistent<Value> *persist_tag;
- Persistent<Value> *persist_call;
+ NanCallback *callback;
+ std::vector<OpResponse*> *responses;
+ std::vector<Persistent<Value>> *handles;
+ std::vector<NanUtf8String *> *strings;
};
-void *CreateTag(Handle<Value> tag, Handle<Value> call) {
+void *CreateTag(Handle<Function> callback, grpc_op *ops, size_t nops,
+ std::vector<Persistent<Value>> *handles,
+ std::vector<NanUtf8String *> *strings) {
NanScope();
- Persistent<Value> *persist_tag = new Persistent<Value>();
- NanAssignPersistent(*persist_tag, tag);
- Persistent<Value> *persist_call;
- if (call->IsNull() || call->IsUndefined()) {
- persist_call = NULL;
- } else {
- persist_call = new Persistent<Value>();
- NanAssignPersistent(*persist_call, call);
- }
- struct tag *tag_struct = new struct tag(persist_tag, persist_call);
+ NanCallback *cb = new NanCallback(callback);
+ vector<OpResponse*> *responses = new vector<OpResponse*>();
+ for (size_t i = 0; i < nops; i++) {
+ grpc_op *op = &ops[i];
+ OpResponse *resp;
+ // Switching on the TYPE of the op
+ switch (op->op) {
+ case GRPC_OP_SEND_INITIAL_METADATA:
+ resp = new SendResponse("send metadata");
+ break;
+ case GRPC_OP_SEND_MESSAGE:
+ resp = new SendResponse("write");
+ break;
+ case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+ resp = new SendResponse("client close");
+ break;
+ case GRPC_OP_SEND_STATUS_FROM_SERVER:
+ resp = new SendResponse("server close");
+ break;
+ case GRPC_OP_RECV_INITIAL_METADATA:
+ resp = new MetadataResponse(op->data.recv_initial_metadata);
+ break;
+ case GRPC_OP_RECV_MESSAGE:
+ resp = new MessageResponse(op->data.recv_message);
+ break;
+ case GRPC_OP_RECV_STATUS_ON_CLIENT:
+ resp = new ClientStatusResponse(
+ op->data.recv_status_on_client.trailing_metadata,
+ op->data.recv_status_on_client.status,
+ op->data.recv_status_on_client.status_details);
+ break;
+ case GRPC_RECV_CLOSE_ON_SERVER:
+ resp = new ServerCloseResponse(op->data.recv_close_on_server.cancelled);
+ break;
+ default:
+ continue;
+ }
+ responses->push_back(resp);
+ }
+ struct tag *tag_struct = new struct tag(cb, responses, handles, strings);
return reinterpret_cast<void *>(tag_struct);
}
-Handle<Value> GetTagHandle(void *tag) {
+void *CreateTag(Handle<Function> callback, grpc_call **call,
+ grpc_call_details *details,
+ grpc_metadata_array *request_metadata) {
NanEscapableScope();
- struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
- Handle<Value> tag_value = NanNew<Value>(*tag_struct->persist_tag);
- return NanEscapeScope(tag_value);
+ NanCallback *cb = new NanCallback(callback);
+ vector<OpResponse*> *responses = new vector<OpResponse*>();
+ OpResponse *resp = new NewCallResponse(call, details, request_metadata);
+ responses->push_back(resp);
+ struct tag *tag_struct = new struct tag(cb, responses);
+ return reinterpret_cast<void *>(tag_struct);
}
-bool TagHasCall(void *tag) {
+NanCallback GetTagCallback(void *tag) {
+ NanEscapableScope();
struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
- return tag_struct->persist_call != NULL;
+ return NanEscapeScope(*tag_struct->callback);
}
-Handle<Value> TagGetCall(void *tag) {
+Handle<Value> GetNodeValue(void *tag) {
NanEscapableScope();
struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
- if (tag_struct->persist_call == NULL) {
- return NanEscapeScope(NanNull());
+ Handle<Object> obj = NanNew<Object>();
+ for (std::vector<OpResponse *>::iterator it = tag_struct->responses->begin();
+ it != tag_struct->responses->end(); ++it) {
+ OpResponse *resp = *it;
+ obj->Set(resp->GetOpType(), resp->GetNodeValue());
}
- Handle<Value> call_value = NanNew<Value>(*tag_struct->persist_call);
- return NanEscapeScope(call_value);
+ return NanEscapeScope(obj);
}
void DestroyTag(void *tag) { delete reinterpret_cast<struct tag *>(tag); }
diff --git a/src/node/ext/tag.h b/src/node/ext/tag.h
index bdb09252d9..9ff8703b95 100644
--- a/src/node/ext/tag.h
+++ b/src/node/ext/tag.h
@@ -34,21 +34,34 @@
#ifndef NET_GRPC_NODE_TAG_H_
#define NET_GRPC_NODE_TAG_H_
+#include <vector>
+
+
+#include <grpc/grpc.h>
#include <node.h>
+#include <nan.h>
namespace grpc {
namespace node {
-/* Create a void* tag that can be passed to various grpc_call functions from
- a javascript value and the javascript wrapper for the call. The call can be
- null. */
-void *CreateTag(v8::Handle<v8::Value> tag, v8::Handle<v8::Value> call);
-/* Return the javascript value stored in the tag */
-v8::Handle<v8::Value> GetTagHandle(void *tag);
-/* Returns true if the call was set (non-null) when the tag was created */
-bool TagHasCall(void *tag);
-/* Returns the javascript wrapper for the call associated with this tag */
-v8::Handle<v8::Value> TagGetCall(void *call);
+/* Create a void* tag that can be passed to grpc_call_start_batch from a
+ callback function and an ops array */
+void *CreateTag(v8::Handle<v8::Function> callback, grpc_op *ops, size_t nops,
+ std::vector<v8::Persistent<v8::Value> > *handles,
+ std::vector<NanUtf8String *> *strings);
+
+/* Create a void* tag that can be passed to grpc_server_request_call from a
+ callback and the various out parameters to that function */
+void *CreateTag(v8::Handle<v8::Function> callback, grpc_call **call,
+ grpc_call_details *details,
+ grpc_metadata_array *request_metadata);
+
+/* Get the callback from the tag */
+NanCallback GetCallback(void *tag);
+
+/* Get the combined output value from the tag */
+v8::Handle<v8::Value> GetNodeValue(void *tag);
+
/* Destroy the tag and all resources it is holding. It is illegal to call any
of these other functions on a tag after it has been destroyed. */
void DestroyTag(void *tag);
diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js
index 48db245498..e341092ff8 100644
--- a/src/node/test/call_test.js
+++ b/src/node/test/call_test.js
@@ -98,100 +98,80 @@ describe('call', function() {
}, TypeError);
});
});
- describe('addMetadata', function() {
- it('should succeed with a map from strings to string arrays', function() {
+ describe('startBatch', function() {
+ it('should fail without an object and a function', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
- assert.doesNotThrow(function() {
- call.addMetadata({'key': ['value']});
+ assert.throws(function() {
+ call.startBatch();
});
- assert.doesNotThrow(function() {
- call.addMetadata({'key1': ['value1'], 'key2': ['value2']});
+ assert.throws(function() {
+ call.startBatch({});
+ });
+ assert.throws(function() {
+ call.startBatch(null, function(){});
});
});
- it('should succeed with a map from strings to buffer arrays', function() {
+ it.skip('should succeed with an empty object', function(done) {
var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.doesNotThrow(function() {
- call.addMetadata({'key': [new Buffer('value')]});
- });
- assert.doesNotThrow(function() {
- call.addMetadata({'key1': [new Buffer('value1')],
- 'key2': [new Buffer('value2')]});
+ call.startBatch({}, function(err) {
+ assert.ifError(err);
+ done();
+ });
});
});
- it('should fail with other parameter types', function() {
+ });
+ describe('startBatch with metadata', function() {
+ it('should succeed with a map of strings to string arrays', function(done) {
var call = new grpc.Call(channel, 'method', getDeadline(1));
- assert.throws(function() {
- call.addMetadata();
+ assert.doesNotThrow(function() {
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] = {'key1': ['value1'],
+ 'key2': ['value2']};
+ call.startBatch(batch, function(err, resp) {
+ assert.ifError(err);
+ assert.deepEqual(resp, {'send metadata': true});
+ done();
+ });
});
- assert.throws(function() {
- call.addMetadata(null);
- }, TypeError);
- assert.throws(function() {
- call.addMetadata('value');
- }, TypeError);
- assert.throws(function() {
- call.addMetadata(5);
- }, TypeError);
});
- it.skip('should fail if invoke was already called', function(done) {
+ it('should succeed with a map of strings to buffer arrays', function(done) {
var call = new grpc.Call(channel, 'method', getDeadline(1));
- call.invoke(function() {},
- function() {done();},
- 0);
- assert.throws(function() {
- call.addMetadata({'key': ['value']});
+ assert.doesNotThrow(function() {
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] = {
+ 'key1': [new Buffer('value1')],
+ 'key2': [new Buffer('value2')]
+ };
+ call.startBatch(batch, function(err, resp) {
+ assert.ifError(err);
+ assert.deepEqual(resp, {'send metadata': true});
+ done();
+ });
});
- // Cancel to speed up the test
- call.cancel();
});
- });
- describe('invoke', function() {
- it('should fail with fewer than 3 arguments', function() {
+ it('should fail with other parameter types', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() {
- call.invoke();
- }, TypeError);
- assert.throws(function() {
- call.invoke(function() {});
- }, TypeError);
- assert.throws(function() {
- call.invoke(function() {},
- function() {});
- }, TypeError);
- });
- it('should work with 2 args and an int', function(done) {
- assert.doesNotThrow(function() {
- var call = new grpc.Call(channel, 'method', getDeadline(1));
- call.invoke(function() {},
- function() {done();},
- 0);
- // Cancel to speed up the test
- call.cancel();
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] = undefined;
+ call.startBatch(batch, function(){});
});
- });
- it('should reject incorrectly typed arguments', function() {
- var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() {
- call.invoke(0, 0, 0);
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] = null;
+ call.startBatch(batch, function(){});
}, TypeError);
assert.throws(function() {
- call.invoke(function() {},
- function() {}, 'test');
- });
- });
- });
- describe('serverAccept', function() {
- it('should fail with fewer than 1 argument1', function() {
- var call = new grpc.Call(channel, 'method', getDeadline(1));
- assert.throws(function() {
- call.serverAccept();
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] = 'value';
+ call.startBatch(batch, function(){});
}, TypeError);
- });
- it.skip('should return an error when called on a client Call', function() {
- var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() {
- call.serverAccept(function() {});
- });
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] = 5;
+ call.startBatch(batch, function(){});
+ }, TypeError);
});
});
describe('cancel', function() {
diff --git a/src/node/test/constant_test.js b/src/node/test/constant_test.js
index 0138a55226..4d11e6f527 100644
--- a/src/node/test/constant_test.js
+++ b/src/node/test/constant_test.js
@@ -76,31 +76,6 @@ var callErrorNames = [
'INVALID_FLAGS'
];
-/**
- * List of all op error names
- * @const
- * @type {Array.<string>}
- */
-var opErrorNames = [
- 'OK',
- 'ERROR'
-];
-
-/**
- * List of all completion type names
- * @const
- * @type {Array.<string>}
- */
-var completionTypeNames = [
- 'QUEUE_SHUTDOWN',
- 'READ',
- 'WRITE_ACCEPTED',
- 'FINISH_ACCEPTED',
- 'CLIENT_METADATA_READ',
- 'FINISHED',
- 'SERVER_RPC_NEW'
-];
-
describe('constants', function() {
it('should have all of the status constants', function() {
for (var i = 0; i < statusNames.length; i++) {
@@ -114,16 +89,4 @@ describe('constants', function() {
'call error missing: ' + callErrorNames[i]);
}
});
- it('should have all of the op errors', function() {
- for (var i = 0; i < opErrorNames.length; i++) {
- assert(grpc.opError.hasOwnProperty(opErrorNames[i]),
- 'op error missing: ' + opErrorNames[i]);
- }
- });
- it('should have all of the completion types', function() {
- for (var i = 0; i < completionTypeNames.length; i++) {
- assert(grpc.completionType.hasOwnProperty(completionTypeNames[i]),
- 'completion type missing: ' + completionTypeNames[i]);
- }
- });
});
diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js
index 1f53df23f3..f1277ff207 100644
--- a/src/node/test/end_to_end_test.js
+++ b/src/node/test/end_to_end_test.js
@@ -75,39 +75,47 @@ describe('end-to-end', function() {
var call = new grpc.Call(channel,
'dummy_method',
deadline);
- call.invoke(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.CLIENT_METADATA_READ);
- },function(event) {
- assert.strictEqual(event.type, grpc.completionType.FINISHED);
- var status = event.data;
- assert.strictEqual(status.code, grpc.status.OK);
- assert.strictEqual(status.details, status_text);
+ var client_batch = {};
+ client_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
+ client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
+ client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
+ client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
+ call.startBatch(client_batch, function(err, response) {
+ assert.ifError(err);
+ assert.deepEqual(response, {
+ 'send metadata': true,
+ 'client close': true,
+ 'status': {
+ 'code': grpc.status.OK,
+ 'details': status_text,
+ 'metadata': {}
+ }
+ });
done();
- }, 0);
+ });
- server.requestCall(function(event) {
- assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW);
- var server_call = event.call;
+ server.requestCall(function(err, call_details) {
+ var new_call = call_details['new call'];
+ assert.notEqual(new_call, null);
+ var server_call = new_call.call;
assert.notEqual(server_call, null);
- server_call.serverAccept(function(event) {
- assert.strictEqual(event.type, grpc.completionType.FINISHED);
- }, 0);
- server_call.serverEndInitialMetadata(0);
- server_call.startWriteStatus(
- grpc.status.OK,
- status_text,
- function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- done();
- });
- });
- call.writesDone(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
+ var server_batch = {};
+ server_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
+ server_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
+ 'metadata': {},
+ 'code': grpc.status.OK,
+ 'details': status_text
+ };
+ server_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
+ server_call.startBatch(server_batch, function(err, response) {
+ assert.ifError(err);
+ assert.deepEqual(response, {
+ 'send metadata': true,
+ 'send status': true,
+ 'cancelled': false
+ });
+ done();
+ });
});
});
it('should successfully send and receive metadata', function(complete) {
@@ -118,114 +126,110 @@ describe('end-to-end', function() {
var call = new grpc.Call(channel,
'dummy_method',
deadline);
- call.addMetadata({'client_key': ['client_value']});
- call.invoke(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.CLIENT_METADATA_READ);
- assert.strictEqual(event.data.server_key[0].toString(), 'server_value');
- },function(event) {
- assert.strictEqual(event.type, grpc.completionType.FINISHED);
- var status = event.data;
- assert.strictEqual(status.code, grpc.status.OK);
- assert.strictEqual(status.details, status_text);
+ var client_batch = {};
+ client_batch[grpc.opType.SEND_INITIAL_METADATA] = {
+ 'client_key': ['client_value']
+ };
+ client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
+ client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
+ client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
+ call.startBatch(client_batch, function(err, response) {
+ assert.ifError(err);
+ assert(response['send metadata']);
+ assert(response['client close']);
+ assert(response.hasOwnProperty('metadata'));
+ assert.strictEqual(response.metadata.server_key.toString(),
+ 'server_value');
+ assert.deepEqual(response.status, {'code': grpc.status.OK,
+ 'details': status_text,
+ 'metadata': {}});
done();
- }, 0);
+ });
- server.requestCall(function(event) {
- assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW);
- assert.strictEqual(event.data.metadata.client_key[0].toString(),
+ server.requestCall(function(err, call_details) {
+ var new_call = call_details['new call'];
+ assert.notEqual(new_call, null);
+ assert.strictEqual(new_call.metadata.client_key[0].toString(),
'client_value');
- var server_call = event.call;
+ var server_call = new_call.call;
assert.notEqual(server_call, null);
- server_call.serverAccept(function(event) {
- assert.strictEqual(event.type, grpc.completionType.FINISHED);
- }, 0);
- server_call.addMetadata({'server_key': ['server_value']});
- server_call.serverEndInitialMetadata(0);
- server_call.startWriteStatus(
- grpc.status.OK,
- status_text,
- function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- done();
- });
- });
- call.writesDone(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
+ var server_batch = {};
+ server_batch[grpc.opType.SEND_INITIAL_METADATA] = {
+ 'server_key': ['server_value']
+ };
+ server_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
+ 'metadata': {},
+ 'code': grpc.status.OK,
+ 'details': status_text
+ };
+ server_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
+ server_call.startBatch(server_batch, function(err, response) {
+ assert.ifError(err);
+ assert.deepEqual(response, {
+ 'send metadata': true,
+ 'send status': true,
+ 'cancelled': false
+ });
+ done();
+ });
});
});
- it('should send and receive data without error', function(complete) {
+ it.only('should send and receive data without error', function(complete) {
var req_text = 'client_request';
var reply_text = 'server_response';
- var done = multiDone(complete, 6);
+ var done = multiDone(complete, 2);
var deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 3);
var status_text = 'success';
var call = new grpc.Call(channel,
'dummy_method',
deadline);
- call.invoke(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.CLIENT_METADATA_READ);
- done();
- },function(event) {
- assert.strictEqual(event.type, grpc.completionType.FINISHED);
- var status = event.data;
- assert.strictEqual(status.code, grpc.status.OK);
- assert.strictEqual(status.details, status_text);
- done();
- }, 0);
- call.startWrite(
- new Buffer(req_text),
- function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.WRITE_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- call.writesDone(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- done();
- });
- }, 0);
- call.startRead(function(event) {
- assert.strictEqual(event.type, grpc.completionType.READ);
- assert.strictEqual(event.data.toString(), reply_text);
+ var client_batch = {};
+ client_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
+ client_batch[grpc.opType.SEND_MESSAGE] = new Buffer(req_text);
+ client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
+ client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
+ client_batch[grpc.opType.RECV_MESSAGE] = true;
+ client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
+ call.startBatch(client_batch, function(err, response) {
+ assert.ifError(err);
+ assert(response['send metadata']);
+ assert(response['client close']);
+ assert.deepEqual(response.metadata, {});
+ assert(response['send message']);
+ assert.strictEqual(response.read.toString(), reply_text);
+ assert.deepEqual(response.status, {'code': grpc.status.OK,
+ 'details': status_text,
+ 'metadata': {}});
+ console.log("OK status");
done();
});
- server.requestCall(function(event) {
- assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW);
- var server_call = event.call;
+
+ server.requestCall(function(err, call_details) {
+ var new_call = call_details['new call'];
+ assert.notEqual(new_call, null);
+ var server_call = new_call.call;
assert.notEqual(server_call, null);
- server_call.serverAccept(function(event) {
- assert.strictEqual(event.type, grpc.completionType.FINISHED);
- done();
- });
- server_call.serverEndInitialMetadata(0);
- server_call.startRead(function(event) {
- assert.strictEqual(event.type, grpc.completionType.READ);
- assert.strictEqual(event.data.toString(), req_text);
- server_call.startWrite(
- new Buffer(reply_text),
- function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.WRITE_ACCEPTED);
- assert.strictEqual(event.data,
- grpc.opError.OK);
- server_call.startWriteStatus(
- grpc.status.OK,
- status_text,
- function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- done();
- });
- }, 0);
+ var server_batch = {};
+ server_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
+ server_batch[grpc.opType.RECV_MESSAGE] = true;
+ server_call.startBatch(server_batch, function(err, response) {
+ assert.ifError(err);
+ assert(response['send metadata']);
+ assert.strictEqual(response.read.toString(), req_text);
+ var response_batch = {};
+ response_batch[grpc.opType.SEND_MESSAGE] = new Buffer(reply_text);
+ response_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
+ 'metadata': {},
+ 'code': grpc.status.OK,
+ 'details': status_text
+ };
+ response_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
+ server_call.startBatch(response_batch, function(err, response) {
+ assert(response['send status']);
+ //assert(!response['cancelled']);
+ done();
+ });
});
});
});