diff options
Diffstat (limited to 'src/node')
-rw-r--r-- | src/node/binding.gyp | 9 | ||||
-rw-r--r-- | src/node/ext/call.cc | 665 | ||||
-rw-r--r-- | src/node/ext/call.h | 59 | ||||
-rw-r--r-- | src/node/ext/completion_queue_async_worker.cc | 24 | ||||
-rw-r--r-- | src/node/ext/completion_queue_async_worker.h | 2 | ||||
-rw-r--r-- | src/node/ext/node_grpc.cc | 58 | ||||
-rw-r--r-- | src/node/ext/server.cc | 64 | ||||
-rw-r--r-- | src/node/ext/tag.cc | 284 | ||||
-rw-r--r-- | src/node/ext/tag.h | 33 | ||||
-rw-r--r-- | src/node/test/call_test.js | 124 | ||||
-rw-r--r-- | src/node/test/constant_test.js | 37 | ||||
-rw-r--r-- | src/node/test/end_to_end_test.js | 246 |
12 files changed, 1057 insertions, 548 deletions
diff --git a/src/node/binding.gyp b/src/node/binding.gyp index cf2a6acb04..fb4c779f8e 100644 --- a/src/node/binding.gyp +++ b/src/node/binding.gyp @@ -9,14 +9,15 @@ 'include_dirs': [ "<!(nodejs -e \"require('nan')\")" ], - 'cxxflags': [ + 'cflags': [ + '-std=c++11', '-Wall', '-pthread', '-pedantic', '-g', '-zdefs' - '-Werror', - ], + '-Werror' + ], 'ldflags': [ '-g' ], @@ -33,11 +34,9 @@ "ext/channel.cc", "ext/completion_queue_async_worker.cc", "ext/credentials.cc", - "ext/event.cc", "ext/node_grpc.cc", "ext/server.cc", "ext/server_credentials.cc", - "ext/tag.cc", "ext/timeval.cc" ], 'conditions' : [ diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 23aead07b2..9a6359fe44 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2014, Google Inc. + * Copyright 2015, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,17 +31,23 @@ * */ +#include <memory> +#include <vector> +#include <map> + #include <node.h> #include "grpc/support/log.h" #include "grpc/grpc.h" +#include "grpc/support/alloc.h" #include "grpc/support/time.h" #include "byte_buffer.h" #include "call.h" #include "channel.h" #include "completion_queue_async_worker.h" #include "timeval.h" -#include "tag.h" + +using std::unique_ptr; namespace grpc { namespace node { @@ -49,6 +55,7 @@ namespace node { using ::node::Buffer; using v8::Arguments; using v8::Array; +using v8::Boolean; using v8::Exception; using v8::External; using v8::Function; @@ -68,6 +75,372 @@ using v8::Value; Persistent<Function> Call::constructor; Persistent<FunctionTemplate> Call::fun_tpl; + +bool CreateMetadataArray( + Handle<Object> metadata, grpc_metadata_array *array, + std::vector<unique_ptr<NanUtf8String> > *string_handles, + std::vector<unique_ptr<PersistentHolder> > *handles) { + NanScope(); + grpc_metadata_array_init(array); + Handle<Array> keys(metadata->GetOwnPropertyNames()); + for (unsigned int i = 0; i < keys->Length(); i++) { + Handle<String> current_key(keys->Get(i)->ToString()); + if (!metadata->Get(current_key)->IsArray()) { + return false; + } + array->capacity += Local<Array>::Cast(metadata->Get(current_key))->Length(); + } + array->metadata = reinterpret_cast<grpc_metadata*>( + gpr_malloc(array->capacity * sizeof(grpc_metadata))); + for (unsigned int i = 0; i < keys->Length(); i++) { + Handle<String> current_key(keys->Get(i)->ToString()); + NanUtf8String *utf8_key = new NanUtf8String(current_key); + string_handles->push_back(unique_ptr<NanUtf8String>(utf8_key)); + Handle<Array> values = Local<Array>::Cast(metadata->Get(current_key)); + for (unsigned int j = 0; j < values->Length(); j++) { + Handle<Value> value = values->Get(j); + grpc_metadata *current = &array->metadata[array->count]; + current->key = **utf8_key; + if (Buffer::HasInstance(value)) { + current->value = Buffer::Data(value); + current->value_length = Buffer::Length(value); + Persistent<Value> handle; + NanAssignPersistent(handle, value); + handles->push_back(unique_ptr<PersistentHolder>( + new PersistentHolder(handle))); + } else if (value->IsString()) { + Handle<String> string_value = value->ToString(); + NanUtf8String *utf8_value = new NanUtf8String(string_value); + string_handles->push_back(unique_ptr<NanUtf8String>(utf8_value)); + current->value = **utf8_value; + current->value_length = string_value->Length(); + } else { + return false; + } + array->count += 1; + } + } + return true; +} + +Handle<Value> ParseMetadata(const grpc_metadata_array *metadata_array) { + NanEscapableScope(); + grpc_metadata *metadata_elements = metadata_array->metadata; + size_t length = metadata_array->count; + std::map<const char*, size_t> size_map; + std::map<const char*, size_t> index_map; + + for (unsigned int i = 0; i < length; i++) { + const char *key = metadata_elements[i].key; + if (size_map.count(key)) { + size_map[key] += 1; + } + index_map[key] = 0; + } + Handle<Object> metadata_object = NanNew<Object>(); + for (unsigned int i = 0; i < length; i++) { + grpc_metadata* elem = &metadata_elements[i]; + Handle<String> key_string = String::New(elem->key); + Handle<Array> array; + if (metadata_object->Has(key_string)) { + array = Handle<Array>::Cast(metadata_object->Get(key_string)); + } else { + array = NanNew<Array>(size_map[elem->key]); + metadata_object->Set(key_string, array); + } + array->Set(index_map[elem->key], + MakeFastBuffer( + NanNewBufferHandle(elem->value, elem->value_length))); + index_map[elem->key] += 1; + } + return NanEscapeScope(metadata_object); +} + +Handle<Value> Op::GetOpType() const { + NanEscapableScope(); + return NanEscapeScope(NanNew<String>(GetTypeString())); +} + +class SendMetadataOp : public Op { + public: + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(NanTrue()); + } + bool ParseOp(Handle<Value> value, grpc_op *out, + std::vector<unique_ptr<NanUtf8String> > *strings, + std::vector<unique_ptr<PersistentHolder> > *handles) { + if (!value->IsObject()) { + return false; + } + grpc_metadata_array array; + if (!CreateMetadataArray(value->ToObject(), &array, strings, handles)) { + return false; + } + out->data.send_initial_metadata.count = array.count; + out->data.send_initial_metadata.metadata = array.metadata; + return true; + } + protected: + std::string GetTypeString() const { + return "send metadata"; + } +}; + +class SendMessageOp : public Op { + public: + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(NanTrue()); + } + bool ParseOp(Handle<Value> value, grpc_op *out, + std::vector<unique_ptr<NanUtf8String> > *strings, + std::vector<unique_ptr<PersistentHolder> > *handles) { + if (!Buffer::HasInstance(value)) { + return false; + } + out->data.send_message = BufferToByteBuffer(value); + Persistent<Value> handle; + NanAssignPersistent(handle, value); + handles->push_back(unique_ptr<PersistentHolder>( + new PersistentHolder(handle))); + return true; + } + protected: + std::string GetTypeString() const { + return "send message"; + } +}; + +class SendClientCloseOp : public Op { + public: + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(NanTrue()); + } + bool ParseOp(Handle<Value> value, grpc_op *out, + std::vector<unique_ptr<NanUtf8String> > *strings, + std::vector<unique_ptr<PersistentHolder> > *handles) { + return true; + } + protected: + std::string GetTypeString() const { + return "client close"; + } +}; + +class SendServerStatusOp : public Op { + public: + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(NanTrue()); + } + bool ParseOp(Handle<Value> value, grpc_op *out, + std::vector<unique_ptr<NanUtf8String> > *strings, + std::vector<unique_ptr<PersistentHolder> > *handles) { + if (!value->IsObject()) { + return false; + } + Handle<Object> server_status = value->ToObject(); + if (!server_status->Get(NanNew("metadata"))->IsObject()) { + return false; + } + if (!server_status->Get(NanNew("code"))->IsUint32()) { + return false; + } + if (!server_status->Get(NanNew("details"))->IsString()) { + return false; + } + grpc_metadata_array array; + if (!CreateMetadataArray(server_status->Get(NanNew("metadata"))-> + ToObject(), + &array, strings, handles)) { + return false; + } + out->data.send_status_from_server.trailing_metadata_count = array.count; + out->data.send_status_from_server.trailing_metadata = array.metadata; + out->data.send_status_from_server.status = + static_cast<grpc_status_code>( + server_status->Get(NanNew("code"))->Uint32Value()); + NanUtf8String *str = new NanUtf8String( + server_status->Get(NanNew("details"))); + strings->push_back(unique_ptr<NanUtf8String>(str)); + out->data.send_status_from_server.status_details = **str; + return true; + } + protected: + std::string GetTypeString() const { + return "send status"; + } +}; + +class GetMetadataOp : public Op { + public: + GetMetadataOp() { + grpc_metadata_array_init(&recv_metadata); + } + + ~GetMetadataOp() { + grpc_metadata_array_destroy(&recv_metadata); + } + + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(ParseMetadata(&recv_metadata)); + } + + bool ParseOp(Handle<Value> value, grpc_op *out, + std::vector<unique_ptr<NanUtf8String> > *strings, + std::vector<unique_ptr<PersistentHolder> > *handles) { + out->data.recv_initial_metadata = &recv_metadata; + return true; + } + + protected: + std::string GetTypeString() const { + return "metadata"; + } + + private: + grpc_metadata_array recv_metadata; +}; + +class ReadMessageOp : public Op { + public: + ReadMessageOp() { + recv_message = NULL; + } + ~ReadMessageOp() { + if (recv_message != NULL) { + gpr_free(recv_message); + } + } + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(ByteBufferToBuffer(recv_message)); + } + + bool ParseOp(Handle<Value> value, grpc_op *out, + std::vector<unique_ptr<NanUtf8String> > *strings, + std::vector<unique_ptr<PersistentHolder> > *handles) { + out->data.recv_message = &recv_message; + return true; + } + + protected: + std::string GetTypeString() const { + return "read"; + } + + private: + grpc_byte_buffer *recv_message; +}; + +class ClientStatusOp : public Op { + public: + ClientStatusOp() { + grpc_metadata_array_init(&metadata_array); + status_details = NULL; + details_capacity = 0; + } + + ~ClientStatusOp() { + grpc_metadata_array_destroy(&metadata_array); + gpr_free(status_details); + } + + bool ParseOp(Handle<Value> value, grpc_op *out, + std::vector<unique_ptr<NanUtf8String> > *strings, + std::vector<unique_ptr<PersistentHolder> > *handles) { + out->data.recv_status_on_client.trailing_metadata = &metadata_array; + out->data.recv_status_on_client.status = &status; + out->data.recv_status_on_client.status_details = &status_details; + out->data.recv_status_on_client.status_details_capacity = &details_capacity; + return true; + } + + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + Handle<Object> status_obj = NanNew<Object>(); + status_obj->Set(NanNew("code"), NanNew<Number>(status)); + if (status_details != NULL) { + status_obj->Set(NanNew("details"), String::New(status_details)); + } + status_obj->Set(NanNew("metadata"), ParseMetadata(&metadata_array)); + return NanEscapeScope(status_obj); + } + protected: + std::string GetTypeString() const { + return "status"; + } + private: + grpc_metadata_array metadata_array; + grpc_status_code status; + char *status_details; + size_t details_capacity; +}; + +class ServerCloseResponseOp : public Op { + public: + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(NanNew<Boolean>(cancelled)); + } + + bool ParseOp(Handle<Value> value, grpc_op *out, + std::vector<unique_ptr<NanUtf8String> > *strings, + std::vector<unique_ptr<PersistentHolder> > *handles) { + out->data.recv_close_on_server.cancelled = &cancelled; + return true; + } + + protected: + std::string GetTypeString() const { + return "cancelled"; + } + + private: + int cancelled; +}; + +tag::tag(NanCallback *callback, std::vector<unique_ptr<Op> > *ops, + std::vector<unique_ptr<PersistentHolder> > *handles, + std::vector<unique_ptr<NanUtf8String> > *strings) : + callback(callback), ops(ops), handles(handles), strings(strings){ +} +tag::~tag() { + delete callback; + delete ops; + if (handles != NULL) { + delete handles; + } + if (strings != NULL) { + delete strings; + } +} + +Handle<Value> GetTagNodeValue(void *tag) { + NanEscapableScope(); + struct tag *tag_struct = reinterpret_cast<struct tag *>(tag); + Handle<Object> tag_obj = NanNew<Object>(); + for (std::vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin(); + it != tag_struct->ops->end(); ++it) { + Op *op_ptr = it->get(); + tag_obj->Set(op_ptr->GetOpType(), op_ptr->GetNodeValue()); + } + return NanEscapeScope(tag_obj); +} + +NanCallback GetTagCallback(void *tag) { + struct tag *tag_struct = reinterpret_cast<struct tag *>(tag); + return *tag_struct->callback; +} + +void DestroyTag(void *tag) { + struct tag *tag_struct = reinterpret_cast<struct tag *>(tag); + delete tag_struct; +} + Call::Call(grpc_call *call) : wrapped_call(call) {} Call::~Call() { grpc_call_destroy(wrapped_call); } @@ -77,28 +450,10 @@ void Call::Init(Handle<Object> exports) { Local<FunctionTemplate> tpl = FunctionTemplate::New(New); tpl->SetClassName(NanNew("Call")); tpl->InstanceTemplate()->SetInternalFieldCount(1); - NanSetPrototypeTemplate(tpl, "addMetadata", - FunctionTemplate::New(AddMetadata)->GetFunction()); - NanSetPrototypeTemplate(tpl, "invoke", - FunctionTemplate::New(Invoke)->GetFunction()); - NanSetPrototypeTemplate(tpl, "serverAccept", - FunctionTemplate::New(ServerAccept)->GetFunction()); - NanSetPrototypeTemplate( - tpl, "serverEndInitialMetadata", - FunctionTemplate::New(ServerEndInitialMetadata)->GetFunction()); + NanSetPrototypeTemplate(tpl, "startBatch", + FunctionTemplate::New(StartBatch)->GetFunction()); NanSetPrototypeTemplate(tpl, "cancel", FunctionTemplate::New(Cancel)->GetFunction()); - NanSetPrototypeTemplate(tpl, "startWrite", - FunctionTemplate::New(StartWrite)->GetFunction()); - NanSetPrototypeTemplate( - tpl, "startWriteStatus", - FunctionTemplate::New(StartWriteStatus)->GetFunction()); - NanSetPrototypeTemplate(tpl, "writesDone", - FunctionTemplate::New(WritesDone)->GetFunction()); - NanSetPrototypeTemplate(tpl, "startReadMetadata", - FunctionTemplate::New(WritesDone)->GetFunction()); - NanSetPrototypeTemplate(tpl, "startRead", - FunctionTemplate::New(StartRead)->GetFunction()); NanAssignPersistent(fun_tpl, tpl); NanAssignPersistent(constructor, tpl->GetFunction()); constructor->Set(NanNew("WRITE_BUFFER_HINT"), @@ -152,9 +507,9 @@ NAN_METHOD(Call::New) { NanUtf8String method(args[1]); double deadline = args[2]->NumberValue(); grpc_channel *wrapped_channel = channel->GetWrappedChannel(); - grpc_call *wrapped_call = grpc_channel_create_call_old( - wrapped_channel, *method, channel->GetHost(), - MillisecondsToTimespec(deadline)); + grpc_call *wrapped_call = grpc_channel_create_call( + wrapped_channel, CompletionQueueAsyncWorker::GetQueue(), *method, + channel->GetHost(), MillisecondsToTimespec(deadline)); call = new Call(wrapped_call); args.This()->SetHiddenValue(String::NewSymbol("channel_"), channel_object); @@ -168,119 +523,78 @@ NAN_METHOD(Call::New) { } } -NAN_METHOD(Call::AddMetadata) { +NAN_METHOD(Call::StartBatch) { NanScope(); if (!HasInstance(args.This())) { - return NanThrowTypeError("addMetadata can only be called on Call objects"); + return NanThrowTypeError("startBatch can only be called on Call objects"); } - Call *call = ObjectWrap::Unwrap<Call>(args.This()); if (!args[0]->IsObject()) { - return NanThrowTypeError("addMetadata's first argument must be an object"); - } - Handle<Object> metadata = args[0]->ToObject(); - Handle<Array> keys(metadata->GetOwnPropertyNames()); - for (unsigned int i = 0; i < keys->Length(); i++) { - Handle<String> current_key(keys->Get(i)->ToString()); - if (!metadata->Get(current_key)->IsArray()) { - return NanThrowTypeError( - "addMetadata's first argument's values must be arrays"); - } - NanUtf8String utf8_key(current_key); - Handle<Array> values = Local<Array>::Cast(metadata->Get(current_key)); - for (unsigned int j = 0; j < values->Length(); j++) { - Handle<Value> value = values->Get(j); - grpc_metadata metadata; - grpc_call_error error; - metadata.key = *utf8_key; - if (Buffer::HasInstance(value)) { - metadata.value = Buffer::Data(value); - metadata.value_length = Buffer::Length(value); - error = grpc_call_add_metadata_old(call->wrapped_call, &metadata, 0); - } else if (value->IsString()) { - Handle<String> string_value = value->ToString(); - NanUtf8String utf8_value(string_value); - metadata.value = *utf8_value; - metadata.value_length = string_value->Length(); - gpr_log(GPR_DEBUG, "adding metadata: %s, %s, %d", metadata.key, - metadata.value, metadata.value_length); - error = grpc_call_add_metadata_old(call->wrapped_call, &metadata, 0); - } else { - return NanThrowTypeError( - "addMetadata values must be strings or buffers"); - } - if (error != GRPC_CALL_OK) { - return NanThrowError("addMetadata failed", error); - } - } - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::Invoke) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("invoke can only be called on Call objects"); - } - if (!args[0]->IsFunction()) { - return NanThrowTypeError("invoke's first argument must be a function"); + return NanThrowError("startBatch's first argument must be an object"); } if (!args[1]->IsFunction()) { - return NanThrowTypeError("invoke's second argument must be a function"); - } - if (!args[2]->IsUint32()) { - return NanThrowTypeError("invoke's third argument must be integer flags"); + return NanThrowError("startBatch's second argument must be a callback"); } Call *call = ObjectWrap::Unwrap<Call>(args.This()); - unsigned int flags = args[3]->Uint32Value(); - grpc_call_error error = grpc_call_invoke_old( - call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(), - CreateTag(args[0], args.This()), CreateTag(args[1], args.This()), flags); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("invoke failed", error); - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::ServerAccept) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("accept can only be called on Call objects"); - } - if (!args[0]->IsFunction()) { - return NanThrowTypeError("accept's first argument must be a function"); - } - Call *call = ObjectWrap::Unwrap<Call>(args.This()); - grpc_call_error error = grpc_call_server_accept_old( - call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(), - CreateTag(args[0], args.This())); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("serverAccept failed", error); - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::ServerEndInitialMetadata) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError( - "serverEndInitialMetadata can only be called on Call objects"); - } - if (!args[0]->IsUint32()) { - return NanThrowTypeError( - "serverEndInitialMetadata's second argument must be integer flags"); + std::vector<unique_ptr<PersistentHolder> > *handles = + new std::vector<unique_ptr<PersistentHolder> >(); + std::vector<unique_ptr<NanUtf8String> > *strings = + new std::vector<unique_ptr<NanUtf8String> >(); + Persistent<Value> handle; + Handle<Object> obj = args[0]->ToObject(); + Handle<Array> keys = obj->GetOwnPropertyNames(); + size_t nops = keys->Length(); + grpc_op *ops = new grpc_op[nops]; + std::vector<unique_ptr<Op> > *op_vector = new std::vector<unique_ptr<Op> >(); + for (unsigned int i = 0; i < nops; i++) { + Op *op; + if (!keys->Get(i)->IsUint32()) { + return NanThrowError( + "startBatch's first argument's keys must be integers"); + } + uint32_t type = keys->Get(i)->Uint32Value(); + ops[i].op = static_cast<grpc_op_type>(type); + switch (type) { + case GRPC_OP_SEND_INITIAL_METADATA: + op = new SendMetadataOp(); + break; + case GRPC_OP_SEND_MESSAGE: + op = new SendMessageOp(); + break; + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + op = new SendClientCloseOp(); + break; + case GRPC_OP_SEND_STATUS_FROM_SERVER: + op = new SendServerStatusOp(); + break; + case GRPC_OP_RECV_INITIAL_METADATA: + op = new GetMetadataOp(); + break; + case GRPC_OP_RECV_MESSAGE: + op = new ReadMessageOp(); + break; + case GRPC_OP_RECV_STATUS_ON_CLIENT: + op = new ClientStatusOp(); + break; + case GRPC_OP_RECV_CLOSE_ON_SERVER: + op = new ServerCloseResponseOp(); + break; + default: + return NanThrowError("Argument object had an unrecognized key"); + } + if (!op->ParseOp(obj->Get(type), &ops[i], strings, handles)) { + return NanThrowTypeError("Incorrectly typed arguments to startBatch"); + } + op_vector->push_back(unique_ptr<Op>(op)); } - Call *call = ObjectWrap::Unwrap<Call>(args.This()); - unsigned int flags = args[1]->Uint32Value(); - grpc_call_error error = - grpc_call_server_end_initial_metadata_old(call->wrapped_call, flags); + grpc_call_error error = grpc_call_start_batch( + call->wrapped_call, ops, nops, new struct tag( + new NanCallback(args[1].As<Function>()), + op_vector, handles, + strings)); if (error != GRPC_CALL_OK) { - return NanThrowError("serverEndInitialMetadata failed", error); + return NanThrowError("startBatch failed", error); } + CompletionQueueAsyncWorker::Next(); NanReturnUndefined(); } @@ -297,102 +611,5 @@ NAN_METHOD(Call::Cancel) { NanReturnUndefined(); } -NAN_METHOD(Call::StartWrite) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("startWrite can only be called on Call objects"); - } - if (!Buffer::HasInstance(args[0])) { - return NanThrowTypeError("startWrite's first argument must be a Buffer"); - } - if (!args[1]->IsFunction()) { - return NanThrowTypeError("startWrite's second argument must be a function"); - } - if (!args[2]->IsUint32()) { - return NanThrowTypeError( - "startWrite's third argument must be integer flags"); - } - Call *call = ObjectWrap::Unwrap<Call>(args.This()); - grpc_byte_buffer *buffer = BufferToByteBuffer(args[0]); - unsigned int flags = args[2]->Uint32Value(); - grpc_call_error error = grpc_call_start_write_old( - call->wrapped_call, buffer, CreateTag(args[1], args.This()), flags); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("startWrite failed", error); - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::StartWriteStatus) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError( - "startWriteStatus can only be called on Call objects"); - } - if (!args[0]->IsUint32()) { - return NanThrowTypeError( - "startWriteStatus's first argument must be a status code"); - } - if (!args[1]->IsString()) { - return NanThrowTypeError( - "startWriteStatus's second argument must be a string"); - } - if (!args[2]->IsFunction()) { - return NanThrowTypeError( - "startWriteStatus's third argument must be a function"); - } - Call *call = ObjectWrap::Unwrap<Call>(args.This()); - NanUtf8String details(args[1]); - grpc_call_error error = grpc_call_start_write_status_old( - call->wrapped_call, (grpc_status_code)args[0]->Uint32Value(), *details, - CreateTag(args[2], args.This())); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("startWriteStatus failed", error); - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::WritesDone) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("writesDone can only be called on Call objects"); - } - if (!args[0]->IsFunction()) { - return NanThrowTypeError("writesDone's first argument must be a function"); - } - Call *call = ObjectWrap::Unwrap<Call>(args.This()); - grpc_call_error error = grpc_call_writes_done_old( - call->wrapped_call, CreateTag(args[0], args.This())); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("writesDone failed", error); - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::StartRead) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("startRead can only be called on Call objects"); - } - if (!args[0]->IsFunction()) { - return NanThrowTypeError("startRead's first argument must be a function"); - } - Call *call = ObjectWrap::Unwrap<Call>(args.This()); - grpc_call_error error = grpc_call_start_read_old( - call->wrapped_call, CreateTag(args[0], args.This())); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("startRead failed", error); - } - NanReturnUndefined(); -} - } // namespace node } // namespace grpc diff --git a/src/node/ext/call.h b/src/node/ext/call.h index 1924a1bf42..434bcf8a63 100644 --- a/src/node/ext/call.h +++ b/src/node/ext/call.h @@ -34,6 +34,9 @@ #ifndef NET_GRPC_NODE_CALL_H_ #define NET_GRPC_NODE_CALL_H_ +#include <memory> +#include <vector> + #include <node.h> #include <nan.h> #include "grpc/grpc.h" @@ -43,6 +46,53 @@ namespace grpc { namespace node { +using std::unique_ptr; + +v8::Handle<v8::Value> ParseMetadata(const grpc_metadata_array *metadata_array); + +class PersistentHolder { + public: + explicit PersistentHolder(v8::Persistent<v8::Value> persist) : + persist(persist) { + } + + ~PersistentHolder() { + persist.Dispose(); +} + + private: + v8::Persistent<v8::Value> persist; +}; + +class Op { + public: + virtual v8::Handle<v8::Value> GetNodeValue() const = 0; + virtual bool ParseOp(v8::Handle<v8::Value> value, grpc_op *out, + std::vector<unique_ptr<NanUtf8String> > *strings, + std::vector<unique_ptr<PersistentHolder> > *handles) = 0; + v8::Handle<v8::Value> GetOpType() const; + + protected: + virtual std::string GetTypeString() const = 0; +}; + +struct tag { + tag(NanCallback *callback, std::vector<unique_ptr<Op> > *ops, + std::vector<unique_ptr<PersistentHolder> > *handles, + std::vector<unique_ptr<NanUtf8String> > *strings); + ~tag(); + NanCallback *callback; + std::vector<unique_ptr<Op> > *ops; + std::vector<unique_ptr<PersistentHolder> > *handles; + std::vector<unique_ptr<NanUtf8String> > *strings; +}; + +v8::Handle<v8::Value> GetTagNodeValue(void *tag); + +NanCallback GetTagCallback(void *tag); + +void DestroyTag(void *tag); + /* Wrapper class for grpc_call structs. */ class Call : public ::node::ObjectWrap { public: @@ -60,15 +110,8 @@ class Call : public ::node::ObjectWrap { Call &operator=(const Call &); static NAN_METHOD(New); - static NAN_METHOD(AddMetadata); - static NAN_METHOD(Invoke); - static NAN_METHOD(ServerAccept); - static NAN_METHOD(ServerEndInitialMetadata); + static NAN_METHOD(StartBatch); static NAN_METHOD(Cancel); - static NAN_METHOD(StartWrite); - static NAN_METHOD(StartWriteStatus); - static NAN_METHOD(WritesDone); - static NAN_METHOD(StartRead); static v8::Persistent<v8::Function> constructor; // Used for typechecking instances of this javascript class static v8::Persistent<v8::FunctionTemplate> fun_tpl; diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc index 8de7db66d5..5c0e27e6a7 100644 --- a/src/node/ext/completion_queue_async_worker.cc +++ b/src/node/ext/completion_queue_async_worker.cc @@ -37,8 +37,7 @@ #include "grpc/grpc.h" #include "grpc/support/time.h" #include "completion_queue_async_worker.h" -#include "event.h" -#include "tag.h" +#include "call.h" namespace grpc { namespace node { @@ -58,6 +57,9 @@ CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {} void CompletionQueueAsyncWorker::Execute() { result = grpc_completion_queue_next(queue, gpr_inf_future); + if (result->data.op_complete != GRPC_OP_OK) { + SetErrorMessage("The batch encountered an error"); + } } grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; } @@ -75,14 +77,26 @@ void CompletionQueueAsyncWorker::Init(Handle<Object> exports) { void CompletionQueueAsyncWorker::HandleOKCallback() { NanScope(); - NanCallback event_callback(GetTagHandle(result->tag).As<Function>()); - Handle<Value> argv[] = {CreateEventObject(result)}; + NanCallback callback = GetTagCallback(result->tag); + Handle<Value> argv[] = {NanNull(), GetTagNodeValue(result->tag)}; DestroyTag(result->tag); grpc_event_finish(result); result = NULL; - event_callback.Call(1, argv); + callback.Call(2, argv); +} + +void CompletionQueueAsyncWorker::HandleErrorCallback() { + NanScope(); + NanCallback callback = GetTagCallback(result->tag); + Handle<Value> argv[] = {NanError(ErrorMessage())}; + + DestroyTag(result->tag); + grpc_event_finish(result); + result = NULL; + + callback.Call(1, argv); } } // namespace node diff --git a/src/node/ext/completion_queue_async_worker.h b/src/node/ext/completion_queue_async_worker.h index 2c928b7024..c04a303283 100644 --- a/src/node/ext/completion_queue_async_worker.h +++ b/src/node/ext/completion_queue_async_worker.h @@ -67,6 +67,8 @@ class CompletionQueueAsyncWorker : public NanAsyncWorker { completion_queue_next */ void HandleOKCallback(); + void HandleErrorCallback(); + private: grpc_event *result; diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc index bc1dfaf899..9b0fe82976 100644 --- a/src/node/ext/node_grpc.cc +++ b/src/node/ext/node_grpc.cc @@ -130,35 +130,34 @@ void InitCallErrorConstants(Handle<Object> exports) { call_error->Set(NanNew("INVALID_FLAGS"), INVALID_FLAGS); } -void InitOpErrorConstants(Handle<Object> exports) { +void InitOpTypeConstants(Handle<Object> exports) { NanScope(); - Handle<Object> op_error = Object::New(); - exports->Set(NanNew("opError"), op_error); - Handle<Value> OK(NanNew<Uint32, uint32_t>(GRPC_OP_OK)); - op_error->Set(NanNew("OK"), OK); - Handle<Value> ERROR(NanNew<Uint32, uint32_t>(GRPC_OP_ERROR)); - op_error->Set(NanNew("ERROR"), ERROR); -} - -void InitCompletionTypeConstants(Handle<Object> exports) { - NanScope(); - Handle<Object> completion_type = Object::New(); - exports->Set(NanNew("completionType"), completion_type); - Handle<Value> QUEUE_SHUTDOWN(NanNew<Uint32, uint32_t>(GRPC_QUEUE_SHUTDOWN)); - completion_type->Set(NanNew("QUEUE_SHUTDOWN"), QUEUE_SHUTDOWN); - Handle<Value> READ(NanNew<Uint32, uint32_t>(GRPC_READ)); - completion_type->Set(NanNew("READ"), READ); - Handle<Value> WRITE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_WRITE_ACCEPTED)); - completion_type->Set(NanNew("WRITE_ACCEPTED"), WRITE_ACCEPTED); - Handle<Value> FINISH_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_FINISH_ACCEPTED)); - completion_type->Set(NanNew("FINISH_ACCEPTED"), FINISH_ACCEPTED); - Handle<Value> CLIENT_METADATA_READ( - NanNew<Uint32, uint32_t>(GRPC_CLIENT_METADATA_READ)); - completion_type->Set(NanNew("CLIENT_METADATA_READ"), CLIENT_METADATA_READ); - Handle<Value> FINISHED(NanNew<Uint32, uint32_t>(GRPC_FINISHED)); - completion_type->Set(NanNew("FINISHED"), FINISHED); - Handle<Value> SERVER_RPC_NEW(NanNew<Uint32, uint32_t>(GRPC_SERVER_RPC_NEW)); - completion_type->Set(NanNew("SERVER_RPC_NEW"), SERVER_RPC_NEW); + Handle<Object> op_type = Object::New(); + exports->Set(NanNew("opType"), op_type); + Handle<Value> SEND_INITIAL_METADATA( + NanNew<Uint32, uint32_t>(GRPC_OP_SEND_INITIAL_METADATA)); + op_type->Set(NanNew("SEND_INITIAL_METADATA"), SEND_INITIAL_METADATA); + Handle<Value> SEND_MESSAGE( + NanNew<Uint32, uint32_t>(GRPC_OP_SEND_MESSAGE)); + op_type->Set(NanNew("SEND_MESSAGE"), SEND_MESSAGE); + Handle<Value> SEND_CLOSE_FROM_CLIENT( + NanNew<Uint32, uint32_t>(GRPC_OP_SEND_CLOSE_FROM_CLIENT)); + op_type->Set(NanNew("SEND_CLOSE_FROM_CLIENT"), SEND_CLOSE_FROM_CLIENT); + Handle<Value> SEND_STATUS_FROM_SERVER( + NanNew<Uint32, uint32_t>(GRPC_OP_SEND_STATUS_FROM_SERVER)); + op_type->Set(NanNew("SEND_STATUS_FROM_SERVER"), SEND_STATUS_FROM_SERVER); + Handle<Value> RECV_INITIAL_METADATA( + NanNew<Uint32, uint32_t>(GRPC_OP_RECV_INITIAL_METADATA)); + op_type->Set(NanNew("RECV_INITIAL_METADATA"), RECV_INITIAL_METADATA); + Handle<Value> RECV_MESSAGE( + NanNew<Uint32, uint32_t>(GRPC_OP_RECV_MESSAGE)); + op_type->Set(NanNew("RECV_MESSAGE"), RECV_MESSAGE); + Handle<Value> RECV_STATUS_ON_CLIENT( + NanNew<Uint32, uint32_t>(GRPC_OP_RECV_STATUS_ON_CLIENT)); + op_type->Set(NanNew("RECV_STATUS_ON_CLIENT"), RECV_STATUS_ON_CLIENT); + Handle<Value> RECV_CLOSE_ON_SERVER( + NanNew<Uint32, uint32_t>(GRPC_OP_RECV_CLOSE_ON_SERVER)); + op_type->Set(NanNew("RECV_CLOSE_ON_SERVER"), RECV_CLOSE_ON_SERVER); } void init(Handle<Object> exports) { @@ -166,8 +165,7 @@ void init(Handle<Object> exports) { grpc_init(); InitStatusConstants(exports); InitCallErrorConstants(exports); - InitOpErrorConstants(exports); - InitCompletionTypeConstants(exports); + InitOpTypeConstants(exports); grpc::node::Call::Init(exports); grpc::node::Channel::Init(exports); diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc index 6b8ccef9b1..75ea681fa7 100644 --- a/src/node/ext/server.cc +++ b/src/node/ext/server.cc @@ -31,6 +31,8 @@ * */ +#include <memory> + #include "server.h" #include <node.h> @@ -43,15 +45,17 @@ #include "grpc/grpc_security.h" #include "call.h" #include "completion_queue_async_worker.h" -#include "tag.h" #include "server_credentials.h" +#include "timeval.h" namespace grpc { namespace node { +using std::unique_ptr; using v8::Arguments; using v8::Array; using v8::Boolean; +using v8::Date; using v8::Exception; using v8::Function; using v8::FunctionTemplate; @@ -67,6 +71,50 @@ using v8::Value; Persistent<Function> Server::constructor; Persistent<FunctionTemplate> Server::fun_tpl; +class NewCallOp : public Op { + public: + NewCallOp() { + call = NULL; + grpc_call_details_init(&details); + grpc_metadata_array_init(&request_metadata); + } + + ~NewCallOp() { + grpc_call_details_destroy(&details); + grpc_metadata_array_destroy(&request_metadata); + } + + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + if (call == NULL) { + return NanEscapeScope(NanNull()); + } + Handle<Object> obj = NanNew<Object>(); + obj->Set(NanNew("call"), Call::WrapStruct(call)); + obj->Set(NanNew("method"), NanNew(details.method)); + obj->Set(NanNew("host"), NanNew(details.host)); + obj->Set(NanNew("deadline"), + NanNew<Date>(TimespecToMilliseconds(details.deadline))); + obj->Set(NanNew("metadata"), ParseMetadata(&request_metadata)); + return NanEscapeScope(obj); + } + + bool ParseOp(Handle<Value> value, grpc_op *out, + std::vector<unique_ptr<NanUtf8String> > *strings, + std::vector<unique_ptr<PersistentHolder> > *handles) { + return true; + } + + grpc_call *call; + grpc_call_details details; + grpc_metadata_array request_metadata; + + protected: + std::string GetTypeString() const { + return "new call"; + } +}; + Server::Server(grpc_server *server) : wrapped_server(server) {} Server::~Server() { grpc_server_destroy(wrapped_server); } @@ -175,13 +223,17 @@ NAN_METHOD(Server::RequestCall) { return NanThrowTypeError("requestCall can only be called on a Server"); } Server *server = ObjectWrap::Unwrap<Server>(args.This()); - grpc_call_error error = grpc_server_request_call_old( - server->wrapped_server, CreateTag(args[0], NanNull())); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { + NewCallOp *op = new NewCallOp(); + std::vector<unique_ptr<Op> > *ops = new std::vector<unique_ptr<Op> >(); + ops->push_back(unique_ptr<Op>(op)); + grpc_call_error error = grpc_server_request_call( + server->wrapped_server, &op->call, &op->details, &op->request_metadata, + CompletionQueueAsyncWorker::GetQueue(), + new struct tag(new NanCallback(args[0].As<Function>()), ops, NULL, NULL)); + if (error != GRPC_CALL_OK) { return NanThrowError("requestCall failed", error); } + CompletionQueueAsyncWorker::Next(); NanReturnUndefined(); } diff --git a/src/node/ext/tag.cc b/src/node/ext/tag.cc index dc8e523e12..27baa94a8e 100644 --- a/src/node/ext/tag.cc +++ b/src/node/ext/tag.cc @@ -31,68 +31,292 @@ * */ +#include <map> +#include <vector> + +#include <grpc/grpc.h> #include <stdlib.h> #include <node.h> #include <nan.h> #include "tag.h" +#include "call.h" namespace grpc { namespace node { +using v8::Boolean; +using v8::Function; using v8::Handle; using v8::HandleScope; using v8::Persistent; using v8::Value; -struct tag { - tag(Persistent<Value> *tag, Persistent<Value> *call) - : persist_tag(tag), persist_call(call) {} +Handle<Value> ParseMetadata(grpc_metadata_array *metadata_array) { + NanEscapableScope(); + grpc_metadata *metadata_elements = metadata_array->metadata; + size_t length = metadata_array->count; + std::map<char*, size_t> size_map; + std::map<char*, size_t> index_map; + + for (unsigned int i = 0; i < length; i++) { + char *key = metadata_elements[i].key; + if (size_map.count(key)) { + size_map[key] += 1; + } + index_map[key] = 0; + } + Handle<Object> metadata_object = NanNew<Object>(); + for (unsigned int i = 0; i < length; i++) { + grpc_metadata* elem = &metadata_elements[i]; + Handle<String> key_string = String::New(elem->key); + Handle<Array> array; + if (metadata_object->Has(key_string)) { + array = Handle<Array>::Cast(metadata_object->Get(key_string)); + } else { + array = NanNew<Array>(size_map[elem->key]); + metadata_object->Set(key_string, array); + } + array->Set(index_map[elem->key], + MakeFastBuffer( + NanNewBufferHandle(elem->value, elem->value_length))); + index_map[elem->key] += 1; + } + return NanEscapeScope(metadata_object); +} + +class OpResponse { + public: + explicit OpResponse(char *name): name(name) { + } + virtual Handle<Value> GetNodeValue() const = 0; + virtual bool ParseOp() = 0; + Handle<Value> GetOpType() const { + NanEscapableScope(); + return NanEscapeScope(NanNew(name)); + } + + private: + char *name; +}; + +class SendResponse : public OpResponse { + public: + explicit SendResponse(char *name): OpResponse(name) { + } + + Handle<Value> GetNodeValue() { + NanEscapableScope(); + return NanEscapeScope(NanTrue()); + } +} + +class MetadataResponse : public OpResponse { + public: + explicit MetadataResponse(grpc_metadata_array *recv_metadata): + recv_metadata(recv_metadata), OpResponse("metadata") { + } + + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(ParseMetadata(recv_metadata)); + } + + private: + grpc_metadata_array *recv_metadata; +}; + +class MessageResponse : public OpResponse { + public: + explicit MessageResponse(grpc_byte_buffer **recv_message): + recv_message(recv_message), OpResponse("read") { + } + + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(ByteBufferToBuffer(*recv_message)); + } + + private: + grpc_byte_buffer **recv_message; +}; +switch () { +case GRPC_RECV_CLIENT_STATUS: + op = new ClientStatusResponse; + break; +} + + +class ClientStatusResponse : public OpResponse { + public: + explicit ClientStatusResponse(): + OpResponse("status") { + } + + bool ParseOp(Handle<Value> obj, grpc_op *out) { + } + + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + Handle<Object> status_obj = NanNew<Object>(); + status_obj->Set(NanNew("code"), NanNew<Number>(*status)); + if (event->data.finished.details != NULL) { + status_obj->Set(NanNew("details"), String::New(*status_details)); + } + status_obj->Set(NanNew("metadata"), ParseMetadata(metadata_array)); + return NanEscapeScope(status_obj); + } + private: + grpc_metadata_array metadata_array; + grpc_status_code status; + char *status_details; +}; + +class ServerCloseResponse : public OpResponse { + public: + explicit ServerCloseResponse(int *cancelled): cancelled(cancelled), + OpResponse("cancelled") { + } + + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + NanEscapeScope(NanNew<Boolean>(*cancelled)); + } + + private: + int *cancelled; +}; + +class NewCallResponse : public OpResponse { + public: + explicit NewCallResponse(grpc_call **call, grpc_call_details *details, + grpc_metadata_array *request_metadata) : + call(call), details(details), request_metadata(request_metadata), + OpResponse("call"){ + } + + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + if (*call == NULL) { + return NanEscapeScope(NanNull()); + } + Handle<Object> obj = NanNew<Object>(); + obj->Set(NanNew("call"), Call::WrapStruct(*call)); + obj->Set(NanNew("method"), NanNew(details->method)); + obj->Set(NanNew("host"), NanNew(details->host)); + obj->Set(NanNew("deadline"), + NanNew<Date>(TimespecToMilliseconds(details->deadline))); + obj->Set(NanNew("metadata"), ParseMetadata(request_metadata)); + return NanEscapeScope(obj); + } + private: + grpc_call **call; + grpc_call_details *details; + grpc_metadata_array *request_metadata; +} + +struct tag { + tag(NanCallback *callback, std::vector<OpResponse*> *responses, + std::vector<Persistent<Value>> *handles, + std::vector<NanUtf8String *> *strings) : + callback(callback), repsonses(responses), handles(handles), + strings(strings){ + } ~tag() { - persist_tag->Dispose(); - if (persist_call != NULL) { - persist_call->Dispose(); + for (std::vector<OpResponse *>::iterator it = responses->begin(); + it != responses->end(); ++it) { + delete *it; + } + for (std::vector<NanUtf8String *>::iterator it = responses->begin(); + it != responses->end(); ++it) { + delete *it; } + delete callback; + delete responses; + delete handles; + delete strings; } - Persistent<Value> *persist_tag; - Persistent<Value> *persist_call; + NanCallback *callback; + std::vector<OpResponse*> *responses; + std::vector<Persistent<Value>> *handles; + std::vector<NanUtf8String *> *strings; }; -void *CreateTag(Handle<Value> tag, Handle<Value> call) { +void *CreateTag(Handle<Function> callback, grpc_op *ops, size_t nops, + std::vector<Persistent<Value>> *handles, + std::vector<NanUtf8String *> *strings) { NanScope(); - Persistent<Value> *persist_tag = new Persistent<Value>(); - NanAssignPersistent(*persist_tag, tag); - Persistent<Value> *persist_call; - if (call->IsNull() || call->IsUndefined()) { - persist_call = NULL; - } else { - persist_call = new Persistent<Value>(); - NanAssignPersistent(*persist_call, call); - } - struct tag *tag_struct = new struct tag(persist_tag, persist_call); + NanCallback *cb = new NanCallback(callback); + vector<OpResponse*> *responses = new vector<OpResponse*>(); + for (size_t i = 0; i < nops; i++) { + grpc_op *op = &ops[i]; + OpResponse *resp; + // Switching on the TYPE of the op + switch (op->op) { + case GRPC_OP_SEND_INITIAL_METADATA: + resp = new SendResponse("send metadata"); + break; + case GRPC_OP_SEND_MESSAGE: + resp = new SendResponse("write"); + break; + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + resp = new SendResponse("client close"); + break; + case GRPC_OP_SEND_STATUS_FROM_SERVER: + resp = new SendResponse("server close"); + break; + case GRPC_OP_RECV_INITIAL_METADATA: + resp = new MetadataResponse(op->data.recv_initial_metadata); + break; + case GRPC_OP_RECV_MESSAGE: + resp = new MessageResponse(op->data.recv_message); + break; + case GRPC_OP_RECV_STATUS_ON_CLIENT: + resp = new ClientStatusResponse( + op->data.recv_status_on_client.trailing_metadata, + op->data.recv_status_on_client.status, + op->data.recv_status_on_client.status_details); + break; + case GRPC_RECV_CLOSE_ON_SERVER: + resp = new ServerCloseResponse(op->data.recv_close_on_server.cancelled); + break; + default: + continue; + } + responses->push_back(resp); + } + struct tag *tag_struct = new struct tag(cb, responses, handles, strings); return reinterpret_cast<void *>(tag_struct); } -Handle<Value> GetTagHandle(void *tag) { +void *CreateTag(Handle<Function> callback, grpc_call **call, + grpc_call_details *details, + grpc_metadata_array *request_metadata) { NanEscapableScope(); - struct tag *tag_struct = reinterpret_cast<struct tag *>(tag); - Handle<Value> tag_value = NanNew<Value>(*tag_struct->persist_tag); - return NanEscapeScope(tag_value); + NanCallback *cb = new NanCallback(callback); + vector<OpResponse*> *responses = new vector<OpResponse*>(); + OpResponse *resp = new NewCallResponse(call, details, request_metadata); + responses->push_back(resp); + struct tag *tag_struct = new struct tag(cb, responses); + return reinterpret_cast<void *>(tag_struct); } -bool TagHasCall(void *tag) { +NanCallback GetTagCallback(void *tag) { + NanEscapableScope(); struct tag *tag_struct = reinterpret_cast<struct tag *>(tag); - return tag_struct->persist_call != NULL; + return NanEscapeScope(*tag_struct->callback); } -Handle<Value> TagGetCall(void *tag) { +Handle<Value> GetNodeValue(void *tag) { NanEscapableScope(); struct tag *tag_struct = reinterpret_cast<struct tag *>(tag); - if (tag_struct->persist_call == NULL) { - return NanEscapeScope(NanNull()); + Handle<Object> obj = NanNew<Object>(); + for (std::vector<OpResponse *>::iterator it = tag_struct->responses->begin(); + it != tag_struct->responses->end(); ++it) { + OpResponse *resp = *it; + obj->Set(resp->GetOpType(), resp->GetNodeValue()); } - Handle<Value> call_value = NanNew<Value>(*tag_struct->persist_call); - return NanEscapeScope(call_value); + return NanEscapeScope(obj); } void DestroyTag(void *tag) { delete reinterpret_cast<struct tag *>(tag); } diff --git a/src/node/ext/tag.h b/src/node/ext/tag.h index bdb09252d9..9ff8703b95 100644 --- a/src/node/ext/tag.h +++ b/src/node/ext/tag.h @@ -34,21 +34,34 @@ #ifndef NET_GRPC_NODE_TAG_H_ #define NET_GRPC_NODE_TAG_H_ +#include <vector> + + +#include <grpc/grpc.h> #include <node.h> +#include <nan.h> namespace grpc { namespace node { -/* Create a void* tag that can be passed to various grpc_call functions from - a javascript value and the javascript wrapper for the call. The call can be - null. */ -void *CreateTag(v8::Handle<v8::Value> tag, v8::Handle<v8::Value> call); -/* Return the javascript value stored in the tag */ -v8::Handle<v8::Value> GetTagHandle(void *tag); -/* Returns true if the call was set (non-null) when the tag was created */ -bool TagHasCall(void *tag); -/* Returns the javascript wrapper for the call associated with this tag */ -v8::Handle<v8::Value> TagGetCall(void *call); +/* Create a void* tag that can be passed to grpc_call_start_batch from a + callback function and an ops array */ +void *CreateTag(v8::Handle<v8::Function> callback, grpc_op *ops, size_t nops, + std::vector<v8::Persistent<v8::Value> > *handles, + std::vector<NanUtf8String *> *strings); + +/* Create a void* tag that can be passed to grpc_server_request_call from a + callback and the various out parameters to that function */ +void *CreateTag(v8::Handle<v8::Function> callback, grpc_call **call, + grpc_call_details *details, + grpc_metadata_array *request_metadata); + +/* Get the callback from the tag */ +NanCallback GetCallback(void *tag); + +/* Get the combined output value from the tag */ +v8::Handle<v8::Value> GetNodeValue(void *tag); + /* Destroy the tag and all resources it is holding. It is illegal to call any of these other functions on a tag after it has been destroyed. */ void DestroyTag(void *tag); diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js index 48db245498..e341092ff8 100644 --- a/src/node/test/call_test.js +++ b/src/node/test/call_test.js @@ -98,100 +98,80 @@ describe('call', function() { }, TypeError); }); }); - describe('addMetadata', function() { - it('should succeed with a map from strings to string arrays', function() { + describe('startBatch', function() { + it('should fail without an object and a function', function() { var call = new grpc.Call(channel, 'method', getDeadline(1)); - assert.doesNotThrow(function() { - call.addMetadata({'key': ['value']}); + assert.throws(function() { + call.startBatch(); }); - assert.doesNotThrow(function() { - call.addMetadata({'key1': ['value1'], 'key2': ['value2']}); + assert.throws(function() { + call.startBatch({}); + }); + assert.throws(function() { + call.startBatch(null, function(){}); }); }); - it('should succeed with a map from strings to buffer arrays', function() { + it.skip('should succeed with an empty object', function(done) { var call = new grpc.Call(channel, 'method', getDeadline(1)); assert.doesNotThrow(function() { - call.addMetadata({'key': [new Buffer('value')]}); - }); - assert.doesNotThrow(function() { - call.addMetadata({'key1': [new Buffer('value1')], - 'key2': [new Buffer('value2')]}); + call.startBatch({}, function(err) { + assert.ifError(err); + done(); + }); }); }); - it('should fail with other parameter types', function() { + }); + describe('startBatch with metadata', function() { + it('should succeed with a map of strings to string arrays', function(done) { var call = new grpc.Call(channel, 'method', getDeadline(1)); - assert.throws(function() { - call.addMetadata(); + assert.doesNotThrow(function() { + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = {'key1': ['value1'], + 'key2': ['value2']}; + call.startBatch(batch, function(err, resp) { + assert.ifError(err); + assert.deepEqual(resp, {'send metadata': true}); + done(); + }); }); - assert.throws(function() { - call.addMetadata(null); - }, TypeError); - assert.throws(function() { - call.addMetadata('value'); - }, TypeError); - assert.throws(function() { - call.addMetadata(5); - }, TypeError); }); - it.skip('should fail if invoke was already called', function(done) { + it('should succeed with a map of strings to buffer arrays', function(done) { var call = new grpc.Call(channel, 'method', getDeadline(1)); - call.invoke(function() {}, - function() {done();}, - 0); - assert.throws(function() { - call.addMetadata({'key': ['value']}); + assert.doesNotThrow(function() { + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = { + 'key1': [new Buffer('value1')], + 'key2': [new Buffer('value2')] + }; + call.startBatch(batch, function(err, resp) { + assert.ifError(err); + assert.deepEqual(resp, {'send metadata': true}); + done(); + }); }); - // Cancel to speed up the test - call.cancel(); }); - }); - describe('invoke', function() { - it('should fail with fewer than 3 arguments', function() { + it('should fail with other parameter types', function() { var call = new grpc.Call(channel, 'method', getDeadline(1)); assert.throws(function() { - call.invoke(); - }, TypeError); - assert.throws(function() { - call.invoke(function() {}); - }, TypeError); - assert.throws(function() { - call.invoke(function() {}, - function() {}); - }, TypeError); - }); - it('should work with 2 args and an int', function(done) { - assert.doesNotThrow(function() { - var call = new grpc.Call(channel, 'method', getDeadline(1)); - call.invoke(function() {}, - function() {done();}, - 0); - // Cancel to speed up the test - call.cancel(); + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = undefined; + call.startBatch(batch, function(){}); }); - }); - it('should reject incorrectly typed arguments', function() { - var call = new grpc.Call(channel, 'method', getDeadline(1)); assert.throws(function() { - call.invoke(0, 0, 0); + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = null; + call.startBatch(batch, function(){}); }, TypeError); assert.throws(function() { - call.invoke(function() {}, - function() {}, 'test'); - }); - }); - }); - describe('serverAccept', function() { - it('should fail with fewer than 1 argument1', function() { - var call = new grpc.Call(channel, 'method', getDeadline(1)); - assert.throws(function() { - call.serverAccept(); + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = 'value'; + call.startBatch(batch, function(){}); }, TypeError); - }); - it.skip('should return an error when called on a client Call', function() { - var call = new grpc.Call(channel, 'method', getDeadline(1)); assert.throws(function() { - call.serverAccept(function() {}); - }); + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = 5; + call.startBatch(batch, function(){}); + }, TypeError); }); }); describe('cancel', function() { diff --git a/src/node/test/constant_test.js b/src/node/test/constant_test.js index 0138a55226..4d11e6f527 100644 --- a/src/node/test/constant_test.js +++ b/src/node/test/constant_test.js @@ -76,31 +76,6 @@ var callErrorNames = [ 'INVALID_FLAGS' ]; -/** - * List of all op error names - * @const - * @type {Array.<string>} - */ -var opErrorNames = [ - 'OK', - 'ERROR' -]; - -/** - * List of all completion type names - * @const - * @type {Array.<string>} - */ -var completionTypeNames = [ - 'QUEUE_SHUTDOWN', - 'READ', - 'WRITE_ACCEPTED', - 'FINISH_ACCEPTED', - 'CLIENT_METADATA_READ', - 'FINISHED', - 'SERVER_RPC_NEW' -]; - describe('constants', function() { it('should have all of the status constants', function() { for (var i = 0; i < statusNames.length; i++) { @@ -114,16 +89,4 @@ describe('constants', function() { 'call error missing: ' + callErrorNames[i]); } }); - it('should have all of the op errors', function() { - for (var i = 0; i < opErrorNames.length; i++) { - assert(grpc.opError.hasOwnProperty(opErrorNames[i]), - 'op error missing: ' + opErrorNames[i]); - } - }); - it('should have all of the completion types', function() { - for (var i = 0; i < completionTypeNames.length; i++) { - assert(grpc.completionType.hasOwnProperty(completionTypeNames[i]), - 'completion type missing: ' + completionTypeNames[i]); - } - }); }); diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js index 1f53df23f3..f1277ff207 100644 --- a/src/node/test/end_to_end_test.js +++ b/src/node/test/end_to_end_test.js @@ -75,39 +75,47 @@ describe('end-to-end', function() { var call = new grpc.Call(channel, 'dummy_method', deadline); - call.invoke(function(event) { - assert.strictEqual(event.type, - grpc.completionType.CLIENT_METADATA_READ); - },function(event) { - assert.strictEqual(event.type, grpc.completionType.FINISHED); - var status = event.data; - assert.strictEqual(status.code, grpc.status.OK); - assert.strictEqual(status.details, status_text); + var client_batch = {}; + client_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; + client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; + client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; + call.startBatch(client_batch, function(err, response) { + assert.ifError(err); + assert.deepEqual(response, { + 'send metadata': true, + 'client close': true, + 'status': { + 'code': grpc.status.OK, + 'details': status_text, + 'metadata': {} + } + }); done(); - }, 0); + }); - server.requestCall(function(event) { - assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW); - var server_call = event.call; + server.requestCall(function(err, call_details) { + var new_call = call_details['new call']; + assert.notEqual(new_call, null); + var server_call = new_call.call; assert.notEqual(server_call, null); - server_call.serverAccept(function(event) { - assert.strictEqual(event.type, grpc.completionType.FINISHED); - }, 0); - server_call.serverEndInitialMetadata(0); - server_call.startWriteStatus( - grpc.status.OK, - status_text, - function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - done(); - }); - }); - call.writesDone(function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); + var server_batch = {}; + server_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + server_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + 'metadata': {}, + 'code': grpc.status.OK, + 'details': status_text + }; + server_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; + server_call.startBatch(server_batch, function(err, response) { + assert.ifError(err); + assert.deepEqual(response, { + 'send metadata': true, + 'send status': true, + 'cancelled': false + }); + done(); + }); }); }); it('should successfully send and receive metadata', function(complete) { @@ -118,114 +126,110 @@ describe('end-to-end', function() { var call = new grpc.Call(channel, 'dummy_method', deadline); - call.addMetadata({'client_key': ['client_value']}); - call.invoke(function(event) { - assert.strictEqual(event.type, - grpc.completionType.CLIENT_METADATA_READ); - assert.strictEqual(event.data.server_key[0].toString(), 'server_value'); - },function(event) { - assert.strictEqual(event.type, grpc.completionType.FINISHED); - var status = event.data; - assert.strictEqual(status.code, grpc.status.OK); - assert.strictEqual(status.details, status_text); + var client_batch = {}; + client_batch[grpc.opType.SEND_INITIAL_METADATA] = { + 'client_key': ['client_value'] + }; + client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; + client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; + client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; + call.startBatch(client_batch, function(err, response) { + assert.ifError(err); + assert(response['send metadata']); + assert(response['client close']); + assert(response.hasOwnProperty('metadata')); + assert.strictEqual(response.metadata.server_key.toString(), + 'server_value'); + assert.deepEqual(response.status, {'code': grpc.status.OK, + 'details': status_text, + 'metadata': {}}); done(); - }, 0); + }); - server.requestCall(function(event) { - assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW); - assert.strictEqual(event.data.metadata.client_key[0].toString(), + server.requestCall(function(err, call_details) { + var new_call = call_details['new call']; + assert.notEqual(new_call, null); + assert.strictEqual(new_call.metadata.client_key[0].toString(), 'client_value'); - var server_call = event.call; + var server_call = new_call.call; assert.notEqual(server_call, null); - server_call.serverAccept(function(event) { - assert.strictEqual(event.type, grpc.completionType.FINISHED); - }, 0); - server_call.addMetadata({'server_key': ['server_value']}); - server_call.serverEndInitialMetadata(0); - server_call.startWriteStatus( - grpc.status.OK, - status_text, - function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - done(); - }); - }); - call.writesDone(function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); + var server_batch = {}; + server_batch[grpc.opType.SEND_INITIAL_METADATA] = { + 'server_key': ['server_value'] + }; + server_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + 'metadata': {}, + 'code': grpc.status.OK, + 'details': status_text + }; + server_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; + server_call.startBatch(server_batch, function(err, response) { + assert.ifError(err); + assert.deepEqual(response, { + 'send metadata': true, + 'send status': true, + 'cancelled': false + }); + done(); + }); }); }); - it('should send and receive data without error', function(complete) { + it.only('should send and receive data without error', function(complete) { var req_text = 'client_request'; var reply_text = 'server_response'; - var done = multiDone(complete, 6); + var done = multiDone(complete, 2); var deadline = new Date(); deadline.setSeconds(deadline.getSeconds() + 3); var status_text = 'success'; var call = new grpc.Call(channel, 'dummy_method', deadline); - call.invoke(function(event) { - assert.strictEqual(event.type, - grpc.completionType.CLIENT_METADATA_READ); - done(); - },function(event) { - assert.strictEqual(event.type, grpc.completionType.FINISHED); - var status = event.data; - assert.strictEqual(status.code, grpc.status.OK); - assert.strictEqual(status.details, status_text); - done(); - }, 0); - call.startWrite( - new Buffer(req_text), - function(event) { - assert.strictEqual(event.type, - grpc.completionType.WRITE_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - call.writesDone(function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - done(); - }); - }, 0); - call.startRead(function(event) { - assert.strictEqual(event.type, grpc.completionType.READ); - assert.strictEqual(event.data.toString(), reply_text); + var client_batch = {}; + client_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + client_batch[grpc.opType.SEND_MESSAGE] = new Buffer(req_text); + client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; + client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; + client_batch[grpc.opType.RECV_MESSAGE] = true; + client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; + call.startBatch(client_batch, function(err, response) { + assert.ifError(err); + assert(response['send metadata']); + assert(response['client close']); + assert.deepEqual(response.metadata, {}); + assert(response['send message']); + assert.strictEqual(response.read.toString(), reply_text); + assert.deepEqual(response.status, {'code': grpc.status.OK, + 'details': status_text, + 'metadata': {}}); + console.log("OK status"); done(); }); - server.requestCall(function(event) { - assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW); - var server_call = event.call; + + server.requestCall(function(err, call_details) { + var new_call = call_details['new call']; + assert.notEqual(new_call, null); + var server_call = new_call.call; assert.notEqual(server_call, null); - server_call.serverAccept(function(event) { - assert.strictEqual(event.type, grpc.completionType.FINISHED); - done(); - }); - server_call.serverEndInitialMetadata(0); - server_call.startRead(function(event) { - assert.strictEqual(event.type, grpc.completionType.READ); - assert.strictEqual(event.data.toString(), req_text); - server_call.startWrite( - new Buffer(reply_text), - function(event) { - assert.strictEqual(event.type, - grpc.completionType.WRITE_ACCEPTED); - assert.strictEqual(event.data, - grpc.opError.OK); - server_call.startWriteStatus( - grpc.status.OK, - status_text, - function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - done(); - }); - }, 0); + var server_batch = {}; + server_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + server_batch[grpc.opType.RECV_MESSAGE] = true; + server_call.startBatch(server_batch, function(err, response) { + assert.ifError(err); + assert(response['send metadata']); + assert.strictEqual(response.read.toString(), req_text); + var response_batch = {}; + response_batch[grpc.opType.SEND_MESSAGE] = new Buffer(reply_text); + response_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + 'metadata': {}, + 'code': grpc.status.OK, + 'details': status_text + }; + response_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; + server_call.startBatch(response_batch, function(err, response) { + assert(response['send status']); + //assert(!response['cancelled']); + done(); + }); }); }); }); |