diff options
Diffstat (limited to 'src/node')
30 files changed, 1656 insertions, 2250 deletions
diff --git a/src/node/binding.gyp b/src/node/binding.gyp index cf2a6acb04..fb4c779f8e 100644 --- a/src/node/binding.gyp +++ b/src/node/binding.gyp @@ -9,14 +9,15 @@ 'include_dirs': [ "<!(nodejs -e \"require('nan')\")" ], - 'cxxflags': [ + 'cflags': [ + '-std=c++11', '-Wall', '-pthread', '-pedantic', '-g', '-zdefs' - '-Werror', - ], + '-Werror' + ], 'ldflags': [ '-g' ], @@ -33,11 +34,9 @@ "ext/channel.cc", "ext/completion_queue_async_worker.cc", "ext/credentials.cc", - "ext/event.cc", "ext/node_grpc.cc", "ext/server.cc", "ext/server_credentials.cc", - "ext/tag.cc", "ext/timeval.cc" ], 'conditions' : [ diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 23aead07b2..9ed389f3bc 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2014, Google Inc. + * Copyright 2015, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,17 +31,25 @@ * */ +#include <memory> +#include <vector> +#include <map> + #include <node.h> #include "grpc/support/log.h" #include "grpc/grpc.h" +#include "grpc/support/alloc.h" #include "grpc/support/time.h" #include "byte_buffer.h" #include "call.h" #include "channel.h" #include "completion_queue_async_worker.h" #include "timeval.h" -#include "tag.h" + +using std::unique_ptr; +using std::shared_ptr; +using std::vector; namespace grpc { namespace node { @@ -49,6 +57,7 @@ namespace node { using ::node::Buffer; using v8::Arguments; using v8::Array; +using v8::Boolean; using v8::Exception; using v8::External; using v8::Function; @@ -68,37 +77,372 @@ using v8::Value; Persistent<Function> Call::constructor; Persistent<FunctionTemplate> Call::fun_tpl; -Call::Call(grpc_call *call) : wrapped_call(call) {} -Call::~Call() { grpc_call_destroy(wrapped_call); } +bool CreateMetadataArray(Handle<Object> metadata, grpc_metadata_array *array, + shared_ptr<Resources> resources) { + NanScope(); + grpc_metadata_array_init(array); + Handle<Array> keys(metadata->GetOwnPropertyNames()); + for (unsigned int i = 0; i < keys->Length(); i++) { + Handle<String> current_key(keys->Get(i)->ToString()); + if (!metadata->Get(current_key)->IsArray()) { + return false; + } + array->capacity += Local<Array>::Cast(metadata->Get(current_key))->Length(); + } + array->metadata = reinterpret_cast<grpc_metadata*>( + gpr_malloc(array->capacity * sizeof(grpc_metadata))); + for (unsigned int i = 0; i < keys->Length(); i++) { + Handle<String> current_key(keys->Get(i)->ToString()); + NanUtf8String *utf8_key = new NanUtf8String(current_key); + resources->strings.push_back(unique_ptr<NanUtf8String>(utf8_key)); + Handle<Array> values = Local<Array>::Cast(metadata->Get(current_key)); + for (unsigned int j = 0; j < values->Length(); j++) { + Handle<Value> value = values->Get(j); + grpc_metadata *current = &array->metadata[array->count]; + current->key = **utf8_key; + if (Buffer::HasInstance(value)) { + current->value = Buffer::Data(value); + current->value_length = Buffer::Length(value); + Persistent<Value> handle; + NanAssignPersistent(handle, value); + resources->handles.push_back(unique_ptr<PersistentHolder>( + new PersistentHolder(handle))); + } else if (value->IsString()) { + Handle<String> string_value = value->ToString(); + NanUtf8String *utf8_value = new NanUtf8String(string_value); + resources->strings.push_back(unique_ptr<NanUtf8String>(utf8_value)); + current->value = **utf8_value; + current->value_length = string_value->Length(); + } else { + return false; + } + array->count += 1; + } + } + return true; +} + +Handle<Value> ParseMetadata(const grpc_metadata_array *metadata_array) { + NanEscapableScope(); + grpc_metadata *metadata_elements = metadata_array->metadata; + size_t length = metadata_array->count; + std::map<const char*, size_t> size_map; + std::map<const char*, size_t> index_map; + + for (unsigned int i = 0; i < length; i++) { + const char *key = metadata_elements[i].key; + if (size_map.count(key)) { + size_map[key] += 1; + } + index_map[key] = 0; + } + Handle<Object> metadata_object = NanNew<Object>(); + for (unsigned int i = 0; i < length; i++) { + grpc_metadata* elem = &metadata_elements[i]; + Handle<String> key_string = String::New(elem->key); + Handle<Array> array; + if (metadata_object->Has(key_string)) { + array = Handle<Array>::Cast(metadata_object->Get(key_string)); + } else { + array = NanNew<Array>(size_map[elem->key]); + metadata_object->Set(key_string, array); + } + array->Set(index_map[elem->key], + MakeFastBuffer( + NanNewBufferHandle(elem->value, elem->value_length))); + index_map[elem->key] += 1; + } + return NanEscapeScope(metadata_object); +} + +Handle<Value> Op::GetOpType() const { + NanEscapableScope(); + return NanEscapeScope(NanNew<String>(GetTypeString())); +} + +class SendMetadataOp : public Op { + public: + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(NanTrue()); + } + bool ParseOp(Handle<Value> value, grpc_op *out, + shared_ptr<Resources> resources) { + if (!value->IsObject()) { + return false; + } + grpc_metadata_array array; + if (!CreateMetadataArray(value->ToObject(), &array, resources)) { + return false; + } + out->data.send_initial_metadata.count = array.count; + out->data.send_initial_metadata.metadata = array.metadata; + return true; + } + protected: + std::string GetTypeString() const { + return "send metadata"; + } +}; + +class SendMessageOp : public Op { + public: + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(NanTrue()); + } + bool ParseOp(Handle<Value> value, grpc_op *out, + shared_ptr<Resources> resources) { + if (!Buffer::HasInstance(value)) { + return false; + } + out->data.send_message = BufferToByteBuffer(value); + Persistent<Value> handle; + NanAssignPersistent(handle, value); + resources->handles.push_back(unique_ptr<PersistentHolder>( + new PersistentHolder(handle))); + return true; + } + protected: + std::string GetTypeString() const { + return "send message"; + } +}; + +class SendClientCloseOp : public Op { + public: + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(NanTrue()); + } + bool ParseOp(Handle<Value> value, grpc_op *out, + shared_ptr<Resources> resources) { + return true; + } + protected: + std::string GetTypeString() const { + return "client close"; + } +}; + +class SendServerStatusOp : public Op { + public: + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(NanTrue()); + } + bool ParseOp(Handle<Value> value, grpc_op *out, + shared_ptr<Resources> resources) { + if (!value->IsObject()) { + return false; + } + Handle<Object> server_status = value->ToObject(); + if (!server_status->Get(NanNew("metadata"))->IsObject()) { + return false; + } + if (!server_status->Get(NanNew("code"))->IsUint32()) { + return false; + } + if (!server_status->Get(NanNew("details"))->IsString()) { + return false; + } + grpc_metadata_array array; + if (!CreateMetadataArray(server_status->Get(NanNew("metadata"))-> + ToObject(), + &array, resources)) { + return false; + } + out->data.send_status_from_server.trailing_metadata_count = array.count; + out->data.send_status_from_server.trailing_metadata = array.metadata; + out->data.send_status_from_server.status = + static_cast<grpc_status_code>( + server_status->Get(NanNew("code"))->Uint32Value()); + NanUtf8String *str = new NanUtf8String( + server_status->Get(NanNew("details"))); + resources->strings.push_back(unique_ptr<NanUtf8String>(str)); + out->data.send_status_from_server.status_details = **str; + return true; + } + protected: + std::string GetTypeString() const { + return "send status"; + } +}; + +class GetMetadataOp : public Op { + public: + GetMetadataOp() { + grpc_metadata_array_init(&recv_metadata); + } + + ~GetMetadataOp() { + grpc_metadata_array_destroy(&recv_metadata); + } + + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(ParseMetadata(&recv_metadata)); + } + + bool ParseOp(Handle<Value> value, grpc_op *out, + shared_ptr<Resources> resources) { + out->data.recv_initial_metadata = &recv_metadata; + return true; + } + + protected: + std::string GetTypeString() const { + return "metadata"; + } + + private: + grpc_metadata_array recv_metadata; +}; + +class ReadMessageOp : public Op { + public: + ReadMessageOp() { + recv_message = NULL; + } + ~ReadMessageOp() { + if (recv_message != NULL) { + gpr_free(recv_message); + } + } + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(ByteBufferToBuffer(recv_message)); + } + + bool ParseOp(Handle<Value> value, grpc_op *out, + shared_ptr<Resources> resources) { + out->data.recv_message = &recv_message; + return true; + } + + protected: + std::string GetTypeString() const { + return "read"; + } + + private: + grpc_byte_buffer *recv_message; +}; + +class ClientStatusOp : public Op { + public: + ClientStatusOp() { + grpc_metadata_array_init(&metadata_array); + status_details = NULL; + details_capacity = 0; + } + + ~ClientStatusOp() { + grpc_metadata_array_destroy(&metadata_array); + gpr_free(status_details); + } + + bool ParseOp(Handle<Value> value, grpc_op *out, + shared_ptr<Resources> resources) { + out->data.recv_status_on_client.trailing_metadata = &metadata_array; + out->data.recv_status_on_client.status = &status; + out->data.recv_status_on_client.status_details = &status_details; + out->data.recv_status_on_client.status_details_capacity = &details_capacity; + return true; + } + + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + Handle<Object> status_obj = NanNew<Object>(); + status_obj->Set(NanNew("code"), NanNew<Number>(status)); + if (status_details != NULL) { + status_obj->Set(NanNew("details"), String::New(status_details)); + } + status_obj->Set(NanNew("metadata"), ParseMetadata(&metadata_array)); + return NanEscapeScope(status_obj); + } + protected: + std::string GetTypeString() const { + return "status"; + } + private: + grpc_metadata_array metadata_array; + grpc_status_code status; + char *status_details; + size_t details_capacity; +}; + +class ServerCloseResponseOp : public Op { + public: + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(NanNew<Boolean>(cancelled)); + } + + bool ParseOp(Handle<Value> value, grpc_op *out, + shared_ptr<Resources> resources) { + out->data.recv_close_on_server.cancelled = &cancelled; + return true; + } + + protected: + std::string GetTypeString() const { + return "cancelled"; + } + + private: + int cancelled; +}; + +tag::tag(NanCallback *callback, OpVec *ops, + shared_ptr<Resources> resources) : + callback(callback), ops(ops), resources(resources){ +} + +tag::~tag() { + delete callback; + delete ops; +} + +Handle<Value> GetTagNodeValue(void *tag) { + NanEscapableScope(); + struct tag *tag_struct = reinterpret_cast<struct tag *>(tag); + Handle<Object> tag_obj = NanNew<Object>(); + for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin(); + it != tag_struct->ops->end(); ++it) { + Op *op_ptr = it->get(); + tag_obj->Set(op_ptr->GetOpType(), op_ptr->GetNodeValue()); + } + return NanEscapeScope(tag_obj); +} + +NanCallback *GetTagCallback(void *tag) { + struct tag *tag_struct = reinterpret_cast<struct tag *>(tag); + return tag_struct->callback; +} + +void DestroyTag(void *tag) { + struct tag *tag_struct = reinterpret_cast<struct tag *>(tag); + delete tag_struct; +} + +Call::Call(grpc_call *call) : wrapped_call(call) { +} + +Call::~Call() { + grpc_call_destroy(wrapped_call); +} void Call::Init(Handle<Object> exports) { NanScope(); Local<FunctionTemplate> tpl = FunctionTemplate::New(New); tpl->SetClassName(NanNew("Call")); tpl->InstanceTemplate()->SetInternalFieldCount(1); - NanSetPrototypeTemplate(tpl, "addMetadata", - FunctionTemplate::New(AddMetadata)->GetFunction()); - NanSetPrototypeTemplate(tpl, "invoke", - FunctionTemplate::New(Invoke)->GetFunction()); - NanSetPrototypeTemplate(tpl, "serverAccept", - FunctionTemplate::New(ServerAccept)->GetFunction()); - NanSetPrototypeTemplate( - tpl, "serverEndInitialMetadata", - FunctionTemplate::New(ServerEndInitialMetadata)->GetFunction()); + NanSetPrototypeTemplate(tpl, "startBatch", + FunctionTemplate::New(StartBatch)->GetFunction()); NanSetPrototypeTemplate(tpl, "cancel", FunctionTemplate::New(Cancel)->GetFunction()); - NanSetPrototypeTemplate(tpl, "startWrite", - FunctionTemplate::New(StartWrite)->GetFunction()); - NanSetPrototypeTemplate( - tpl, "startWriteStatus", - FunctionTemplate::New(StartWriteStatus)->GetFunction()); - NanSetPrototypeTemplate(tpl, "writesDone", - FunctionTemplate::New(WritesDone)->GetFunction()); - NanSetPrototypeTemplate(tpl, "startReadMetadata", - FunctionTemplate::New(WritesDone)->GetFunction()); - NanSetPrototypeTemplate(tpl, "startRead", - FunctionTemplate::New(StartRead)->GetFunction()); NanAssignPersistent(fun_tpl, tpl); NanAssignPersistent(constructor, tpl->GetFunction()); constructor->Set(NanNew("WRITE_BUFFER_HINT"), @@ -152,9 +496,9 @@ NAN_METHOD(Call::New) { NanUtf8String method(args[1]); double deadline = args[2]->NumberValue(); grpc_channel *wrapped_channel = channel->GetWrappedChannel(); - grpc_call *wrapped_call = grpc_channel_create_call_old( - wrapped_channel, *method, channel->GetHost(), - MillisecondsToTimespec(deadline)); + grpc_call *wrapped_call = grpc_channel_create_call( + wrapped_channel, CompletionQueueAsyncWorker::GetQueue(), *method, + channel->GetHost(), MillisecondsToTimespec(deadline)); call = new Call(wrapped_call); args.This()->SetHiddenValue(String::NewSymbol("channel_"), channel_object); @@ -168,119 +512,74 @@ NAN_METHOD(Call::New) { } } -NAN_METHOD(Call::AddMetadata) { +NAN_METHOD(Call::StartBatch) { NanScope(); if (!HasInstance(args.This())) { - return NanThrowTypeError("addMetadata can only be called on Call objects"); + return NanThrowTypeError("startBatch can only be called on Call objects"); } - Call *call = ObjectWrap::Unwrap<Call>(args.This()); if (!args[0]->IsObject()) { - return NanThrowTypeError("addMetadata's first argument must be an object"); - } - Handle<Object> metadata = args[0]->ToObject(); - Handle<Array> keys(metadata->GetOwnPropertyNames()); - for (unsigned int i = 0; i < keys->Length(); i++) { - Handle<String> current_key(keys->Get(i)->ToString()); - if (!metadata->Get(current_key)->IsArray()) { - return NanThrowTypeError( - "addMetadata's first argument's values must be arrays"); - } - NanUtf8String utf8_key(current_key); - Handle<Array> values = Local<Array>::Cast(metadata->Get(current_key)); - for (unsigned int j = 0; j < values->Length(); j++) { - Handle<Value> value = values->Get(j); - grpc_metadata metadata; - grpc_call_error error; - metadata.key = *utf8_key; - if (Buffer::HasInstance(value)) { - metadata.value = Buffer::Data(value); - metadata.value_length = Buffer::Length(value); - error = grpc_call_add_metadata_old(call->wrapped_call, &metadata, 0); - } else if (value->IsString()) { - Handle<String> string_value = value->ToString(); - NanUtf8String utf8_value(string_value); - metadata.value = *utf8_value; - metadata.value_length = string_value->Length(); - gpr_log(GPR_DEBUG, "adding metadata: %s, %s, %d", metadata.key, - metadata.value, metadata.value_length); - error = grpc_call_add_metadata_old(call->wrapped_call, &metadata, 0); - } else { - return NanThrowTypeError( - "addMetadata values must be strings or buffers"); - } - if (error != GRPC_CALL_OK) { - return NanThrowError("addMetadata failed", error); - } - } - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::Invoke) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("invoke can only be called on Call objects"); - } - if (!args[0]->IsFunction()) { - return NanThrowTypeError("invoke's first argument must be a function"); + return NanThrowError("startBatch's first argument must be an object"); } if (!args[1]->IsFunction()) { - return NanThrowTypeError("invoke's second argument must be a function"); - } - if (!args[2]->IsUint32()) { - return NanThrowTypeError("invoke's third argument must be integer flags"); - } - Call *call = ObjectWrap::Unwrap<Call>(args.This()); - unsigned int flags = args[3]->Uint32Value(); - grpc_call_error error = grpc_call_invoke_old( - call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(), - CreateTag(args[0], args.This()), CreateTag(args[1], args.This()), flags); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("invoke failed", error); - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::ServerAccept) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("accept can only be called on Call objects"); - } - if (!args[0]->IsFunction()) { - return NanThrowTypeError("accept's first argument must be a function"); + return NanThrowError("startBatch's second argument must be a callback"); } + Handle<Function> callback_func = args[1].As<Function>(); Call *call = ObjectWrap::Unwrap<Call>(args.This()); - grpc_call_error error = grpc_call_server_accept_old( - call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(), - CreateTag(args[0], args.This())); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("serverAccept failed", error); - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::ServerEndInitialMetadata) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError( - "serverEndInitialMetadata can only be called on Call objects"); - } - if (!args[0]->IsUint32()) { - return NanThrowTypeError( - "serverEndInitialMetadata's second argument must be integer flags"); + shared_ptr<Resources> resources(new Resources); + Handle<Object> obj = args[0]->ToObject(); + Handle<Array> keys = obj->GetOwnPropertyNames(); + size_t nops = keys->Length(); + vector<grpc_op> ops(nops); + unique_ptr<OpVec> op_vector(new OpVec()); + for (unsigned int i = 0; i < nops; i++) { + unique_ptr<Op> op; + if (!keys->Get(i)->IsUint32()) { + return NanThrowError( + "startBatch's first argument's keys must be integers"); + } + uint32_t type = keys->Get(i)->Uint32Value(); + ops[i].op = static_cast<grpc_op_type>(type); + switch (type) { + case GRPC_OP_SEND_INITIAL_METADATA: + op.reset(new SendMetadataOp()); + break; + case GRPC_OP_SEND_MESSAGE: + op.reset(new SendMessageOp()); + break; + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + op.reset(new SendClientCloseOp()); + break; + case GRPC_OP_SEND_STATUS_FROM_SERVER: + op.reset(new SendServerStatusOp()); + break; + case GRPC_OP_RECV_INITIAL_METADATA: + op.reset(new GetMetadataOp()); + break; + case GRPC_OP_RECV_MESSAGE: + op.reset(new ReadMessageOp()); + break; + case GRPC_OP_RECV_STATUS_ON_CLIENT: + op.reset(new ClientStatusOp()); + break; + case GRPC_OP_RECV_CLOSE_ON_SERVER: + op.reset(new ServerCloseResponseOp()); + break; + default: + return NanThrowError("Argument object had an unrecognized key"); + } + if (!op->ParseOp(obj->Get(type), &ops[i], resources)) { + return NanThrowTypeError("Incorrectly typed arguments to startBatch"); + } + op_vector->push_back(std::move(op)); } - Call *call = ObjectWrap::Unwrap<Call>(args.This()); - unsigned int flags = args[1]->Uint32Value(); - grpc_call_error error = - grpc_call_server_end_initial_metadata_old(call->wrapped_call, flags); + NanCallback *callback = new NanCallback(callback_func); + grpc_call_error error = grpc_call_start_batch( + call->wrapped_call, &ops[0], nops, new struct tag( + callback, op_vector.release(), resources)); if (error != GRPC_CALL_OK) { - return NanThrowError("serverEndInitialMetadata failed", error); + return NanThrowError("startBatch failed", error); } + CompletionQueueAsyncWorker::Next(); NanReturnUndefined(); } @@ -297,102 +596,5 @@ NAN_METHOD(Call::Cancel) { NanReturnUndefined(); } -NAN_METHOD(Call::StartWrite) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("startWrite can only be called on Call objects"); - } - if (!Buffer::HasInstance(args[0])) { - return NanThrowTypeError("startWrite's first argument must be a Buffer"); - } - if (!args[1]->IsFunction()) { - return NanThrowTypeError("startWrite's second argument must be a function"); - } - if (!args[2]->IsUint32()) { - return NanThrowTypeError( - "startWrite's third argument must be integer flags"); - } - Call *call = ObjectWrap::Unwrap<Call>(args.This()); - grpc_byte_buffer *buffer = BufferToByteBuffer(args[0]); - unsigned int flags = args[2]->Uint32Value(); - grpc_call_error error = grpc_call_start_write_old( - call->wrapped_call, buffer, CreateTag(args[1], args.This()), flags); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("startWrite failed", error); - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::StartWriteStatus) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError( - "startWriteStatus can only be called on Call objects"); - } - if (!args[0]->IsUint32()) { - return NanThrowTypeError( - "startWriteStatus's first argument must be a status code"); - } - if (!args[1]->IsString()) { - return NanThrowTypeError( - "startWriteStatus's second argument must be a string"); - } - if (!args[2]->IsFunction()) { - return NanThrowTypeError( - "startWriteStatus's third argument must be a function"); - } - Call *call = ObjectWrap::Unwrap<Call>(args.This()); - NanUtf8String details(args[1]); - grpc_call_error error = grpc_call_start_write_status_old( - call->wrapped_call, (grpc_status_code)args[0]->Uint32Value(), *details, - CreateTag(args[2], args.This())); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("startWriteStatus failed", error); - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::WritesDone) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("writesDone can only be called on Call objects"); - } - if (!args[0]->IsFunction()) { - return NanThrowTypeError("writesDone's first argument must be a function"); - } - Call *call = ObjectWrap::Unwrap<Call>(args.This()); - grpc_call_error error = grpc_call_writes_done_old( - call->wrapped_call, CreateTag(args[0], args.This())); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("writesDone failed", error); - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::StartRead) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("startRead can only be called on Call objects"); - } - if (!args[0]->IsFunction()) { - return NanThrowTypeError("startRead's first argument must be a function"); - } - Call *call = ObjectWrap::Unwrap<Call>(args.This()); - grpc_call_error error = grpc_call_start_read_old( - call->wrapped_call, CreateTag(args[0], args.This())); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("startRead failed", error); - } - NanReturnUndefined(); -} - } // namespace node } // namespace grpc diff --git a/src/node/ext/call.h b/src/node/ext/call.h index 1924a1bf42..dbdb8e2ff6 100644 --- a/src/node/ext/call.h +++ b/src/node/ext/call.h @@ -34,15 +34,71 @@ #ifndef NET_GRPC_NODE_CALL_H_ #define NET_GRPC_NODE_CALL_H_ +#include <memory> +#include <vector> + #include <node.h> #include <nan.h> #include "grpc/grpc.h" #include "channel.h" + namespace grpc { namespace node { +using std::unique_ptr; +using std::shared_ptr; + +v8::Handle<v8::Value> ParseMetadata(const grpc_metadata_array *metadata_array); + +class PersistentHolder { + public: + explicit PersistentHolder(v8::Persistent<v8::Value> persist) : + persist(persist) { + } + + ~PersistentHolder() { + NanDisposePersistent(persist); + } + + private: + v8::Persistent<v8::Value> persist; +}; + +struct Resources { + std::vector<unique_ptr<NanUtf8String> > strings; + std::vector<unique_ptr<PersistentHolder> > handles; +}; + +class Op { + public: + virtual v8::Handle<v8::Value> GetNodeValue() const = 0; + virtual bool ParseOp(v8::Handle<v8::Value> value, grpc_op *out, + shared_ptr<Resources> resources) = 0; + v8::Handle<v8::Value> GetOpType() const; + + protected: + virtual std::string GetTypeString() const = 0; +}; + +typedef std::vector<unique_ptr<Op>> OpVec; + +struct tag { + tag(NanCallback *callback, OpVec *ops, + shared_ptr<Resources> resources); + ~tag(); + NanCallback *callback; + OpVec *ops; + shared_ptr<Resources> resources; +}; + +v8::Handle<v8::Value> GetTagNodeValue(void *tag); + +NanCallback *GetTagCallback(void *tag); + +void DestroyTag(void *tag); + /* Wrapper class for grpc_call structs. */ class Call : public ::node::ObjectWrap { public: @@ -60,15 +116,8 @@ class Call : public ::node::ObjectWrap { Call &operator=(const Call &); static NAN_METHOD(New); - static NAN_METHOD(AddMetadata); - static NAN_METHOD(Invoke); - static NAN_METHOD(ServerAccept); - static NAN_METHOD(ServerEndInitialMetadata); + static NAN_METHOD(StartBatch); static NAN_METHOD(Cancel); - static NAN_METHOD(StartWrite); - static NAN_METHOD(StartWriteStatus); - static NAN_METHOD(WritesDone); - static NAN_METHOD(StartRead); static v8::Persistent<v8::Function> constructor; // Used for typechecking instances of this javascript class static v8::Persistent<v8::FunctionTemplate> fun_tpl; diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc index 8de7db66d5..a1f390f64b 100644 --- a/src/node/ext/completion_queue_async_worker.cc +++ b/src/node/ext/completion_queue_async_worker.cc @@ -35,10 +35,10 @@ #include <nan.h> #include "grpc/grpc.h" +#include "grpc/support/log.h" #include "grpc/support/time.h" #include "completion_queue_async_worker.h" -#include "event.h" -#include "tag.h" +#include "call.h" namespace grpc { namespace node { @@ -58,6 +58,9 @@ CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {} void CompletionQueueAsyncWorker::Execute() { result = grpc_completion_queue_next(queue, gpr_inf_future); + if (result->data.op_complete != GRPC_OP_OK) { + SetErrorMessage("The batch encountered an error"); + } } grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; } @@ -75,14 +78,26 @@ void CompletionQueueAsyncWorker::Init(Handle<Object> exports) { void CompletionQueueAsyncWorker::HandleOKCallback() { NanScope(); - NanCallback event_callback(GetTagHandle(result->tag).As<Function>()); - Handle<Value> argv[] = {CreateEventObject(result)}; + NanCallback *callback = GetTagCallback(result->tag); + Handle<Value> argv[] = {NanNull(), GetTagNodeValue(result->tag)}; + + callback->Call(2, argv); DestroyTag(result->tag); grpc_event_finish(result); result = NULL; +} + +void CompletionQueueAsyncWorker::HandleErrorCallback() { + NanScope(); + NanCallback *callback = GetTagCallback(result->tag); + Handle<Value> argv[] = {NanError(ErrorMessage())}; - event_callback.Call(1, argv); + callback->Call(1, argv); + + DestroyTag(result->tag); + grpc_event_finish(result); + result = NULL; } } // namespace node diff --git a/src/node/ext/completion_queue_async_worker.h b/src/node/ext/completion_queue_async_worker.h index 2c928b7024..c04a303283 100644 --- a/src/node/ext/completion_queue_async_worker.h +++ b/src/node/ext/completion_queue_async_worker.h @@ -67,6 +67,8 @@ class CompletionQueueAsyncWorker : public NanAsyncWorker { completion_queue_next */ void HandleOKCallback(); + void HandleErrorCallback(); + private: grpc_event *result; diff --git a/src/node/ext/credentials.cc b/src/node/ext/credentials.cc index c8859ed941..b79c3e3019 100644 --- a/src/node/ext/credentials.cc +++ b/src/node/ext/credentials.cc @@ -63,7 +63,6 @@ Credentials::Credentials(grpc_credentials *credentials) : wrapped_credentials(credentials) {} Credentials::~Credentials() { - gpr_log(GPR_DEBUG, "Destroying credentials object"); grpc_credentials_release(wrapped_credentials); } diff --git a/src/node/ext/event.cc b/src/node/ext/event.cc deleted file mode 100644 index d59b68fb40..0000000000 --- a/src/node/ext/event.cc +++ /dev/null @@ -1,173 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <map> - -#include <node.h> -#include <nan.h> -#include "grpc/grpc.h" -#include "byte_buffer.h" -#include "call.h" -#include "event.h" -#include "tag.h" -#include "timeval.h" - -namespace grpc { -namespace node { - -using ::node::Buffer; -using v8::Array; -using v8::Date; -using v8::Handle; -using v8::HandleScope; -using v8::Number; -using v8::Object; -using v8::Persistent; -using v8::String; -using v8::Value; - -Handle<Value> ParseMetadata(grpc_metadata *metadata_elements, size_t length) { - NanEscapableScope(); - std::map<const char*, size_t> size_map; - std::map<const char*, size_t> index_map; - - for (unsigned int i = 0; i < length; i++) { - const char *key = metadata_elements[i].key; - if (size_map.count(key)) { - size_map[key] += 1; - } - index_map[key] = 0; - } - Handle<Object> metadata_object = NanNew<Object>(); - for (unsigned int i = 0; i < length; i++) { - grpc_metadata* elem = &metadata_elements[i]; - Handle<String> key_string = String::New(elem->key); - Handle<Array> array; - if (metadata_object->Has(key_string)) { - array = Handle<Array>::Cast(metadata_object->Get(key_string)); - } else { - array = NanNew<Array>(size_map[elem->key]); - metadata_object->Set(key_string, array); - } - array->Set(index_map[elem->key], - MakeFastBuffer( - NanNewBufferHandle(elem->value, elem->value_length))); - index_map[elem->key] += 1; - } - return NanEscapeScope(metadata_object); -} - -Handle<Value> GetEventData(grpc_event *event) { - NanEscapableScope(); - size_t count; - grpc_metadata *items; - Handle<Array> metadata; - Handle<Object> status; - Handle<Object> rpc_new; - switch (event->type) { - case GRPC_READ: - return NanEscapeScope(ByteBufferToBuffer(event->data.read)); - case GRPC_WRITE_ACCEPTED: - return NanEscapeScope(NanNew<Number>(event->data.write_accepted)); - case GRPC_FINISH_ACCEPTED: - return NanEscapeScope(NanNew<Number>(event->data.finish_accepted)); - case GRPC_CLIENT_METADATA_READ: - count = event->data.client_metadata_read.count; - items = event->data.client_metadata_read.elements; - return NanEscapeScope(ParseMetadata(items, count)); - case GRPC_FINISHED: - status = NanNew<Object>(); - status->Set(NanNew("code"), NanNew<Number>(event->data.finished.status)); - if (event->data.finished.details != NULL) { - status->Set(NanNew("details"), - String::New(event->data.finished.details)); - } - count = event->data.finished.metadata_count; - items = event->data.finished.metadata_elements; - status->Set(NanNew("metadata"), ParseMetadata(items, count)); - return NanEscapeScope(status); - case GRPC_SERVER_RPC_NEW: - rpc_new = NanNew<Object>(); - if (event->data.server_rpc_new.method == NULL) { - return NanEscapeScope(NanNull()); - } - rpc_new->Set( - NanNew("method"), - NanNew(event->data.server_rpc_new.method)); - rpc_new->Set( - NanNew("host"), - NanNew(event->data.server_rpc_new.host)); - rpc_new->Set(NanNew("absolute_deadline"), - NanNew<Date>(TimespecToMilliseconds( - event->data.server_rpc_new.deadline))); - count = event->data.server_rpc_new.metadata_count; - items = event->data.server_rpc_new.metadata_elements; - metadata = NanNew<Array>(static_cast<int>(count)); - for (unsigned int i = 0; i < count; i++) { - Handle<Object> item_obj = Object::New(); - item_obj->Set(NanNew("key"), - NanNew(items[i].key)); - item_obj->Set( - NanNew("value"), - NanNew(items[i].value, static_cast<int>(items[i].value_length))); - metadata->Set(i, item_obj); - } - rpc_new->Set(NanNew("metadata"), ParseMetadata(items, count)); - return NanEscapeScope(rpc_new); - default: - return NanEscapeScope(NanNull()); - } -} - -Handle<Value> CreateEventObject(grpc_event *event) { - NanEscapableScope(); - if (event == NULL) { - return NanEscapeScope(NanNull()); - } - Handle<Object> event_obj = NanNew<Object>(); - Handle<Value> call; - if (TagHasCall(event->tag)) { - call = TagGetCall(event->tag); - } else { - call = Call::WrapStruct(event->call); - } - event_obj->Set(NanNew<String, const char *>("call"), call); - event_obj->Set(NanNew<String, const char *>("type"), - NanNew<Number>(event->type)); - event_obj->Set(NanNew<String, const char *>("data"), GetEventData(event)); - - return NanEscapeScope(event_obj); -} - -} // namespace node -} // namespace grpc diff --git a/src/node/ext/event.h b/src/node/ext/event.h deleted file mode 100644 index e06d8f0168..0000000000 --- a/src/node/ext/event.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef NET_GRPC_NODE_EVENT_H_ -#define NET_GRPC_NODE_EVENT_H_ - -#include <node.h> -#include "grpc/grpc.h" - -namespace grpc { -namespace node { - -v8::Handle<v8::Value> CreateEventObject(grpc_event *event); - -} // namespace node -} // namespace grpc - -#endif // NET_GRPC_NODE_EVENT_H_ diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc index bc1dfaf899..9b0fe82976 100644 --- a/src/node/ext/node_grpc.cc +++ b/src/node/ext/node_grpc.cc @@ -130,35 +130,34 @@ void InitCallErrorConstants(Handle<Object> exports) { call_error->Set(NanNew("INVALID_FLAGS"), INVALID_FLAGS); } -void InitOpErrorConstants(Handle<Object> exports) { +void InitOpTypeConstants(Handle<Object> exports) { NanScope(); - Handle<Object> op_error = Object::New(); - exports->Set(NanNew("opError"), op_error); - Handle<Value> OK(NanNew<Uint32, uint32_t>(GRPC_OP_OK)); - op_error->Set(NanNew("OK"), OK); - Handle<Value> ERROR(NanNew<Uint32, uint32_t>(GRPC_OP_ERROR)); - op_error->Set(NanNew("ERROR"), ERROR); -} - -void InitCompletionTypeConstants(Handle<Object> exports) { - NanScope(); - Handle<Object> completion_type = Object::New(); - exports->Set(NanNew("completionType"), completion_type); - Handle<Value> QUEUE_SHUTDOWN(NanNew<Uint32, uint32_t>(GRPC_QUEUE_SHUTDOWN)); - completion_type->Set(NanNew("QUEUE_SHUTDOWN"), QUEUE_SHUTDOWN); - Handle<Value> READ(NanNew<Uint32, uint32_t>(GRPC_READ)); - completion_type->Set(NanNew("READ"), READ); - Handle<Value> WRITE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_WRITE_ACCEPTED)); - completion_type->Set(NanNew("WRITE_ACCEPTED"), WRITE_ACCEPTED); - Handle<Value> FINISH_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_FINISH_ACCEPTED)); - completion_type->Set(NanNew("FINISH_ACCEPTED"), FINISH_ACCEPTED); - Handle<Value> CLIENT_METADATA_READ( - NanNew<Uint32, uint32_t>(GRPC_CLIENT_METADATA_READ)); - completion_type->Set(NanNew("CLIENT_METADATA_READ"), CLIENT_METADATA_READ); - Handle<Value> FINISHED(NanNew<Uint32, uint32_t>(GRPC_FINISHED)); - completion_type->Set(NanNew("FINISHED"), FINISHED); - Handle<Value> SERVER_RPC_NEW(NanNew<Uint32, uint32_t>(GRPC_SERVER_RPC_NEW)); - completion_type->Set(NanNew("SERVER_RPC_NEW"), SERVER_RPC_NEW); + Handle<Object> op_type = Object::New(); + exports->Set(NanNew("opType"), op_type); + Handle<Value> SEND_INITIAL_METADATA( + NanNew<Uint32, uint32_t>(GRPC_OP_SEND_INITIAL_METADATA)); + op_type->Set(NanNew("SEND_INITIAL_METADATA"), SEND_INITIAL_METADATA); + Handle<Value> SEND_MESSAGE( + NanNew<Uint32, uint32_t>(GRPC_OP_SEND_MESSAGE)); + op_type->Set(NanNew("SEND_MESSAGE"), SEND_MESSAGE); + Handle<Value> SEND_CLOSE_FROM_CLIENT( + NanNew<Uint32, uint32_t>(GRPC_OP_SEND_CLOSE_FROM_CLIENT)); + op_type->Set(NanNew("SEND_CLOSE_FROM_CLIENT"), SEND_CLOSE_FROM_CLIENT); + Handle<Value> SEND_STATUS_FROM_SERVER( + NanNew<Uint32, uint32_t>(GRPC_OP_SEND_STATUS_FROM_SERVER)); + op_type->Set(NanNew("SEND_STATUS_FROM_SERVER"), SEND_STATUS_FROM_SERVER); + Handle<Value> RECV_INITIAL_METADATA( + NanNew<Uint32, uint32_t>(GRPC_OP_RECV_INITIAL_METADATA)); + op_type->Set(NanNew("RECV_INITIAL_METADATA"), RECV_INITIAL_METADATA); + Handle<Value> RECV_MESSAGE( + NanNew<Uint32, uint32_t>(GRPC_OP_RECV_MESSAGE)); + op_type->Set(NanNew("RECV_MESSAGE"), RECV_MESSAGE); + Handle<Value> RECV_STATUS_ON_CLIENT( + NanNew<Uint32, uint32_t>(GRPC_OP_RECV_STATUS_ON_CLIENT)); + op_type->Set(NanNew("RECV_STATUS_ON_CLIENT"), RECV_STATUS_ON_CLIENT); + Handle<Value> RECV_CLOSE_ON_SERVER( + NanNew<Uint32, uint32_t>(GRPC_OP_RECV_CLOSE_ON_SERVER)); + op_type->Set(NanNew("RECV_CLOSE_ON_SERVER"), RECV_CLOSE_ON_SERVER); } void init(Handle<Object> exports) { @@ -166,8 +165,7 @@ void init(Handle<Object> exports) { grpc_init(); InitStatusConstants(exports); InitCallErrorConstants(exports); - InitOpErrorConstants(exports); - InitCompletionTypeConstants(exports); + InitOpTypeConstants(exports); grpc::node::Call::Init(exports); grpc::node::Channel::Init(exports); diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc index 6b8ccef9b1..ee3e1087ce 100644 --- a/src/node/ext/server.cc +++ b/src/node/ext/server.cc @@ -31,6 +31,8 @@ * */ +#include <memory> + #include "server.h" #include <node.h> @@ -41,17 +43,20 @@ #include <vector> #include "grpc/grpc.h" #include "grpc/grpc_security.h" +#include "grpc/support/log.h" #include "call.h" #include "completion_queue_async_worker.h" -#include "tag.h" #include "server_credentials.h" +#include "timeval.h" namespace grpc { namespace node { +using std::unique_ptr; using v8::Arguments; using v8::Array; using v8::Boolean; +using v8::Date; using v8::Exception; using v8::Function; using v8::FunctionTemplate; @@ -67,6 +72,49 @@ using v8::Value; Persistent<Function> Server::constructor; Persistent<FunctionTemplate> Server::fun_tpl; +class NewCallOp : public Op { + public: + NewCallOp() { + call = NULL; + grpc_call_details_init(&details); + grpc_metadata_array_init(&request_metadata); + } + + ~NewCallOp() { + grpc_call_details_destroy(&details); + grpc_metadata_array_destroy(&request_metadata); + } + + Handle<Value> GetNodeValue() const { + NanEscapableScope(); + if (call == NULL) { + return NanEscapeScope(NanNull()); + } + Handle<Object> obj = NanNew<Object>(); + obj->Set(NanNew("call"), Call::WrapStruct(call)); + obj->Set(NanNew("method"), NanNew(details.method)); + obj->Set(NanNew("host"), NanNew(details.host)); + obj->Set(NanNew("deadline"), + NanNew<Date>(TimespecToMilliseconds(details.deadline))); + obj->Set(NanNew("metadata"), ParseMetadata(&request_metadata)); + return NanEscapeScope(obj); + } + + bool ParseOp(Handle<Value> value, grpc_op *out, + shared_ptr<Resources> resources) { + return true; + } + + grpc_call *call; + grpc_call_details details; + grpc_metadata_array request_metadata; + + protected: + std::string GetTypeString() const { + return "new call"; + } +}; + Server::Server(grpc_server *server) : wrapped_server(server) {} Server::~Server() { grpc_server_destroy(wrapped_server); } @@ -175,13 +223,18 @@ NAN_METHOD(Server::RequestCall) { return NanThrowTypeError("requestCall can only be called on a Server"); } Server *server = ObjectWrap::Unwrap<Server>(args.This()); - grpc_call_error error = grpc_server_request_call_old( - server->wrapped_server, CreateTag(args[0], NanNull())); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { + NewCallOp *op = new NewCallOp(); + unique_ptr<OpVec> ops(new OpVec()); + ops->push_back(unique_ptr<Op>(op)); + grpc_call_error error = grpc_server_request_call( + server->wrapped_server, &op->call, &op->details, &op->request_metadata, + CompletionQueueAsyncWorker::GetQueue(), + new struct tag(new NanCallback(args[0].As<Function>()), ops.release(), + shared_ptr<Resources>(nullptr))); + if (error != GRPC_CALL_OK) { return NanThrowError("requestCall failed", error); } + CompletionQueueAsyncWorker::Next(); NanReturnUndefined(); } diff --git a/src/node/ext/server_credentials.cc b/src/node/ext/server_credentials.cc index 393f3a6305..3add43c48c 100644 --- a/src/node/ext/server_credentials.cc +++ b/src/node/ext/server_credentials.cc @@ -63,7 +63,6 @@ ServerCredentials::ServerCredentials(grpc_server_credentials *credentials) : wrapped_credentials(credentials) {} ServerCredentials::~ServerCredentials() { - gpr_log(GPR_DEBUG, "Destroying server credentials object"); grpc_server_credentials_release(wrapped_credentials); } diff --git a/src/node/ext/tag.cc b/src/node/ext/tag.cc deleted file mode 100644 index dc8e523e12..0000000000 --- a/src/node/ext/tag.cc +++ /dev/null @@ -1,101 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <stdlib.h> -#include <node.h> -#include <nan.h> -#include "tag.h" - -namespace grpc { -namespace node { - -using v8::Handle; -using v8::HandleScope; -using v8::Persistent; -using v8::Value; - -struct tag { - tag(Persistent<Value> *tag, Persistent<Value> *call) - : persist_tag(tag), persist_call(call) {} - - ~tag() { - persist_tag->Dispose(); - if (persist_call != NULL) { - persist_call->Dispose(); - } - } - Persistent<Value> *persist_tag; - Persistent<Value> *persist_call; -}; - -void *CreateTag(Handle<Value> tag, Handle<Value> call) { - NanScope(); - Persistent<Value> *persist_tag = new Persistent<Value>(); - NanAssignPersistent(*persist_tag, tag); - Persistent<Value> *persist_call; - if (call->IsNull() || call->IsUndefined()) { - persist_call = NULL; - } else { - persist_call = new Persistent<Value>(); - NanAssignPersistent(*persist_call, call); - } - struct tag *tag_struct = new struct tag(persist_tag, persist_call); - return reinterpret_cast<void *>(tag_struct); -} - -Handle<Value> GetTagHandle(void *tag) { - NanEscapableScope(); - struct tag *tag_struct = reinterpret_cast<struct tag *>(tag); - Handle<Value> tag_value = NanNew<Value>(*tag_struct->persist_tag); - return NanEscapeScope(tag_value); -} - -bool TagHasCall(void *tag) { - struct tag *tag_struct = reinterpret_cast<struct tag *>(tag); - return tag_struct->persist_call != NULL; -} - -Handle<Value> TagGetCall(void *tag) { - NanEscapableScope(); - struct tag *tag_struct = reinterpret_cast<struct tag *>(tag); - if (tag_struct->persist_call == NULL) { - return NanEscapeScope(NanNull()); - } - Handle<Value> call_value = NanNew<Value>(*tag_struct->persist_call); - return NanEscapeScope(call_value); -} - -void DestroyTag(void *tag) { delete reinterpret_cast<struct tag *>(tag); } - -} // namespace node -} // namespace grpc diff --git a/src/node/ext/tag.h b/src/node/ext/tag.h deleted file mode 100644 index bdb09252d9..0000000000 --- a/src/node/ext/tag.h +++ /dev/null @@ -1,59 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef NET_GRPC_NODE_TAG_H_ -#define NET_GRPC_NODE_TAG_H_ - -#include <node.h> - -namespace grpc { -namespace node { - -/* Create a void* tag that can be passed to various grpc_call functions from - a javascript value and the javascript wrapper for the call. The call can be - null. */ -void *CreateTag(v8::Handle<v8::Value> tag, v8::Handle<v8::Value> call); -/* Return the javascript value stored in the tag */ -v8::Handle<v8::Value> GetTagHandle(void *tag); -/* Returns true if the call was set (non-null) when the tag was created */ -bool TagHasCall(void *tag); -/* Returns the javascript wrapper for the call associated with this tag */ -v8::Handle<v8::Value> TagGetCall(void *call); -/* Destroy the tag and all resources it is holding. It is illegal to call any - of these other functions on a tag after it has been destroyed. */ -void DestroyTag(void *tag); - -} // namespace node -} // namespace grpc - -#endif // NET_GRPC_NODE_TAG_H_ diff --git a/src/node/index.js b/src/node/index.js index 0627e7f557..baef4d03c6 100644 --- a/src/node/index.js +++ b/src/node/index.js @@ -35,9 +35,9 @@ var _ = require('underscore'); var ProtoBuf = require('protobufjs'); -var surface_client = require('./src/surface_client.js'); +var client = require('./src/client.js'); -var surface_server = require('./src/surface_server.js'); +var server = require('./src/server.js'); var grpc = require('bindings')('grpc'); @@ -54,7 +54,7 @@ function loadObject(value) { }); return result; } else if (value.className === 'Service') { - return surface_client.makeClientConstructor(value); + return client.makeClientConstructor(value); } else if (value.className === 'Message' || value.className === 'Enum') { return value.build(); } else { @@ -84,9 +84,9 @@ exports.loadObject = loadObject; exports.load = load; /** - * See docs for surface_server.makeServerConstructor + * See docs for server.makeServerConstructor */ -exports.buildServer = surface_server.makeServerConstructor; +exports.buildServer = server.makeServerConstructor; /** * Status name to code number mapping diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js index ce18f77fe7..8737af6cde 100644 --- a/src/node/interop/interop_client.js +++ b/src/node/interop/interop_client.js @@ -145,8 +145,8 @@ function serverStreaming(client, done) { resp_index += 1; }); call.on('status', function(status) { - assert.strictEqual(resp_index, 4); assert.strictEqual(status.code, grpc.status.OK); + assert.strictEqual(resp_index, 4); if (done) { done(); } diff --git a/src/node/interop/test.proto b/src/node/interop/test.proto index cce0889bba..c2437630b7 100644 --- a/src/node/interop/test.proto +++ b/src/node/interop/test.proto @@ -44,7 +44,7 @@ service TestService { rpc EmptyCall(grpc.testing.Empty) returns (grpc.testing.Empty); // One request followed by one response. - // The server returns the client payload as-is. + // TODO(Issue 527): Describe required server behavior. rpc UnaryCall(SimpleRequest) returns (SimpleResponse); // One request followed by a sequence of responses (streamed download). diff --git a/src/node/package.json b/src/node/package.json index 028dc20555..8f81014c1e 100644 --- a/src/node/package.json +++ b/src/node/package.json @@ -1,6 +1,6 @@ { "name": "grpc", - "version": "0.1.0", + "version": "0.2.0", "description": "gRPC Library for Node", "scripts": { "test": "./node_modules/mocha/bin/mocha" diff --git a/src/node/src/client.js b/src/node/src/client.js index 3a1c9eef84..81fa65eb26 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -1,6 +1,6 @@ /* * - * Copyright 2014, Google Inc. + * Copyright 2015, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,185 +31,452 @@ * */ +var _ = require('underscore'); + +var capitalize = require('underscore.string/capitalize'); +var decapitalize = require('underscore.string/decapitalize'); + var grpc = require('bindings')('grpc.node'); -var common = require('./common'); +var common = require('./common.js'); + +var EventEmitter = require('events').EventEmitter; -var Duplex = require('stream').Duplex; +var stream = require('stream'); + +var Readable = stream.Readable; +var Writable = stream.Writable; +var Duplex = stream.Duplex; var util = require('util'); -util.inherits(GrpcClientStream, Duplex); +util.inherits(ClientWritableStream, Writable); /** - * Class for representing a gRPC client side stream as a Node stream. Extends - * from stream.Duplex. + * A stream that the client can write to. Used for calls that are streaming from + * the client side. * @constructor - * @param {grpc.Call} call Call object to proxy - * @param {function(*):Buffer=} serialize Serialization function for requests - * @param {function(Buffer):*=} deserialize Deserialization function for - * responses + * @param {grpc.Call} call The call object to send data with + * @param {function(*):Buffer=} serialize Serialization function for writes. */ -function GrpcClientStream(call, serialize, deserialize) { - Duplex.call(this, {objectMode: true}); - if (!serialize) { - serialize = function(value) { - return value; - }; - } - if (!deserialize) { - deserialize = function(value) { - return value; - }; - } - var self = this; - var finished = false; - // Indicates that a read is currently pending - var reading = false; - // Indicates that a write is currently pending - var writing = false; - this._call = call; +function ClientWritableStream(call, serialize) { + Writable.call(this, {objectMode: true}); + this.call = call; + this.serialize = common.wrapIgnoreNull(serialize); + this.on('finish', function() { + var batch = {}; + batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; + call.startBatch(batch, function() {}); + }); +} - /** - * Serialize a request value to a buffer. Always maps null to null. Otherwise - * uses the provided serialize function - * @param {*} value The value to serialize - * @return {Buffer} The serialized value - */ - this.serialize = function(value) { - if (value === null || value === undefined) { - return null; +/** + * Attempt to write the given chunk. Calls the callback when done. This is an + * implementation of a method needed for implementing stream.Writable. + * @param {Buffer} chunk The chunk to write + * @param {string} encoding Ignored + * @param {function(Error=)} callback Called when the write is complete + */ +function _write(chunk, encoding, callback) { + var batch = {}; + batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk); + this.call.startBatch(batch, function(err, event) { + if (err) { + throw err; } - return serialize(value); - }; + callback(); + }); +}; - /** - * Deserialize a response buffer to a value. Always maps null to null. - * Otherwise uses the provided deserialize function. - * @param {Buffer} buffer The buffer to deserialize - * @return {*} The deserialized value - */ - this.deserialize = function(buffer) { - if (buffer === null) { - return null; - } - return deserialize(buffer); - }; +ClientWritableStream.prototype._write = _write; + +util.inherits(ClientReadableStream, Readable); + +/** + * A stream that the client can read from. Used for calls that are streaming + * from the server side. + * @constructor + * @param {grpc.Call} call The call object to read data with + * @param {function(Buffer):*=} deserialize Deserialization function for reads + */ +function ClientReadableStream(call, deserialize) { + Readable.call(this, {objectMode: true}); + this.call = call; + this.finished = false; + this.reading = false; + this.deserialize = common.wrapIgnoreNull(deserialize); +} + +/** + * Read the next object from the stream. + * @param {*} size Ignored because we use objectMode=true + */ +function _read(size) { + var self = this; /** * Callback to be called when a READ event is received. Pushes the data onto * the read queue and starts reading again if applicable * @param {grpc.Event} event READ event object */ - function readCallback(event) { - if (finished) { + function readCallback(err, event) { + if (err) { + throw err; + } + if (self.finished) { self.push(null); return; } - var data = event.data; + var data = event.read; if (self.push(self.deserialize(data)) && data != null) { - self._call.startRead(readCallback); + var read_batch = {}; + read_batch[grpc.opType.RECV_MESSAGE] = true; + self.call.startBatch(read_batch, readCallback); } else { - reading = false; + self.reading = false; + } + } + if (self.finished) { + self.push(null); + } else { + if (!self.reading) { + self.reading = true; + var read_batch = {}; + read_batch[grpc.opType.RECV_MESSAGE] = true; + self.call.startBatch(read_batch, readCallback); } } - call.invoke(function(event) { - self.emit('metadata', event.data); - }, function(event) { - finished = true; - self.emit('status', event.data); - }, 0); +}; + +ClientReadableStream.prototype._read = _read; + +util.inherits(ClientDuplexStream, Duplex); + +/** + * A stream that the client can read from or write to. Used for calls with + * duplex streaming. + * @constructor + * @param {grpc.Call} call Call object to proxy + * @param {function(*):Buffer=} serialize Serialization function for requests + * @param {function(Buffer):*=} deserialize Deserialization function for + * responses + */ +function ClientDuplexStream(call, serialize, deserialize) { + Duplex.call(this, {objectMode: true}); + this.serialize = common.wrapIgnoreNull(serialize); + this.deserialize = common.wrapIgnoreNull(deserialize); + var self = this; + var finished = false; + // Indicates that a read is currently pending + var reading = false; + this.call = call; this.on('finish', function() { - call.writesDone(function() {}); + var batch = {}; + batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; + call.startBatch(batch, function() {}); }); - /** - * Start reading if there is not already a pending read. Reading will - * continue until self.push returns false (indicating reads should slow - * down) or the read data is null (indicating that there is no more data). - */ - this.startReading = function() { - if (finished) { - self.push(null); - } else { - if (!reading) { - reading = true; - self._call.startRead(readCallback); - } - } - }; } +ClientDuplexStream.prototype._read = _read; +ClientDuplexStream.prototype._write = _write; + /** - * Start reading. This is an implementation of a method needed for implementing - * stream.Readable. - * @param {number} size Ignored + * Cancel the ongoing call */ -GrpcClientStream.prototype._read = function(size) { - this.startReading(); -}; +function cancel() { + this.call.cancel(); +} + +ClientReadableStream.prototype.cancel = cancel; +ClientWritableStream.prototype.cancel = cancel; +ClientDuplexStream.prototype.cancel = cancel; /** - * Attempt to write the given chunk. Calls the callback when done. This is an - * implementation of a method needed for implementing stream.Writable. - * @param {Buffer} chunk The chunk to write - * @param {string} encoding Ignored - * @param {function(Error=)} callback Ignored + * Get a function that can make unary requests to the specified method. + * @param {string} method The name of the method to request + * @param {function(*):Buffer} serialize The serialization function for inputs + * @param {function(Buffer)} deserialize The deserialization function for + * outputs + * @return {Function} makeUnaryRequest */ -GrpcClientStream.prototype._write = function(chunk, encoding, callback) { - var self = this; - self._call.startWrite(self.serialize(chunk), function(event) { - callback(); - }, 0); -}; +function makeUnaryRequestFunction(method, serialize, deserialize) { + /** + * Make a unary request with this method on the given channel with the given + * argument, callback, etc. + * @this {Client} Client object. Must have a channel member. + * @param {*} argument The argument to the call. Should be serializable with + * serialize + * @param {function(?Error, value=)} callback The callback to for when the + * response is received + * @param {array=} metadata Array of metadata key/value pairs to add to the + * call + * @param {(number|Date)=} deadline The deadline for processing this request. + * Defaults to infinite future + * @return {EventEmitter} An event emitter for stream related events + */ + function makeUnaryRequest(argument, callback, metadata, deadline) { + if (deadline === undefined) { + deadline = Infinity; + } + var emitter = new EventEmitter(); + var call = new grpc.Call(this.channel, method, deadline); + if (metadata === null || metadata === undefined) { + metadata = {}; + } + emitter.cancel = function cancel() { + call.cancel(); + }; + var client_batch = {}; + client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + client_batch[grpc.opType.SEND_MESSAGE] = serialize(argument); + client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; + client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; + client_batch[grpc.opType.RECV_MESSAGE] = true; + client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; + call.startBatch(client_batch, function(err, response) { + if (err) { + callback(err); + return; + } + if (response.status.code != grpc.status.OK) { + callback(response.status); + return; + } + emitter.emit('status', response.status); + emitter.emit('metadata', response.metadata); + callback(null, deserialize(response.read)); + }); + return emitter; + } + return makeUnaryRequest; +} /** - * Cancel the ongoing call. If the call has not already finished, it will finish - * with status CANCELLED. + * Get a function that can make client stream requests to the specified method. + * @param {string} method The name of the method to request + * @param {function(*):Buffer} serialize The serialization function for inputs + * @param {function(Buffer)} deserialize The deserialization function for + * outputs + * @return {Function} makeClientStreamRequest */ -GrpcClientStream.prototype.cancel = function() { - this._call.cancel(); -}; +function makeClientStreamRequestFunction(method, serialize, deserialize) { + /** + * Make a client stream request with this method on the given channel with the + * given callback, etc. + * @this {Client} Client object. Must have a channel member. + * @param {function(?Error, value=)} callback The callback to for when the + * response is received + * @param {array=} metadata Array of metadata key/value pairs to add to the + * call + * @param {(number|Date)=} deadline The deadline for processing this request. + * Defaults to infinite future + * @return {EventEmitter} An event emitter for stream related events + */ + function makeClientStreamRequest(callback, metadata, deadline) { + if (deadline === undefined) { + deadline = Infinity; + } + var call = new grpc.Call(this.channel, method, deadline); + if (metadata === null || metadata === undefined) { + metadata = {}; + } + var stream = new ClientWritableStream(call, serialize); + var metadata_batch = {}; + metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true; + call.startBatch(metadata_batch, function(err, response) { + if (err) { + callback(err); + return; + } + stream.emit('metadata', response.metadata); + }); + var client_batch = {}; + client_batch[grpc.opType.RECV_MESSAGE] = true; + client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; + call.startBatch(client_batch, function(err, response) { + if (err) { + callback(err); + return; + } + if (response.status.code != grpc.status.OK) { + callback(response.status); + return; + } + stream.emit('status', response.status); + callback(null, deserialize(response.read)); + }); + return stream; + } + return makeClientStreamRequest; +} /** - * Make a request on the channel to the given method with the given arguments - * @param {grpc.Channel} channel The channel on which to make the request - * @param {string} method The method to request - * @param {function(*):Buffer} serialize Serialization function for requests - * @param {function(Buffer):*} deserialize Deserialization function for - * responses - * @param {array=} metadata Array of metadata key/value pairs to add to the call - * @param {(number|Date)=} deadline The deadline for processing this request. - * Defaults to infinite future. - * @return {stream=} The stream of responses + * Get a function that can make server stream requests to the specified method. + * @param {string} method The name of the method to request + * @param {function(*):Buffer} serialize The serialization function for inputs + * @param {function(Buffer)} deserialize The deserialization function for + * outputs + * @return {Function} makeServerStreamRequest */ -function makeRequest(channel, - method, - serialize, - deserialize, - metadata, - deadline) { - if (deadline === undefined) { - deadline = Infinity; +function makeServerStreamRequestFunction(method, serialize, deserialize) { + /** + * Make a server stream request with this method on the given channel with the + * given argument, etc. + * @this {SurfaceClient} Client object. Must have a channel member. + * @param {*} argument The argument to the call. Should be serializable with + * serialize + * @param {array=} metadata Array of metadata key/value pairs to add to the + * call + * @param {(number|Date)=} deadline The deadline for processing this request. + * Defaults to infinite future + * @return {EventEmitter} An event emitter for stream related events + */ + function makeServerStreamRequest(argument, metadata, deadline) { + if (deadline === undefined) { + deadline = Infinity; + } + var call = new grpc.Call(this.channel, method, deadline); + if (metadata === null || metadata === undefined) { + metadata = {}; + } + var stream = new ClientReadableStream(call, deserialize); + var start_batch = {}; + start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; + start_batch[grpc.opType.SEND_MESSAGE] = serialize(argument); + start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; + call.startBatch(start_batch, function(err, response) { + if (err) { + throw err; + } + stream.emit('metadata', response.metadata); + }); + var status_batch = {}; + status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; + call.startBatch(status_batch, function(err, response) { + if (err) { + throw err; + } + stream.emit('status', response.status); + }); + return stream; } - var call = new grpc.Call(channel, method, deadline); - if (metadata) { - call.addMetadata(metadata); + return makeServerStreamRequest; +} + +/** + * Get a function that can make bidirectional stream requests to the specified + * method. + * @param {string} method The name of the method to request + * @param {function(*):Buffer} serialize The serialization function for inputs + * @param {function(Buffer)} deserialize The deserialization function for + * outputs + * @return {Function} makeBidiStreamRequest + */ +function makeBidiStreamRequestFunction(method, serialize, deserialize) { + /** + * Make a bidirectional stream request with this method on the given channel. + * @this {SurfaceClient} Client object. Must have a channel member. + * @param {array=} metadata Array of metadata key/value pairs to add to the + * call + * @param {(number|Date)=} deadline The deadline for processing this request. + * Defaults to infinite future + * @return {EventEmitter} An event emitter for stream related events + */ + function makeBidiStreamRequest(metadata, deadline) { + if (deadline === undefined) { + deadline = Infinity; + } + var call = new grpc.Call(this.channel, method, deadline); + if (metadata === null || metadata === undefined) { + metadata = {}; + } + var stream = new ClientDuplexStream(call, serialize, deserialize); + var start_batch = {}; + start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; + call.startBatch(start_batch, function(err, response) { + if (err) { + throw err; + } + stream.emit('metadata', response.metadata); + }); + var status_batch = {}; + status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; + call.startBatch(status_batch, function(err, response) { + if (err) { + throw err; + } + stream.emit('status', response.status); + }); + return stream; } - return new GrpcClientStream(call, serialize, deserialize); + return makeBidiStreamRequest; } + /** - * See documentation for makeRequest above + * Map with short names for each of the requester maker functions. Used in + * makeClientConstructor */ -exports.makeRequest = makeRequest; +var requester_makers = { + unary: makeUnaryRequestFunction, + server_stream: makeServerStreamRequestFunction, + client_stream: makeClientStreamRequestFunction, + bidi: makeBidiStreamRequestFunction +}; /** - * Represents a client side gRPC channel associated with a single host. + * Creates a constructor for clients for the given service + * @param {ProtoBuf.Reflect.Service} service The service to generate a client + * for + * @return {function(string, Object)} New client constructor */ -exports.Channel = grpc.Channel; +function makeClientConstructor(service) { + var prefix = '/' + common.fullyQualifiedName(service) + '/'; + /** + * Create a client with the given methods + * @constructor + * @param {string} address The address of the server to connect to + * @param {Object} options Options to pass to the underlying channel + */ + function Client(address, options) { + this.channel = new grpc.Channel(address, options); + } + + _.each(service.children, function(method) { + var method_type; + if (method.requestStream) { + if (method.responseStream) { + method_type = 'bidi'; + } else { + method_type = 'client_stream'; + } + } else { + if (method.responseStream) { + method_type = 'server_stream'; + } else { + method_type = 'unary'; + } + } + Client.prototype[decapitalize(method.name)] = + requester_makers[method_type]( + prefix + capitalize(method.name), + common.serializeCls(method.resolvedRequestType.build()), + common.deserializeCls(method.resolvedResponseType.build())); + }); + + Client.service = service; + + return Client; +} + +exports.makeClientConstructor = makeClientConstructor; + /** - * Status name to code number mapping + * See docs for client.status */ exports.status = grpc.status; /** - * Call error name to code number mapping + * See docs for client.callError */ exports.callError = grpc.callError; diff --git a/src/node/src/common.js b/src/node/src/common.js index 54247e3fa1..7560cf1bdd 100644 --- a/src/node/src/common.js +++ b/src/node/src/common.js @@ -31,6 +31,8 @@ * */ +var _ = require('underscore'); + var capitalize = require('underscore.string/capitalize'); /** @@ -88,6 +90,24 @@ function fullyQualifiedName(value) { } /** + * Wrap a function to pass null-like values through without calling it. If no + * function is given, just uses the identity; + * @param {?function} func The function to wrap + * @return {function} The wrapped function + */ +function wrapIgnoreNull(func) { + if (!func) { + return _.identity; + } + return function(arg) { + if (arg === null || arg === undefined) { + return null; + } + return func(arg); + }; +} + +/** * See docs for deserializeCls */ exports.deserializeCls = deserializeCls; @@ -101,3 +121,8 @@ exports.serializeCls = serializeCls; * See docs for fullyQualifiedName */ exports.fullyQualifiedName = fullyQualifiedName; + +/** + * See docs for wrapIgnoreNull + */ +exports.wrapIgnoreNull = wrapIgnoreNull; diff --git a/src/node/src/server.js b/src/node/src/server.js index e4f71ff05f..48c349ef99 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -1,6 +1,6 @@ /* * - * Copyright 2014, Google Inc. + * Copyright 2015, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -33,80 +33,108 @@ var _ = require('underscore'); +var capitalize = require('underscore.string/capitalize'); +var decapitalize = require('underscore.string/decapitalize'); + var grpc = require('bindings')('grpc.node'); var common = require('./common'); -var Duplex = require('stream').Duplex; +var stream = require('stream'); + +var Readable = stream.Readable; +var Writable = stream.Writable; +var Duplex = stream.Duplex; var util = require('util'); -util.inherits(GrpcServerStream, Duplex); +var EventEmitter = require('events').EventEmitter; + +var common = require('./common.js'); /** - * Class for representing a gRPC server side stream as a Node stream. Extends - * from stream.Duplex. - * @constructor - * @param {grpc.Call} call Call object to proxy - * @param {function(*):Buffer=} serialize Serialization function for responses - * @param {function(Buffer):*=} deserialize Deserialization function for - * requests + * Handle an error on a call by sending it as a status + * @param {grpc.Call} call The call to send the error on + * @param {Object} error The error object */ -function GrpcServerStream(call, serialize, deserialize) { - Duplex.call(this, {objectMode: true}); - if (!serialize) { - serialize = function(value) { - return value; - }; - } - if (!deserialize) { - deserialize = function(value) { - return value; - }; - } - this._call = call; - // Indicate that a status has been sent - var finished = false; - var self = this; +function handleError(call, error) { var status = { - 'code' : grpc.status.OK, - 'details' : 'OK' + code: grpc.status.INTERNAL, + details: 'Unknown Error', + metadata: {} }; - - /** - * Serialize a response value to a buffer. Always maps null to null. Otherwise - * uses the provided serialize function - * @param {*} value The value to serialize - * @return {Buffer} The serialized value - */ - this.serialize = function(value) { - if (value === null || value === undefined) { - return null; + if (error.hasOwnProperty('message')) { + status.details = error.message; + } + if (error.hasOwnProperty('code')) { + status.code = error.code; + if (error.hasOwnProperty('details')) { + status.details = error.details; } - return serialize(value); - }; + } + var error_batch = {}; + error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status; + call.startBatch(error_batch, function(){}); +} - /** - * Deserialize a request buffer to a value. Always maps null to null. - * Otherwise uses the provided deserialize function. - * @param {Buffer} buffer The buffer to deserialize - * @return {*} The deserialized value - */ - this.deserialize = function(buffer) { - if (buffer === null) { - return null; +/** + * Wait for the client to close, then emit a cancelled event if the client + * cancelled. + * @param {grpc.Call} call The call object to wait on + * @param {EventEmitter} emitter The event emitter to emit the cancelled event + * on + */ +function waitForCancel(call, emitter) { + var cancel_batch = {}; + cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; + call.startBatch(cancel_batch, function(err, result) { + if (err) { + emitter.emit('error', err); } - return deserialize(buffer); + if (result.cancelled) { + emitter.cancelled = true; + emitter.emit('cancelled'); + } + }); +} + +/** + * Send a response to a unary or client streaming call. + * @param {grpc.Call} call The call to respond on + * @param {*} value The value to respond with + * @param {function(*):Buffer=} serialize Serialization function for the + * response + */ +function sendUnaryResponse(call, value, serialize) { + var end_batch = {}; + end_batch[grpc.opType.SEND_MESSAGE] = serialize(value); + end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + code: grpc.status.OK, + details: 'OK', + metadata: {} }; + call.startBatch(end_batch, function (){}); +} - /** - * Send the pending status - */ +/** + * Initialize a writable stream. This is used for both the writable and duplex + * stream constructors. + * @param {Writable} stream The stream to set up + * @param {function(*):Buffer=} Serialization function for responses + */ +function setUpWritable(stream, serialize) { + stream.finished = false; + stream.status = { + code : grpc.status.OK, + details : 'OK', + metadata : {} + }; + stream.serialize = common.wrapIgnoreNull(serialize); function sendStatus() { - call.startWriteStatus(status.code, status.details, function() { - }); - finished = true; + var batch = {}; + batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status; + stream.call.startBatch(batch, function(){}); } - this.on('finish', sendStatus); + stream.on('finish', sendStatus); /** * Set the pending status to a given error status. If the error does not have * code or details properties, the code will be set to grpc.status.INTERNAL @@ -116,14 +144,16 @@ function GrpcServerStream(call, serialize, deserialize) { function setStatus(err) { var code = grpc.status.INTERNAL; var details = 'Unknown Error'; - + if (err.hasOwnProperty('message')) { + details = err.message; + } if (err.hasOwnProperty('code')) { code = err.code; if (err.hasOwnProperty('details')) { details = err.details; } } - status = {'code': code, 'details': details}; + stream.status = {code: code, details: details, metadata: {}}; } /** * Terminate the call. This includes indicating that reads are done, draining @@ -133,69 +163,250 @@ function GrpcServerStream(call, serialize, deserialize) { */ function terminateCall(err) { // Drain readable data - this.on('data', function() {}); setStatus(err); - this.end(); + stream.end(); } - this.on('error', terminateCall); - // Indicates that a read is pending - var reading = false; + stream.on('error', terminateCall); +} + +/** + * Initialize a readable stream. This is used for both the readable and duplex + * stream constructors. + * @param {Readable} stream The stream to initialize + * @param {function(Buffer):*=} deserialize Deserialization function for + * incoming data. + */ +function setUpReadable(stream, deserialize) { + stream.deserialize = common.wrapIgnoreNull(deserialize); + stream.finished = false; + stream.reading = false; + + stream.terminate = function() { + stream.finished = true; + stream.on('data', function() {}); + }; + + stream.on('cancelled', function() { + stream.terminate(); + }); +} + +util.inherits(ServerWritableStream, Writable); + +/** + * A stream that the server can write to. Used for calls that are streaming from + * the server side. + * @constructor + * @param {grpc.Call} call The call object to send data with + * @param {function(*):Buffer=} serialize Serialization function for writes + */ +function ServerWritableStream(call, serialize) { + Writable.call(this, {objectMode: true}); + this.call = call; + + this.finished = false; + setUpWritable(this, serialize); +} + +/** + * Start writing a chunk of data. This is an implementation of a method required + * for implementing stream.Writable. + * @param {Buffer} chunk The chunk of data to write + * @param {string} encoding Ignored + * @param {function(Error=)} callback Callback to indicate that the write is + * complete + */ +function _write(chunk, encoding, callback) { + var batch = {}; + batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk); + this.call.startBatch(batch, function(err, value) { + if (err) { + this.emit('error', err); + return; + } + callback(); + }); +} + +ServerWritableStream.prototype._write = _write; + +util.inherits(ServerReadableStream, Readable); + +/** + * A stream that the server can read from. Used for calls that are streaming + * from the client side. + * @constructor + * @param {grpc.Call} call The call object to read data with + * @param {function(Buffer):*=} deserialize Deserialization function for reads + */ +function ServerReadableStream(call, deserialize) { + Readable.call(this, {objectMode: true}); + this.call = call; + setUpReadable(this, deserialize); +} + +/** + * Start reading from the gRPC data source. This is an implementation of a + * method required for implementing stream.Readable + * @param {number} size Ignored + */ +function _read(size) { + var self = this; /** * Callback to be called when a READ event is received. Pushes the data onto * the read queue and starts reading again if applicable * @param {grpc.Event} event READ event object */ - function readCallback(event) { - if (finished) { + function readCallback(err, event) { + if (err) { + self.terminate(); + return; + } + if (self.finished) { self.push(null); return; } - var data = event.data; + var data = event.read; if (self.push(self.deserialize(data)) && data != null) { - self._call.startRead(readCallback); + var read_batch = {}; + read_batch[grpc.opType.RECV_MESSAGE] = true; + self.call.startBatch(read_batch, readCallback); } else { - reading = false; + self.reading = false; } } - /** - * Start reading if there is not already a pending read. Reading will - * continue until self.push returns false (indicating reads should slow - * down) or the read data is null (indicating that there is no more data). - */ - this.startReading = function() { - if (finished) { - self.push(null); - } else { - if (!reading) { - reading = true; - self._call.startRead(readCallback); + if (self.finished) { + self.push(null); + } else { + if (!self.reading) { + self.reading = true; + var batch = {}; + batch[grpc.opType.RECV_MESSAGE] = true; + self.call.startBatch(batch, readCallback); + } + } +} + +ServerReadableStream.prototype._read = _read; + +util.inherits(ServerDuplexStream, Duplex); + +/** + * A stream that the server can read from or write to. Used for calls with + * duplex streaming. + * @constructor + * @param {grpc.Call} call Call object to proxy + * @param {function(*):Buffer=} serialize Serialization function for requests + * @param {function(Buffer):*=} deserialize Deserialization function for + * responses + */ +function ServerDuplexStream(call, serialize, deserialize) { + Duplex.call(this, {objectMode: true}); + this.call = call; + setUpWritable(this, serialize); + setUpReadable(this, deserialize); +} + +ServerDuplexStream.prototype._read = _read; +ServerDuplexStream.prototype._write = _write; + +/** + * Fully handle a unary call + * @param {grpc.Call} call The call to handle + * @param {Object} handler Request handler object for the method that was called + * @param {Object} metadata Metadata from the client + */ +function handleUnary(call, handler, metadata) { + var emitter = new EventEmitter(); + emitter.on('error', function(error) { + handleError(call, error); + }); + waitForCancel(call, emitter); + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + batch[grpc.opType.RECV_MESSAGE] = true; + call.startBatch(batch, function(err, result) { + if (err) { + handleError(call, err); + return; + } + emitter.request = handler.deserialize(result.read); + if (emitter.cancelled) { + return; + } + handler.func(emitter, function sendUnaryData(err, value) { + if (err) { + handleError(call, err); } + sendUnaryResponse(call, value, handler.serialize); + }); + }); +} + +/** + * Fully handle a server streaming call + * @param {grpc.Call} call The call to handle + * @param {Object} handler Request handler object for the method that was called + * @param {Object} metadata Metadata from the client + */ +function handleServerStreaming(call, handler, metadata) { + var stream = new ServerWritableStream(call, handler.serialize); + waitForCancel(call, stream); + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + batch[grpc.opType.RECV_MESSAGE] = true; + call.startBatch(batch, function(err, result) { + if (err) { + stream.emit('error', err); + return; } - }; + stream.request = handler.deserialize(result.read); + handler.func(stream); + }); } /** - * Start reading from the gRPC data source. This is an implementation of a - * method required for implementing stream.Readable - * @param {number} size Ignored + * Fully handle a client streaming call + * @param {grpc.Call} call The call to handle + * @param {Object} handler Request handler object for the method that was called + * @param {Object} metadata Metadata from the client */ -GrpcServerStream.prototype._read = function(size) { - this.startReading(); -}; +function handleClientStreaming(call, handler, metadata) { + var stream = new ServerReadableStream(call, handler.deserialize); + waitForCancel(call, stream); + var metadata_batch = {}; + metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + call.startBatch(metadata_batch, function() {}); + handler.func(stream, function(err, value) { + stream.terminate(); + if (err) { + handleError(call, err); + } + sendUnaryResponse(call, value, handler.serialize); + }); +} /** - * Start writing a chunk of data. This is an implementation of a method required - * for implementing stream.Writable. - * @param {Buffer} chunk The chunk of data to write - * @param {string} encoding Ignored - * @param {function(Error=)} callback Callback to indicate that the write is - * complete + * Fully handle a bidirectional streaming call + * @param {grpc.Call} call The call to handle + * @param {Object} handler Request handler object for the method that was called + * @param {Object} metadata Metadata from the client */ -GrpcServerStream.prototype._write = function(chunk, encoding, callback) { - var self = this; - self._call.startWrite(self.serialize(chunk), function(event) { - callback(); - }, 0); +function handleBidiStreaming(call, handler, metadata) { + var stream = new ServerDuplexStream(call, handler.serialize, + handler.deserialize); + waitForCancel(call, stream); + var metadata_batch = {}; + metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + call.startBatch(metadata_batch, function() {}); + handler.func(stream); +} + +var streamHandlers = { + unary: handleUnary, + server_stream: handleServerStreaming, + client_stream: handleClientStreaming, + bidi: handleBidiStreaming }; /** @@ -218,7 +429,7 @@ function Server(getMetadata, options) { * Start the server and begin handling requests * @this Server */ - this.start = function() { + this.listen = function() { console.log('Server starting'); _.each(handlers, function(handler, handler_name) { console.log('Serving', handler_name); @@ -233,48 +444,39 @@ function Server(getMetadata, options) { * wait for the next request * @param {grpc.Event} event The event to handle with tag SERVER_RPC_NEW */ - function handleNewCall(event) { - var call = event.call; - var data = event.data; - if (data === null) { + function handleNewCall(err, event) { + if (err) { + return; + } + var details = event['new call']; + var call = details.call; + var method = details.method; + var metadata = details.metadata; + if (method === null) { return; } server.requestCall(handleNewCall); var handler = undefined; - var deadline = data.absolute_deadline; - var cancelled = false; - call.serverAccept(function(event) { - if (event.data.code === grpc.status.CANCELLED) { - cancelled = true; - if (stream) { - stream.emit('cancelled'); - } - } - }, 0); - if (handlers.hasOwnProperty(data.method)) { - handler = handlers[data.method]; + var deadline = details.deadline; + if (handlers.hasOwnProperty(method)) { + handler = handlers[method]; } else { - call.serverEndInitialMetadata(0); - call.startWriteStatus( - grpc.status.UNIMPLEMENTED, - "This method is not available on this server.", - function() {}); + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + code: grpc.status.UNIMPLEMENTED, + details: "This method is not available on this server.", + metadata: {} + }; + batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; + call.startBatch(batch, function() {}); return; } + var response_metadata = {}; if (getMetadata) { - call.addMetadata(getMetadata(data.method, data.metadata)); - } - call.serverEndInitialMetadata(0); - var stream = new GrpcServerStream(call, handler.serialize, - handler.deserialize); - Object.defineProperty(stream, 'cancelled', { - get: function() { return cancelled;} - }); - try { - handler.func(stream, data.metadata); - } catch (e) { - stream.emit('error', e); + response_metadata = getMetadata(method, metadata); } + streamHandlers[handler.type](call, handler, response_metadata); } server.requestCall(handleNewCall); }; @@ -294,17 +496,20 @@ function Server(getMetadata, options) { * returns a stream of response values * @param {function(*):Buffer} serialize Serialization function for responses * @param {function(Buffer):*} deserialize Deserialization function for requests + * @param {string} type The streaming type of method that this handles * @return {boolean} True if the handler was set. False if a handler was already * set for that name. */ -Server.prototype.register = function(name, handler, serialize, deserialize) { +Server.prototype.register = function(name, handler, serialize, deserialize, + type) { if (this.handlers.hasOwnProperty(name)) { return false; } this.handlers[name] = { func: handler, serialize: serialize, - deserialize: deserialize + deserialize: deserialize, + type: type }; return true; }; @@ -324,6 +529,110 @@ Server.prototype.bind = function(port, secure) { }; /** - * See documentation for Server + * Creates a constructor for servers with a service defined by the methods + * object. The methods object has string keys and values of this form: + * {serialize: function, deserialize: function, client_stream: bool, + * server_stream: bool} + * @param {Object} methods Method descriptor for each method the server should + * expose + * @param {string} prefix The prefex to prepend to each method name + * @return {function(Object, Object)} New server constructor + */ +function makeServerConstructor(services) { + var qual_names = []; + _.each(services, function(service) { + _.each(service.children, function(method) { + var name = common.fullyQualifiedName(method); + if (_.indexOf(qual_names, name) !== -1) { + throw new Error('Method ' + name + ' exposed by more than one service'); + } + qual_names.push(name); + }); + }); + /** + * Create a server with the given handlers for all of the methods. + * @constructor + * @param {Object} service_handlers Map from service names to map from method + * names to handlers + * @param {function(string, Object<string, Array<Buffer>>): + Object<string, Array<Buffer|string>>=} getMetadata Callback that + * gets metatada for a given method + * @param {Object=} options Options to pass to the underlying server + */ + function SurfaceServer(service_handlers, getMetadata, options) { + var server = new Server(getMetadata, options); + this.inner_server = server; + _.each(services, function(service) { + var service_name = common.fullyQualifiedName(service); + if (service_handlers[service_name] === undefined) { + throw new Error('Handlers for service ' + + service_name + ' not provided.'); + } + var prefix = '/' + common.fullyQualifiedName(service) + '/'; + _.each(service.children, function(method) { + var method_type; + if (method.requestStream) { + if (method.responseStream) { + method_type = 'bidi'; + } else { + method_type = 'client_stream'; + } + } else { + if (method.responseStream) { + method_type = 'server_stream'; + } else { + method_type = 'unary'; + } + } + if (service_handlers[service_name][decapitalize(method.name)] === + undefined) { + throw new Error('Method handler for ' + + common.fullyQualifiedName(method) + ' not provided.'); + } + var serialize = common.serializeCls( + method.resolvedResponseType.build()); + var deserialize = common.deserializeCls( + method.resolvedRequestType.build()); + server.register( + prefix + capitalize(method.name), + service_handlers[service_name][decapitalize(method.name)], + serialize, deserialize, method_type); + }); + }, this); + } + + /** + * Binds the server to the given port, with SSL enabled if secure is specified + * @param {string} port The port that the server should bind on, in the format + * "address:port" + * @param {boolean=} secure Whether the server should open a secure port + * @return {SurfaceServer} this + */ + SurfaceServer.prototype.bind = function(port, secure) { + return this.inner_server.bind(port, secure); + }; + + /** + * Starts the server listening on any bound ports + * @return {SurfaceServer} this + */ + SurfaceServer.prototype.listen = function() { + this.inner_server.listen(); + return this; + }; + + /** + * Shuts the server down; tells it to stop listening for new requests and to + * kill old requests. + */ + SurfaceServer.prototype.shutdown = function() { + this.inner_server.shutdown(); + }; + + return SurfaceServer; +} + +/** + * See documentation for makeServerConstructor */ -module.exports = Server; +exports.makeServerConstructor = makeServerConstructor; diff --git a/src/node/src/surface_client.js b/src/node/src/surface_client.js deleted file mode 100644 index 16c31809f4..0000000000 --- a/src/node/src/surface_client.js +++ /dev/null @@ -1,357 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -var _ = require('underscore'); - -var capitalize = require('underscore.string/capitalize'); -var decapitalize = require('underscore.string/decapitalize'); - -var client = require('./client.js'); - -var common = require('./common.js'); - -var EventEmitter = require('events').EventEmitter; - -var stream = require('stream'); - -var Readable = stream.Readable; -var Writable = stream.Writable; -var Duplex = stream.Duplex; -var util = require('util'); - - -function forwardEvent(fromEmitter, toEmitter, event) { - fromEmitter.on(event, function forward() { - _.partial(toEmitter.emit, event).apply(toEmitter, arguments); - }); -} - -util.inherits(ClientReadableObjectStream, Readable); - -/** - * Class for representing a gRPC server streaming call as a Node stream on the - * client side. Extends from stream.Readable. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - */ -function ClientReadableObjectStream(stream) { - var options = {objectMode: true}; - Readable.call(this, options); - this._stream = stream; - var self = this; - forwardEvent(stream, this, 'status'); - forwardEvent(stream, this, 'metadata'); - this._stream.on('data', function forwardData(chunk) { - if (!self.push(chunk)) { - self._stream.pause(); - } - }); - this._stream.pause(); -} - -/** - * _read implementation for both types of streams that allow reading. - * @this {ClientReadableObjectStream} - * @param {number} size Ignored - */ -function _read(size) { - this._stream.resume(); -} - -/** - * See docs for _read - */ -ClientReadableObjectStream.prototype._read = _read; - -util.inherits(ClientWritableObjectStream, Writable); - -/** - * Class for representing a gRPC client streaming call as a Node stream on the - * client side. Extends from stream.Writable. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - */ -function ClientWritableObjectStream(stream) { - var options = {objectMode: true}; - Writable.call(this, options); - this._stream = stream; - forwardEvent(stream, this, 'status'); - forwardEvent(stream, this, 'metadata'); - this.on('finish', function() { - this._stream.end(); - }); -} - -/** - * _write implementation for both types of streams that allow writing - * @this {ClientWritableObjectStream} - * @param {*} chunk The value to write to the stream - * @param {string} encoding Ignored - * @param {function(Error)} callback Callback to call when finished writing - */ -function _write(chunk, encoding, callback) { - this._stream.write(chunk, encoding, callback); -} - -/** - * See docs for _write - */ -ClientWritableObjectStream.prototype._write = _write; - -/** - * Cancel the underlying call - */ -function cancel() { - this._stream.cancel(); -} - -ClientReadableObjectStream.prototype.cancel = cancel; -ClientWritableObjectStream.prototype.cancel = cancel; - -/** - * Get a function that can make unary requests to the specified method. - * @param {string} method The name of the method to request - * @param {function(*):Buffer} serialize The serialization function for inputs - * @param {function(Buffer)} deserialize The deserialization function for - * outputs - * @return {Function} makeUnaryRequest - */ -function makeUnaryRequestFunction(method, serialize, deserialize) { - /** - * Make a unary request with this method on the given channel with the given - * argument, callback, etc. - * @this {SurfaceClient} Client object. Must have a channel member. - * @param {*} argument The argument to the call. Should be serializable with - * serialize - * @param {function(?Error, value=)} callback The callback to for when the - * response is received - * @param {array=} metadata Array of metadata key/value pairs to add to the - * call - * @param {(number|Date)=} deadline The deadline for processing this request. - * Defaults to infinite future - * @return {EventEmitter} An event emitter for stream related events - */ - function makeUnaryRequest(argument, callback, metadata, deadline) { - var stream = client.makeRequest(this.channel, method, serialize, - deserialize, metadata, deadline); - var emitter = new EventEmitter(); - emitter.cancel = function cancel() { - stream.cancel(); - }; - forwardEvent(stream, emitter, 'status'); - forwardEvent(stream, emitter, 'metadata'); - stream.write(argument); - stream.end(); - stream.on('data', function forwardData(chunk) { - try { - callback(null, chunk); - } catch (e) { - callback(e); - } - }); - stream.on('status', function forwardStatus(status) { - if (status.code !== client.status.OK) { - callback(status); - } - }); - return emitter; - } - return makeUnaryRequest; -} - -/** - * Get a function that can make client stream requests to the specified method. - * @param {string} method The name of the method to request - * @param {function(*):Buffer} serialize The serialization function for inputs - * @param {function(Buffer)} deserialize The deserialization function for - * outputs - * @return {Function} makeClientStreamRequest - */ -function makeClientStreamRequestFunction(method, serialize, deserialize) { - /** - * Make a client stream request with this method on the given channel with the - * given callback, etc. - * @this {SurfaceClient} Client object. Must have a channel member. - * @param {function(?Error, value=)} callback The callback to for when the - * response is received - * @param {array=} metadata Array of metadata key/value pairs to add to the - * call - * @param {(number|Date)=} deadline The deadline for processing this request. - * Defaults to infinite future - * @return {EventEmitter} An event emitter for stream related events - */ - function makeClientStreamRequest(callback, metadata, deadline) { - var stream = client.makeRequest(this.channel, method, serialize, - deserialize, metadata, deadline); - var obj_stream = new ClientWritableObjectStream(stream); - stream.on('data', function forwardData(chunk) { - try { - callback(null, chunk); - } catch (e) { - callback(e); - } - }); - stream.on('status', function forwardStatus(status) { - if (status.code !== client.status.OK) { - callback(status); - } - }); - return obj_stream; - } - return makeClientStreamRequest; -} - -/** - * Get a function that can make server stream requests to the specified method. - * @param {string} method The name of the method to request - * @param {function(*):Buffer} serialize The serialization function for inputs - * @param {function(Buffer)} deserialize The deserialization function for - * outputs - * @return {Function} makeServerStreamRequest - */ -function makeServerStreamRequestFunction(method, serialize, deserialize) { - /** - * Make a server stream request with this method on the given channel with the - * given argument, etc. - * @this {SurfaceClient} Client object. Must have a channel member. - * @param {*} argument The argument to the call. Should be serializable with - * serialize - * @param {array=} metadata Array of metadata key/value pairs to add to the - * call - * @param {(number|Date)=} deadline The deadline for processing this request. - * Defaults to infinite future - * @return {EventEmitter} An event emitter for stream related events - */ - function makeServerStreamRequest(argument, metadata, deadline) { - var stream = client.makeRequest(this.channel, method, serialize, - deserialize, metadata, deadline); - var obj_stream = new ClientReadableObjectStream(stream); - stream.write(argument); - stream.end(); - return obj_stream; - } - return makeServerStreamRequest; -} - -/** - * Get a function that can make bidirectional stream requests to the specified - * method. - * @param {string} method The name of the method to request - * @param {function(*):Buffer} serialize The serialization function for inputs - * @param {function(Buffer)} deserialize The deserialization function for - * outputs - * @return {Function} makeBidiStreamRequest - */ -function makeBidiStreamRequestFunction(method, serialize, deserialize) { - /** - * Make a bidirectional stream request with this method on the given channel. - * @this {SurfaceClient} Client object. Must have a channel member. - * @param {array=} metadata Array of metadata key/value pairs to add to the - * call - * @param {(number|Date)=} deadline The deadline for processing this request. - * Defaults to infinite future - * @return {EventEmitter} An event emitter for stream related events - */ - function makeBidiStreamRequest(metadata, deadline) { - return client.makeRequest(this.channel, method, serialize, - deserialize, metadata, deadline); - } - return makeBidiStreamRequest; -} - -/** - * Map with short names for each of the requester maker functions. Used in - * makeClientConstructor - */ -var requester_makers = { - unary: makeUnaryRequestFunction, - server_stream: makeServerStreamRequestFunction, - client_stream: makeClientStreamRequestFunction, - bidi: makeBidiStreamRequestFunction -} - -/** - * Creates a constructor for clients for the given service - * @param {ProtoBuf.Reflect.Service} service The service to generate a client - * for - * @return {function(string, Object)} New client constructor - */ -function makeClientConstructor(service) { - var prefix = '/' + common.fullyQualifiedName(service) + '/'; - /** - * Create a client with the given methods - * @constructor - * @param {string} address The address of the server to connect to - * @param {Object} options Options to pass to the underlying channel - */ - function SurfaceClient(address, options) { - this.channel = new client.Channel(address, options); - } - - _.each(service.children, function(method) { - var method_type; - if (method.requestStream) { - if (method.responseStream) { - method_type = 'bidi'; - } else { - method_type = 'client_stream'; - } - } else { - if (method.responseStream) { - method_type = 'server_stream'; - } else { - method_type = 'unary'; - } - } - SurfaceClient.prototype[decapitalize(method.name)] = - requester_makers[method_type]( - prefix + capitalize(method.name), - common.serializeCls(method.resolvedRequestType.build()), - common.deserializeCls(method.resolvedResponseType.build())); - }); - - SurfaceClient.service = service; - - return SurfaceClient; -} - -exports.makeClientConstructor = makeClientConstructor; - -/** - * See docs for client.status - */ -exports.status = client.status; -/** - * See docs for client.callError - */ -exports.callError = client.callError; diff --git a/src/node/src/surface_server.js b/src/node/src/surface_server.js deleted file mode 100644 index a47d1fa23d..0000000000 --- a/src/node/src/surface_server.js +++ /dev/null @@ -1,340 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -var _ = require('underscore'); - -var capitalize = require('underscore.string/capitalize'); -var decapitalize = require('underscore.string/decapitalize'); - -var Server = require('./server.js'); - -var stream = require('stream'); - -var Readable = stream.Readable; -var Writable = stream.Writable; -var Duplex = stream.Duplex; -var util = require('util'); - -var common = require('./common.js'); - -util.inherits(ServerReadableObjectStream, Readable); - -/** - * Class for representing a gRPC client streaming call as a Node stream on the - * server side. Extends from stream.Readable. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - */ -function ServerReadableObjectStream(stream) { - var options = {objectMode: true}; - Readable.call(this, options); - this._stream = stream; - Object.defineProperty(this, 'cancelled', { - get: function() { return stream.cancelled; } - }); - var self = this; - this._stream.on('cancelled', function() { - self.emit('cancelled'); - }); - this._stream.on('data', function forwardData(chunk) { - if (!self.push(chunk)) { - self._stream.pause(); - } - }); - this._stream.on('end', function forwardEnd() { - self.push(null); - }); - this._stream.pause(); -} - -/** - * _read implementation for both types of streams that allow reading. - * @this {ServerReadableObjectStream|ServerBidiObjectStream} - * @param {number} size Ignored - */ -function _read(size) { - this._stream.resume(); -} - -/** - * See docs for _read - */ -ServerReadableObjectStream.prototype._read = _read; - -util.inherits(ServerWritableObjectStream, Writable); - -/** - * Class for representing a gRPC server streaming call as a Node stream on the - * server side. Extends from stream.Writable. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - */ -function ServerWritableObjectStream(stream) { - var options = {objectMode: true}; - Writable.call(this, options); - this._stream = stream; - this._stream.on('cancelled', function() { - self.emit('cancelled'); - }); - this.on('finish', function() { - this._stream.end(); - }); -} - -/** - * _write implementation for both types of streams that allow writing - * @this {ServerWritableObjectStream} - * @param {*} chunk The value to write to the stream - * @param {string} encoding Ignored - * @param {function(Error)} callback Callback to call when finished writing - */ -function _write(chunk, encoding, callback) { - this._stream.write(chunk, encoding, callback); -} - -/** - * See docs for _write - */ -ServerWritableObjectStream.prototype._write = _write; - -/** - * Creates a binary stream handler function from a unary handler function - * @param {function(Object, function(Error, *), metadata=)} handler Unary call - * handler - * @return {function(stream, metadata=)} Binary stream handler - */ -function makeUnaryHandler(handler) { - /** - * Handles a stream by reading a single data value, passing it to the handler, - * and writing the response back to the stream. - * @param {stream} stream Binary data stream - * @param {metadata=} metadata Incoming metadata array - */ - return function handleUnaryCall(stream, metadata) { - stream.on('data', function handleUnaryData(value) { - var call = {request: value}; - Object.defineProperty(call, 'cancelled', { - get: function() { return stream.cancelled;} - }); - stream.on('cancelled', function() { - call.emit('cancelled'); - }); - handler(call, function sendUnaryData(err, value) { - if (err) { - stream.emit('error', err); - } else { - stream.write(value); - stream.end(); - } - }, metadata); - }); - }; -} - -/** - * Creates a binary stream handler function from a client stream handler - * function - * @param {function(Readable, function(Error, *), metadata=)} handler Client - * stream call handler - * @return {function(stream, metadata=)} Binary stream handler - */ -function makeClientStreamHandler(handler) { - /** - * Handles a stream by passing a deserializing stream to the handler and - * writing the response back to the stream. - * @param {stream} stream Binary data stream - * @param {metadata=} metadata Incoming metadata array - */ - return function handleClientStreamCall(stream, metadata) { - var object_stream = new ServerReadableObjectStream(stream); - handler(object_stream, function sendClientStreamData(err, value) { - if (err) { - stream.emit('error', err); - } else { - stream.write(value); - stream.end(); - } - }, metadata); - }; -} - -/** - * Creates a binary stream handler function from a server stream handler - * function - * @param {function(Writable, metadata=)} handler Server stream call handler - * @return {function(stream, metadata=)} Binary stream handler - */ -function makeServerStreamHandler(handler) { - /** - * Handles a stream by attaching it to a serializing stream, and passing it to - * the handler. - * @param {stream} stream Binary data stream - * @param {metadata=} metadata Incoming metadata array - */ - return function handleServerStreamCall(stream, metadata) { - stream.on('data', function handleClientData(value) { - var object_stream = new ServerWritableObjectStream(stream); - object_stream.request = value; - handler(object_stream, metadata); - }); - }; -} - -/** - * Creates a binary stream handler function from a bidi stream handler function - * @param {function(Duplex, metadata=)} handler Unary call handler - * @return {function(stream, metadata=)} Binary stream handler - */ -function makeBidiStreamHandler(handler) { - return handler; -} - -/** - * Map with short names for each of the handler maker functions. Used in - * makeServerConstructor - */ -var handler_makers = { - unary: makeUnaryHandler, - server_stream: makeServerStreamHandler, - client_stream: makeClientStreamHandler, - bidi: makeBidiStreamHandler -}; - -/** - * Creates a constructor for servers with a service defined by the methods - * object. The methods object has string keys and values of this form: - * {serialize: function, deserialize: function, client_stream: bool, - * server_stream: bool} - * @param {Object} methods Method descriptor for each method the server should - * expose - * @param {string} prefix The prefex to prepend to each method name - * @return {function(Object, Object)} New server constructor - */ -function makeServerConstructor(services) { - var qual_names = []; - _.each(services, function(service) { - _.each(service.children, function(method) { - var name = common.fullyQualifiedName(method); - if (_.indexOf(qual_names, name) !== -1) { - throw new Error('Method ' + name + ' exposed by more than one service'); - } - qual_names.push(name); - }); - }); - /** - * Create a server with the given handlers for all of the methods. - * @constructor - * @param {Object} service_handlers Map from service names to map from method - * names to handlers - * @param {function(string, Object<string, Array<Buffer>>): - Object<string, Array<Buffer|string>>=} getMetadata Callback that - * gets metatada for a given method - * @param {Object=} options Options to pass to the underlying server - */ - function SurfaceServer(service_handlers, getMetadata, options) { - var server = new Server(getMetadata, options); - this.inner_server = server; - _.each(services, function(service) { - var service_name = common.fullyQualifiedName(service); - if (service_handlers[service_name] === undefined) { - throw new Error('Handlers for service ' + - service_name + ' not provided.'); - } - var prefix = '/' + common.fullyQualifiedName(service) + '/'; - _.each(service.children, function(method) { - var method_type; - if (method.requestStream) { - if (method.responseStream) { - method_type = 'bidi'; - } else { - method_type = 'client_stream'; - } - } else { - if (method.responseStream) { - method_type = 'server_stream'; - } else { - method_type = 'unary'; - } - } - if (service_handlers[service_name][decapitalize(method.name)] === - undefined) { - throw new Error('Method handler for ' + - common.fullyQualifiedName(method) + ' not provided.'); - } - var binary_handler = handler_makers[method_type]( - service_handlers[service_name][decapitalize(method.name)]); - var serialize = common.serializeCls( - method.resolvedResponseType.build()); - var deserialize = common.deserializeCls( - method.resolvedRequestType.build()); - server.register(prefix + capitalize(method.name), binary_handler, - serialize, deserialize); - }); - }, this); - } - - /** - * Binds the server to the given port, with SSL enabled if secure is specified - * @param {string} port The port that the server should bind on, in the format - * "address:port" - * @param {boolean=} secure Whether the server should open a secure port - * @return {SurfaceServer} this - */ - SurfaceServer.prototype.bind = function(port, secure) { - return this.inner_server.bind(port, secure); - }; - - /** - * Starts the server listening on any bound ports - * @return {SurfaceServer} this - */ - SurfaceServer.prototype.listen = function() { - this.inner_server.start(); - return this; - }; - - /** - * Shuts the server down; tells it to stop listening for new requests and to - * kill old requests. - */ - SurfaceServer.prototype.shutdown = function() { - this.inner_server.shutdown(); - }; - - return SurfaceServer; -} - -/** - * See documentation for makeServerConstructor - */ -exports.makeServerConstructor = makeServerConstructor; diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js index 48db245498..c1a7e95fa0 100644 --- a/src/node/test/call_test.js +++ b/src/node/test/call_test.js @@ -1,6 +1,6 @@ /* * - * Copyright 2014, Google Inc. + * Copyright 2015, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -98,100 +98,80 @@ describe('call', function() { }, TypeError); }); }); - describe('addMetadata', function() { - it('should succeed with a map from strings to string arrays', function() { + describe('startBatch', function() { + it('should fail without an object and a function', function() { var call = new grpc.Call(channel, 'method', getDeadline(1)); - assert.doesNotThrow(function() { - call.addMetadata({'key': ['value']}); + assert.throws(function() { + call.startBatch(); }); - assert.doesNotThrow(function() { - call.addMetadata({'key1': ['value1'], 'key2': ['value2']}); + assert.throws(function() { + call.startBatch({}); + }); + assert.throws(function() { + call.startBatch(null, function(){}); }); }); - it('should succeed with a map from strings to buffer arrays', function() { + it('should succeed with an empty object', function(done) { var call = new grpc.Call(channel, 'method', getDeadline(1)); assert.doesNotThrow(function() { - call.addMetadata({'key': [new Buffer('value')]}); - }); - assert.doesNotThrow(function() { - call.addMetadata({'key1': [new Buffer('value1')], - 'key2': [new Buffer('value2')]}); + call.startBatch({}, function(err) { + assert.ifError(err); + done(); + }); }); }); - it('should fail with other parameter types', function() { + }); + describe('startBatch with metadata', function() { + it('should succeed with a map of strings to string arrays', function(done) { var call = new grpc.Call(channel, 'method', getDeadline(1)); - assert.throws(function() { - call.addMetadata(); + assert.doesNotThrow(function() { + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = {'key1': ['value1'], + 'key2': ['value2']}; + call.startBatch(batch, function(err, resp) { + assert.ifError(err); + assert.deepEqual(resp, {'send metadata': true}); + done(); + }); }); - assert.throws(function() { - call.addMetadata(null); - }, TypeError); - assert.throws(function() { - call.addMetadata('value'); - }, TypeError); - assert.throws(function() { - call.addMetadata(5); - }, TypeError); }); - it.skip('should fail if invoke was already called', function(done) { + it('should succeed with a map of strings to buffer arrays', function(done) { var call = new grpc.Call(channel, 'method', getDeadline(1)); - call.invoke(function() {}, - function() {done();}, - 0); - assert.throws(function() { - call.addMetadata({'key': ['value']}); + assert.doesNotThrow(function() { + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = { + 'key1': [new Buffer('value1')], + 'key2': [new Buffer('value2')] + }; + call.startBatch(batch, function(err, resp) { + assert.ifError(err); + assert.deepEqual(resp, {'send metadata': true}); + done(); + }); }); - // Cancel to speed up the test - call.cancel(); }); - }); - describe('invoke', function() { - it('should fail with fewer than 3 arguments', function() { + it('should fail with other parameter types', function() { var call = new grpc.Call(channel, 'method', getDeadline(1)); assert.throws(function() { - call.invoke(); - }, TypeError); - assert.throws(function() { - call.invoke(function() {}); - }, TypeError); - assert.throws(function() { - call.invoke(function() {}, - function() {}); - }, TypeError); - }); - it('should work with 2 args and an int', function(done) { - assert.doesNotThrow(function() { - var call = new grpc.Call(channel, 'method', getDeadline(1)); - call.invoke(function() {}, - function() {done();}, - 0); - // Cancel to speed up the test - call.cancel(); + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = undefined; + call.startBatch(batch, function(){}); }); - }); - it('should reject incorrectly typed arguments', function() { - var call = new grpc.Call(channel, 'method', getDeadline(1)); assert.throws(function() { - call.invoke(0, 0, 0); + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = null; + call.startBatch(batch, function(){}); }, TypeError); assert.throws(function() { - call.invoke(function() {}, - function() {}, 'test'); - }); - }); - }); - describe('serverAccept', function() { - it('should fail with fewer than 1 argument1', function() { - var call = new grpc.Call(channel, 'method', getDeadline(1)); - assert.throws(function() { - call.serverAccept(); + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = 'value'; + call.startBatch(batch, function(){}); }, TypeError); - }); - it.skip('should return an error when called on a client Call', function() { - var call = new grpc.Call(channel, 'method', getDeadline(1)); assert.throws(function() { - call.serverAccept(function() {}); - }); + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = 5; + call.startBatch(batch, function(){}); + }, TypeError); }); }); describe('cancel', function() { diff --git a/src/node/test/client_server_test.js b/src/node/test/client_server_test.js deleted file mode 100644 index 1db9f69467..0000000000 --- a/src/node/test/client_server_test.js +++ /dev/null @@ -1,255 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -var assert = require('assert'); -var fs = require('fs'); -var path = require('path'); -var grpc = require('bindings')('grpc.node'); -var Server = require('../src/server'); -var client = require('../src/client'); -var common = require('../src/common'); - -var ca_path = path.join(__dirname, 'data/ca.pem'); - -var key_path = path.join(__dirname, 'data/server1.key'); - -var pem_path = path.join(__dirname, 'data/server1.pem'); - -/** - * Helper function to return an absolute deadline given a relative timeout in - * seconds. - * @param {number} timeout_secs The number of seconds to wait before timing out - * @return {Date} A date timeout_secs in the future - */ -function getDeadline(timeout_secs) { - var deadline = new Date(); - deadline.setSeconds(deadline.getSeconds() + timeout_secs); - return deadline; -} - -/** - * Responds to every request with the same data as a response - * @param {Stream} stream - */ -function echoHandler(stream) { - stream.pipe(stream); -} - -/** - * Responds to every request with an error status - * @param {Stream} stream - */ -function errorHandler(stream) { - throw { - 'code' : grpc.status.UNIMPLEMENTED, - 'details' : 'error details' - }; -} - -/** - * Wait for a cancellation instead of responding - * @param {Stream} stream - */ -function cancelHandler(stream) { - // do nothing -} - -function metadataHandler(stream, metadata) { - stream.end(); -} - -/** - * Serialize a string to a Buffer - * @param {string} value The string to serialize - * @return {Buffer} The serialized value - */ -function stringSerialize(value) { - return new Buffer(value); -} - -/** - * Deserialize a Buffer to a string - * @param {Buffer} buffer The buffer to deserialize - * @return {string} The string value of the buffer - */ -function stringDeserialize(buffer) { - return buffer.toString(); -} - -describe('echo client', function() { - var server; - var channel; - before(function() { - server = new Server(function getMetadata(method, metadata) { - return {method: [method]}; - }); - var port_num = server.bind('0.0.0.0:0'); - server.register('echo', echoHandler); - server.register('error', errorHandler); - server.register('cancellation', cancelHandler); - server.register('metadata', metadataHandler); - server.start(); - - channel = new grpc.Channel('localhost:' + port_num); - }); - after(function() { - server.shutdown(); - }); - it('should receive echo responses', function(done) { - var messages = ['echo1', 'echo2', 'echo3', 'echo4']; - var stream = client.makeRequest( - channel, - 'echo', - stringSerialize, - stringDeserialize); - for (var i = 0; i < messages.length; i++) { - stream.write(messages[i]); - } - stream.end(); - var index = 0; - stream.on('data', function(chunk) { - assert.equal(messages[index], chunk); - index += 1; - }); - stream.on('status', function(status) { - assert.equal(status.code, client.status.OK); - }); - stream.on('end', function() { - assert.equal(index, messages.length); - done(); - }); - }); - it('should recieve metadata set by the server', function(done) { - var stream = client.makeRequest(channel, 'metadata'); - stream.on('metadata', function(metadata) { - assert.strictEqual(metadata.method[0].toString(), 'metadata'); - }); - stream.on('status', function(status) { - assert.equal(status.code, client.status.OK); - done(); - }); - stream.end(); - }); - it('should get an error status that the server throws', function(done) { - var stream = client.makeRequest(channel, 'error'); - - stream.on('data', function() {}); - stream.write(new Buffer('test')); - stream.end(); - stream.on('status', function(status) { - assert.equal(status.code, grpc.status.UNIMPLEMENTED); - assert.equal(status.details, 'error details'); - done(); - }); - }); - it('should be able to cancel a call', function(done) { - var stream = client.makeRequest( - channel, - 'cancellation', - null, - getDeadline(1)); - - stream.cancel(); - stream.on('status', function(status) { - assert.equal(status.code, grpc.status.CANCELLED); - done(); - }); - }); - it('should get correct status for unimplemented method', function(done) { - var stream = client.makeRequest(channel, 'unimplemented_method'); - stream.end(); - stream.on('status', function(status) { - assert.equal(status.code, grpc.status.UNIMPLEMENTED); - done(); - }); - }); -}); -/* TODO(mlumish): explore options for reducing duplication between this test - * and the insecure echo client test */ -describe('secure echo client', function() { - var server; - var channel; - before(function(done) { - fs.readFile(ca_path, function(err, ca_data) { - assert.ifError(err); - fs.readFile(key_path, function(err, key_data) { - assert.ifError(err); - fs.readFile(pem_path, function(err, pem_data) { - assert.ifError(err); - var creds = grpc.Credentials.createSsl(ca_data); - var server_creds = grpc.ServerCredentials.createSsl(null, - key_data, - pem_data); - - server = new Server(null, {'credentials' : server_creds}); - var port_num = server.bind('0.0.0.0:0', true); - server.register('echo', echoHandler); - server.start(); - - channel = new grpc.Channel('localhost:' + port_num, { - 'grpc.ssl_target_name_override' : 'foo.test.google.com', - 'credentials' : creds - }); - done(); - }); - }); - }); - }); - after(function() { - server.shutdown(); - }); - it('should recieve echo responses', function(done) { - var messages = ['echo1', 'echo2', 'echo3', 'echo4']; - var stream = client.makeRequest( - channel, - 'echo', - stringSerialize, - stringDeserialize); - for (var i = 0; i < messages.length; i++) { - stream.write(messages[i]); - } - stream.end(); - var index = 0; - stream.on('data', function(chunk) { - assert.equal(messages[index], chunk); - index += 1; - }); - stream.on('status', function(status) { - assert.equal(status.code, client.status.OK); - }); - stream.on('end', function() { - assert.equal(index, messages.length); - done(); - }); - }); -}); diff --git a/src/node/test/constant_test.js b/src/node/test/constant_test.js index 0138a55226..4d11e6f527 100644 --- a/src/node/test/constant_test.js +++ b/src/node/test/constant_test.js @@ -76,31 +76,6 @@ var callErrorNames = [ 'INVALID_FLAGS' ]; -/** - * List of all op error names - * @const - * @type {Array.<string>} - */ -var opErrorNames = [ - 'OK', - 'ERROR' -]; - -/** - * List of all completion type names - * @const - * @type {Array.<string>} - */ -var completionTypeNames = [ - 'QUEUE_SHUTDOWN', - 'READ', - 'WRITE_ACCEPTED', - 'FINISH_ACCEPTED', - 'CLIENT_METADATA_READ', - 'FINISHED', - 'SERVER_RPC_NEW' -]; - describe('constants', function() { it('should have all of the status constants', function() { for (var i = 0; i < statusNames.length; i++) { @@ -114,16 +89,4 @@ describe('constants', function() { 'call error missing: ' + callErrorNames[i]); } }); - it('should have all of the op errors', function() { - for (var i = 0; i < opErrorNames.length; i++) { - assert(grpc.opError.hasOwnProperty(opErrorNames[i]), - 'op error missing: ' + opErrorNames[i]); - } - }); - it('should have all of the completion types', function() { - for (var i = 0; i < completionTypeNames.length; i++) { - assert(grpc.completionType.hasOwnProperty(completionTypeNames[i]), - 'completion type missing: ' + completionTypeNames[i]); - } - }); }); diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js index 1f53df23f3..f8899beae8 100644 --- a/src/node/test/end_to_end_test.js +++ b/src/node/test/end_to_end_test.js @@ -74,40 +74,49 @@ describe('end-to-end', function() { var status_text = 'xyz'; var call = new grpc.Call(channel, 'dummy_method', - deadline); - call.invoke(function(event) { - assert.strictEqual(event.type, - grpc.completionType.CLIENT_METADATA_READ); - },function(event) { - assert.strictEqual(event.type, grpc.completionType.FINISHED); - var status = event.data; - assert.strictEqual(status.code, grpc.status.OK); - assert.strictEqual(status.details, status_text); + Infinity); + var client_batch = {}; + client_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; + client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; + client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; + call.startBatch(client_batch, function(err, response) { + assert.ifError(err); + assert.deepEqual(response, { + 'send metadata': true, + 'client close': true, + 'metadata': {}, + 'status': { + 'code': grpc.status.OK, + 'details': status_text, + 'metadata': {} + } + }); done(); - }, 0); + }); - server.requestCall(function(event) { - assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW); - var server_call = event.call; + server.requestCall(function(err, call_details) { + var new_call = call_details['new call']; + assert.notEqual(new_call, null); + var server_call = new_call.call; assert.notEqual(server_call, null); - server_call.serverAccept(function(event) { - assert.strictEqual(event.type, grpc.completionType.FINISHED); - }, 0); - server_call.serverEndInitialMetadata(0); - server_call.startWriteStatus( - grpc.status.OK, - status_text, - function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - done(); - }); - }); - call.writesDone(function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); + var server_batch = {}; + server_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + server_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + 'metadata': {}, + 'code': grpc.status.OK, + 'details': status_text + }; + server_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; + server_call.startBatch(server_batch, function(err, response) { + assert.ifError(err); + assert.deepEqual(response, { + 'send metadata': true, + 'send status': true, + 'cancelled': false + }); + done(); + }); }); }); it('should successfully send and receive metadata', function(complete) { @@ -117,115 +126,110 @@ describe('end-to-end', function() { var status_text = 'xyz'; var call = new grpc.Call(channel, 'dummy_method', - deadline); - call.addMetadata({'client_key': ['client_value']}); - call.invoke(function(event) { - assert.strictEqual(event.type, - grpc.completionType.CLIENT_METADATA_READ); - assert.strictEqual(event.data.server_key[0].toString(), 'server_value'); - },function(event) { - assert.strictEqual(event.type, grpc.completionType.FINISHED); - var status = event.data; - assert.strictEqual(status.code, grpc.status.OK); - assert.strictEqual(status.details, status_text); + Infinity); + var client_batch = {}; + client_batch[grpc.opType.SEND_INITIAL_METADATA] = { + 'client_key': ['client_value'] + }; + client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; + client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; + client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; + call.startBatch(client_batch, function(err, response) { + assert.ifError(err); + assert(response['send metadata']); + assert(response['client close']); + assert(response.hasOwnProperty('metadata')); + assert.strictEqual(response.metadata.server_key[0].toString(), + 'server_value'); + assert.deepEqual(response.status, {'code': grpc.status.OK, + 'details': status_text, + 'metadata': {}}); done(); - }, 0); + }); - server.requestCall(function(event) { - assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW); - assert.strictEqual(event.data.metadata.client_key[0].toString(), + server.requestCall(function(err, call_details) { + var new_call = call_details['new call']; + assert.notEqual(new_call, null); + assert.strictEqual(new_call.metadata.client_key[0].toString(), 'client_value'); - var server_call = event.call; + var server_call = new_call.call; assert.notEqual(server_call, null); - server_call.serverAccept(function(event) { - assert.strictEqual(event.type, grpc.completionType.FINISHED); - }, 0); - server_call.addMetadata({'server_key': ['server_value']}); - server_call.serverEndInitialMetadata(0); - server_call.startWriteStatus( - grpc.status.OK, - status_text, - function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - done(); - }); - }); - call.writesDone(function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); + var server_batch = {}; + server_batch[grpc.opType.SEND_INITIAL_METADATA] = { + 'server_key': ['server_value'] + }; + server_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + 'metadata': {}, + 'code': grpc.status.OK, + 'details': status_text + }; + server_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; + server_call.startBatch(server_batch, function(err, response) { + assert.ifError(err); + assert.deepEqual(response, { + 'send metadata': true, + 'send status': true, + 'cancelled': false + }); + done(); + }); }); }); it('should send and receive data without error', function(complete) { var req_text = 'client_request'; var reply_text = 'server_response'; - var done = multiDone(complete, 6); + var done = multiDone(complete, 2); var deadline = new Date(); deadline.setSeconds(deadline.getSeconds() + 3); var status_text = 'success'; var call = new grpc.Call(channel, 'dummy_method', - deadline); - call.invoke(function(event) { - assert.strictEqual(event.type, - grpc.completionType.CLIENT_METADATA_READ); - done(); - },function(event) { - assert.strictEqual(event.type, grpc.completionType.FINISHED); - var status = event.data; - assert.strictEqual(status.code, grpc.status.OK); - assert.strictEqual(status.details, status_text); - done(); - }, 0); - call.startWrite( - new Buffer(req_text), - function(event) { - assert.strictEqual(event.type, - grpc.completionType.WRITE_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - call.writesDone(function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - done(); - }); - }, 0); - call.startRead(function(event) { - assert.strictEqual(event.type, grpc.completionType.READ); - assert.strictEqual(event.data.toString(), reply_text); + Infinity); + var client_batch = {}; + client_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + client_batch[grpc.opType.SEND_MESSAGE] = new Buffer(req_text); + client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; + client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; + client_batch[grpc.opType.RECV_MESSAGE] = true; + client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; + call.startBatch(client_batch, function(err, response) { + assert.ifError(err); + assert(response['send metadata']); + assert(response['client close']); + assert.deepEqual(response.metadata, {}); + assert(response['send message']); + assert.strictEqual(response.read.toString(), reply_text); + assert.deepEqual(response.status, {'code': grpc.status.OK, + 'details': status_text, + 'metadata': {}}); done(); }); - server.requestCall(function(event) { - assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW); - var server_call = event.call; + + server.requestCall(function(err, call_details) { + var new_call = call_details['new call']; + assert.notEqual(new_call, null); + var server_call = new_call.call; assert.notEqual(server_call, null); - server_call.serverAccept(function(event) { - assert.strictEqual(event.type, grpc.completionType.FINISHED); - done(); - }); - server_call.serverEndInitialMetadata(0); - server_call.startRead(function(event) { - assert.strictEqual(event.type, grpc.completionType.READ); - assert.strictEqual(event.data.toString(), req_text); - server_call.startWrite( - new Buffer(reply_text), - function(event) { - assert.strictEqual(event.type, - grpc.completionType.WRITE_ACCEPTED); - assert.strictEqual(event.data, - grpc.opError.OK); - server_call.startWriteStatus( - grpc.status.OK, - status_text, - function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - done(); - }); - }, 0); + var server_batch = {}; + server_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + server_batch[grpc.opType.RECV_MESSAGE] = true; + server_call.startBatch(server_batch, function(err, response) { + assert.ifError(err); + assert(response['send metadata']); + assert.strictEqual(response.read.toString(), req_text); + var response_batch = {}; + response_batch[grpc.opType.SEND_MESSAGE] = new Buffer(reply_text); + response_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + 'metadata': {}, + 'code': grpc.status.OK, + 'details': status_text + }; + response_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; + server_call.startBatch(response_batch, function(err, response) { + assert(response['send status']); + assert(!response['cancelled']); + done(); + }); }); }); }); diff --git a/src/node/test/interop_sanity_test.js b/src/node/test/interop_sanity_test.js index 7ecaad833d..81cd9fa5b9 100644 --- a/src/node/test/interop_sanity_test.js +++ b/src/node/test/interop_sanity_test.js @@ -56,7 +56,7 @@ describe('Interop tests', function() { interop_client.runTest(port, name_override, 'empty_unary', true, done); }); // This fails due to an unknown bug - it.skip('should pass large_unary', function(done) { + it('should pass large_unary', function(done) { interop_client.runTest(port, name_override, 'large_unary', true, done); }); it('should pass client_streaming', function(done) { diff --git a/src/node/test/math_client_test.js b/src/node/test/math_client_test.js index 0e365bf870..61b4a2fa2f 100644 --- a/src/node/test/math_client_test.js +++ b/src/node/test/math_client_test.js @@ -63,9 +63,6 @@ describe('Math client', function() { assert.ifError(err); assert.equal(value.quotient, 1); assert.equal(value.remainder, 3); - }); - call.on('status', function checkStatus(status) { - assert.strictEqual(status.code, grpc.status.OK); done(); }); }); diff --git a/src/node/test/server_test.js b/src/node/test/server_test.js deleted file mode 100644 index a3e1edf50f..0000000000 --- a/src/node/test/server_test.js +++ /dev/null @@ -1,122 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -var assert = require('assert'); -var grpc = require('bindings')('grpc.node'); -var Server = require('../src/server'); - -/** - * This is used for testing functions with multiple asynchronous calls that - * can happen in different orders. This should be passed the number of async - * function invocations that can occur last, and each of those should call this - * function's return value - * @param {function()} done The function that should be called when a test is - * complete. - * @param {number} count The number of calls to the resulting function if the - * test passes. - * @return {function()} The function that should be called at the end of each - * sequence of asynchronous functions. - */ -function multiDone(done, count) { - return function() { - count -= 1; - if (count <= 0) { - done(); - } - }; -} - -/** - * Responds to every request with the same data as a response - * @param {Stream} stream - */ -function echoHandler(stream) { - stream.pipe(stream); -} - -describe('echo server', function() { - var server; - var channel; - before(function() { - server = new Server(); - var port_num = server.bind('[::]:0'); - server.register('echo', echoHandler); - server.start(); - - channel = new grpc.Channel('localhost:' + port_num); - }); - after(function() { - server.shutdown(); - }); - it('should echo inputs as responses', function(done) { - done = multiDone(done, 4); - - var req_text = 'echo test string'; - var status_text = 'OK'; - - var deadline = new Date(); - deadline.setSeconds(deadline.getSeconds() + 3); - var call = new grpc.Call(channel, - 'echo', - deadline); - call.invoke(function(event) { - assert.strictEqual(event.type, - grpc.completionType.CLIENT_METADATA_READ); - done(); - },function(event) { - assert.strictEqual(event.type, grpc.completionType.FINISHED); - var status = event.data; - assert.strictEqual(status.code, grpc.status.OK); - assert.strictEqual(status.details, status_text); - done(); - }, 0); - call.startWrite( - new Buffer(req_text), - function(event) { - assert.strictEqual(event.type, - grpc.completionType.WRITE_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - call.writesDone(function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - done(); - }); - }, 0); - call.startRead(function(event) { - assert.strictEqual(event.type, grpc.completionType.READ); - assert.strictEqual(event.data.toString(), req_text); - done(); - }); - }); -}); diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index 1038f9ab33..34e4ab4013 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -33,9 +33,9 @@ var assert = require('assert'); -var surface_server = require('../src/surface_server.js'); +var surface_server = require('../src/server.js'); -var surface_client = require('../src/surface_client.js'); +var surface_client = require('../src/client.js'); var ProtoBuf = require('protobufjs'); |