From a9b99c93e3ac7557eb0ad52c83fde013518eee92 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 5 Feb 2015 17:42:01 -0800 Subject: Part of the update to the new API --- src/node/ext/call.cc | 155 ++++++++++++++++++++++++++- src/node/ext/node_grpc.cc | 31 ++++++ src/node/ext/tag.cc | 263 ++++++++++++++++++++++++++++++++++++++++------ src/node/ext/tag.h | 28 +++-- 4 files changed, 434 insertions(+), 43 deletions(-) (limited to 'src/node') diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 23aead07b2..d2e930bc0f 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -31,6 +31,8 @@ * */ +#include + #include #include "grpc/support/log.h" @@ -68,6 +70,50 @@ using v8::Value; Persistent Call::constructor; Persistent Call::fun_tpl; +bool CreateMetadataArray(Handle metadata, grpc_metadata_array *array + vector *string_handles, + vector*> *handles) { + NanScope(); + Handle keys(metadata->GetOwnPropertyNames()); + for (unsigned int i = 0; i < keys->Length(); i++) { + Handle current_key(keys->Get(i)->ToString()); + if (!metadata->Get(current_key)->IsArray()) { + return false; + } + array->capacity += Local::Cast(metadata->Get(current_key))->Length(); + } + array->metadata = calloc(array->capacity, sizeof(grpc_metadata)); + for (unsigned int i = 0; i < keys->Length(); i++) { + Handle current_key(keys->Get(i)->ToString()); + NanUtf8String *utf8_key = new NanUtf8String(current_key); + string_handles->push_back(utf8_key); + Handle values = Local::Cast(metadata->Get(current_key)); + for (unsigned int j = 0; j < values->Length(); j++) { + Handle value = values->Get(j); + grpc_metadata *current = &array[array->count]; + grpc_call_error error; + current->key = **utf8_key; + if (Buffer::HasInstance(value)) { + current->value = Buffer::Data(value); + current->value_length = Buffer::Length(value); + Persistent *handle = new Persistent(); + NanAssignPersistent(handle, object); + handles->push_back(handle); + } else if (value->IsString()) { + Handle string_value = value->ToString(); + NanUtf8String *utf8_value = new NanUtf8String(string_value); + string_handles->push_back(utf8_value); + current->value = **utf8_value; + current->value_length = string_value->Length(); + } else { + return false; + } + array->count += 1; + } + } + return true; +} + Call::Call(grpc_call *call) : wrapped_call(call) {} Call::~Call() { grpc_call_destroy(wrapped_call); } @@ -152,9 +198,9 @@ NAN_METHOD(Call::New) { NanUtf8String method(args[1]); double deadline = args[2]->NumberValue(); grpc_channel *wrapped_channel = channel->GetWrappedChannel(); - grpc_call *wrapped_call = grpc_channel_create_call_old( - wrapped_channel, *method, channel->GetHost(), - MillisecondsToTimespec(deadline)); + grpc_call *wrapped_call = grpc_channel_create_call( + wrapped_channel, CompletionQueueAsyncWorker::GetQueue(), *method, + channel->GetHost(), MillisecondsToTimespec(deadline)); call = new Call(wrapped_call); args.This()->SetHiddenValue(String::NewSymbol("channel_"), channel_object); @@ -168,6 +214,109 @@ NAN_METHOD(Call::New) { } } +NAN_METHOD(Call::StartBatch) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError("startBatch can only be called on Call objects"); + } + if (!args[0]->IsObject()) { + return NanThrowError("startBatch's first argument must be an object"); + } + if (!args[1]->IsFunction()) { + return NanThrowError("startBatch's second argument must be a callback"); + } + vector *> *handles = new vector>(); + vector *strings = new vector(); + Persistent *handle; + Handle keys = args[0]->GetOwnPropertyNames(); + size_t nops = keys->Length(); + grpc_op *ops = calloc(nops, sizeof(grpc_op)); + grpc_metadata_array array; + Handle server_status; + NanUtf8String *str; + for (unsigned int i = 0; i < nops; i++) { + if (!keys->Get(i)->IsUInt32()) { + return NanThrowError( + "startBatch's first argument's keys must be integers"); + } + uint32_t type = keys->Get(i)->UInt32Value(); + ops[i].op = type; + switch (type) { + case GRPC_OP_SEND_INITIAL_METADATA: + if (!args[0]->Get(type)->IsObject()) { + return NanThrowError("metadata must be an object"); + } + if (!CreateMetadataArray(args[0]->Get(type)->ToObject(), &array, + strings, handles)) { + return NanThrowError("failed to parse metadata"); + } + ops[i].data.send_initial_metadata.count = array.count; + ops[i].data.send_initial_metadata.metadata = array.metadata; + break + case GRPC_OP_SEND_MESSAGE: + if (!Buffer::HasInstance(args[0]->Get(type))) { + return NanThrowError("message must be a Buffer"); + } + ops[i].data.send_message = BufferToByteBuffer(args[0]->Get(type)); + handle = new Persistent(); + NanAssignPersistent(*handle, args[0]->Get(type)); + handles->push_back(handle); + break; + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + break; + case GRPC_OP_SEND_STATUS_FROM_SERVER: + if (!args[0]->Get(type)->IsObject()) { + return NanThrowError("server status must be an object"); + } + server_status = args[0]->Get(type)->ToObject(); + if (!server_status->Get("metadata")->IsObject()) { + return NanThrowError("status metadata must be an object"); + } + if (!server_status->Get("code")->IsUInt32()) { + return NanThrowError("status code must be a positive integer"); + } + if (!server_status->Get("details")->IsString()) { + return NanThrowError("status details must be a string"); + } + if (!CreateMetadataArray(server_status->Get("metadata")->ToObject(), + &array, strings, handles)) { + return NanThrowError("Failed to parse status metadata"); + } + ops[i].data.send_status_from_server.trailing_metadata_count = + array.count; + ops[i].data.send_status_from_server.trailing_metadata = array.metadata; + ops[i].data.send_status_from_server.status = + server_status->Get("code")->UInt32Value(); + str = new NanUtf8String(server_status->Get("details")); + strings->push_back(str); + ops[i].data.send_status_from_server.status_details = **str; + break; + case GRPC_OP_RECV_INITIAL_METADATA: + ops[i].data.recv_initial_metadata = malloc(sizeof(grpc_metadata_array)); + grpc_metadata_array_init(ops[i].data.recv_initial_metadata); + break; + case GRPC_OP_RECV_MESSAGE: + ops[i].data.recv_message = malloc(sizeof(grpc_byte_buffer*)); + break; + case GRPC_OP_RECV_STATUS_ON_CLIENT: + ops[i].data.recv_status_on_client.trailing_metadata = + malloc(sizeof(grpc_metadata_array)); + grpc_metadata_array_init(ops[i].data.recv_status_on_client); + ops[i].data.recv_status_on_client.status = + malloc(sizeof(grpc_status_code)); + ops[i].data.recv_status_on_client.status_details = + malloc(sizeof(char *)); + ops[i].data.recv_status_on_client.status_details_capacity = + malloc(sizeof(size_t)); + break; + case GRPC_OP_RECV_CLOSE_ON_SERVER: + ops[i].data.recv_close_on_server = malloc(sizeof(int)); + break; + + } + } +} + NAN_METHOD(Call::AddMetadata) { NanScope(); if (!HasInstance(args.This())) { diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc index bc1dfaf899..c9388940ad 100644 --- a/src/node/ext/node_grpc.cc +++ b/src/node/ext/node_grpc.cc @@ -161,6 +161,36 @@ void InitCompletionTypeConstants(Handle exports) { completion_type->Set(NanNew("SERVER_RPC_NEW"), SERVER_RPC_NEW); } +void InitOpTypeConstants(Handle exports) { + NanScope(); + Handle op_type = Object::New(); + exports->Set(NanNew("opType"), op_type); + Handle SEND_INITIAL_METADATA( + NanNew(GRPC_OP_SEND_INITIAL_METADATA)); + op_type->Set(NanNew("SEND_INITIAL_METADATA"), SEND_INITIAL_METADATA); + Handle SEND_MESSAGE( + NanNew(GRPC_OP_SEND_MESSAGE)); + op_type->Set(NanNew("SEND_MESSAGE"), SEND_MESSAGE); + Handle SEND_CLOSE_FROM_CLIENT( + NanNew(GRPC_OP_SEND_CLOSE_FROM_CLIENT)); + op_type->Set(NanNew("SEND_CLOSE_FROM_CLIENT"), SEND_CLOSE_FROM_CLIENT); + Handle SEND_STATUS_FROM_SERVER( + NanNew(GRPC_OP_SEND_STATUS_FROM_SERVER)); + op_type->Set(NanNew("SEND_STATUS_FROM_SERVER"), SEND_STATUS_FROM_SERVER); + Handle RECV_INITIAL_METADATA( + NanNew(GRPC_OP_RECV_INITIAL_METADATA)); + op_type->Set(NanNew("RECV_INITIAL_METADATA"), RECV_INITIAL_METADATA); + Handle RECV_MESSAGE( + NanNew(GRPC_OP_RECV_MESSAGE)); + op_type->Set(NanNew("RECV_MESSAGE"), RECV_MESSAGE); + Handle RECV_STATUS_ON_CLIENT( + NanNew(GRPC_OP_RECV_STATUS_ON_CLIENT)); + op_type->Set(NanNew("RECV_STATUS_ON_CLIENT"), RECV_STATUS_ON_CLIENT); + Handle RECV_CLOSE_ON_SERVER( + NanNew(GRPC_OP_RECV_CLOSE_ON_SERVER)); + op_type->Set(NanNew("RECV_CLOSE_ON_SERVER"), RECV_CLOSE_ON_SERVER); +} + void init(Handle exports) { NanScope(); grpc_init(); @@ -168,6 +198,7 @@ void init(Handle exports) { InitCallErrorConstants(exports); InitOpErrorConstants(exports); InitCompletionTypeConstants(exports); + InitOpTypeConstants(exports); grpc::node::Call::Init(exports); grpc::node::Channel::Init(exports); diff --git a/src/node/ext/tag.cc b/src/node/ext/tag.cc index dc8e523e12..4c41c3d20c 100644 --- a/src/node/ext/tag.cc +++ b/src/node/ext/tag.cc @@ -31,68 +31,271 @@ * */ +#include +#include + +#include #include #include #include #include "tag.h" +#include "call.h" namespace grpc { namespace node { +using v8::Boolean; +using v8::Function; using v8::Handle; using v8::HandleScope; using v8::Persistent; using v8::Value; -struct tag { - tag(Persistent *tag, Persistent *call) - : persist_tag(tag), persist_call(call) {} +Handle ParseMetadata(grpc_metadata_array *metadata_array) { + NanEscapableScope(); + grpc_metadata *metadata_elements = metadata_array->metadata; + size_t length = metadata_array->count; + std::map size_map; + std::map index_map; + + for (unsigned int i = 0; i < length; i++) { + char *key = metadata_elements[i].key; + if (size_map.count(key)) { + size_map[key] += 1; + } + index_map[key] = 0; + } + Handle metadata_object = NanNew(); + for (unsigned int i = 0; i < length; i++) { + grpc_metadata* elem = &metadata_elements[i]; + Handle key_string = String::New(elem->key); + Handle array; + if (metadata_object->Has(key_string)) { + array = Handle::Cast(metadata_object->Get(key_string)); + } else { + array = NanNew(size_map[elem->key]); + metadata_object->Set(key_string, array); + } + array->Set(index_map[elem->key], + MakeFastBuffer( + NanNewBufferHandle(elem->value, elem->value_length))); + index_map[elem->key] += 1; + } + return NanEscapeScope(metadata_object); +} + +class OpResponse { + public: + explicit OpResponse(char *name): name(name) { + } + virtual Handle GetNodeValue() const = 0; + Handle GetOpType() const { + NanEscapableScope(); + return NanEscapeScope(NanNew(name)); + } + + private: + char *name; +}; + +class SendResponse : public OpResponse { + public: + explicit SendResponse(char *name): OpResponse(name) { + } + + Handle GetNodeValue() { + NanEscapableScope(); + return NanEscapeScope(NanTrue()); + } +} + +class MetadataResponse : public OpResponse { + public: + explicit MetadataResponse(grpc_metadata_array *recv_metadata): + recv_metadata(recv_metadata), OpResponse("metadata") { + } + + Handle GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(ParseMetadata(recv_metadata)); + } + + private: + grpc_metadata_array *recv_metadata; +}; + +class MessageResponse : public OpResponse { + public: + explicit MessageResponse(grpc_byte_buffer **recv_message): + recv_message(recv_message), OpResponse("read") { + } + + Handle GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(ByteBufferToBuffer(*recv_message)); + } + + private: + grpc_byte_buffer **recv_message +}; + +class ClientStatusResponse : public OpResponse { + public: + explicit ClientStatusResponse(grpc_metadata_array *metadata_array, + grpc_status_code *status, + char **status_details): + metadata_array(metadata_array), status(status), + status_details(status_details), OpResponse("status") { + } + + Handle GetNodeValue() const { + NanEscapableScope(); + Handle status_obj = NanNew(); + status_obj->Set(NanNew("code"), NanNew(*status)); + if (event->data.finished.details != NULL) { + status_obj->Set(NanNew("details"), String::New(*status_details)); + } + status_obj->Set(NanNew("metadata"), ParseMetadata(metadata_array)); + return NanEscapeScope(status_obj); + } + private: + grpc_metadata_array *metadata_array; + grpc_status_code *status; + char **status_details; +}; + +class ServerCloseResponse : public OpResponse { + public: + explicit ServerCloseResponse(int *cancelled): cancelled(cancelled), + OpResponse("cancelled") { + } + + Handle GetNodeValue() const { + NanEscapableScope(); + NanEscapeScope(NanNew(*cancelled)); + } + + private: + int *cancelled; +}; + +class NewCallResponse : public OpResponse { + public: + explicit NewCallResponse(grpc_call **call, grpc_call_details *details, + grpc_metadata_array *request_metadata) : + call(call), details(details), request_metadata(request_metadata), + OpResponse("call"){ + } + + Handle GetNodeValue() const { + NanEscapableScope(); + if (*call == NULL) { + return NanEscapeScope(NanNull()); + } + Handle obj = NanNew(); + obj->Set(NanNew("call"), Call::WrapStruct(*call)); + obj->Set(NanNew("method"), NanNew(details->method)); + obj->Set(NanNew("host"), NanNew(details->host)); + obj->Set(NanNew("deadline"), + NanNew(TimespecToMilliseconds(details->deadline))); + obj->Set(NanNew("metadata"), ParseMetadata(request_metadata)); + return NanEscapeScope(obj); + } + private: + grpc_call **call; + grpc_call_details *details; + grpc_metadata_array *request_metadata; +} +struct tag { + tag(NanCallback *callback, std::vector *responses) : + callback(callback), repsonses(responses) { + } ~tag() { - persist_tag->Dispose(); - if (persist_call != NULL) { - persist_call->Dispose(); + for (std::vector::iterator it = responses->begin(); + it != responses->end(); ++it) { + delete *it; } + delete callback; + delete responses; } - Persistent *persist_tag; - Persistent *persist_call; + NanCallback *callback; + std::vector *responses; }; -void *CreateTag(Handle tag, Handle call) { +void *CreateTag(Handle callback, grpc_op *ops, size_t nops) { NanScope(); - Persistent *persist_tag = new Persistent(); - NanAssignPersistent(*persist_tag, tag); - Persistent *persist_call; - if (call->IsNull() || call->IsUndefined()) { - persist_call = NULL; - } else { - persist_call = new Persistent(); - NanAssignPersistent(*persist_call, call); - } - struct tag *tag_struct = new struct tag(persist_tag, persist_call); + NanCallback *cb = new NanCallback(callback); + vector *responses = new vector(); + for (size_t i = 0; i < nops; i++) { + grpc_op *op = &ops[i]; + OpResponse *resp; + // Switching on the TYPE of the op + switch (op->op) { + case GRPC_OP_SEND_INITIAL_METADATA: + resp = new SendResponse("send metadata"); + break; + case GRPC_OP_SEND_MESSAGE: + resp = new SendResponse("write"); + break; + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + resp = new SendResponse("client close"); + break; + case GRPC_OP_SEND_STATUS_FROM_SERVER: + resp = new SendResponse("server close"); + break; + case GRPC_OP_RECV_INITIAL_METADATA: + resp = new MetadataResponse(op->data.recv_initial_metadata); + break; + case GRPC_OP_RECV_MESSAGE: + resp = new MessageResponse(op->data.recv_message); + break; + case GRPC_OP_RECV_STATUS_ON_CLIENT: + resp = new ClientStatusResponse( + op->data.recv_status_on_client.trailing_metadata, + op->data.recv_status_on_client.status, + op->data.recv_status_on_client.status_details); + break; + case GRPC_RECV_CLOSE_ON_SERVER: + resp = new ServerCloseResponse(op->data.recv_close_on_server.cancelled); + break; + default: + continue; + } + responses->push_back(resp); + } + struct tag *tag_struct = new struct tag(cb, responses); return reinterpret_cast(tag_struct); } -Handle GetTagHandle(void *tag) { +void *CreateTag(Handle callback, grpc_call **call, + grpc_call_details *details, + grpc_metadata_array *request_metadata) { NanEscapableScope(); - struct tag *tag_struct = reinterpret_cast(tag); - Handle tag_value = NanNew(*tag_struct->persist_tag); - return NanEscapeScope(tag_value); + NanCallback *cb = new NanCallback(callback); + vector *responses = new vector(); + OpResponse *resp = new NewCallResponse(call, details, request_metadata); + responses->push_back(resp); + struct tag *tag_struct = new struct tag(cb, responses); + return reinterpret_cast(tag_struct); } -bool TagHasCall(void *tag) { +NanCallback GetCallback(void *tag) { + NanEscapableScope(); struct tag *tag_struct = reinterpret_cast(tag); - return tag_struct->persist_call != NULL; + return NanEscapeScope(*tag_struct->callback); } -Handle TagGetCall(void *tag) { +Handle GetNodeValue(void *tag) { NanEscapableScope(); struct tag *tag_struct = reinterpret_cast(tag); - if (tag_struct->persist_call == NULL) { - return NanEscapeScope(NanNull()); + Handle obj = NanNew(); + for (std::vector::iterator it = tag_struct->responses->begin(); + it != tag_struct->responses->end(); ++it) { + OpResponse *resp = *it; + obj->Set(resp->GetOpType(), resp->GetNodeValue()); } - Handle call_value = NanNew(*tag_struct->persist_call); - return NanEscapeScope(call_value); + return NanEscapeScope(obj); } void DestroyTag(void *tag) { delete reinterpret_cast(tag); } diff --git a/src/node/ext/tag.h b/src/node/ext/tag.h index bdb09252d9..5c70974323 100644 --- a/src/node/ext/tag.h +++ b/src/node/ext/tag.h @@ -34,21 +34,29 @@ #ifndef NET_GRPC_NODE_TAG_H_ #define NET_GRPC_NODE_TAG_H_ +#include #include +#include namespace grpc { namespace node { -/* Create a void* tag that can be passed to various grpc_call functions from - a javascript value and the javascript wrapper for the call. The call can be - null. */ -void *CreateTag(v8::Handle tag, v8::Handle call); -/* Return the javascript value stored in the tag */ -v8::Handle GetTagHandle(void *tag); -/* Returns true if the call was set (non-null) when the tag was created */ -bool TagHasCall(void *tag); -/* Returns the javascript wrapper for the call associated with this tag */ -v8::Handle TagGetCall(void *call); +/* Create a void* tag that can be passed to grpc_call_start_batch from a callback + function and an ops array */ +void *CreateTag(v8::Handle callback, grpc_op *ops, size_t nops); + +/* Create a void* tag that can be passed to grpc_server_request_call from a + callback and the various out parameters to that function */ +void *CreateTag(v8::Handle callback, grpc_call **call, + grpc_call_details *details, + grpc_metadata_array *request_metadata); + +/* Get the callback from the tag */ +NanCallback GetCallback(void *tag); + +/* Get the combined output value from the tag */ +v8::Handle GetNodevalue(void *tag); + /* Destroy the tag and all resources it is holding. It is illegal to call any of these other functions on a tag after it has been destroyed. */ void DestroyTag(void *tag); -- cgit v1.2.3 From d4d67ade9b2183cfec08ecc241e2540f3494fe48 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 9 Feb 2015 10:43:21 -0800 Subject: More progress towards new API compatibility --- src/node/ext/call.cc | 648 ++++++++++++++------------ src/node/ext/call.h | 49 +- src/node/ext/completion_queue_async_worker.cc | 22 +- src/node/ext/completion_queue_async_worker.h | 2 + src/node/ext/server.cc | 55 ++- src/node/ext/tag.cc | 49 +- src/node/ext/tag.h | 13 +- 7 files changed, 494 insertions(+), 344 deletions(-) (limited to 'src/node') diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index d2e930bc0f..85dcb3cd07 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -31,24 +31,26 @@ * */ +#include #include #include #include "grpc/support/log.h" #include "grpc/grpc.h" +#include "grpc/support/alloc.h" #include "grpc/support/time.h" #include "byte_buffer.h" #include "call.h" #include "channel.h" #include "completion_queue_async_worker.h" #include "timeval.h" -#include "tag.h" namespace grpc { namespace node { using ::node::Buffer; +using std::unique_ptr; using v8::Arguments; using v8::Array; using v8::Exception; @@ -70,9 +72,11 @@ using v8::Value; Persistent Call::constructor; Persistent Call::fun_tpl; -bool CreateMetadataArray(Handle metadata, grpc_metadata_array *array - vector *string_handles, - vector*> *handles) { + +bool CreateMetadataArray( + Handle metadata, grpc_metadata_array *array, + std::vector > *string_handles, + std::vector > *handles) { NanScope(); Handle keys(metadata->GetOwnPropertyNames()); for (unsigned int i = 0; i < keys->Length(); i++) { @@ -82,27 +86,27 @@ bool CreateMetadataArray(Handle metadata, grpc_metadata_array *array } array->capacity += Local::Cast(metadata->Get(current_key))->Length(); } - array->metadata = calloc(array->capacity, sizeof(grpc_metadata)); + array->metadata = reinterpret_cast( + gpr_malloc(array->capacity * sizeof(grpc_metadata))); for (unsigned int i = 0; i < keys->Length(); i++) { Handle current_key(keys->Get(i)->ToString()); NanUtf8String *utf8_key = new NanUtf8String(current_key); - string_handles->push_back(utf8_key); + string_handles->push_back(unique_ptr values = Local::Cast(metadata->Get(current_key)); for (unsigned int j = 0; j < values->Length(); j++) { Handle value = values->Get(j); - grpc_metadata *current = &array[array->count]; - grpc_call_error error; + grpc_metadata *current = &array->metadata[array->count]; current->key = **utf8_key; if (Buffer::HasInstance(value)) { current->value = Buffer::Data(value); current->value_length = Buffer::Length(value); - Persistent *handle = new Persistent(); - NanAssignPersistent(handle, object); - handles->push_back(handle); + Persistent handle; + NanAssignPersistent(handle, value); + handles->push_back(PersistentHolder(handle)); } else if (value->IsString()) { Handle string_value = value->ToString(); NanUtf8String *utf8_value = new NanUtf8String(string_value); - string_handles->push_back(utf8_value); + string_handles->push_back(unique_ptr(utf8_value)); current->value = **utf8_value; current->value_length = string_value->Length(); } else { @@ -114,6 +118,294 @@ bool CreateMetadataArray(Handle metadata, grpc_metadata_array *array return true; } +Handle ParseMetadata(grpc_metadata_array *metadata_array) { + NanEscapableScope(); + grpc_metadata *metadata_elements = metadata_array->metadata; + size_t length = metadata_array->count; + std::map size_map; + std::map index_map; + + for (unsigned int i = 0; i < length; i++) { + char *key = metadata_elements[i].key; + if (size_map.count(key)) { + size_map[key] += 1; + } + index_map[key] = 0; + } + Handle metadata_object = NanNew(); + for (unsigned int i = 0; i < length; i++) { + grpc_metadata* elem = &metadata_elements[i]; + Handle key_string = String::New(elem->key); + Handle array; + if (metadata_object->Has(key_string)) { + array = Handle::Cast(metadata_object->Get(key_string)); + } else { + array = NanNew(size_map[elem->key]); + metadata_object->Set(key_string, array); + } + array->Set(index_map[elem->key], + MakeFastBuffer( + NanNewBufferHandle(elem->value, elem->value_length))); + index_map[elem->key] += 1; + } + return NanEscapeScope(metadata_object); +} + +class Op { + public: + Handle GetOpType() const { + NanEscapableScope(); + return NanEscapeScope(NanNew(GetTypeString())); + } +}; + +class SendMetadataOp : public Op { + public: + Handle GetNodeValue() { + NanEscapableScope(); + return NanEscapeScope(NanTrue()); + } + bool ParseOp(Handle value, grpc_op *out, + std::vector > *strings, + std::vector > *handles) { + if (!value->IsObject()) { + return false; + } + grpc_metadata_array array; + if (!CreateMetadataArray(value->ToObject(), &array, strings, handles)) { + return false; + } + out->data.send_initial_metadata.count = array.count; + out->data.send_initial_metadata.metadata = array.metadata; + return true; + } + protected: + char *GetTypeString() { + return "send metadata"; + } +}; + +class SendMessageOp : public Op { + public: + Handle GetNodeValue() { + NanEscapableScope(); + return NanEscapeScope(NanTrue()); + } + bool ParseOp(Handle value, grpc_op *out, + std::vector > *strings, + std::vector > *handles) { + if (!Buffer::HasInstance(value)) { + return false; + } + out->.data.send_message = BufferToByteBuffer(obj->Get(type)); + NanAssignPersistent(handle, value); + handles->push_back(PersistentHolder(handle)); + } + protected: + char *GetTypeString() { + return "send message"; + } +}; + +class SendClientCloseOp : public Op { + public: + Handle GetNodeValue() { + NanEscapableScope(); + return NanEscapeScope(NanTrue()); + } + bool ParseOp(Handle value, grpc_op *out, + std::vector > *strings, + std::vector > *handles) { + return true; + } + protected: + char *GetTypeString() { + return "client close"; + } +}; + +class SendServerStatusOp : public Op { + public: + Handle GetNodeValue() { + NanEscapableScope(); + return NanEscapeScope(NanTrue()); + } + bool ParseOp(Handle value, grpc_op *out, + std::vector > *strings, + std::vector > *handles) { + if (value->IsObject()) { + return false; + } + Handle server_status = value->ToObject(); + if (!server_status->Get(NanNew("metadata"))->IsObject()) { + return false; + } + if (!server_status->Get(NanNew("code"))->IsUint32()) { + return false; + } + if (!server_status->Get(NanNew("details"))->IsString()) { + return false; + } + grpc_metadata_array array; + if (!CreateMetadataArray(server_status->Get(NanNew("metadata"))-> + ToObject(), + &array, strings, handles)) { + return false; + } + out->data.send_status_from_server.trailing_metadata_count = array.count; + out->data.send_status_from_server.trailing_metadata = array.metadata; + out->data.send_status_from_server.status = + static_cast( + server_status->Get(NanNew("code"))->Uint32Value()); + NanUtf8String *str = new NanUtf8String( + server_status->Get(NanNew("details"))); + strings->push_back(unique_ptr(str)); + out->data.send_status_from_server.status_details = **str; + return true; + } + protected: + char *GetTypeString() { + return "send status"; + } +} + +class GetMetadataOp : public Op { + public: + GetMetadataOp() { + grpc_metadata_array_init(&recv_metadata); + } + + ~GetMetadataOp() { + grpc_metadata_array_destroy(&recv_metadata); + } + + Handle GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(ParseMetadata(&recv_metadata)); + } + + bool ParseOp(Handle value, grpc_op *out, + std::vector > *strings, + std::vector > *handles) { + out->data.recv_initial_metadata = &recv_metadata; + } + + protected: + char *GetTypeString() { + return "metadata"; + } + + private: + grpc_metadata_array recv_metadata; +}; + +class ReadMessageOp : public Op { + public: + ReadMessageOp() { + recv_message = NULL; + } + ~ReadMessageOp() { + if (recv_message != NULL) { + gpr_free(recv_message); + } + } + Handle GetNodeValue() const { + NanEscapableScope(); + return NanEscapeScope(ByteBufferToBuffer(*recv_message)); + } + + bool ParseOp(Handle value, grpc_op *out, + std::vector > *strings, + std::vector > *handles) { + out->data.recv_message = &recv_message; + } + + protected: + char *GetTypeString() { + return "read"; + } + + private: + grpc_byte_buffer *recv_message; +}; + +class ClientStatusOp : public Op { + public: + ClientStatusOp() { + grpc_metadata_array_init(&metadata); + status_details = NULL; + } + + ~ClientStatusOp() { + gprc_metadata_array_destroy(&metadata_array); + gpr_free(status_details); + } + + bool ParseOp(Handle value, grpc_op *out, + std::vector > *strings, + std::vector > *handles) { + out->data.recv_status_on_client.trailing_metadata = &metadata_array; + out->data.recv_status_on_client.status = &status; + out->data.recv_status_on_client.status_details = &status_details; + out->data.recv_status_on_client.status_details_capacity = &details_capacity; + } + + Handle GetNodeValue() const { + NanEscapableScope(); + Handle status_obj = NanNew(); + status_obj->Set(NanNew("code"), NanNew(status)); + if (event->data.finished.details != NULL) { + status_obj->Set(NanNew("details"), String::New(status_details)); + } + status_obj->Set(NanNew("metadata"), ParseMetadata(&metadata_array)); + return NanEscapeScope(status_obj); + } + private: + grpc_metadata_array metadata_array; + grpc_status_code status; + char *status_details; + size_t details_capacity; +}; + +class ServerCloseResponseOp : public Op { + public: + Handle GetNodeValue() const { + NanEscapableScope(); + NanEscapeScope(NanNew(cancelled)); + } + + bool ParseOp(Handle value, grpc_op *out, + std::vector > *strings, + std::vector > *handles) { + out->data.recv_close_on_server.cancelled = &cancelled; + } + + private: + int cancelled; +}; + +struct tag { + tag(NanCallback *callback, std::vector > *ops, + std::vector > *handles, + std::vector > *strings) : + callback(callback), ops(ops), handles(handles), strings(strings){ + } + ~tag() { + if (strings != null) { + for (std::vector::iterator it = strings.begin(); + it != strings.end(); ++it) { + delete *it; + } + delete strings; + } + delete callback; + delete ops; + if (handles != null) { + delete handles; + } + } +}; + Call::Call(grpc_call *call) : wrapped_call(call) {} Call::~Call() { grpc_call_destroy(wrapped_call); } @@ -123,28 +415,10 @@ void Call::Init(Handle exports) { Local tpl = FunctionTemplate::New(New); tpl->SetClassName(NanNew("Call")); tpl->InstanceTemplate()->SetInternalFieldCount(1); - NanSetPrototypeTemplate(tpl, "addMetadata", - FunctionTemplate::New(AddMetadata)->GetFunction()); - NanSetPrototypeTemplate(tpl, "invoke", - FunctionTemplate::New(Invoke)->GetFunction()); - NanSetPrototypeTemplate(tpl, "serverAccept", - FunctionTemplate::New(ServerAccept)->GetFunction()); - NanSetPrototypeTemplate( - tpl, "serverEndInitialMetadata", - FunctionTemplate::New(ServerEndInitialMetadata)->GetFunction()); + NanSetPrototypeTemplate(tpl, "startBatch", + FunctionTemplate::New(StartBatch)->GetFunction()); NanSetPrototypeTemplate(tpl, "cancel", FunctionTemplate::New(Cancel)->GetFunction()); - NanSetPrototypeTemplate(tpl, "startWrite", - FunctionTemplate::New(StartWrite)->GetFunction()); - NanSetPrototypeTemplate( - tpl, "startWriteStatus", - FunctionTemplate::New(StartWriteStatus)->GetFunction()); - NanSetPrototypeTemplate(tpl, "writesDone", - FunctionTemplate::New(WritesDone)->GetFunction()); - NanSetPrototypeTemplate(tpl, "startReadMetadata", - FunctionTemplate::New(WritesDone)->GetFunction()); - NanSetPrototypeTemplate(tpl, "startRead", - FunctionTemplate::New(StartRead)->GetFunction()); NanAssignPersistent(fun_tpl, tpl); NanAssignPersistent(constructor, tpl->GetFunction()); constructor->Set(NanNew("WRITE_BUFFER_HINT"), @@ -225,211 +499,64 @@ NAN_METHOD(Call::StartBatch) { if (!args[1]->IsFunction()) { return NanThrowError("startBatch's second argument must be a callback"); } - vector *> *handles = new vector>(); - vector *strings = new vector(); - Persistent *handle; - Handle keys = args[0]->GetOwnPropertyNames(); + Call *call = ObjectWrap::Unwrap(args.This()); + std::vector > *handles = + new std::vector >(); + std::vector > *strings = + new std::vector >(); + Persistent handle; + Handle obj = args[0]->ToObject(); + Handle keys = obj->GetOwnPropertyNames(); size_t nops = keys->Length(); - grpc_op *ops = calloc(nops, sizeof(grpc_op)); - grpc_metadata_array array; - Handle server_status; - NanUtf8String *str; + grpc_op *ops = new grpc_op[nops]; + std::vector > *op_vector = new std::vector >(); for (unsigned int i = 0; i < nops; i++) { - if (!keys->Get(i)->IsUInt32()) { + Op *op; + if (!keys->Get(i)->IsUint32()) { return NanThrowError( "startBatch's first argument's keys must be integers"); } - uint32_t type = keys->Get(i)->UInt32Value(); - ops[i].op = type; + uint32_t type = keys->Get(i)->Uint32Value(); + ops[i].op = static_cast(type); switch (type) { case GRPC_OP_SEND_INITIAL_METADATA: - if (!args[0]->Get(type)->IsObject()) { - return NanThrowError("metadata must be an object"); - } - if (!CreateMetadataArray(args[0]->Get(type)->ToObject(), &array, - strings, handles)) { - return NanThrowError("failed to parse metadata"); - } - ops[i].data.send_initial_metadata.count = array.count; - ops[i].data.send_initial_metadata.metadata = array.metadata; - break + op = new SendMetadataOp(); + break; case GRPC_OP_SEND_MESSAGE: - if (!Buffer::HasInstance(args[0]->Get(type))) { - return NanThrowError("message must be a Buffer"); - } - ops[i].data.send_message = BufferToByteBuffer(args[0]->Get(type)); - handle = new Persistent(); - NanAssignPersistent(*handle, args[0]->Get(type)); - handles->push_back(handle); + op = new SendMessageOp(); break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + op = new SendClientCloseOp(); break; case GRPC_OP_SEND_STATUS_FROM_SERVER: - if (!args[0]->Get(type)->IsObject()) { - return NanThrowError("server status must be an object"); - } - server_status = args[0]->Get(type)->ToObject(); - if (!server_status->Get("metadata")->IsObject()) { - return NanThrowError("status metadata must be an object"); - } - if (!server_status->Get("code")->IsUInt32()) { - return NanThrowError("status code must be a positive integer"); - } - if (!server_status->Get("details")->IsString()) { - return NanThrowError("status details must be a string"); - } - if (!CreateMetadataArray(server_status->Get("metadata")->ToObject(), - &array, strings, handles)) { - return NanThrowError("Failed to parse status metadata"); - } - ops[i].data.send_status_from_server.trailing_metadata_count = - array.count; - ops[i].data.send_status_from_server.trailing_metadata = array.metadata; - ops[i].data.send_status_from_server.status = - server_status->Get("code")->UInt32Value(); - str = new NanUtf8String(server_status->Get("details")); - strings->push_back(str); - ops[i].data.send_status_from_server.status_details = **str; + op = new SendServerStatusOp(); break; case GRPC_OP_RECV_INITIAL_METADATA: - ops[i].data.recv_initial_metadata = malloc(sizeof(grpc_metadata_array)); - grpc_metadata_array_init(ops[i].data.recv_initial_metadata); + op = new GetMetadataOp(); break; case GRPC_OP_RECV_MESSAGE: - ops[i].data.recv_message = malloc(sizeof(grpc_byte_buffer*)); + op = new ReadMessageOp(); break; case GRPC_OP_RECV_STATUS_ON_CLIENT: - ops[i].data.recv_status_on_client.trailing_metadata = - malloc(sizeof(grpc_metadata_array)); - grpc_metadata_array_init(ops[i].data.recv_status_on_client); - ops[i].data.recv_status_on_client.status = - malloc(sizeof(grpc_status_code)); - ops[i].data.recv_status_on_client.status_details = - malloc(sizeof(char *)); - ops[i].data.recv_status_on_client.status_details_capacity = - malloc(sizeof(size_t)); + op = new ClientStatusOp(); break; case GRPC_OP_RECV_CLOSE_ON_SERVER: - ops[i].data.recv_close_on_server = malloc(sizeof(int)); + op = new ServerCloseResponseOp(); break; - + default: + return NanThrowError("Argument object had an unrecognized key"); } + op.ParseOp(obj.get(type), &ops[i], strings, handles); + op_vector.push_back(unique_ptr(op)); } -} - -NAN_METHOD(Call::AddMetadata) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("addMetadata can only be called on Call objects"); - } - Call *call = ObjectWrap::Unwrap(args.This()); - if (!args[0]->IsObject()) { - return NanThrowTypeError("addMetadata's first argument must be an object"); - } - Handle metadata = args[0]->ToObject(); - Handle keys(metadata->GetOwnPropertyNames()); - for (unsigned int i = 0; i < keys->Length(); i++) { - Handle current_key(keys->Get(i)->ToString()); - if (!metadata->Get(current_key)->IsArray()) { - return NanThrowTypeError( - "addMetadata's first argument's values must be arrays"); - } - NanUtf8String utf8_key(current_key); - Handle values = Local::Cast(metadata->Get(current_key)); - for (unsigned int j = 0; j < values->Length(); j++) { - Handle value = values->Get(j); - grpc_metadata metadata; - grpc_call_error error; - metadata.key = *utf8_key; - if (Buffer::HasInstance(value)) { - metadata.value = Buffer::Data(value); - metadata.value_length = Buffer::Length(value); - error = grpc_call_add_metadata_old(call->wrapped_call, &metadata, 0); - } else if (value->IsString()) { - Handle string_value = value->ToString(); - NanUtf8String utf8_value(string_value); - metadata.value = *utf8_value; - metadata.value_length = string_value->Length(); - gpr_log(GPR_DEBUG, "adding metadata: %s, %s, %d", metadata.key, - metadata.value, metadata.value_length); - error = grpc_call_add_metadata_old(call->wrapped_call, &metadata, 0); - } else { - return NanThrowTypeError( - "addMetadata values must be strings or buffers"); - } - if (error != GRPC_CALL_OK) { - return NanThrowError("addMetadata failed", error); - } - } - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::Invoke) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("invoke can only be called on Call objects"); - } - if (!args[0]->IsFunction()) { - return NanThrowTypeError("invoke's first argument must be a function"); - } - if (!args[1]->IsFunction()) { - return NanThrowTypeError("invoke's second argument must be a function"); - } - if (!args[2]->IsUint32()) { - return NanThrowTypeError("invoke's third argument must be integer flags"); - } - Call *call = ObjectWrap::Unwrap(args.This()); - unsigned int flags = args[3]->Uint32Value(); - grpc_call_error error = grpc_call_invoke_old( - call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(), - CreateTag(args[0], args.This()), CreateTag(args[1], args.This()), flags); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("invoke failed", error); - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::ServerAccept) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("accept can only be called on Call objects"); - } - if (!args[0]->IsFunction()) { - return NanThrowTypeError("accept's first argument must be a function"); - } - Call *call = ObjectWrap::Unwrap(args.This()); - grpc_call_error error = grpc_call_server_accept_old( - call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(), - CreateTag(args[0], args.This())); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("serverAccept failed", error); - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::ServerEndInitialMetadata) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError( - "serverEndInitialMetadata can only be called on Call objects"); - } - if (!args[0]->IsUint32()) { - return NanThrowTypeError( - "serverEndInitialMetadata's second argument must be integer flags"); - } - Call *call = ObjectWrap::Unwrap(args.This()); - unsigned int flags = args[1]->Uint32Value(); - grpc_call_error error = - grpc_call_server_end_initial_metadata_old(call->wrapped_call, flags); + grpc_call_error error = grpc_call_start_batch( + call->wrapped_call, ops, nops, new struct tag(args[1].As(), + op_vector, nops, handles, + strings)); if (error != GRPC_CALL_OK) { - return NanThrowError("serverEndInitialMetadata failed", error); + return NanThrowError("startBatch failed", error); } + CompletionQueueAsyncWorker::Next(); NanReturnUndefined(); } @@ -446,102 +573,5 @@ NAN_METHOD(Call::Cancel) { NanReturnUndefined(); } -NAN_METHOD(Call::StartWrite) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("startWrite can only be called on Call objects"); - } - if (!Buffer::HasInstance(args[0])) { - return NanThrowTypeError("startWrite's first argument must be a Buffer"); - } - if (!args[1]->IsFunction()) { - return NanThrowTypeError("startWrite's second argument must be a function"); - } - if (!args[2]->IsUint32()) { - return NanThrowTypeError( - "startWrite's third argument must be integer flags"); - } - Call *call = ObjectWrap::Unwrap(args.This()); - grpc_byte_buffer *buffer = BufferToByteBuffer(args[0]); - unsigned int flags = args[2]->Uint32Value(); - grpc_call_error error = grpc_call_start_write_old( - call->wrapped_call, buffer, CreateTag(args[1], args.This()), flags); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("startWrite failed", error); - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::StartWriteStatus) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError( - "startWriteStatus can only be called on Call objects"); - } - if (!args[0]->IsUint32()) { - return NanThrowTypeError( - "startWriteStatus's first argument must be a status code"); - } - if (!args[1]->IsString()) { - return NanThrowTypeError( - "startWriteStatus's second argument must be a string"); - } - if (!args[2]->IsFunction()) { - return NanThrowTypeError( - "startWriteStatus's third argument must be a function"); - } - Call *call = ObjectWrap::Unwrap(args.This()); - NanUtf8String details(args[1]); - grpc_call_error error = grpc_call_start_write_status_old( - call->wrapped_call, (grpc_status_code)args[0]->Uint32Value(), *details, - CreateTag(args[2], args.This())); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("startWriteStatus failed", error); - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::WritesDone) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("writesDone can only be called on Call objects"); - } - if (!args[0]->IsFunction()) { - return NanThrowTypeError("writesDone's first argument must be a function"); - } - Call *call = ObjectWrap::Unwrap(args.This()); - grpc_call_error error = grpc_call_writes_done_old( - call->wrapped_call, CreateTag(args[0], args.This())); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("writesDone failed", error); - } - NanReturnUndefined(); -} - -NAN_METHOD(Call::StartRead) { - NanScope(); - if (!HasInstance(args.This())) { - return NanThrowTypeError("startRead can only be called on Call objects"); - } - if (!args[0]->IsFunction()) { - return NanThrowTypeError("startRead's first argument must be a function"); - } - Call *call = ObjectWrap::Unwrap(args.This()); - grpc_call_error error = grpc_call_start_read_old( - call->wrapped_call, CreateTag(args[0], args.This())); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { - return NanThrowError("startRead failed", error); - } - NanReturnUndefined(); -} - } // namespace node } // namespace grpc diff --git a/src/node/ext/call.h b/src/node/ext/call.h index 1924a1bf42..6ae370d02f 100644 --- a/src/node/ext/call.h +++ b/src/node/ext/call.h @@ -34,6 +34,8 @@ #ifndef NET_GRPC_NODE_CALL_H_ #define NET_GRPC_NODE_CALL_H_ +#include + #include #include #include "grpc/grpc.h" @@ -43,6 +45,44 @@ namespace grpc { namespace node { +using std::unique_ptr; + +class PersistentHolder { + public: + explicit PersistentHolder(v8::Persistent persist) : persist(persist) { + } + + ~PersistentHolder() { + persist.Dispose(); + } + + private: + v8::Persistent persist; +}; + +class Op { + public: + virtual Handle GetNodeValue() const = 0; + virtual bool ParseOp(v8::Handle value, grpc_op *out, + std::vector > *strings, + std::vector > *handles) = 0; + Handle GetOpType(); + + protected: + virtual char *GetTypeString(); +}; + +struct tag { + tag(NanCallback *callback, std::vector > *ops, + std::vector > *handles, + std::vector > *strings); + ~tag(); + NanCallback *callback; + std::vector > *ops; + std::vector > *handles; + std::vector > *strings; +}; + /* Wrapper class for grpc_call structs. */ class Call : public ::node::ObjectWrap { public: @@ -60,15 +100,8 @@ class Call : public ::node::ObjectWrap { Call &operator=(const Call &); static NAN_METHOD(New); - static NAN_METHOD(AddMetadata); - static NAN_METHOD(Invoke); - static NAN_METHOD(ServerAccept); - static NAN_METHOD(ServerEndInitialMetadata); + static NAN_METHOD(StartBatch); static NAN_METHOD(Cancel); - static NAN_METHOD(StartWrite); - static NAN_METHOD(StartWriteStatus); - static NAN_METHOD(WritesDone); - static NAN_METHOD(StartRead); static v8::Persistent constructor; // Used for typechecking instances of this javascript class static v8::Persistent fun_tpl; diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc index 8de7db66d5..bb0e39180e 100644 --- a/src/node/ext/completion_queue_async_worker.cc +++ b/src/node/ext/completion_queue_async_worker.cc @@ -37,7 +37,6 @@ #include "grpc/grpc.h" #include "grpc/support/time.h" #include "completion_queue_async_worker.h" -#include "event.h" #include "tag.h" namespace grpc { @@ -58,6 +57,9 @@ CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {} void CompletionQueueAsyncWorker::Execute() { result = grpc_completion_queue_next(queue, gpr_inf_future); + if (result->data.op_complete != GRPC_OP_OK) { + SetErrorMessage("The batch encountered an error"); + } } grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; } @@ -75,14 +77,26 @@ void CompletionQueueAsyncWorker::Init(Handle exports) { void CompletionQueueAsyncWorker::HandleOKCallback() { NanScope(); - NanCallback event_callback(GetTagHandle(result->tag).As()); - Handle argv[] = {CreateEventObject(result)}; + NanCallback callback = GetTagCallback(result->tag); + Handle argv[] = {NanNull(), GetNodeValue(result->tag)}; DestroyTag(result->tag); grpc_event_finish(result); result = NULL; - event_callback.Call(1, argv); + callback.Call(2, argv); +} + +void CompletionQueueAsyncWorker::HandleErrorCallback() { + NanScope(); + NanCallback callback = GetTagCallback(result->tag); + Handle argv[] = {NanError(ErrorMessage())}; + + DestroyTag(result->tag); + grpc_event_finish(result); + result = NULL; + + callback.Call(1, argv); } } // namespace node diff --git a/src/node/ext/completion_queue_async_worker.h b/src/node/ext/completion_queue_async_worker.h index 2c928b7024..c04a303283 100644 --- a/src/node/ext/completion_queue_async_worker.h +++ b/src/node/ext/completion_queue_async_worker.h @@ -67,6 +67,8 @@ class CompletionQueueAsyncWorker : public NanAsyncWorker { completion_queue_next */ void HandleOKCallback(); + void HandleErrorCallback(); + private: grpc_event *result; diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc index 6b8ccef9b1..c0ccf1f381 100644 --- a/src/node/ext/server.cc +++ b/src/node/ext/server.cc @@ -31,6 +31,8 @@ * */ +#include + #include "server.h" #include @@ -49,6 +51,7 @@ namespace grpc { namespace node { +using std::unique_ptr; using v8::Arguments; using v8::Array; using v8::Boolean; @@ -67,6 +70,45 @@ using v8::Value; Persistent Server::constructor; Persistent Server::fun_tpl; +class NewCallOp : public Op { + public: + NewCallOp() { + call = NULL; + grpc_call_details_init(&details); + grpc_metadata_array_init(&request_metadata); + } + + ~NewCallOp() { + grpc_call_details_destroy(&details); + grpc_metadata_array_destroy(&details); + } + + Handle GetNodeValue() const { + NanEscapableScope(); + if (*call == NULL) { + return NanEscapeScope(NanNull()); + } + Handle obj = NanNew(); + obj->Set(NanNew("call"), Call::WrapStruct(call)); + obj->Set(NanNew("method"), NanNew(details.method)); + obj->Set(NanNew("host"), NanNew(details.host)); + obj->Set(NanNew("deadline"), + NanNew(TimespecToMilliseconds(details.deadline))); + obj->Set(NanNew("metadata"), ParseMetadata(&request_metadata)); + return NanEscapeScope(obj); + } + + bool ParseOp(Handle value, grpc_op *out, + std::vector > strings, + std::vector > handles) { + return true; + } + + grpc_call *call; + grpc_call_details details; + grpc_metadata_array request_metadata; +} + Server::Server(grpc_server *server) : wrapped_server(server) {} Server::~Server() { grpc_server_destroy(wrapped_server); } @@ -175,13 +217,16 @@ NAN_METHOD(Server::RequestCall) { return NanThrowTypeError("requestCall can only be called on a Server"); } Server *server = ObjectWrap::Unwrap(args.This()); - grpc_call_error error = grpc_server_request_call_old( - server->wrapped_server, CreateTag(args[0], NanNull())); - if (error == GRPC_CALL_OK) { - CompletionQueueAsyncWorker::Next(); - } else { + Op *op = new NewCallOp(); + std::vector > *ops = { unique_ptr(op) }; + grpc_call_error error = grpc_server_request_call( + server->wrapped_server, &op->call, &op->details, &op->metadata, + CompletionQueueAsyncWorker::GetQueue(), + new struct tag(args[0].As(), ops, NULL, NULL)); + if (error != GRPC_CALL_OK) { return NanThrowError("requestCall failed", error); } + CompletionQueueAsyncWorker::Next(); NanReturnUndefined(); } diff --git a/src/node/ext/tag.cc b/src/node/ext/tag.cc index 4c41c3d20c..27baa94a8e 100644 --- a/src/node/ext/tag.cc +++ b/src/node/ext/tag.cc @@ -89,6 +89,7 @@ class OpResponse { explicit OpResponse(char *name): name(name) { } virtual Handle GetNodeValue() const = 0; + virtual bool ParseOp() = 0; Handle GetOpType() const { NanEscapableScope(); return NanEscapeScope(NanNew(name)); @@ -136,16 +137,23 @@ class MessageResponse : public OpResponse { } private: - grpc_byte_buffer **recv_message + grpc_byte_buffer **recv_message; }; +switch () { +case GRPC_RECV_CLIENT_STATUS: + op = new ClientStatusResponse; + break; +} + + class ClientStatusResponse : public OpResponse { public: - explicit ClientStatusResponse(grpc_metadata_array *metadata_array, - grpc_status_code *status, - char **status_details): - metadata_array(metadata_array), status(status), - status_details(status_details), OpResponse("status") { + explicit ClientStatusResponse(): + OpResponse("status") { + } + + bool ParseOp(Handle obj, grpc_op *out) { } Handle GetNodeValue() const { @@ -159,9 +167,9 @@ class ClientStatusResponse : public OpResponse { return NanEscapeScope(status_obj); } private: - grpc_metadata_array *metadata_array; - grpc_status_code *status; - char **status_details; + grpc_metadata_array metadata_array; + grpc_status_code status; + char *status_details; }; class ServerCloseResponse : public OpResponse { @@ -208,22 +216,35 @@ class NewCallResponse : public OpResponse { } struct tag { - tag(NanCallback *callback, std::vector *responses) : - callback(callback), repsonses(responses) { + tag(NanCallback *callback, std::vector *responses, + std::vector> *handles, + std::vector *strings) : + callback(callback), repsonses(responses), handles(handles), + strings(strings){ } ~tag() { for (std::vector::iterator it = responses->begin(); it != responses->end(); ++it) { delete *it; } + for (std::vector::iterator it = responses->begin(); + it != responses->end(); ++it) { + delete *it; + } delete callback; delete responses; + delete handles; + delete strings; } NanCallback *callback; std::vector *responses; + std::vector> *handles; + std::vector *strings; }; -void *CreateTag(Handle callback, grpc_op *ops, size_t nops) { +void *CreateTag(Handle callback, grpc_op *ops, size_t nops, + std::vector> *handles, + std::vector *strings) { NanScope(); NanCallback *cb = new NanCallback(callback); vector *responses = new vector(); @@ -264,7 +285,7 @@ void *CreateTag(Handle callback, grpc_op *ops, size_t nops) { } responses->push_back(resp); } - struct tag *tag_struct = new struct tag(cb, responses); + struct tag *tag_struct = new struct tag(cb, responses, handles, strings); return reinterpret_cast(tag_struct); } @@ -280,7 +301,7 @@ void *CreateTag(Handle callback, grpc_call **call, return reinterpret_cast(tag_struct); } -NanCallback GetCallback(void *tag) { +NanCallback GetTagCallback(void *tag) { NanEscapableScope(); struct tag *tag_struct = reinterpret_cast(tag); return NanEscapeScope(*tag_struct->callback); diff --git a/src/node/ext/tag.h b/src/node/ext/tag.h index 5c70974323..9ff8703b95 100644 --- a/src/node/ext/tag.h +++ b/src/node/ext/tag.h @@ -34,6 +34,9 @@ #ifndef NET_GRPC_NODE_TAG_H_ #define NET_GRPC_NODE_TAG_H_ +#include + + #include #include #include @@ -41,9 +44,11 @@ namespace grpc { namespace node { -/* Create a void* tag that can be passed to grpc_call_start_batch from a callback - function and an ops array */ -void *CreateTag(v8::Handle callback, grpc_op *ops, size_t nops); +/* Create a void* tag that can be passed to grpc_call_start_batch from a + callback function and an ops array */ +void *CreateTag(v8::Handle callback, grpc_op *ops, size_t nops, + std::vector > *handles, + std::vector *strings); /* Create a void* tag that can be passed to grpc_server_request_call from a callback and the various out parameters to that function */ @@ -55,7 +60,7 @@ void *CreateTag(v8::Handle callback, grpc_call **call, NanCallback GetCallback(void *tag); /* Get the combined output value from the tag */ -v8::Handle GetNodevalue(void *tag); +v8::Handle GetNodeValue(void *tag); /* Destroy the tag and all resources it is holding. It is illegal to call any of these other functions on a tag after it has been destroyed. */ -- cgit v1.2.3 From ff43c093b646cc3b9b4166a75decc0fa0e436ce9 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 9 Feb 2015 11:41:23 -0800 Subject: Fixed some compiler errors in call.cc --- src/node/binding.gyp | 7 +++--- src/node/ext/call.cc | 61 +++++++++++++++++++++++++++++++--------------------- src/node/ext/call.h | 12 +++++------ 3 files changed, 47 insertions(+), 33 deletions(-) (limited to 'src/node') diff --git a/src/node/binding.gyp b/src/node/binding.gyp index cf2a6acb04..a289b9b9e8 100644 --- a/src/node/binding.gyp +++ b/src/node/binding.gyp @@ -9,14 +9,15 @@ 'include_dirs': [ " #include +#include #include @@ -46,11 +47,12 @@ #include "completion_queue_async_worker.h" #include "timeval.h" +using std::unique_ptr; + namespace grpc { namespace node { using ::node::Buffer; -using std::unique_ptr; using v8::Arguments; using v8::Array; using v8::Exception; @@ -91,7 +93,7 @@ bool CreateMetadataArray( for (unsigned int i = 0; i < keys->Length(); i++) { Handle current_key(keys->Get(i)->ToString()); NanUtf8String *utf8_key = new NanUtf8String(current_key); - string_handles->push_back(unique_ptrpush_back(unique_ptr(utf8_key)); Handle values = Local::Cast(metadata->Get(current_key)); for (unsigned int j = 0; j < values->Length(); j++) { Handle value = values->Get(j); @@ -102,7 +104,8 @@ bool CreateMetadataArray( current->value_length = Buffer::Length(value); Persistent handle; NanAssignPersistent(handle, value); - handles->push_back(PersistentHolder(handle)); + handles->push_back(unique_ptr( + new PersistentHolder(handle))); } else if (value->IsString()) { Handle string_value = value->ToString(); NanUtf8String *utf8_value = new NanUtf8String(string_value); @@ -118,15 +121,15 @@ bool CreateMetadataArray( return true; } -Handle ParseMetadata(grpc_metadata_array *metadata_array) { +Handle ParseMetadata(const grpc_metadata_array *metadata_array) { NanEscapableScope(); grpc_metadata *metadata_elements = metadata_array->metadata; size_t length = metadata_array->count; - std::map size_map; - std::map index_map; + std::map size_map; + std::map index_map; for (unsigned int i = 0; i < length; i++) { - char *key = metadata_elements[i].key; + const char *key = metadata_elements[i].key; if (size_map.count(key)) { size_map[key] += 1; } @@ -151,13 +154,10 @@ Handle ParseMetadata(grpc_metadata_array *metadata_array) { return NanEscapeScope(metadata_object); } -class Op { - public: - Handle GetOpType() const { - NanEscapableScope(); - return NanEscapeScope(NanNew(GetTypeString())); - } -}; +Handle Op::GetOpType() const { + NanEscapableScope(); + return NanEscapeScope(NanNew(GetTypeString())); +} class SendMetadataOp : public Op { public: @@ -180,7 +180,7 @@ class SendMetadataOp : public Op { return true; } protected: - char *GetTypeString() { + std::string GetTypeString() { return "send metadata"; } }; @@ -197,12 +197,13 @@ class SendMessageOp : public Op { if (!Buffer::HasInstance(value)) { return false; } - out->.data.send_message = BufferToByteBuffer(obj->Get(type)); + out->data.send_message = BufferToByteBuffer(obj->Get(type)); NanAssignPersistent(handle, value); - handles->push_back(PersistentHolder(handle)); + handles->push_back(unique_ptr( + new PersistentHolder(handle))); } protected: - char *GetTypeString() { + std::string GetTypeString() { return "send message"; } }; @@ -219,7 +220,7 @@ class SendClientCloseOp : public Op { return true; } protected: - char *GetTypeString() { + std::string GetTypeString() { return "client close"; } }; @@ -264,7 +265,7 @@ class SendServerStatusOp : public Op { return true; } protected: - char *GetTypeString() { + std::string GetTypeString() { return "send status"; } } @@ -291,7 +292,7 @@ class GetMetadataOp : public Op { } protected: - char *GetTypeString() { + std::string GetTypeString() { return "metadata"; } @@ -311,17 +312,18 @@ class ReadMessageOp : public Op { } Handle GetNodeValue() const { NanEscapableScope(); - return NanEscapeScope(ByteBufferToBuffer(*recv_message)); + return NanEscapeScope(ByteBufferToBuffer(recv_message)); } bool ParseOp(Handle value, grpc_op *out, std::vector > *strings, std::vector > *handles) { out->data.recv_message = &recv_message; + return true; } protected: - char *GetTypeString() { + std::string GetTypeString() { return "read"; } @@ -348,6 +350,7 @@ class ClientStatusOp : public Op { out->data.recv_status_on_client.status = &status; out->data.recv_status_on_client.status_details = &status_details; out->data.recv_status_on_client.status_details_capacity = &details_capacity; + return true; } Handle GetNodeValue() const { @@ -360,6 +363,10 @@ class ClientStatusOp : public Op { status_obj->Set(NanNew("metadata"), ParseMetadata(&metadata_array)); return NanEscapeScope(status_obj); } + protected: + std::string GetTypeString() const { + return "status"; + } private: grpc_metadata_array metadata_array; grpc_status_code status; @@ -378,6 +385,12 @@ class ServerCloseResponseOp : public Op { std::vector > *strings, std::vector > *handles) { out->data.recv_close_on_server.cancelled = &cancelled; + return true; + } + + protected: + std::string GetTypeString() const { + return "cancelled"; } private: diff --git a/src/node/ext/call.h b/src/node/ext/call.h index 6ae370d02f..6c38877d3a 100644 --- a/src/node/ext/call.h +++ b/src/node/ext/call.h @@ -49,27 +49,27 @@ using std::unique_ptr; class PersistentHolder { public: - explicit PersistentHolder(v8::Persistent persist) : persist(persist) { + explicit PersistentHolder(v8::Persistent persist) : persist(persist) { } ~PersistentHolder() { persist.Dispose(); - } +} private: - v8::Persistent persist; + v8::Persistent persist; }; class Op { public: - virtual Handle GetNodeValue() const = 0; + virtual v8::Handle GetNodeValue() const = 0; virtual bool ParseOp(v8::Handle value, grpc_op *out, std::vector > *strings, std::vector > *handles) = 0; - Handle GetOpType(); + v8::Handle GetOpType() const; protected: - virtual char *GetTypeString(); + virtual char *GetTypeString() const; }; struct tag { -- cgit v1.2.3 From 5efd50fd985cf90d8cd82c8537e26eafe7855200 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 9 Feb 2015 11:43:34 -0800 Subject: Fixed another compiler error --- src/node/ext/call.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/node') diff --git a/src/node/ext/call.h b/src/node/ext/call.h index 6c38877d3a..880ce7c451 100644 --- a/src/node/ext/call.h +++ b/src/node/ext/call.h @@ -69,7 +69,7 @@ class Op { v8::Handle GetOpType() const; protected: - virtual char *GetTypeString() const; + virtual std::string GetTypeString() const; }; struct tag { -- cgit v1.2.3 From 016bb50e763978b27ddf73612f65fdd84b89f478 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 9 Feb 2015 15:55:10 -0800 Subject: Extension module now compiles and some tests pass --- src/node/binding.gyp | 2 - src/node/ext/call.cc | 109 +++++++++++++--------- src/node/ext/call.h | 14 ++- src/node/ext/completion_queue_async_worker.cc | 4 +- src/node/ext/node_grpc.cc | 33 ------- src/node/ext/server.cc | 27 ++++-- src/node/test/call_test.js | 128 +++++++++++--------------- src/node/test/constant_test.js | 37 -------- src/node/test/end_to_end_test.js | 81 ++++++++-------- 9 files changed, 195 insertions(+), 240 deletions(-) (limited to 'src/node') diff --git a/src/node/binding.gyp b/src/node/binding.gyp index a289b9b9e8..fb4c779f8e 100644 --- a/src/node/binding.gyp +++ b/src/node/binding.gyp @@ -34,11 +34,9 @@ "ext/channel.cc", "ext/completion_queue_async_worker.cc", "ext/credentials.cc", - "ext/event.cc", "ext/node_grpc.cc", "ext/server.cc", "ext/server_credentials.cc", - "ext/tag.cc", "ext/timeval.cc" ], 'conditions' : [ diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 4751621c9f..785cee8d3e 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -55,6 +55,7 @@ namespace node { using ::node::Buffer; using v8::Arguments; using v8::Array; +using v8::Boolean; using v8::Exception; using v8::External; using v8::Function; @@ -80,6 +81,7 @@ bool CreateMetadataArray( std::vector > *string_handles, std::vector > *handles) { NanScope(); + grpc_metadata_array_init(array); Handle keys(metadata->GetOwnPropertyNames()); for (unsigned int i = 0; i < keys->Length(); i++) { Handle current_key(keys->Get(i)->ToString()); @@ -156,12 +158,12 @@ Handle ParseMetadata(const grpc_metadata_array *metadata_array) { Handle Op::GetOpType() const { NanEscapableScope(); - return NanEscapeScope(NanNew(GetTypeString())); + return NanEscapeScope(NanNew(GetTypeString())); } class SendMetadataOp : public Op { public: - Handle GetNodeValue() { + Handle GetNodeValue() const { NanEscapableScope(); return NanEscapeScope(NanTrue()); } @@ -180,14 +182,14 @@ class SendMetadataOp : public Op { return true; } protected: - std::string GetTypeString() { + std::string GetTypeString() const { return "send metadata"; } }; class SendMessageOp : public Op { public: - Handle GetNodeValue() { + Handle GetNodeValue() const { NanEscapableScope(); return NanEscapeScope(NanTrue()); } @@ -197,20 +199,22 @@ class SendMessageOp : public Op { if (!Buffer::HasInstance(value)) { return false; } - out->data.send_message = BufferToByteBuffer(obj->Get(type)); + out->data.send_message = BufferToByteBuffer(value); + Persistent handle; NanAssignPersistent(handle, value); handles->push_back(unique_ptr( new PersistentHolder(handle))); + return true; } protected: - std::string GetTypeString() { + std::string GetTypeString() const { return "send message"; } }; class SendClientCloseOp : public Op { public: - Handle GetNodeValue() { + Handle GetNodeValue() const { NanEscapableScope(); return NanEscapeScope(NanTrue()); } @@ -220,14 +224,14 @@ class SendClientCloseOp : public Op { return true; } protected: - std::string GetTypeString() { + std::string GetTypeString() const { return "client close"; } }; class SendServerStatusOp : public Op { public: - Handle GetNodeValue() { + Handle GetNodeValue() const { NanEscapableScope(); return NanEscapeScope(NanTrue()); } @@ -265,10 +269,10 @@ class SendServerStatusOp : public Op { return true; } protected: - std::string GetTypeString() { + std::string GetTypeString() const { return "send status"; } -} +}; class GetMetadataOp : public Op { public: @@ -289,10 +293,11 @@ class GetMetadataOp : public Op { std::vector > *strings, std::vector > *handles) { out->data.recv_initial_metadata = &recv_metadata; + return true; } protected: - std::string GetTypeString() { + std::string GetTypeString() const { return "metadata"; } @@ -323,7 +328,7 @@ class ReadMessageOp : public Op { } protected: - std::string GetTypeString() { + std::string GetTypeString() const { return "read"; } @@ -334,12 +339,13 @@ class ReadMessageOp : public Op { class ClientStatusOp : public Op { public: ClientStatusOp() { - grpc_metadata_array_init(&metadata); + grpc_metadata_array_init(&metadata_array); status_details = NULL; + details_capacity = 0; } ~ClientStatusOp() { - gprc_metadata_array_destroy(&metadata_array); + grpc_metadata_array_destroy(&metadata_array); gpr_free(status_details); } @@ -357,7 +363,7 @@ class ClientStatusOp : public Op { NanEscapableScope(); Handle status_obj = NanNew(); status_obj->Set(NanNew("code"), NanNew(status)); - if (event->data.finished.details != NULL) { + if (status_details != NULL) { status_obj->Set(NanNew("details"), String::New(status_details)); } status_obj->Set(NanNew("metadata"), ParseMetadata(&metadata_array)); @@ -378,7 +384,7 @@ class ServerCloseResponseOp : public Op { public: Handle GetNodeValue() const { NanEscapableScope(); - NanEscapeScope(NanNew(cancelled)); + return NanEscapeScope(NanNew(cancelled)); } bool ParseOp(Handle value, grpc_op *out, @@ -397,27 +403,43 @@ class ServerCloseResponseOp : public Op { int cancelled; }; -struct tag { - tag(NanCallback *callback, std::vector > *ops, - std::vector > *handles, - std::vector > *strings) : - callback(callback), ops(ops), handles(handles), strings(strings){ - } - ~tag() { - if (strings != null) { - for (std::vector::iterator it = strings.begin(); - it != strings.end(); ++it) { - delete *it; - } - delete strings; - } - delete callback; - delete ops; - if (handles != null) { - delete handles; - } +tag::tag(NanCallback *callback, std::vector > *ops, + std::vector > *handles, + std::vector > *strings) : + callback(callback), ops(ops), handles(handles), strings(strings){ +} +tag::~tag() { + delete callback; + delete ops; + if (handles != NULL) { + delete handles; } -}; + if (strings != NULL) { + delete strings; + } +} + +Handle GetTagNodeValue(void *tag) { + NanEscapableScope(); + struct tag *tag_struct = reinterpret_cast(tag); + Handle tag_obj = NanNew(); + for (std::vector >::iterator it = tag_struct->ops->begin(); + it != tag_struct->ops->end(); ++it) { + Op *op_ptr = it->get(); + tag_obj->Set(op_ptr->GetOpType(), op_ptr->GetNodeValue()); + } + return NanEscapeScope(tag_obj); +} + +NanCallback GetTagCallback(void *tag) { + struct tag *tag_struct = reinterpret_cast(tag); + return *tag_struct->callback; +} + +void DestroyTag(void *tag) { + struct tag *tag_struct = reinterpret_cast(tag); + delete tag_struct; +} Call::Call(grpc_call *call) : wrapped_call(call) {} @@ -559,13 +581,16 @@ NAN_METHOD(Call::StartBatch) { default: return NanThrowError("Argument object had an unrecognized key"); } - op.ParseOp(obj.get(type), &ops[i], strings, handles); - op_vector.push_back(unique_ptr(op)); + if (!op->ParseOp(obj->Get(type), &ops[i], strings, handles)) { + return NanThrowTypeError("Incorrectly typed arguments to startBatch"); + } + op_vector->push_back(unique_ptr(op)); } grpc_call_error error = grpc_call_start_batch( - call->wrapped_call, ops, nops, new struct tag(args[1].As(), - op_vector, nops, handles, - strings)); + call->wrapped_call, ops, nops, new struct tag( + new NanCallback(args[1].As()), + op_vector, handles, + strings)); if (error != GRPC_CALL_OK) { return NanThrowError("startBatch failed", error); } diff --git a/src/node/ext/call.h b/src/node/ext/call.h index 880ce7c451..434bcf8a63 100644 --- a/src/node/ext/call.h +++ b/src/node/ext/call.h @@ -35,6 +35,7 @@ #define NET_GRPC_NODE_CALL_H_ #include +#include #include #include @@ -47,9 +48,12 @@ namespace node { using std::unique_ptr; +v8::Handle ParseMetadata(const grpc_metadata_array *metadata_array); + class PersistentHolder { public: - explicit PersistentHolder(v8::Persistent persist) : persist(persist) { + explicit PersistentHolder(v8::Persistent persist) : + persist(persist) { } ~PersistentHolder() { @@ -69,7 +73,7 @@ class Op { v8::Handle GetOpType() const; protected: - virtual std::string GetTypeString() const; + virtual std::string GetTypeString() const = 0; }; struct tag { @@ -83,6 +87,12 @@ struct tag { std::vector > *strings; }; +v8::Handle GetTagNodeValue(void *tag); + +NanCallback GetTagCallback(void *tag); + +void DestroyTag(void *tag); + /* Wrapper class for grpc_call structs. */ class Call : public ::node::ObjectWrap { public: diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc index bb0e39180e..5c0e27e6a7 100644 --- a/src/node/ext/completion_queue_async_worker.cc +++ b/src/node/ext/completion_queue_async_worker.cc @@ -37,7 +37,7 @@ #include "grpc/grpc.h" #include "grpc/support/time.h" #include "completion_queue_async_worker.h" -#include "tag.h" +#include "call.h" namespace grpc { namespace node { @@ -78,7 +78,7 @@ void CompletionQueueAsyncWorker::Init(Handle exports) { void CompletionQueueAsyncWorker::HandleOKCallback() { NanScope(); NanCallback callback = GetTagCallback(result->tag); - Handle argv[] = {NanNull(), GetNodeValue(result->tag)}; + Handle argv[] = {NanNull(), GetTagNodeValue(result->tag)}; DestroyTag(result->tag); grpc_event_finish(result); diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc index c9388940ad..9b0fe82976 100644 --- a/src/node/ext/node_grpc.cc +++ b/src/node/ext/node_grpc.cc @@ -130,37 +130,6 @@ void InitCallErrorConstants(Handle exports) { call_error->Set(NanNew("INVALID_FLAGS"), INVALID_FLAGS); } -void InitOpErrorConstants(Handle exports) { - NanScope(); - Handle op_error = Object::New(); - exports->Set(NanNew("opError"), op_error); - Handle OK(NanNew(GRPC_OP_OK)); - op_error->Set(NanNew("OK"), OK); - Handle ERROR(NanNew(GRPC_OP_ERROR)); - op_error->Set(NanNew("ERROR"), ERROR); -} - -void InitCompletionTypeConstants(Handle exports) { - NanScope(); - Handle completion_type = Object::New(); - exports->Set(NanNew("completionType"), completion_type); - Handle QUEUE_SHUTDOWN(NanNew(GRPC_QUEUE_SHUTDOWN)); - completion_type->Set(NanNew("QUEUE_SHUTDOWN"), QUEUE_SHUTDOWN); - Handle READ(NanNew(GRPC_READ)); - completion_type->Set(NanNew("READ"), READ); - Handle WRITE_ACCEPTED(NanNew(GRPC_WRITE_ACCEPTED)); - completion_type->Set(NanNew("WRITE_ACCEPTED"), WRITE_ACCEPTED); - Handle FINISH_ACCEPTED(NanNew(GRPC_FINISH_ACCEPTED)); - completion_type->Set(NanNew("FINISH_ACCEPTED"), FINISH_ACCEPTED); - Handle CLIENT_METADATA_READ( - NanNew(GRPC_CLIENT_METADATA_READ)); - completion_type->Set(NanNew("CLIENT_METADATA_READ"), CLIENT_METADATA_READ); - Handle FINISHED(NanNew(GRPC_FINISHED)); - completion_type->Set(NanNew("FINISHED"), FINISHED); - Handle SERVER_RPC_NEW(NanNew(GRPC_SERVER_RPC_NEW)); - completion_type->Set(NanNew("SERVER_RPC_NEW"), SERVER_RPC_NEW); -} - void InitOpTypeConstants(Handle exports) { NanScope(); Handle op_type = Object::New(); @@ -196,8 +165,6 @@ void init(Handle exports) { grpc_init(); InitStatusConstants(exports); InitCallErrorConstants(exports); - InitOpErrorConstants(exports); - InitCompletionTypeConstants(exports); InitOpTypeConstants(exports); grpc::node::Call::Init(exports); diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc index c0ccf1f381..75ea681fa7 100644 --- a/src/node/ext/server.cc +++ b/src/node/ext/server.cc @@ -45,8 +45,8 @@ #include "grpc/grpc_security.h" #include "call.h" #include "completion_queue_async_worker.h" -#include "tag.h" #include "server_credentials.h" +#include "timeval.h" namespace grpc { namespace node { @@ -55,6 +55,7 @@ using std::unique_ptr; using v8::Arguments; using v8::Array; using v8::Boolean; +using v8::Date; using v8::Exception; using v8::Function; using v8::FunctionTemplate; @@ -80,12 +81,12 @@ class NewCallOp : public Op { ~NewCallOp() { grpc_call_details_destroy(&details); - grpc_metadata_array_destroy(&details); + grpc_metadata_array_destroy(&request_metadata); } Handle GetNodeValue() const { NanEscapableScope(); - if (*call == NULL) { + if (call == NULL) { return NanEscapeScope(NanNull()); } Handle obj = NanNew(); @@ -99,15 +100,20 @@ class NewCallOp : public Op { } bool ParseOp(Handle value, grpc_op *out, - std::vector > strings, - std::vector > handles) { + std::vector > *strings, + std::vector > *handles) { return true; } grpc_call *call; grpc_call_details details; grpc_metadata_array request_metadata; -} + + protected: + std::string GetTypeString() const { + return "new call"; + } +}; Server::Server(grpc_server *server) : wrapped_server(server) {} @@ -217,12 +223,13 @@ NAN_METHOD(Server::RequestCall) { return NanThrowTypeError("requestCall can only be called on a Server"); } Server *server = ObjectWrap::Unwrap(args.This()); - Op *op = new NewCallOp(); - std::vector > *ops = { unique_ptr(op) }; + NewCallOp *op = new NewCallOp(); + std::vector > *ops = new std::vector >(); + ops->push_back(unique_ptr(op)); grpc_call_error error = grpc_server_request_call( - server->wrapped_server, &op->call, &op->details, &op->metadata, + server->wrapped_server, &op->call, &op->details, &op->request_metadata, CompletionQueueAsyncWorker::GetQueue(), - new struct tag(args[0].As(), ops, NULL, NULL)); + new struct tag(new NanCallback(args[0].As()), ops, NULL, NULL)); if (error != GRPC_CALL_OK) { return NanThrowError("requestCall failed", error); } diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js index dfa9aaa1a7..e341092ff8 100644 --- a/src/node/test/call_test.js +++ b/src/node/test/call_test.js @@ -98,104 +98,80 @@ describe('call', function() { }, TypeError); }); }); - describe('addMetadata', function() { - it('should succeed with a map from strings to string arrays', function() { + describe('startBatch', function() { + it('should fail without an object and a function', function() { var call = new grpc.Call(channel, 'method', getDeadline(1)); - assert.doesNotThrow(function() { - call.addMetadata({'key': ['value']}); + assert.throws(function() { + call.startBatch(); }); - assert.doesNotThrow(function() { - call.addMetadata({'key1': ['value1'], 'key2': ['value2']}); + assert.throws(function() { + call.startBatch({}); + }); + assert.throws(function() { + call.startBatch(null, function(){}); }); }); - it('should succeed with a map from strings to buffer arrays', function() { + it.skip('should succeed with an empty object', function(done) { var call = new grpc.Call(channel, 'method', getDeadline(1)); assert.doesNotThrow(function() { - call.addMetadata({'key': [new Buffer('value')]}); - }); - assert.doesNotThrow(function() { - call.addMetadata({'key1': [new Buffer('value1')], - 'key2': [new Buffer('value2')]}); + call.startBatch({}, function(err) { + assert.ifError(err); + done(); + }); }); }); - it('should fail with other parameter types', function() { + }); + describe('startBatch with metadata', function() { + it('should succeed with a map of strings to string arrays', function(done) { var call = new grpc.Call(channel, 'method', getDeadline(1)); - assert.throws(function() { - call.addMetadata(); + assert.doesNotThrow(function() { + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = {'key1': ['value1'], + 'key2': ['value2']}; + call.startBatch(batch, function(err, resp) { + assert.ifError(err); + assert.deepEqual(resp, {'send metadata': true}); + done(); + }); }); - assert.throws(function() { - call.addMetadata(null); - }, TypeError); - assert.throws(function() { - call.addMetadata('value'); - }, TypeError); - assert.throws(function() { - call.addMetadata(5); - }, TypeError); }); - it('should fail if invoke was already called', function(done) { + it('should succeed with a map of strings to buffer arrays', function(done) { var call = new grpc.Call(channel, 'method', getDeadline(1)); - call.invoke(function() {}, - function() {done();}, - 0); - assert.throws(function() { - call.addMetadata({'key': ['value']}); - }, function(err) { - return err.code === grpc.callError.ALREADY_INVOKED; + assert.doesNotThrow(function() { + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = { + 'key1': [new Buffer('value1')], + 'key2': [new Buffer('value2')] + }; + call.startBatch(batch, function(err, resp) { + assert.ifError(err); + assert.deepEqual(resp, {'send metadata': true}); + done(); + }); }); - // Cancel to speed up the test - call.cancel(); }); - }); - describe('invoke', function() { - it('should fail with fewer than 3 arguments', function() { + it('should fail with other parameter types', function() { var call = new grpc.Call(channel, 'method', getDeadline(1)); assert.throws(function() { - call.invoke(); - }, TypeError); - assert.throws(function() { - call.invoke(function() {}); - }, TypeError); - assert.throws(function() { - call.invoke(function() {}, - function() {}); - }, TypeError); - }); - it('should work with 2 args and an int', function(done) { - assert.doesNotThrow(function() { - var call = new grpc.Call(channel, 'method', getDeadline(1)); - call.invoke(function() {}, - function() {done();}, - 0); - // Cancel to speed up the test - call.cancel(); + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = undefined; + call.startBatch(batch, function(){}); }); - }); - it('should reject incorrectly typed arguments', function() { - var call = new grpc.Call(channel, 'method', getDeadline(1)); assert.throws(function() { - call.invoke(0, 0, 0); + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = null; + call.startBatch(batch, function(){}); }, TypeError); assert.throws(function() { - call.invoke(function() {}, - function() {}, 'test'); - }); - }); - }); - describe('serverAccept', function() { - it('should fail with fewer than 1 argument1', function() { - var call = new grpc.Call(channel, 'method', getDeadline(1)); - assert.throws(function() { - call.serverAccept(); + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = 'value'; + call.startBatch(batch, function(){}); }, TypeError); - }); - it('should return an error when called on a client Call', function() { - var call = new grpc.Call(channel, 'method', getDeadline(1)); assert.throws(function() { - call.serverAccept(function() {}); - }, function(err) { - return err.code === grpc.callError.NOT_ON_CLIENT; - }); + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = 5; + call.startBatch(batch, function(){}); + }, TypeError); }); }); describe('cancel', function() { diff --git a/src/node/test/constant_test.js b/src/node/test/constant_test.js index 0138a55226..4d11e6f527 100644 --- a/src/node/test/constant_test.js +++ b/src/node/test/constant_test.js @@ -76,31 +76,6 @@ var callErrorNames = [ 'INVALID_FLAGS' ]; -/** - * List of all op error names - * @const - * @type {Array.} - */ -var opErrorNames = [ - 'OK', - 'ERROR' -]; - -/** - * List of all completion type names - * @const - * @type {Array.} - */ -var completionTypeNames = [ - 'QUEUE_SHUTDOWN', - 'READ', - 'WRITE_ACCEPTED', - 'FINISH_ACCEPTED', - 'CLIENT_METADATA_READ', - 'FINISHED', - 'SERVER_RPC_NEW' -]; - describe('constants', function() { it('should have all of the status constants', function() { for (var i = 0; i < statusNames.length; i++) { @@ -114,16 +89,4 @@ describe('constants', function() { 'call error missing: ' + callErrorNames[i]); } }); - it('should have all of the op errors', function() { - for (var i = 0; i < opErrorNames.length; i++) { - assert(grpc.opError.hasOwnProperty(opErrorNames[i]), - 'op error missing: ' + opErrorNames[i]); - } - }); - it('should have all of the completion types', function() { - for (var i = 0; i < completionTypeNames.length; i++) { - assert(grpc.completionType.hasOwnProperty(completionTypeNames[i]), - 'completion type missing: ' + completionTypeNames[i]); - } - }); }); diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js index 1f53df23f3..e0ad9a8874 100644 --- a/src/node/test/end_to_end_test.js +++ b/src/node/test/end_to_end_test.js @@ -110,52 +110,61 @@ describe('end-to-end', function() { assert.strictEqual(event.data, grpc.opError.OK); }); }); - it('should successfully send and receive metadata', function(complete) { - var done = multiDone(complete, 2); + it.only('should successfully send and receive metadata', function(done) { + debugger; var deadline = new Date(); deadline.setSeconds(deadline.getSeconds() + 3); var status_text = 'xyz'; var call = new grpc.Call(channel, 'dummy_method', deadline); - call.addMetadata({'client_key': ['client_value']}); - call.invoke(function(event) { - assert.strictEqual(event.type, - grpc.completionType.CLIENT_METADATA_READ); - assert.strictEqual(event.data.server_key[0].toString(), 'server_value'); - },function(event) { - assert.strictEqual(event.type, grpc.completionType.FINISHED); - var status = event.data; - assert.strictEqual(status.code, grpc.status.OK); - assert.strictEqual(status.details, status_text); + var client_batch = {}; + client_batch[grpc.opType.SEND_INITIAL_METADATA] = { + 'client_key': ['client_value'] + }; + client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; + client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; + client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; + call.startBatch(client_batch, function(err, response) { + assert.ifError(err); + assert.deepEqual(response, { + 'send metadata': true, + 'client close': true, + 'metadata': {'server_key': [new Buffer('server_value')]}, + 'status': { + 'code': grpc.status.OK, + 'details': status_text + } + }); done(); - }, 0); + }); - server.requestCall(function(event) { - assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW); - assert.strictEqual(event.data.metadata.client_key[0].toString(), + server.requestCall(function(err, call_details) { + var new_call = call_details['new call']; + assert.notEqual(new_call, null); + assert.strictEqual(new_call.metadata.client_key[0].toString(), 'client_value'); - var server_call = event.call; + var server_call = new_call.call; assert.notEqual(server_call, null); - server_call.serverAccept(function(event) { - assert.strictEqual(event.type, grpc.completionType.FINISHED); - }, 0); - server_call.addMetadata({'server_key': ['server_value']}); - server_call.serverEndInitialMetadata(0); - server_call.startWriteStatus( - grpc.status.OK, - status_text, - function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - done(); - }); - }); - call.writesDone(function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); + var server_batch = {}; + server_batch[grpc.opType.SEND_INITIAL_METADATA] = { + 'server_key': ['server_value'] + }; + server_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + 'metadata': {}, + 'code': grpc.status.OK, + 'details': status_text + }; + server_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; + console.log(server_batch); + server_call.startBatch(server_batch, function(err, response) { + assert.ifError(err); + assert.deepEqual(response, { + 'send metadata': true, + 'send status': true, + 'cancelled': false + }); + }); }); }); it('should send and receive data without error', function(complete) { -- cgit v1.2.3 From cd0b90621dd2b404d93b169836f02b1694b1f45c Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Tue, 10 Feb 2015 09:16:49 -0800 Subject: Fixed end-to-end tests for new changes --- src/node/ext/call.cc | 2 +- src/node/test/end_to_end_test.js | 193 +++++++++++++++++++-------------------- 2 files changed, 95 insertions(+), 100 deletions(-) (limited to 'src/node') diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 785cee8d3e..9a6359fe44 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -238,7 +238,7 @@ class SendServerStatusOp : public Op { bool ParseOp(Handle value, grpc_op *out, std::vector > *strings, std::vector > *handles) { - if (value->IsObject()) { + if (!value->IsObject()) { return false; } Handle server_status = value->ToObject(); diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js index e0ad9a8874..f1277ff207 100644 --- a/src/node/test/end_to_end_test.js +++ b/src/node/test/end_to_end_test.js @@ -75,43 +75,51 @@ describe('end-to-end', function() { var call = new grpc.Call(channel, 'dummy_method', deadline); - call.invoke(function(event) { - assert.strictEqual(event.type, - grpc.completionType.CLIENT_METADATA_READ); - },function(event) { - assert.strictEqual(event.type, grpc.completionType.FINISHED); - var status = event.data; - assert.strictEqual(status.code, grpc.status.OK); - assert.strictEqual(status.details, status_text); + var client_batch = {}; + client_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; + client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; + client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; + call.startBatch(client_batch, function(err, response) { + assert.ifError(err); + assert.deepEqual(response, { + 'send metadata': true, + 'client close': true, + 'status': { + 'code': grpc.status.OK, + 'details': status_text, + 'metadata': {} + } + }); done(); - }, 0); + }); - server.requestCall(function(event) { - assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW); - var server_call = event.call; + server.requestCall(function(err, call_details) { + var new_call = call_details['new call']; + assert.notEqual(new_call, null); + var server_call = new_call.call; assert.notEqual(server_call, null); - server_call.serverAccept(function(event) { - assert.strictEqual(event.type, grpc.completionType.FINISHED); - }, 0); - server_call.serverEndInitialMetadata(0); - server_call.startWriteStatus( - grpc.status.OK, - status_text, - function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - done(); - }); - }); - call.writesDone(function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); + var server_batch = {}; + server_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + server_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + 'metadata': {}, + 'code': grpc.status.OK, + 'details': status_text + }; + server_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; + server_call.startBatch(server_batch, function(err, response) { + assert.ifError(err); + assert.deepEqual(response, { + 'send metadata': true, + 'send status': true, + 'cancelled': false + }); + done(); + }); }); }); - it.only('should successfully send and receive metadata', function(done) { - debugger; + it('should successfully send and receive metadata', function(complete) { + var done = multiDone(complete, 2); var deadline = new Date(); deadline.setSeconds(deadline.getSeconds() + 3); var status_text = 'xyz'; @@ -127,15 +135,14 @@ describe('end-to-end', function() { client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(client_batch, function(err, response) { assert.ifError(err); - assert.deepEqual(response, { - 'send metadata': true, - 'client close': true, - 'metadata': {'server_key': [new Buffer('server_value')]}, - 'status': { - 'code': grpc.status.OK, - 'details': status_text - } - }); + assert(response['send metadata']); + assert(response['client close']); + assert(response.hasOwnProperty('metadata')); + assert.strictEqual(response.metadata.server_key.toString(), + 'server_value'); + assert.deepEqual(response.status, {'code': grpc.status.OK, + 'details': status_text, + 'metadata': {}}); done(); }); @@ -156,7 +163,6 @@ describe('end-to-end', function() { 'details': status_text }; server_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; - console.log(server_batch); server_call.startBatch(server_batch, function(err, response) { assert.ifError(err); assert.deepEqual(response, { @@ -164,77 +170,66 @@ describe('end-to-end', function() { 'send status': true, 'cancelled': false }); + done(); }); }); }); - it('should send and receive data without error', function(complete) { + it.only('should send and receive data without error', function(complete) { var req_text = 'client_request'; var reply_text = 'server_response'; - var done = multiDone(complete, 6); + var done = multiDone(complete, 2); var deadline = new Date(); deadline.setSeconds(deadline.getSeconds() + 3); var status_text = 'success'; var call = new grpc.Call(channel, 'dummy_method', deadline); - call.invoke(function(event) { - assert.strictEqual(event.type, - grpc.completionType.CLIENT_METADATA_READ); - done(); - },function(event) { - assert.strictEqual(event.type, grpc.completionType.FINISHED); - var status = event.data; - assert.strictEqual(status.code, grpc.status.OK); - assert.strictEqual(status.details, status_text); - done(); - }, 0); - call.startWrite( - new Buffer(req_text), - function(event) { - assert.strictEqual(event.type, - grpc.completionType.WRITE_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - call.writesDone(function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - done(); - }); - }, 0); - call.startRead(function(event) { - assert.strictEqual(event.type, grpc.completionType.READ); - assert.strictEqual(event.data.toString(), reply_text); + var client_batch = {}; + client_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + client_batch[grpc.opType.SEND_MESSAGE] = new Buffer(req_text); + client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; + client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; + client_batch[grpc.opType.RECV_MESSAGE] = true; + client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; + call.startBatch(client_batch, function(err, response) { + assert.ifError(err); + assert(response['send metadata']); + assert(response['client close']); + assert.deepEqual(response.metadata, {}); + assert(response['send message']); + assert.strictEqual(response.read.toString(), reply_text); + assert.deepEqual(response.status, {'code': grpc.status.OK, + 'details': status_text, + 'metadata': {}}); + console.log("OK status"); done(); }); - server.requestCall(function(event) { - assert.strictEqual(event.type, grpc.completionType.SERVER_RPC_NEW); - var server_call = event.call; + + server.requestCall(function(err, call_details) { + var new_call = call_details['new call']; + assert.notEqual(new_call, null); + var server_call = new_call.call; assert.notEqual(server_call, null); - server_call.serverAccept(function(event) { - assert.strictEqual(event.type, grpc.completionType.FINISHED); - done(); - }); - server_call.serverEndInitialMetadata(0); - server_call.startRead(function(event) { - assert.strictEqual(event.type, grpc.completionType.READ); - assert.strictEqual(event.data.toString(), req_text); - server_call.startWrite( - new Buffer(reply_text), - function(event) { - assert.strictEqual(event.type, - grpc.completionType.WRITE_ACCEPTED); - assert.strictEqual(event.data, - grpc.opError.OK); - server_call.startWriteStatus( - grpc.status.OK, - status_text, - function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - done(); - }); - }, 0); + var server_batch = {}; + server_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + server_batch[grpc.opType.RECV_MESSAGE] = true; + server_call.startBatch(server_batch, function(err, response) { + assert.ifError(err); + assert(response['send metadata']); + assert.strictEqual(response.read.toString(), req_text); + var response_batch = {}; + response_batch[grpc.opType.SEND_MESSAGE] = new Buffer(reply_text); + response_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + 'metadata': {}, + 'code': grpc.status.OK, + 'details': status_text + }; + response_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; + server_call.startBatch(response_batch, function(err, response) { + assert(response['send status']); + //assert(!response['cancelled']); + done(); + }); }); }); }); -- cgit v1.2.3 From 77659065216cb3756711dd6c490a202faa133c83 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Wed, 11 Feb 2015 09:26:25 -0800 Subject: More end to end test debugging --- src/node/ext/call.cc | 19 +++++++++++++------ src/node/ext/call.h | 4 ++-- src/node/ext/completion_queue_async_worker.cc | 11 +++++++---- src/node/ext/server.cc | 2 ++ src/node/test/end_to_end_test.js | 17 +++++++++-------- 5 files changed, 33 insertions(+), 20 deletions(-) (limited to 'src/node') diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 9a6359fe44..3452af943d 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -201,6 +201,8 @@ class SendMessageOp : public Op { } out->data.send_message = BufferToByteBuffer(value); Persistent handle; + Handle temp = NanNew(); + NanAssignPersistent(handle, temp); NanAssignPersistent(handle, value); handles->push_back(unique_ptr( new PersistentHolder(handle))); @@ -441,9 +443,14 @@ void DestroyTag(void *tag) { delete tag_struct; } -Call::Call(grpc_call *call) : wrapped_call(call) {} +Call::Call(grpc_call *call) : wrapped_call(call) { + gpr_log(GPR_DEBUG, "Constructing call, this: %p, pointer: %p", this, call); +} -Call::~Call() { grpc_call_destroy(wrapped_call); } +Call::~Call() { + gpr_log(GPR_DEBUG, "Destructing call, this: %p, pointer: %p", this, wrapped_call); + grpc_call_destroy(wrapped_call); +} void Call::Init(Handle exports) { NanScope(); @@ -473,6 +480,7 @@ Handle Call::WrapStruct(grpc_call *call) { if (call == NULL) { return NanEscapeScope(NanNull()); } + gpr_log(GPR_DEBUG, "Wrapping call: %p", call); const int argc = 1; Handle argv[argc] = {External::New(reinterpret_cast(call))}; return NanEscapeScope(constructor->NewInstance(argc, argv)); @@ -534,12 +542,13 @@ NAN_METHOD(Call::StartBatch) { if (!args[1]->IsFunction()) { return NanThrowError("startBatch's second argument must be a callback"); } + Handle callback_func = args[1].As(); + NanCallback *callback = new NanCallback(callback_func); Call *call = ObjectWrap::Unwrap(args.This()); std::vector > *handles = new std::vector >(); std::vector > *strings = new std::vector >(); - Persistent handle; Handle obj = args[0]->ToObject(); Handle keys = obj->GetOwnPropertyNames(); size_t nops = keys->Length(); @@ -588,9 +597,7 @@ NAN_METHOD(Call::StartBatch) { } grpc_call_error error = grpc_call_start_batch( call->wrapped_call, ops, nops, new struct tag( - new NanCallback(args[1].As()), - op_vector, handles, - strings)); + callback, op_vector, handles, strings)); if (error != GRPC_CALL_OK) { return NanThrowError("startBatch failed", error); } diff --git a/src/node/ext/call.h b/src/node/ext/call.h index 434bcf8a63..b8792713da 100644 --- a/src/node/ext/call.h +++ b/src/node/ext/call.h @@ -57,8 +57,8 @@ class PersistentHolder { } ~PersistentHolder() { - persist.Dispose(); -} + NanDisposePersistent(persist); + } private: v8::Persistent persist; diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc index 5c0e27e6a7..dbacdf034e 100644 --- a/src/node/ext/completion_queue_async_worker.cc +++ b/src/node/ext/completion_queue_async_worker.cc @@ -35,6 +35,7 @@ #include #include "grpc/grpc.h" +#include "grpc/support/log.h" #include "grpc/support/time.h" #include "completion_queue_async_worker.h" #include "call.h" @@ -57,6 +58,7 @@ CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {} void CompletionQueueAsyncWorker::Execute() { result = grpc_completion_queue_next(queue, gpr_inf_future); + gpr_log(GPR_DEBUG, "Handling response on call %p", result->call); if (result->data.op_complete != GRPC_OP_OK) { SetErrorMessage("The batch encountered an error"); } @@ -77,14 +79,15 @@ void CompletionQueueAsyncWorker::Init(Handle exports) { void CompletionQueueAsyncWorker::HandleOKCallback() { NanScope(); + gpr_log(GPR_DEBUG, "Handling response on call %p", result->call); NanCallback callback = GetTagCallback(result->tag); Handle argv[] = {NanNull(), GetTagNodeValue(result->tag)}; + callback.Call(2, argv); + DestroyTag(result->tag); grpc_event_finish(result); result = NULL; - - callback.Call(2, argv); } void CompletionQueueAsyncWorker::HandleErrorCallback() { @@ -92,11 +95,11 @@ void CompletionQueueAsyncWorker::HandleErrorCallback() { NanCallback callback = GetTagCallback(result->tag); Handle argv[] = {NanError(ErrorMessage())}; + callback.Call(1, argv); + DestroyTag(result->tag); grpc_event_finish(result); result = NULL; - - callback.Call(1, argv); } } // namespace node diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc index 75ea681fa7..93aa9ec44d 100644 --- a/src/node/ext/server.cc +++ b/src/node/ext/server.cc @@ -43,6 +43,7 @@ #include #include "grpc/grpc.h" #include "grpc/grpc_security.h" +#include "grpc/support/log.h" #include "call.h" #include "completion_queue_async_worker.h" #include "server_credentials.h" @@ -90,6 +91,7 @@ class NewCallOp : public Op { return NanEscapeScope(NanNull()); } Handle obj = NanNew(); + gpr_log(GPR_DEBUG, "Wrapping server call: %p", call); obj->Set(NanNew("call"), Call::WrapStruct(call)); obj->Set(NanNew("method"), NanNew(details.method)); obj->Set(NanNew("host"), NanNew(details.host)); diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js index f1277ff207..d43446084c 100644 --- a/src/node/test/end_to_end_test.js +++ b/src/node/test/end_to_end_test.js @@ -67,14 +67,14 @@ describe('end-to-end', function() { after(function() { server.shutdown(); }); - it('should start and end a request without error', function(complete) { + it.skip('should start and end a request without error', function(complete) { var done = multiDone(complete, 2); var deadline = new Date(); deadline.setSeconds(deadline.getSeconds() + 3); var status_text = 'xyz'; var call = new grpc.Call(channel, 'dummy_method', - deadline); + Infinity); var client_batch = {}; client_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; @@ -85,6 +85,7 @@ describe('end-to-end', function() { assert.deepEqual(response, { 'send metadata': true, 'client close': true, + 'metadata': {}, 'status': { 'code': grpc.status.OK, 'details': status_text, @@ -125,7 +126,7 @@ describe('end-to-end', function() { var status_text = 'xyz'; var call = new grpc.Call(channel, 'dummy_method', - deadline); + Infinity); var client_batch = {}; client_batch[grpc.opType.SEND_INITIAL_METADATA] = { 'client_key': ['client_value'] @@ -138,7 +139,7 @@ describe('end-to-end', function() { assert(response['send metadata']); assert(response['client close']); assert(response.hasOwnProperty('metadata')); - assert.strictEqual(response.metadata.server_key.toString(), + assert.strictEqual(response.metadata.server_key[0].toString(), 'server_value'); assert.deepEqual(response.status, {'code': grpc.status.OK, 'details': status_text, @@ -147,6 +148,7 @@ describe('end-to-end', function() { }); server.requestCall(function(err, call_details) { + console.log("Server received new call"); var new_call = call_details['new call']; assert.notEqual(new_call, null); assert.strictEqual(new_call.metadata.client_key[0].toString(), @@ -174,7 +176,7 @@ describe('end-to-end', function() { }); }); }); - it.only('should send and receive data without error', function(complete) { + it('should send and receive data without error', function(complete) { var req_text = 'client_request'; var reply_text = 'server_response'; var done = multiDone(complete, 2); @@ -183,7 +185,7 @@ describe('end-to-end', function() { var status_text = 'success'; var call = new grpc.Call(channel, 'dummy_method', - deadline); + Infinity); var client_batch = {}; client_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; client_batch[grpc.opType.SEND_MESSAGE] = new Buffer(req_text); @@ -201,7 +203,6 @@ describe('end-to-end', function() { assert.deepEqual(response.status, {'code': grpc.status.OK, 'details': status_text, 'metadata': {}}); - console.log("OK status"); done(); }); @@ -227,7 +228,7 @@ describe('end-to-end', function() { response_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; server_call.startBatch(response_batch, function(err, response) { assert(response['send status']); - //assert(!response['cancelled']); + assert(!response['cancelled']); done(); }); }); -- cgit v1.2.3 From 1578c6a4ab91aed70b4c2c271249ec82e2f95d7f Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Wed, 11 Feb 2015 16:19:55 -0800 Subject: Fixed end to end tests --- src/node/ext/call.cc | 8 ++++++-- src/node/ext/call.h | 2 +- src/node/ext/completion_queue_async_worker.cc | 8 ++++---- src/node/test/call_test.js | 2 +- src/node/test/end_to_end_test.js | 2 +- 5 files changed, 13 insertions(+), 9 deletions(-) (limited to 'src/node') diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 3452af943d..cdc34b52a7 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -98,6 +98,9 @@ bool CreateMetadataArray( string_handles->push_back(unique_ptr(utf8_key)); Handle values = Local::Cast(metadata->Get(current_key)); for (unsigned int j = 0; j < values->Length(); j++) { + if (array->count >= array->capacity) { + gpr_log(GPR_ERROR, "Metadata array grew past capacity"); + } Handle value = values->Get(j); grpc_metadata *current = &array->metadata[array->count]; current->key = **utf8_key; @@ -433,9 +436,9 @@ Handle GetTagNodeValue(void *tag) { return NanEscapeScope(tag_obj); } -NanCallback GetTagCallback(void *tag) { +NanCallback *GetTagCallback(void *tag) { struct tag *tag_struct = reinterpret_cast(tag); - return *tag_struct->callback; + return tag_struct->callback; } void DestroyTag(void *tag) { @@ -598,6 +601,7 @@ NAN_METHOD(Call::StartBatch) { grpc_call_error error = grpc_call_start_batch( call->wrapped_call, ops, nops, new struct tag( callback, op_vector, handles, strings)); + delete ops; if (error != GRPC_CALL_OK) { return NanThrowError("startBatch failed", error); } diff --git a/src/node/ext/call.h b/src/node/ext/call.h index b8792713da..f443a04637 100644 --- a/src/node/ext/call.h +++ b/src/node/ext/call.h @@ -89,7 +89,7 @@ struct tag { v8::Handle GetTagNodeValue(void *tag); -NanCallback GetTagCallback(void *tag); +NanCallback *GetTagCallback(void *tag); void DestroyTag(void *tag); diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc index dbacdf034e..3c32b07ca3 100644 --- a/src/node/ext/completion_queue_async_worker.cc +++ b/src/node/ext/completion_queue_async_worker.cc @@ -80,10 +80,10 @@ void CompletionQueueAsyncWorker::Init(Handle exports) { void CompletionQueueAsyncWorker::HandleOKCallback() { NanScope(); gpr_log(GPR_DEBUG, "Handling response on call %p", result->call); - NanCallback callback = GetTagCallback(result->tag); + NanCallback *callback = GetTagCallback(result->tag); Handle argv[] = {NanNull(), GetTagNodeValue(result->tag)}; - callback.Call(2, argv); + callback->Call(2, argv); DestroyTag(result->tag); grpc_event_finish(result); @@ -92,10 +92,10 @@ void CompletionQueueAsyncWorker::HandleOKCallback() { void CompletionQueueAsyncWorker::HandleErrorCallback() { NanScope(); - NanCallback callback = GetTagCallback(result->tag); + NanCallback *callback = GetTagCallback(result->tag); Handle argv[] = {NanError(ErrorMessage())}; - callback.Call(1, argv); + callback->Call(1, argv); DestroyTag(result->tag); grpc_event_finish(result); diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js index e341092ff8..1cbfc2280c 100644 --- a/src/node/test/call_test.js +++ b/src/node/test/call_test.js @@ -1,6 +1,6 @@ /* * - * Copyright 2014, Google Inc. + * Copyright 2015, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js index d43446084c..34ce2500f6 100644 --- a/src/node/test/end_to_end_test.js +++ b/src/node/test/end_to_end_test.js @@ -67,7 +67,7 @@ describe('end-to-end', function() { after(function() { server.shutdown(); }); - it.skip('should start and end a request without error', function(complete) { + it('should start and end a request without error', function(complete) { var done = multiDone(complete, 2); var deadline = new Date(); deadline.setSeconds(deadline.getSeconds() + 3); -- cgit v1.2.3 From 63056a694d154b9b19e25b8e4352e81eba2e30fb Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Wed, 11 Feb 2015 17:29:09 -0800 Subject: Removed extra extension files --- src/node/ext/event.cc | 173 --------------------------- src/node/ext/event.h | 48 -------- src/node/ext/tag.cc | 325 -------------------------------------------------- src/node/ext/tag.h | 72 ----------- 4 files changed, 618 deletions(-) delete mode 100644 src/node/ext/event.cc delete mode 100644 src/node/ext/event.h delete mode 100644 src/node/ext/tag.cc delete mode 100644 src/node/ext/tag.h (limited to 'src/node') diff --git a/src/node/ext/event.cc b/src/node/ext/event.cc deleted file mode 100644 index d59b68fb40..0000000000 --- a/src/node/ext/event.cc +++ /dev/null @@ -1,173 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include - -#include -#include -#include "grpc/grpc.h" -#include "byte_buffer.h" -#include "call.h" -#include "event.h" -#include "tag.h" -#include "timeval.h" - -namespace grpc { -namespace node { - -using ::node::Buffer; -using v8::Array; -using v8::Date; -using v8::Handle; -using v8::HandleScope; -using v8::Number; -using v8::Object; -using v8::Persistent; -using v8::String; -using v8::Value; - -Handle ParseMetadata(grpc_metadata *metadata_elements, size_t length) { - NanEscapableScope(); - std::map size_map; - std::map index_map; - - for (unsigned int i = 0; i < length; i++) { - const char *key = metadata_elements[i].key; - if (size_map.count(key)) { - size_map[key] += 1; - } - index_map[key] = 0; - } - Handle metadata_object = NanNew(); - for (unsigned int i = 0; i < length; i++) { - grpc_metadata* elem = &metadata_elements[i]; - Handle key_string = String::New(elem->key); - Handle array; - if (metadata_object->Has(key_string)) { - array = Handle::Cast(metadata_object->Get(key_string)); - } else { - array = NanNew(size_map[elem->key]); - metadata_object->Set(key_string, array); - } - array->Set(index_map[elem->key], - MakeFastBuffer( - NanNewBufferHandle(elem->value, elem->value_length))); - index_map[elem->key] += 1; - } - return NanEscapeScope(metadata_object); -} - -Handle GetEventData(grpc_event *event) { - NanEscapableScope(); - size_t count; - grpc_metadata *items; - Handle metadata; - Handle status; - Handle rpc_new; - switch (event->type) { - case GRPC_READ: - return NanEscapeScope(ByteBufferToBuffer(event->data.read)); - case GRPC_WRITE_ACCEPTED: - return NanEscapeScope(NanNew(event->data.write_accepted)); - case GRPC_FINISH_ACCEPTED: - return NanEscapeScope(NanNew(event->data.finish_accepted)); - case GRPC_CLIENT_METADATA_READ: - count = event->data.client_metadata_read.count; - items = event->data.client_metadata_read.elements; - return NanEscapeScope(ParseMetadata(items, count)); - case GRPC_FINISHED: - status = NanNew(); - status->Set(NanNew("code"), NanNew(event->data.finished.status)); - if (event->data.finished.details != NULL) { - status->Set(NanNew("details"), - String::New(event->data.finished.details)); - } - count = event->data.finished.metadata_count; - items = event->data.finished.metadata_elements; - status->Set(NanNew("metadata"), ParseMetadata(items, count)); - return NanEscapeScope(status); - case GRPC_SERVER_RPC_NEW: - rpc_new = NanNew(); - if (event->data.server_rpc_new.method == NULL) { - return NanEscapeScope(NanNull()); - } - rpc_new->Set( - NanNew("method"), - NanNew(event->data.server_rpc_new.method)); - rpc_new->Set( - NanNew("host"), - NanNew(event->data.server_rpc_new.host)); - rpc_new->Set(NanNew("absolute_deadline"), - NanNew(TimespecToMilliseconds( - event->data.server_rpc_new.deadline))); - count = event->data.server_rpc_new.metadata_count; - items = event->data.server_rpc_new.metadata_elements; - metadata = NanNew(static_cast(count)); - for (unsigned int i = 0; i < count; i++) { - Handle item_obj = Object::New(); - item_obj->Set(NanNew("key"), - NanNew(items[i].key)); - item_obj->Set( - NanNew("value"), - NanNew(items[i].value, static_cast(items[i].value_length))); - metadata->Set(i, item_obj); - } - rpc_new->Set(NanNew("metadata"), ParseMetadata(items, count)); - return NanEscapeScope(rpc_new); - default: - return NanEscapeScope(NanNull()); - } -} - -Handle CreateEventObject(grpc_event *event) { - NanEscapableScope(); - if (event == NULL) { - return NanEscapeScope(NanNull()); - } - Handle event_obj = NanNew(); - Handle call; - if (TagHasCall(event->tag)) { - call = TagGetCall(event->tag); - } else { - call = Call::WrapStruct(event->call); - } - event_obj->Set(NanNew("call"), call); - event_obj->Set(NanNew("type"), - NanNew(event->type)); - event_obj->Set(NanNew("data"), GetEventData(event)); - - return NanEscapeScope(event_obj); -} - -} // namespace node -} // namespace grpc diff --git a/src/node/ext/event.h b/src/node/ext/event.h deleted file mode 100644 index e06d8f0168..0000000000 --- a/src/node/ext/event.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef NET_GRPC_NODE_EVENT_H_ -#define NET_GRPC_NODE_EVENT_H_ - -#include -#include "grpc/grpc.h" - -namespace grpc { -namespace node { - -v8::Handle CreateEventObject(grpc_event *event); - -} // namespace node -} // namespace grpc - -#endif // NET_GRPC_NODE_EVENT_H_ diff --git a/src/node/ext/tag.cc b/src/node/ext/tag.cc deleted file mode 100644 index 27baa94a8e..0000000000 --- a/src/node/ext/tag.cc +++ /dev/null @@ -1,325 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include -#include - -#include -#include -#include -#include -#include "tag.h" -#include "call.h" - -namespace grpc { -namespace node { - -using v8::Boolean; -using v8::Function; -using v8::Handle; -using v8::HandleScope; -using v8::Persistent; -using v8::Value; - -Handle ParseMetadata(grpc_metadata_array *metadata_array) { - NanEscapableScope(); - grpc_metadata *metadata_elements = metadata_array->metadata; - size_t length = metadata_array->count; - std::map size_map; - std::map index_map; - - for (unsigned int i = 0; i < length; i++) { - char *key = metadata_elements[i].key; - if (size_map.count(key)) { - size_map[key] += 1; - } - index_map[key] = 0; - } - Handle metadata_object = NanNew(); - for (unsigned int i = 0; i < length; i++) { - grpc_metadata* elem = &metadata_elements[i]; - Handle key_string = String::New(elem->key); - Handle array; - if (metadata_object->Has(key_string)) { - array = Handle::Cast(metadata_object->Get(key_string)); - } else { - array = NanNew(size_map[elem->key]); - metadata_object->Set(key_string, array); - } - array->Set(index_map[elem->key], - MakeFastBuffer( - NanNewBufferHandle(elem->value, elem->value_length))); - index_map[elem->key] += 1; - } - return NanEscapeScope(metadata_object); -} - -class OpResponse { - public: - explicit OpResponse(char *name): name(name) { - } - virtual Handle GetNodeValue() const = 0; - virtual bool ParseOp() = 0; - Handle GetOpType() const { - NanEscapableScope(); - return NanEscapeScope(NanNew(name)); - } - - private: - char *name; -}; - -class SendResponse : public OpResponse { - public: - explicit SendResponse(char *name): OpResponse(name) { - } - - Handle GetNodeValue() { - NanEscapableScope(); - return NanEscapeScope(NanTrue()); - } -} - -class MetadataResponse : public OpResponse { - public: - explicit MetadataResponse(grpc_metadata_array *recv_metadata): - recv_metadata(recv_metadata), OpResponse("metadata") { - } - - Handle GetNodeValue() const { - NanEscapableScope(); - return NanEscapeScope(ParseMetadata(recv_metadata)); - } - - private: - grpc_metadata_array *recv_metadata; -}; - -class MessageResponse : public OpResponse { - public: - explicit MessageResponse(grpc_byte_buffer **recv_message): - recv_message(recv_message), OpResponse("read") { - } - - Handle GetNodeValue() const { - NanEscapableScope(); - return NanEscapeScope(ByteBufferToBuffer(*recv_message)); - } - - private: - grpc_byte_buffer **recv_message; -}; - -switch () { -case GRPC_RECV_CLIENT_STATUS: - op = new ClientStatusResponse; - break; -} - - -class ClientStatusResponse : public OpResponse { - public: - explicit ClientStatusResponse(): - OpResponse("status") { - } - - bool ParseOp(Handle obj, grpc_op *out) { - } - - Handle GetNodeValue() const { - NanEscapableScope(); - Handle status_obj = NanNew(); - status_obj->Set(NanNew("code"), NanNew(*status)); - if (event->data.finished.details != NULL) { - status_obj->Set(NanNew("details"), String::New(*status_details)); - } - status_obj->Set(NanNew("metadata"), ParseMetadata(metadata_array)); - return NanEscapeScope(status_obj); - } - private: - grpc_metadata_array metadata_array; - grpc_status_code status; - char *status_details; -}; - -class ServerCloseResponse : public OpResponse { - public: - explicit ServerCloseResponse(int *cancelled): cancelled(cancelled), - OpResponse("cancelled") { - } - - Handle GetNodeValue() const { - NanEscapableScope(); - NanEscapeScope(NanNew(*cancelled)); - } - - private: - int *cancelled; -}; - -class NewCallResponse : public OpResponse { - public: - explicit NewCallResponse(grpc_call **call, grpc_call_details *details, - grpc_metadata_array *request_metadata) : - call(call), details(details), request_metadata(request_metadata), - OpResponse("call"){ - } - - Handle GetNodeValue() const { - NanEscapableScope(); - if (*call == NULL) { - return NanEscapeScope(NanNull()); - } - Handle obj = NanNew(); - obj->Set(NanNew("call"), Call::WrapStruct(*call)); - obj->Set(NanNew("method"), NanNew(details->method)); - obj->Set(NanNew("host"), NanNew(details->host)); - obj->Set(NanNew("deadline"), - NanNew(TimespecToMilliseconds(details->deadline))); - obj->Set(NanNew("metadata"), ParseMetadata(request_metadata)); - return NanEscapeScope(obj); - } - private: - grpc_call **call; - grpc_call_details *details; - grpc_metadata_array *request_metadata; -} - -struct tag { - tag(NanCallback *callback, std::vector *responses, - std::vector> *handles, - std::vector *strings) : - callback(callback), repsonses(responses), handles(handles), - strings(strings){ - } - ~tag() { - for (std::vector::iterator it = responses->begin(); - it != responses->end(); ++it) { - delete *it; - } - for (std::vector::iterator it = responses->begin(); - it != responses->end(); ++it) { - delete *it; - } - delete callback; - delete responses; - delete handles; - delete strings; - } - NanCallback *callback; - std::vector *responses; - std::vector> *handles; - std::vector *strings; -}; - -void *CreateTag(Handle callback, grpc_op *ops, size_t nops, - std::vector> *handles, - std::vector *strings) { - NanScope(); - NanCallback *cb = new NanCallback(callback); - vector *responses = new vector(); - for (size_t i = 0; i < nops; i++) { - grpc_op *op = &ops[i]; - OpResponse *resp; - // Switching on the TYPE of the op - switch (op->op) { - case GRPC_OP_SEND_INITIAL_METADATA: - resp = new SendResponse("send metadata"); - break; - case GRPC_OP_SEND_MESSAGE: - resp = new SendResponse("write"); - break; - case GRPC_OP_SEND_CLOSE_FROM_CLIENT: - resp = new SendResponse("client close"); - break; - case GRPC_OP_SEND_STATUS_FROM_SERVER: - resp = new SendResponse("server close"); - break; - case GRPC_OP_RECV_INITIAL_METADATA: - resp = new MetadataResponse(op->data.recv_initial_metadata); - break; - case GRPC_OP_RECV_MESSAGE: - resp = new MessageResponse(op->data.recv_message); - break; - case GRPC_OP_RECV_STATUS_ON_CLIENT: - resp = new ClientStatusResponse( - op->data.recv_status_on_client.trailing_metadata, - op->data.recv_status_on_client.status, - op->data.recv_status_on_client.status_details); - break; - case GRPC_RECV_CLOSE_ON_SERVER: - resp = new ServerCloseResponse(op->data.recv_close_on_server.cancelled); - break; - default: - continue; - } - responses->push_back(resp); - } - struct tag *tag_struct = new struct tag(cb, responses, handles, strings); - return reinterpret_cast(tag_struct); -} - -void *CreateTag(Handle callback, grpc_call **call, - grpc_call_details *details, - grpc_metadata_array *request_metadata) { - NanEscapableScope(); - NanCallback *cb = new NanCallback(callback); - vector *responses = new vector(); - OpResponse *resp = new NewCallResponse(call, details, request_metadata); - responses->push_back(resp); - struct tag *tag_struct = new struct tag(cb, responses); - return reinterpret_cast(tag_struct); -} - -NanCallback GetTagCallback(void *tag) { - NanEscapableScope(); - struct tag *tag_struct = reinterpret_cast(tag); - return NanEscapeScope(*tag_struct->callback); -} - -Handle GetNodeValue(void *tag) { - NanEscapableScope(); - struct tag *tag_struct = reinterpret_cast(tag); - Handle obj = NanNew(); - for (std::vector::iterator it = tag_struct->responses->begin(); - it != tag_struct->responses->end(); ++it) { - OpResponse *resp = *it; - obj->Set(resp->GetOpType(), resp->GetNodeValue()); - } - return NanEscapeScope(obj); -} - -void DestroyTag(void *tag) { delete reinterpret_cast(tag); } - -} // namespace node -} // namespace grpc diff --git a/src/node/ext/tag.h b/src/node/ext/tag.h deleted file mode 100644 index 9ff8703b95..0000000000 --- a/src/node/ext/tag.h +++ /dev/null @@ -1,72 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef NET_GRPC_NODE_TAG_H_ -#define NET_GRPC_NODE_TAG_H_ - -#include - - -#include -#include -#include - -namespace grpc { -namespace node { - -/* Create a void* tag that can be passed to grpc_call_start_batch from a - callback function and an ops array */ -void *CreateTag(v8::Handle callback, grpc_op *ops, size_t nops, - std::vector > *handles, - std::vector *strings); - -/* Create a void* tag that can be passed to grpc_server_request_call from a - callback and the various out parameters to that function */ -void *CreateTag(v8::Handle callback, grpc_call **call, - grpc_call_details *details, - grpc_metadata_array *request_metadata); - -/* Get the callback from the tag */ -NanCallback GetCallback(void *tag); - -/* Get the combined output value from the tag */ -v8::Handle GetNodeValue(void *tag); - -/* Destroy the tag and all resources it is holding. It is illegal to call any - of these other functions on a tag after it has been destroyed. */ -void DestroyTag(void *tag); - -} // namespace node -} // namespace grpc - -#endif // NET_GRPC_NODE_TAG_H_ -- cgit v1.2.3 From e7879557c624c0254e711d6e63c70533ebf39a5d Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 12 Feb 2015 12:21:15 -0800 Subject: Fixed most of surface tests --- src/node/examples/math_server.js | 1 + src/node/index.js | 10 +- src/node/src/client.js | 489 ++++++++++++++++++++++++++---------- src/node/src/common.js | 25 ++ src/node/src/server.js | 506 +++++++++++++++++++++++++++----------- src/node/test/math_client_test.js | 5 +- 6 files changed, 756 insertions(+), 280 deletions(-) (limited to 'src/node') diff --git a/src/node/examples/math_server.js b/src/node/examples/math_server.js index e1bd11b5a6..e010445389 100644 --- a/src/node/examples/math_server.js +++ b/src/node/examples/math_server.js @@ -69,6 +69,7 @@ function mathDiv(call, cb) { * @param {stream} stream The stream for sending responses. */ function mathFib(stream) { + console.log(stream); // Here, call is a standard writable Node object Stream var previous = 0, current = 1; for (var i = 0; i < stream.request.limit; i++) { diff --git a/src/node/index.js b/src/node/index.js index 0627e7f557..baef4d03c6 100644 --- a/src/node/index.js +++ b/src/node/index.js @@ -35,9 +35,9 @@ var _ = require('underscore'); var ProtoBuf = require('protobufjs'); -var surface_client = require('./src/surface_client.js'); +var client = require('./src/client.js'); -var surface_server = require('./src/surface_server.js'); +var server = require('./src/server.js'); var grpc = require('bindings')('grpc'); @@ -54,7 +54,7 @@ function loadObject(value) { }); return result; } else if (value.className === 'Service') { - return surface_client.makeClientConstructor(value); + return client.makeClientConstructor(value); } else if (value.className === 'Message' || value.className === 'Enum') { return value.build(); } else { @@ -84,9 +84,9 @@ exports.loadObject = loadObject; exports.load = load; /** - * See docs for surface_server.makeServerConstructor + * See docs for server.makeServerConstructor */ -exports.buildServer = surface_server.makeServerConstructor; +exports.buildServer = server.makeServerConstructor; /** * Status name to code number mapping diff --git a/src/node/src/client.js b/src/node/src/client.js index 3a1c9eef84..88fa9dc2e2 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -1,6 +1,6 @@ /* * - * Copyright 2014, Google Inc. + * Copyright 2015, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,14 +31,104 @@ * */ +var _ = require('underscore'); + +var capitalize = require('underscore.string/capitalize'); +var decapitalize = require('underscore.string/decapitalize'); + var grpc = require('bindings')('grpc.node'); -var common = require('./common'); +var common = require('./common.js'); + +var EventEmitter = require('events').EventEmitter; + +var stream = require('stream'); -var Duplex = require('stream').Duplex; +var Readable = stream.Readable; +var Writable = stream.Writable; +var Duplex = stream.Duplex; var util = require('util'); -util.inherits(GrpcClientStream, Duplex); +util.inherits(ClientWritableStream, Writable); + +function ClientWritableStream(call, serialize) { + Writable.call(this, {objectMode: true}); + this.call = call; + this.serialize = common.wrapIgnoreNull(serialize); + this.on('finish', function() { + var batch = {}; + batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; + call.startBatch(batch, function() {}); + }); +} + +/** + * Attempt to write the given chunk. Calls the callback when done. This is an + * implementation of a method needed for implementing stream.Writable. + * @param {Buffer} chunk The chunk to write + * @param {string} encoding Ignored + * @param {function(Error=)} callback Called when the write is complete + */ +function _write(chunk, encoding, callback) { + var batch = {}; + batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk); + console.log(batch); + this.call.startBatch(batch, function(err, event) { + if (err) { + throw err; + } + callback(); + }); +}; + +ClientWritableStream.prototype._write = _write; + +util.inherits(ClientReadableStream, Readable); + +function ClientReadableStream(call, deserialize) { + Readable.call(this, {objectMode: true}); + this.call = call; + this.finished = false; + this.reading = false; + this.serialize = common.wrapIgnoreNull(deserialize); +} + +function _read(size) { + var self = this; + /** + * Callback to be called when a READ event is received. Pushes the data onto + * the read queue and starts reading again if applicable + * @param {grpc.Event} event READ event object + */ + function readCallback(event) { + if (self.finished) { + self.push(null); + return; + } + var data = event.data; + if (self.push(self.deserialize(data)) && data != null) { + var read_batch = {}; + read_batch[grpc.opType.RECV_MESSAGE] = true; + self.call.startBatch(read_batch, readCallback); + } else { + self.reading = false; + } + } + if (self.finished) { + self.push(null); + } else { + if (!self.reading) { + self.reading = true; + var read_batch = {}; + read_batch[grpc.opType.RECV_MESSAGE] = true; + self.call.startBatch(read_batch, readCallback); + } + } +}; + +ClientReadableStream.prototype._read = _read; + +util.inherits(ClientDuplexStream, Duplex); /** * Class for representing a gRPC client side stream as a Node stream. Extends @@ -49,167 +139,310 @@ util.inherits(GrpcClientStream, Duplex); * @param {function(Buffer):*=} deserialize Deserialization function for * responses */ -function GrpcClientStream(call, serialize, deserialize) { +function ClientDuplexStream(call, serialize, deserialize) { Duplex.call(this, {objectMode: true}); - if (!serialize) { - serialize = function(value) { - return value; - }; - } - if (!deserialize) { - deserialize = function(value) { - return value; - }; - } + this.serialize = common.wrapIgnoreNull(serialize); + this.serialize = common.wrapIgnoreNull(deserialize); var self = this; var finished = false; // Indicates that a read is currently pending var reading = false; - // Indicates that a write is currently pending - var writing = false; - this._call = call; + this.call = call; +} - /** - * Serialize a request value to a buffer. Always maps null to null. Otherwise - * uses the provided serialize function - * @param {*} value The value to serialize - * @return {Buffer} The serialized value - */ - this.serialize = function(value) { - if (value === null || value === undefined) { - return null; - } - return serialize(value); - }; +ClientDuplexStream.prototype._read = _read; +ClientDuplexStream.prototype._write = _write; + +function cancel() { + this.call.cancel(); +} + +ClientReadableStream.prototype.cancel = cancel; +ClientWritableStream.prototype.cancel = cancel; +ClientDuplexStream.prototype.cancel = cancel; +/** + * Get a function that can make unary requests to the specified method. + * @param {string} method The name of the method to request + * @param {function(*):Buffer} serialize The serialization function for inputs + * @param {function(Buffer)} deserialize The deserialization function for + * outputs + * @return {Function} makeUnaryRequest + */ +function makeUnaryRequestFunction(method, serialize, deserialize) { /** - * Deserialize a response buffer to a value. Always maps null to null. - * Otherwise uses the provided deserialize function. - * @param {Buffer} buffer The buffer to deserialize - * @return {*} The deserialized value + * Make a unary request with this method on the given channel with the given + * argument, callback, etc. + * @this {Client} Client object. Must have a channel member. + * @param {*} argument The argument to the call. Should be serializable with + * serialize + * @param {function(?Error, value=)} callback The callback to for when the + * response is received + * @param {array=} metadata Array of metadata key/value pairs to add to the + * call + * @param {(number|Date)=} deadline The deadline for processing this request. + * Defaults to infinite future + * @return {EventEmitter} An event emitter for stream related events */ - this.deserialize = function(buffer) { - if (buffer === null) { - return null; + function makeUnaryRequest(argument, callback, metadata, deadline) { + if (deadline === undefined) { + deadline = Infinity; } - return deserialize(buffer); - }; + var emitter = new EventEmitter(); + var call = new grpc.Call(this.channel, method, deadline); + if (metadata === null || metadata === undefined) { + metadata = {}; + } + emitter.cancel = function cancel() { + call.cancel(); + }; + var client_batch = {}; + client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + client_batch[grpc.opType.SEND_MESSAGE] = serialize(argument); + client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; + client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; + client_batch[grpc.opType.RECV_MESSAGE] = true; + client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; + call.startBatch(client_batch, function(err, response) { + if (err) { + callback(err); + return; + } + emitter.emit('status', response.status); + emitter.emit('metadata', response.metadata); + callback(null, deserialize(response.read)); + }); + return emitter; + } + return makeUnaryRequest; +} + +/** + * Get a function that can make client stream requests to the specified method. + * @param {string} method The name of the method to request + * @param {function(*):Buffer} serialize The serialization function for inputs + * @param {function(Buffer)} deserialize The deserialization function for + * outputs + * @return {Function} makeClientStreamRequest + */ +function makeClientStreamRequestFunction(method, serialize, deserialize) { /** - * Callback to be called when a READ event is received. Pushes the data onto - * the read queue and starts reading again if applicable - * @param {grpc.Event} event READ event object + * Make a client stream request with this method on the given channel with the + * given callback, etc. + * @this {Client} Client object. Must have a channel member. + * @param {function(?Error, value=)} callback The callback to for when the + * response is received + * @param {array=} metadata Array of metadata key/value pairs to add to the + * call + * @param {(number|Date)=} deadline The deadline for processing this request. + * Defaults to infinite future + * @return {EventEmitter} An event emitter for stream related events */ - function readCallback(event) { - if (finished) { - self.push(null); - return; + function makeClientStreamRequest(callback, metadata, deadline) { + if (deadline === undefined) { + deadline = Infinity; } - var data = event.data; - if (self.push(self.deserialize(data)) && data != null) { - self._call.startRead(readCallback); - } else { - reading = false; + var call = new grpc.Call(this.channel, method, deadline); + if (metadata === null || metadata === undefined) { + metadata = {}; } + var stream = new ClientWritableStream(call, serialize); + var metadata_batch = {}; + metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true; + call.startBatch(metadata_batch, function(err, response) { + if (err) { + callback(err); + return; + } + stream.emit('metadata', response.metadata); + }); + var client_batch = {}; + client_batch[grpc.opType.RECV_MESSAGE] = true; + client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; + call.startBatch(client_batch, function(err, response) { + if (err) { + callback(err); + return; + } + stream.emit('status', response.status); + callback(null, deserialize(response.read)); + }); + return stream; } - call.invoke(function(event) { - self.emit('metadata', event.data); - }, function(event) { - finished = true; - self.emit('status', event.data); - }, 0); - this.on('finish', function() { - call.writesDone(function() {}); - }); + return makeClientStreamRequest; +} + +/** + * Get a function that can make server stream requests to the specified method. + * @param {string} method The name of the method to request + * @param {function(*):Buffer} serialize The serialization function for inputs + * @param {function(Buffer)} deserialize The deserialization function for + * outputs + * @return {Function} makeServerStreamRequest + */ +function makeServerStreamRequestFunction(method, serialize, deserialize) { /** - * Start reading if there is not already a pending read. Reading will - * continue until self.push returns false (indicating reads should slow - * down) or the read data is null (indicating that there is no more data). + * Make a server stream request with this method on the given channel with the + * given argument, etc. + * @this {SurfaceClient} Client object. Must have a channel member. + * @param {*} argument The argument to the call. Should be serializable with + * serialize + * @param {array=} metadata Array of metadata key/value pairs to add to the + * call + * @param {(number|Date)=} deadline The deadline for processing this request. + * Defaults to infinite future + * @return {EventEmitter} An event emitter for stream related events */ - this.startReading = function() { - if (finished) { - self.push(null); - } else { - if (!reading) { - reading = true; - self._call.startRead(readCallback); - } + function makeServerStreamRequest(argument, metadata, deadline) { + if (deadline === undefined) { + deadline = Infinity; + } + var call = new grpc.Call(this.channel, method, deadline); + if (metadata === null || metadata === undefined) { + metadata = {}; } - }; + var stream = new ClientReadableStream(call, deserialize); + var start_batch = {}; + console.log('Starting server streaming request on', method); + start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; + start_batch[grpc.opType.SEND_MESSAGE] = serialize(argument); + start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; + call.startBatch(start_batch, function(err, response) { + if (err) { + throw err; + } + console.log(response); + stream.emit('metadata', response.metadata); + }); + var status_batch = {}; + status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; + call.startBatch(status_batch, function(err, response) { + if (err) { + throw err; + } + stream.emit('status', response.status); + }); + return stream; + } + return makeServerStreamRequest; } /** - * Start reading. This is an implementation of a method needed for implementing - * stream.Readable. - * @param {number} size Ignored + * Get a function that can make bidirectional stream requests to the specified + * method. + * @param {string} method The name of the method to request + * @param {function(*):Buffer} serialize The serialization function for inputs + * @param {function(Buffer)} deserialize The deserialization function for + * outputs + * @return {Function} makeBidiStreamRequest */ -GrpcClientStream.prototype._read = function(size) { - this.startReading(); -}; +function makeBidiStreamRequestFunction(method, serialize, deserialize) { + /** + * Make a bidirectional stream request with this method on the given channel. + * @this {SurfaceClient} Client object. Must have a channel member. + * @param {array=} metadata Array of metadata key/value pairs to add to the + * call + * @param {(number|Date)=} deadline The deadline for processing this request. + * Defaults to infinite future + * @return {EventEmitter} An event emitter for stream related events + */ + function makeBidiStreamRequest(metadata, deadline) { + if (deadline === undefined) { + deadline = Infinity; + } + var call = new grpc.Call(this.channel, method, deadline); + if (metadata === null || metadata === undefined) { + metadata = {}; + } + var stream = new ClientDuplexStream(call, serialize, deserialize); + var start_batch = {}; + start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; + call.startBatch(start_batch, function(err, response) { + if (err) { + throw err; + } + stream.emit('metadata', response.metadata); + }); + var status_batch = {}; + status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; + call.startBatch(status_batch, function(err, response) { + if (err) { + throw err; + } + stream.emit('status', response.status); + }); + return stream; + } + return makeBidiStreamRequest; +} -/** - * Attempt to write the given chunk. Calls the callback when done. This is an - * implementation of a method needed for implementing stream.Writable. - * @param {Buffer} chunk The chunk to write - * @param {string} encoding Ignored - * @param {function(Error=)} callback Ignored - */ -GrpcClientStream.prototype._write = function(chunk, encoding, callback) { - var self = this; - self._call.startWrite(self.serialize(chunk), function(event) { - callback(); - }, 0); -}; /** - * Cancel the ongoing call. If the call has not already finished, it will finish - * with status CANCELLED. + * Map with short names for each of the requester maker functions. Used in + * makeClientConstructor */ -GrpcClientStream.prototype.cancel = function() { - this._call.cancel(); +var requester_makers = { + unary: makeUnaryRequestFunction, + server_stream: makeServerStreamRequestFunction, + client_stream: makeClientStreamRequestFunction, + bidi: makeBidiStreamRequestFunction }; /** - * Make a request on the channel to the given method with the given arguments - * @param {grpc.Channel} channel The channel on which to make the request - * @param {string} method The method to request - * @param {function(*):Buffer} serialize Serialization function for requests - * @param {function(Buffer):*} deserialize Deserialization function for - * responses - * @param {array=} metadata Array of metadata key/value pairs to add to the call - * @param {(number|Date)=} deadline The deadline for processing this request. - * Defaults to infinite future. - * @return {stream=} The stream of responses + * Creates a constructor for clients for the given service + * @param {ProtoBuf.Reflect.Service} service The service to generate a client + * for + * @return {function(string, Object)} New client constructor */ -function makeRequest(channel, - method, - serialize, - deserialize, - metadata, - deadline) { - if (deadline === undefined) { - deadline = Infinity; - } - var call = new grpc.Call(channel, method, deadline); - if (metadata) { - call.addMetadata(metadata); +function makeClientConstructor(service) { + var prefix = '/' + common.fullyQualifiedName(service) + '/'; + /** + * Create a client with the given methods + * @constructor + * @param {string} address The address of the server to connect to + * @param {Object} options Options to pass to the underlying channel + */ + function Client(address, options) { + this.channel = new grpc.Channel(address, options); } - return new GrpcClientStream(call, serialize, deserialize); + + _.each(service.children, function(method) { + var method_type; + if (method.requestStream) { + if (method.responseStream) { + method_type = 'bidi'; + } else { + method_type = 'client_stream'; + } + } else { + if (method.responseStream) { + method_type = 'server_stream'; + } else { + method_type = 'unary'; + } + } + Client.prototype[decapitalize(method.name)] = + requester_makers[method_type]( + prefix + capitalize(method.name), + common.serializeCls(method.resolvedRequestType.build()), + common.deserializeCls(method.resolvedResponseType.build())); + }); + + Client.service = service; + + return Client; } -/** - * See documentation for makeRequest above - */ -exports.makeRequest = makeRequest; +exports.makeClientConstructor = makeClientConstructor; /** - * Represents a client side gRPC channel associated with a single host. - */ -exports.Channel = grpc.Channel; -/** - * Status name to code number mapping + * See docs for client.status */ exports.status = grpc.status; /** - * Call error name to code number mapping + * See docs for client.callError */ exports.callError = grpc.callError; diff --git a/src/node/src/common.js b/src/node/src/common.js index 54247e3fa1..7560cf1bdd 100644 --- a/src/node/src/common.js +++ b/src/node/src/common.js @@ -31,6 +31,8 @@ * */ +var _ = require('underscore'); + var capitalize = require('underscore.string/capitalize'); /** @@ -87,6 +89,24 @@ function fullyQualifiedName(value) { return name; } +/** + * Wrap a function to pass null-like values through without calling it. If no + * function is given, just uses the identity; + * @param {?function} func The function to wrap + * @return {function} The wrapped function + */ +function wrapIgnoreNull(func) { + if (!func) { + return _.identity; + } + return function(arg) { + if (arg === null || arg === undefined) { + return null; + } + return func(arg); + }; +} + /** * See docs for deserializeCls */ @@ -101,3 +121,8 @@ exports.serializeCls = serializeCls; * See docs for fullyQualifiedName */ exports.fullyQualifiedName = fullyQualifiedName; + +/** + * See docs for wrapIgnoreNull + */ +exports.wrapIgnoreNull = wrapIgnoreNull; diff --git a/src/node/src/server.js b/src/node/src/server.js index e4f71ff05f..2d5396e3b7 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -1,6 +1,6 @@ /* * - * Copyright 2014, Google Inc. + * Copyright 2015, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -33,80 +33,72 @@ var _ = require('underscore'); +var capitalize = require('underscore.string/capitalize'); +var decapitalize = require('underscore.string/decapitalize'); + var grpc = require('bindings')('grpc.node'); var common = require('./common'); -var Duplex = require('stream').Duplex; +var stream = require('stream'); + +var Readable = stream.Readable; +var Writable = stream.Writable; +var Duplex = stream.Duplex; var util = require('util'); -util.inherits(GrpcServerStream, Duplex); +var EventEmitter = require('events').EventEmitter; -/** - * Class for representing a gRPC server side stream as a Node stream. Extends - * from stream.Duplex. - * @constructor - * @param {grpc.Call} call Call object to proxy - * @param {function(*):Buffer=} serialize Serialization function for responses - * @param {function(Buffer):*=} deserialize Deserialization function for - * requests - */ -function GrpcServerStream(call, serialize, deserialize) { - Duplex.call(this, {objectMode: true}); - if (!serialize) { - serialize = function(value) { - return value; - }; - } - if (!deserialize) { - deserialize = function(value) { - return value; - }; - } - this._call = call; - // Indicate that a status has been sent - var finished = false; - var self = this; - var status = { - 'code' : grpc.status.OK, - 'details' : 'OK' - }; +var common = require('./common.js'); - /** - * Serialize a response value to a buffer. Always maps null to null. Otherwise - * uses the provided serialize function - * @param {*} value The value to serialize - * @return {Buffer} The serialized value - */ - this.serialize = function(value) { - if (value === null || value === undefined) { - return null; - } - return serialize(value); +function handleError(call, error) { + var error_batch = {}; + error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + code: grpc.status.INTERNAL, + details: 'Unknown Error', + metadata: {} }; + call.startBatch(error_batch, function(){}); +} - /** - * Deserialize a request buffer to a value. Always maps null to null. - * Otherwise uses the provided deserialize function. - * @param {Buffer} buffer The buffer to deserialize - * @return {*} The deserialized value - */ - this.deserialize = function(buffer) { - if (buffer === null) { - return null; +function waitForCancel(call, emitter) { + var cancel_batch = {}; + cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; + call.startBatch(cancel_batch, function(err, result) { + if (err) { + emitter.emit('error', err); } - return deserialize(buffer); + if (result.cancelled) { + emitter.cancelled = true; + emitter.emit('cancelled'); + } + }); +} + +function sendUnaryResponse(call, value, serialize) { + var end_batch = {}; + end_batch[grpc.opType.SEND_MESSAGE] = serialize(value); + end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + code: grpc.status.OK, + details: 'OK', + metadata: {} }; + call.startBatch(end_batch, function (){}); +} - /** - * Send the pending status - */ +function setUpWritable(stream, serialize) { + stream.finished = false; + stream.status = { + 'code' : grpc.status.OK, + 'details' : 'OK' + }; + stream.serialize = common.wrapIgnoreNull(serialize); function sendStatus() { - call.startWriteStatus(status.code, status.details, function() { - }); - finished = true; + var batch = {}; + batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status; + stream.call.startBatch(batch, function(){}); } - this.on('finish', sendStatus); + stream.on('finish', sendStatus); /** * Set the pending status to a given error status. If the error does not have * code or details properties, the code will be set to grpc.status.INTERNAL @@ -123,7 +115,7 @@ function GrpcServerStream(call, serialize, deserialize) { details = err.details; } } - status = {'code': code, 'details': details}; + stream.status = {'code': code, 'details': details}; } /** * Terminate the call. This includes indicating that reads are done, draining @@ -133,69 +125,196 @@ function GrpcServerStream(call, serialize, deserialize) { */ function terminateCall(err) { // Drain readable data - this.on('data', function() {}); setStatus(err); - this.end(); + stream.end(); } - this.on('error', terminateCall); - // Indicates that a read is pending - var reading = false; + stream.on('error', terminateCall); +} + +function setUpReadable(stream, deserialize) { + stream.deserialize = common.wrapIgnoreNull(deserialize); + stream.finished = false; + stream.reading = false; + + stream.terminate = function() { + stream.finished = true; + stream.on('data', function() {}); + }; + + stream.on('cancelled', function() { + stream.terminate(); + }); +} + +util.inherits(ServerWritableStream, Writable); + +function ServerWritableStream(call, serialize) { + Writable.call(this, {objectMode: true}); + this.call = call; + + this.finished = false; + setUpWritable(this, serialize); +} + +/** + * Start writing a chunk of data. This is an implementation of a method required + * for implementing stream.Writable. + * @param {Buffer} chunk The chunk of data to write + * @param {string} encoding Ignored + * @param {function(Error=)} callback Callback to indicate that the write is + * complete + */ +function _write(chunk, encoding, callback) { + var batch = {}; + batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk); + this.call.startBatch(batch, function(err, value) { + if (err) { + this.emit('error', err); + return; + } + callback(); + }); +} + +ServerWritableStream.prototype._write = _write; + +util.inherits(ServerReadableStream, Readable); + +function ServerReadableStream(call, deserialize) { + Readable.call(this, {objectMode: true}); + this.call = call; + setUpReadable(this, deserialize); +} + +/** + * Start reading from the gRPC data source. This is an implementation of a + * method required for implementing stream.Readable + * @param {number} size Ignored + */ +function _read(size) { + var self = this; /** * Callback to be called when a READ event is received. Pushes the data onto * the read queue and starts reading again if applicable * @param {grpc.Event} event READ event object */ - function readCallback(event) { - if (finished) { + function readCallback(err, event) { + if (err) { + self.terminate(); + return; + } + if (self.finished) { self.push(null); return; } - var data = event.data; + var data = event.read; if (self.push(self.deserialize(data)) && data != null) { - self._call.startRead(readCallback); + var read_batch = {}; + read_batch[grpc.opType.RECV_MESSAGE] = true; + self.call.startBatch(read_batch, readCallback); } else { - reading = false; + self.reading = false; } } - /** - * Start reading if there is not already a pending read. Reading will - * continue until self.push returns false (indicating reads should slow - * down) or the read data is null (indicating that there is no more data). - */ - this.startReading = function() { - if (finished) { - self.push(null); - } else { - if (!reading) { - reading = true; - self._call.startRead(readCallback); + if (self.finished) { + self.push(null); + } else { + if (!self.reading) { + self.reading = true; + var batch = {}; + batch[grpc.opType.RECV_MESSAGE] = true; + self.call.startBatch(batch, readCallback); + } + } +} + +ServerReadableStream.prototype._read = _read; + +util.inherits(ServerDuplexStream, Duplex); + +function ServerDuplexStream(call, serialize, deserialize) { + Duplex.call(this, {objectMode: true}); + setUpWritable(this, serialize); + setUpReadable(this, deserialize); +} + +ServerDuplexStream.prototype._read = _read; +ServerDuplexStream.prototype._write = _write; + +function handleUnary(call, handler, metadata) { + var emitter = new EventEmitter(); + emitter.on('error', function(error) { + handleError(call, error); + }); + waitForCancel(call, emitter); + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + batch[grpc.opType.RECV_MESSAGE] = true; + call.startBatch(batch, function(err, result) { + if (err) { + handleError(call, err); + return; + } + emitter.request = handler.deserialize(result.read); + if (emitter.cancelled) { + return; + } + handler.func(emitter, function sendUnaryData(err, value) { + if (err) { + handleError(call, err); } + sendUnaryResponse(call, value, handler.serialize); + }); + }); +} + +function handleServerStreaming(call, handler, metadata) { + console.log('Handling server streaming call'); + var stream = new ServerWritableStream(call, handler.serialize); + waitForCancel(call, stream); + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + batch[grpc.opType.RECV_MESSAGE] = true; + call.startBatch(batch, function(err, result) { + if (err) { + stream.emit('error', err); + return; } - }; + stream.request = result.read; + handler.func(stream); + }); } -/** - * Start reading from the gRPC data source. This is an implementation of a - * method required for implementing stream.Readable - * @param {number} size Ignored - */ -GrpcServerStream.prototype._read = function(size) { - this.startReading(); -}; +function handleClientStreaming(call, handler, metadata) { + var stream = new ServerReadableStream(call, handler.deserialize); + waitForCancel(call, stream); + var metadata_batch = {}; + metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + call.startBatch(metadata_batch, function() {}); + handler.func(stream, function(err, value) { + stream.terminate(); + if (err) { + handleError(call, err); + } + sendUnaryResponse(call, value, handler.serialize); + }); +} -/** - * Start writing a chunk of data. This is an implementation of a method required - * for implementing stream.Writable. - * @param {Buffer} chunk The chunk of data to write - * @param {string} encoding Ignored - * @param {function(Error=)} callback Callback to indicate that the write is - * complete - */ -GrpcServerStream.prototype._write = function(chunk, encoding, callback) { - var self = this; - self._call.startWrite(self.serialize(chunk), function(event) { - callback(); - }, 0); +function handleBidiStreaming(call, handler, metadata) { + var stream = new ServerDuplexStream(call, handler.serialize, + handler.deserialize); + waitForCancel(call, stream); + var metadata_batch = {}; + metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + call.startBatch(metadata_batch, function() {}); + handler.func(stream); +} + +var streamHandlers = { + unary: handleUnary, + server_stream: handleServerStreaming, + client_stream: handleClientStreaming, + bidi: handleBidiStreaming }; /** @@ -218,7 +337,7 @@ function Server(getMetadata, options) { * Start the server and begin handling requests * @this Server */ - this.start = function() { + this.listen = function() { console.log('Server starting'); _.each(handlers, function(handler, handler_name) { console.log('Serving', handler_name); @@ -233,48 +352,42 @@ function Server(getMetadata, options) { * wait for the next request * @param {grpc.Event} event The event to handle with tag SERVER_RPC_NEW */ - function handleNewCall(event) { - var call = event.call; - var data = event.data; - if (data === null) { + function handleNewCall(err, event) { + console.log('Handling new call'); + if (err) { + return; + } + var details = event['new call']; + var call = details.call; + var method = details.method; + var metadata = details.metadata; + if (method === null) { return; } server.requestCall(handleNewCall); var handler = undefined; - var deadline = data.absolute_deadline; - var cancelled = false; - call.serverAccept(function(event) { - if (event.data.code === grpc.status.CANCELLED) { - cancelled = true; - if (stream) { - stream.emit('cancelled'); - } - } - }, 0); - if (handlers.hasOwnProperty(data.method)) { - handler = handlers[data.method]; + var deadline = details.deadline; + if (handlers.hasOwnProperty(method)) { + handler = handlers[method]; + console.log(handler); } else { - call.serverEndInitialMetadata(0); - call.startWriteStatus( - grpc.status.UNIMPLEMENTED, - "This method is not available on this server.", - function() {}); + console.log(handlers); + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + code: grpc.status.UNIMPLEMENTED, + details: "This method is not available on this server.", + metadata: {} + }; + batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; + call.startBatch(batch, function() {}); return; } + var response_metadata = {}; if (getMetadata) { - call.addMetadata(getMetadata(data.method, data.metadata)); - } - call.serverEndInitialMetadata(0); - var stream = new GrpcServerStream(call, handler.serialize, - handler.deserialize); - Object.defineProperty(stream, 'cancelled', { - get: function() { return cancelled;} - }); - try { - handler.func(stream, data.metadata); - } catch (e) { - stream.emit('error', e); + response_metadata = getMetadata(method, metadata); } + streamHandlers[handler.type](call, handler, response_metadata); } server.requestCall(handleNewCall); }; @@ -294,17 +407,20 @@ function Server(getMetadata, options) { * returns a stream of response values * @param {function(*):Buffer} serialize Serialization function for responses * @param {function(Buffer):*} deserialize Deserialization function for requests + * @param {string} type The streaming type of method that this handles * @return {boolean} True if the handler was set. False if a handler was already * set for that name. */ -Server.prototype.register = function(name, handler, serialize, deserialize) { +Server.prototype.register = function(name, handler, serialize, deserialize, + type) { if (this.handlers.hasOwnProperty(name)) { return false; } this.handlers[name] = { func: handler, serialize: serialize, - deserialize: deserialize + deserialize: deserialize, + type: type }; return true; }; @@ -324,6 +440,110 @@ Server.prototype.bind = function(port, secure) { }; /** - * See documentation for Server + * Creates a constructor for servers with a service defined by the methods + * object. The methods object has string keys and values of this form: + * {serialize: function, deserialize: function, client_stream: bool, + * server_stream: bool} + * @param {Object} methods Method descriptor for each method the server should + * expose + * @param {string} prefix The prefex to prepend to each method name + * @return {function(Object, Object)} New server constructor + */ +function makeServerConstructor(services) { + var qual_names = []; + _.each(services, function(service) { + _.each(service.children, function(method) { + var name = common.fullyQualifiedName(method); + if (_.indexOf(qual_names, name) !== -1) { + throw new Error('Method ' + name + ' exposed by more than one service'); + } + qual_names.push(name); + }); + }); + /** + * Create a server with the given handlers for all of the methods. + * @constructor + * @param {Object} service_handlers Map from service names to map from method + * names to handlers + * @param {function(string, Object>): + Object>=} getMetadata Callback that + * gets metatada for a given method + * @param {Object=} options Options to pass to the underlying server + */ + function SurfaceServer(service_handlers, getMetadata, options) { + var server = new Server(getMetadata, options); + this.inner_server = server; + _.each(services, function(service) { + var service_name = common.fullyQualifiedName(service); + if (service_handlers[service_name] === undefined) { + throw new Error('Handlers for service ' + + service_name + ' not provided.'); + } + var prefix = '/' + common.fullyQualifiedName(service) + '/'; + _.each(service.children, function(method) { + var method_type; + if (method.requestStream) { + if (method.responseStream) { + method_type = 'bidi'; + } else { + method_type = 'client_stream'; + } + } else { + if (method.responseStream) { + method_type = 'server_stream'; + } else { + method_type = 'unary'; + } + } + if (service_handlers[service_name][decapitalize(method.name)] === + undefined) { + throw new Error('Method handler for ' + + common.fullyQualifiedName(method) + ' not provided.'); + } + var serialize = common.serializeCls( + method.resolvedResponseType.build()); + var deserialize = common.deserializeCls( + method.resolvedRequestType.build()); + server.register( + prefix + capitalize(method.name), + service_handlers[service_name][decapitalize(method.name)], + serialize, deserialize, method_type); + }); + }, this); + } + + /** + * Binds the server to the given port, with SSL enabled if secure is specified + * @param {string} port The port that the server should bind on, in the format + * "address:port" + * @param {boolean=} secure Whether the server should open a secure port + * @return {SurfaceServer} this + */ + SurfaceServer.prototype.bind = function(port, secure) { + return this.inner_server.bind(port, secure); + }; + + /** + * Starts the server listening on any bound ports + * @return {SurfaceServer} this + */ + SurfaceServer.prototype.listen = function() { + this.inner_server.listen(); + return this; + }; + + /** + * Shuts the server down; tells it to stop listening for new requests and to + * kill old requests. + */ + SurfaceServer.prototype.shutdown = function() { + this.inner_server.shutdown(); + }; + + return SurfaceServer; +} + +/** + * See documentation for makeServerConstructor */ -module.exports = Server; +exports.makeServerConstructor = makeServerConstructor; diff --git a/src/node/test/math_client_test.js b/src/node/test/math_client_test.js index 0e365bf870..f347b18ea0 100644 --- a/src/node/test/math_client_test.js +++ b/src/node/test/math_client_test.js @@ -63,13 +63,10 @@ describe('Math client', function() { assert.ifError(err); assert.equal(value.quotient, 1); assert.equal(value.remainder, 3); - }); - call.on('status', function checkStatus(status) { - assert.strictEqual(status.code, grpc.status.OK); done(); }); }); - it('should handle a server streaming request', function(done) { + it.only('should handle a server streaming request', function(done) { var call = math_client.fib({limit: 7}); var expected_results = [1, 1, 2, 3, 5, 8, 13]; var next_expected = 0; -- cgit v1.2.3 From 10ac96cb8ff60637f2b46e7059b46d701c123dc0 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 12 Feb 2015 13:28:25 -0800 Subject: All tests but one now pass against new API --- src/node/interop/interop_client.js | 2 +- src/node/interop/interop_server.js | 1 + src/node/src/client.js | 27 ++- src/node/src/server.js | 15 +- src/node/src/surface_client.js | 357 ----------------------------------- src/node/src/surface_server.js | 340 --------------------------------- src/node/test/client_server_test.js | 255 ------------------------- src/node/test/interop_sanity_test.js | 2 +- src/node/test/math_client_test.js | 2 +- src/node/test/server_test.js | 122 ------------ src/node/test/surface_test.js | 4 +- 11 files changed, 40 insertions(+), 1087 deletions(-) delete mode 100644 src/node/src/surface_client.js delete mode 100644 src/node/src/surface_server.js delete mode 100644 src/node/test/client_server_test.js delete mode 100644 src/node/test/server_test.js (limited to 'src/node') diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js index ce18f77fe7..8737af6cde 100644 --- a/src/node/interop/interop_client.js +++ b/src/node/interop/interop_client.js @@ -145,8 +145,8 @@ function serverStreaming(client, done) { resp_index += 1; }); call.on('status', function(status) { - assert.strictEqual(resp_index, 4); assert.strictEqual(status.code, grpc.status.OK); + assert.strictEqual(resp_index, 4); if (done) { done(); } diff --git a/src/node/interop/interop_server.js b/src/node/interop/interop_server.js index 54e9715d1e..271fe5962e 100644 --- a/src/node/interop/interop_server.js +++ b/src/node/interop/interop_server.js @@ -106,6 +106,7 @@ function handleStreamingOutput(call) { testProto.PayloadType.COMPRESSABLE, testProto.PayloadType.UNCOMPRESSABLE][Math.random() < 0.5 ? 0 : 1]; } + console.log('req:', req); _.each(req.response_parameters, function(resp_param) { call.write({ payload: { diff --git a/src/node/src/client.js b/src/node/src/client.js index 88fa9dc2e2..36a6f41d7b 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -56,6 +56,7 @@ function ClientWritableStream(call, serialize) { this.call = call; this.serialize = common.wrapIgnoreNull(serialize); this.on('finish', function() { + console.log('Send close from client'); var batch = {}; batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; call.startBatch(batch, function() {}); @@ -90,7 +91,7 @@ function ClientReadableStream(call, deserialize) { this.call = call; this.finished = false; this.reading = false; - this.serialize = common.wrapIgnoreNull(deserialize); + this.deserialize = common.wrapIgnoreNull(deserialize); } function _read(size) { @@ -100,12 +101,15 @@ function _read(size) { * the read queue and starts reading again if applicable * @param {grpc.Event} event READ event object */ - function readCallback(event) { + function readCallback(err, event) { + if (err) { + throw err; + } if (self.finished) { self.push(null); return; } - var data = event.data; + var data = event.read; if (self.push(self.deserialize(data)) && data != null) { var read_batch = {}; read_batch[grpc.opType.RECV_MESSAGE] = true; @@ -142,12 +146,18 @@ util.inherits(ClientDuplexStream, Duplex); function ClientDuplexStream(call, serialize, deserialize) { Duplex.call(this, {objectMode: true}); this.serialize = common.wrapIgnoreNull(serialize); - this.serialize = common.wrapIgnoreNull(deserialize); + this.deserialize = common.wrapIgnoreNull(deserialize); var self = this; var finished = false; // Indicates that a read is currently pending var reading = false; this.call = call; + this.on('finish', function() { + console.log('Send close from client'); + var batch = {}; + batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; + call.startBatch(batch, function() {}); + }); } ClientDuplexStream.prototype._read = _read; @@ -208,6 +218,10 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { callback(err); return; } + if (response.status.code != grpc.status.OK) { + callback(response.status); + return; + } emitter.emit('status', response.status); emitter.emit('metadata', response.metadata); callback(null, deserialize(response.read)); @@ -265,6 +279,11 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { callback(err); return; } + console.log(response); + if (response.status.code != grpc.status.OK) { + callback(response.status); + return; + } stream.emit('status', response.status); callback(null, deserialize(response.read)); }); diff --git a/src/node/src/server.js b/src/node/src/server.js index 2d5396e3b7..3c21491319 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -89,11 +89,13 @@ function sendUnaryResponse(call, value, serialize) { function setUpWritable(stream, serialize) { stream.finished = false; stream.status = { - 'code' : grpc.status.OK, - 'details' : 'OK' + code : grpc.status.OK, + details : 'OK', + metadata : {} }; stream.serialize = common.wrapIgnoreNull(serialize); function sendStatus() { + console.log('Server sending status'); var batch = {}; batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status; stream.call.startBatch(batch, function(){}); @@ -115,7 +117,7 @@ function setUpWritable(stream, serialize) { details = err.details; } } - stream.status = {'code': code, 'details': details}; + stream.status = {code: code, details: details, metadata: {}}; } /** * Terminate the call. This includes indicating that reads are done, draining @@ -167,6 +169,7 @@ function ServerWritableStream(call, serialize) { function _write(chunk, encoding, callback) { var batch = {}; batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk); + console.log('Server writing', batch); this.call.startBatch(batch, function(err, value) { if (err) { this.emit('error', err); @@ -204,11 +207,14 @@ function _read(size) { return; } if (self.finished) { + console.log('Pushing null'); self.push(null); return; } var data = event.read; + console.log(data); if (self.push(self.deserialize(data)) && data != null) { + console.log('Reading again'); var read_batch = {}; read_batch[grpc.opType.RECV_MESSAGE] = true; self.call.startBatch(read_batch, readCallback); @@ -234,6 +240,7 @@ util.inherits(ServerDuplexStream, Duplex); function ServerDuplexStream(call, serialize, deserialize) { Duplex.call(this, {objectMode: true}); + this.call = call; setUpWritable(this, serialize); setUpReadable(this, deserialize); } @@ -280,7 +287,7 @@ function handleServerStreaming(call, handler, metadata) { stream.emit('error', err); return; } - stream.request = result.read; + stream.request = handler.deserialize(result.read); handler.func(stream); }); } diff --git a/src/node/src/surface_client.js b/src/node/src/surface_client.js deleted file mode 100644 index 16c31809f4..0000000000 --- a/src/node/src/surface_client.js +++ /dev/null @@ -1,357 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -var _ = require('underscore'); - -var capitalize = require('underscore.string/capitalize'); -var decapitalize = require('underscore.string/decapitalize'); - -var client = require('./client.js'); - -var common = require('./common.js'); - -var EventEmitter = require('events').EventEmitter; - -var stream = require('stream'); - -var Readable = stream.Readable; -var Writable = stream.Writable; -var Duplex = stream.Duplex; -var util = require('util'); - - -function forwardEvent(fromEmitter, toEmitter, event) { - fromEmitter.on(event, function forward() { - _.partial(toEmitter.emit, event).apply(toEmitter, arguments); - }); -} - -util.inherits(ClientReadableObjectStream, Readable); - -/** - * Class for representing a gRPC server streaming call as a Node stream on the - * client side. Extends from stream.Readable. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - */ -function ClientReadableObjectStream(stream) { - var options = {objectMode: true}; - Readable.call(this, options); - this._stream = stream; - var self = this; - forwardEvent(stream, this, 'status'); - forwardEvent(stream, this, 'metadata'); - this._stream.on('data', function forwardData(chunk) { - if (!self.push(chunk)) { - self._stream.pause(); - } - }); - this._stream.pause(); -} - -/** - * _read implementation for both types of streams that allow reading. - * @this {ClientReadableObjectStream} - * @param {number} size Ignored - */ -function _read(size) { - this._stream.resume(); -} - -/** - * See docs for _read - */ -ClientReadableObjectStream.prototype._read = _read; - -util.inherits(ClientWritableObjectStream, Writable); - -/** - * Class for representing a gRPC client streaming call as a Node stream on the - * client side. Extends from stream.Writable. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - */ -function ClientWritableObjectStream(stream) { - var options = {objectMode: true}; - Writable.call(this, options); - this._stream = stream; - forwardEvent(stream, this, 'status'); - forwardEvent(stream, this, 'metadata'); - this.on('finish', function() { - this._stream.end(); - }); -} - -/** - * _write implementation for both types of streams that allow writing - * @this {ClientWritableObjectStream} - * @param {*} chunk The value to write to the stream - * @param {string} encoding Ignored - * @param {function(Error)} callback Callback to call when finished writing - */ -function _write(chunk, encoding, callback) { - this._stream.write(chunk, encoding, callback); -} - -/** - * See docs for _write - */ -ClientWritableObjectStream.prototype._write = _write; - -/** - * Cancel the underlying call - */ -function cancel() { - this._stream.cancel(); -} - -ClientReadableObjectStream.prototype.cancel = cancel; -ClientWritableObjectStream.prototype.cancel = cancel; - -/** - * Get a function that can make unary requests to the specified method. - * @param {string} method The name of the method to request - * @param {function(*):Buffer} serialize The serialization function for inputs - * @param {function(Buffer)} deserialize The deserialization function for - * outputs - * @return {Function} makeUnaryRequest - */ -function makeUnaryRequestFunction(method, serialize, deserialize) { - /** - * Make a unary request with this method on the given channel with the given - * argument, callback, etc. - * @this {SurfaceClient} Client object. Must have a channel member. - * @param {*} argument The argument to the call. Should be serializable with - * serialize - * @param {function(?Error, value=)} callback The callback to for when the - * response is received - * @param {array=} metadata Array of metadata key/value pairs to add to the - * call - * @param {(number|Date)=} deadline The deadline for processing this request. - * Defaults to infinite future - * @return {EventEmitter} An event emitter for stream related events - */ - function makeUnaryRequest(argument, callback, metadata, deadline) { - var stream = client.makeRequest(this.channel, method, serialize, - deserialize, metadata, deadline); - var emitter = new EventEmitter(); - emitter.cancel = function cancel() { - stream.cancel(); - }; - forwardEvent(stream, emitter, 'status'); - forwardEvent(stream, emitter, 'metadata'); - stream.write(argument); - stream.end(); - stream.on('data', function forwardData(chunk) { - try { - callback(null, chunk); - } catch (e) { - callback(e); - } - }); - stream.on('status', function forwardStatus(status) { - if (status.code !== client.status.OK) { - callback(status); - } - }); - return emitter; - } - return makeUnaryRequest; -} - -/** - * Get a function that can make client stream requests to the specified method. - * @param {string} method The name of the method to request - * @param {function(*):Buffer} serialize The serialization function for inputs - * @param {function(Buffer)} deserialize The deserialization function for - * outputs - * @return {Function} makeClientStreamRequest - */ -function makeClientStreamRequestFunction(method, serialize, deserialize) { - /** - * Make a client stream request with this method on the given channel with the - * given callback, etc. - * @this {SurfaceClient} Client object. Must have a channel member. - * @param {function(?Error, value=)} callback The callback to for when the - * response is received - * @param {array=} metadata Array of metadata key/value pairs to add to the - * call - * @param {(number|Date)=} deadline The deadline for processing this request. - * Defaults to infinite future - * @return {EventEmitter} An event emitter for stream related events - */ - function makeClientStreamRequest(callback, metadata, deadline) { - var stream = client.makeRequest(this.channel, method, serialize, - deserialize, metadata, deadline); - var obj_stream = new ClientWritableObjectStream(stream); - stream.on('data', function forwardData(chunk) { - try { - callback(null, chunk); - } catch (e) { - callback(e); - } - }); - stream.on('status', function forwardStatus(status) { - if (status.code !== client.status.OK) { - callback(status); - } - }); - return obj_stream; - } - return makeClientStreamRequest; -} - -/** - * Get a function that can make server stream requests to the specified method. - * @param {string} method The name of the method to request - * @param {function(*):Buffer} serialize The serialization function for inputs - * @param {function(Buffer)} deserialize The deserialization function for - * outputs - * @return {Function} makeServerStreamRequest - */ -function makeServerStreamRequestFunction(method, serialize, deserialize) { - /** - * Make a server stream request with this method on the given channel with the - * given argument, etc. - * @this {SurfaceClient} Client object. Must have a channel member. - * @param {*} argument The argument to the call. Should be serializable with - * serialize - * @param {array=} metadata Array of metadata key/value pairs to add to the - * call - * @param {(number|Date)=} deadline The deadline for processing this request. - * Defaults to infinite future - * @return {EventEmitter} An event emitter for stream related events - */ - function makeServerStreamRequest(argument, metadata, deadline) { - var stream = client.makeRequest(this.channel, method, serialize, - deserialize, metadata, deadline); - var obj_stream = new ClientReadableObjectStream(stream); - stream.write(argument); - stream.end(); - return obj_stream; - } - return makeServerStreamRequest; -} - -/** - * Get a function that can make bidirectional stream requests to the specified - * method. - * @param {string} method The name of the method to request - * @param {function(*):Buffer} serialize The serialization function for inputs - * @param {function(Buffer)} deserialize The deserialization function for - * outputs - * @return {Function} makeBidiStreamRequest - */ -function makeBidiStreamRequestFunction(method, serialize, deserialize) { - /** - * Make a bidirectional stream request with this method on the given channel. - * @this {SurfaceClient} Client object. Must have a channel member. - * @param {array=} metadata Array of metadata key/value pairs to add to the - * call - * @param {(number|Date)=} deadline The deadline for processing this request. - * Defaults to infinite future - * @return {EventEmitter} An event emitter for stream related events - */ - function makeBidiStreamRequest(metadata, deadline) { - return client.makeRequest(this.channel, method, serialize, - deserialize, metadata, deadline); - } - return makeBidiStreamRequest; -} - -/** - * Map with short names for each of the requester maker functions. Used in - * makeClientConstructor - */ -var requester_makers = { - unary: makeUnaryRequestFunction, - server_stream: makeServerStreamRequestFunction, - client_stream: makeClientStreamRequestFunction, - bidi: makeBidiStreamRequestFunction -} - -/** - * Creates a constructor for clients for the given service - * @param {ProtoBuf.Reflect.Service} service The service to generate a client - * for - * @return {function(string, Object)} New client constructor - */ -function makeClientConstructor(service) { - var prefix = '/' + common.fullyQualifiedName(service) + '/'; - /** - * Create a client with the given methods - * @constructor - * @param {string} address The address of the server to connect to - * @param {Object} options Options to pass to the underlying channel - */ - function SurfaceClient(address, options) { - this.channel = new client.Channel(address, options); - } - - _.each(service.children, function(method) { - var method_type; - if (method.requestStream) { - if (method.responseStream) { - method_type = 'bidi'; - } else { - method_type = 'client_stream'; - } - } else { - if (method.responseStream) { - method_type = 'server_stream'; - } else { - method_type = 'unary'; - } - } - SurfaceClient.prototype[decapitalize(method.name)] = - requester_makers[method_type]( - prefix + capitalize(method.name), - common.serializeCls(method.resolvedRequestType.build()), - common.deserializeCls(method.resolvedResponseType.build())); - }); - - SurfaceClient.service = service; - - return SurfaceClient; -} - -exports.makeClientConstructor = makeClientConstructor; - -/** - * See docs for client.status - */ -exports.status = client.status; -/** - * See docs for client.callError - */ -exports.callError = client.callError; diff --git a/src/node/src/surface_server.js b/src/node/src/surface_server.js deleted file mode 100644 index a47d1fa23d..0000000000 --- a/src/node/src/surface_server.js +++ /dev/null @@ -1,340 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -var _ = require('underscore'); - -var capitalize = require('underscore.string/capitalize'); -var decapitalize = require('underscore.string/decapitalize'); - -var Server = require('./server.js'); - -var stream = require('stream'); - -var Readable = stream.Readable; -var Writable = stream.Writable; -var Duplex = stream.Duplex; -var util = require('util'); - -var common = require('./common.js'); - -util.inherits(ServerReadableObjectStream, Readable); - -/** - * Class for representing a gRPC client streaming call as a Node stream on the - * server side. Extends from stream.Readable. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - */ -function ServerReadableObjectStream(stream) { - var options = {objectMode: true}; - Readable.call(this, options); - this._stream = stream; - Object.defineProperty(this, 'cancelled', { - get: function() { return stream.cancelled; } - }); - var self = this; - this._stream.on('cancelled', function() { - self.emit('cancelled'); - }); - this._stream.on('data', function forwardData(chunk) { - if (!self.push(chunk)) { - self._stream.pause(); - } - }); - this._stream.on('end', function forwardEnd() { - self.push(null); - }); - this._stream.pause(); -} - -/** - * _read implementation for both types of streams that allow reading. - * @this {ServerReadableObjectStream|ServerBidiObjectStream} - * @param {number} size Ignored - */ -function _read(size) { - this._stream.resume(); -} - -/** - * See docs for _read - */ -ServerReadableObjectStream.prototype._read = _read; - -util.inherits(ServerWritableObjectStream, Writable); - -/** - * Class for representing a gRPC server streaming call as a Node stream on the - * server side. Extends from stream.Writable. - * @constructor - * @param {stream} stream Underlying binary Duplex stream for the call - */ -function ServerWritableObjectStream(stream) { - var options = {objectMode: true}; - Writable.call(this, options); - this._stream = stream; - this._stream.on('cancelled', function() { - self.emit('cancelled'); - }); - this.on('finish', function() { - this._stream.end(); - }); -} - -/** - * _write implementation for both types of streams that allow writing - * @this {ServerWritableObjectStream} - * @param {*} chunk The value to write to the stream - * @param {string} encoding Ignored - * @param {function(Error)} callback Callback to call when finished writing - */ -function _write(chunk, encoding, callback) { - this._stream.write(chunk, encoding, callback); -} - -/** - * See docs for _write - */ -ServerWritableObjectStream.prototype._write = _write; - -/** - * Creates a binary stream handler function from a unary handler function - * @param {function(Object, function(Error, *), metadata=)} handler Unary call - * handler - * @return {function(stream, metadata=)} Binary stream handler - */ -function makeUnaryHandler(handler) { - /** - * Handles a stream by reading a single data value, passing it to the handler, - * and writing the response back to the stream. - * @param {stream} stream Binary data stream - * @param {metadata=} metadata Incoming metadata array - */ - return function handleUnaryCall(stream, metadata) { - stream.on('data', function handleUnaryData(value) { - var call = {request: value}; - Object.defineProperty(call, 'cancelled', { - get: function() { return stream.cancelled;} - }); - stream.on('cancelled', function() { - call.emit('cancelled'); - }); - handler(call, function sendUnaryData(err, value) { - if (err) { - stream.emit('error', err); - } else { - stream.write(value); - stream.end(); - } - }, metadata); - }); - }; -} - -/** - * Creates a binary stream handler function from a client stream handler - * function - * @param {function(Readable, function(Error, *), metadata=)} handler Client - * stream call handler - * @return {function(stream, metadata=)} Binary stream handler - */ -function makeClientStreamHandler(handler) { - /** - * Handles a stream by passing a deserializing stream to the handler and - * writing the response back to the stream. - * @param {stream} stream Binary data stream - * @param {metadata=} metadata Incoming metadata array - */ - return function handleClientStreamCall(stream, metadata) { - var object_stream = new ServerReadableObjectStream(stream); - handler(object_stream, function sendClientStreamData(err, value) { - if (err) { - stream.emit('error', err); - } else { - stream.write(value); - stream.end(); - } - }, metadata); - }; -} - -/** - * Creates a binary stream handler function from a server stream handler - * function - * @param {function(Writable, metadata=)} handler Server stream call handler - * @return {function(stream, metadata=)} Binary stream handler - */ -function makeServerStreamHandler(handler) { - /** - * Handles a stream by attaching it to a serializing stream, and passing it to - * the handler. - * @param {stream} stream Binary data stream - * @param {metadata=} metadata Incoming metadata array - */ - return function handleServerStreamCall(stream, metadata) { - stream.on('data', function handleClientData(value) { - var object_stream = new ServerWritableObjectStream(stream); - object_stream.request = value; - handler(object_stream, metadata); - }); - }; -} - -/** - * Creates a binary stream handler function from a bidi stream handler function - * @param {function(Duplex, metadata=)} handler Unary call handler - * @return {function(stream, metadata=)} Binary stream handler - */ -function makeBidiStreamHandler(handler) { - return handler; -} - -/** - * Map with short names for each of the handler maker functions. Used in - * makeServerConstructor - */ -var handler_makers = { - unary: makeUnaryHandler, - server_stream: makeServerStreamHandler, - client_stream: makeClientStreamHandler, - bidi: makeBidiStreamHandler -}; - -/** - * Creates a constructor for servers with a service defined by the methods - * object. The methods object has string keys and values of this form: - * {serialize: function, deserialize: function, client_stream: bool, - * server_stream: bool} - * @param {Object} methods Method descriptor for each method the server should - * expose - * @param {string} prefix The prefex to prepend to each method name - * @return {function(Object, Object)} New server constructor - */ -function makeServerConstructor(services) { - var qual_names = []; - _.each(services, function(service) { - _.each(service.children, function(method) { - var name = common.fullyQualifiedName(method); - if (_.indexOf(qual_names, name) !== -1) { - throw new Error('Method ' + name + ' exposed by more than one service'); - } - qual_names.push(name); - }); - }); - /** - * Create a server with the given handlers for all of the methods. - * @constructor - * @param {Object} service_handlers Map from service names to map from method - * names to handlers - * @param {function(string, Object>): - Object>=} getMetadata Callback that - * gets metatada for a given method - * @param {Object=} options Options to pass to the underlying server - */ - function SurfaceServer(service_handlers, getMetadata, options) { - var server = new Server(getMetadata, options); - this.inner_server = server; - _.each(services, function(service) { - var service_name = common.fullyQualifiedName(service); - if (service_handlers[service_name] === undefined) { - throw new Error('Handlers for service ' + - service_name + ' not provided.'); - } - var prefix = '/' + common.fullyQualifiedName(service) + '/'; - _.each(service.children, function(method) { - var method_type; - if (method.requestStream) { - if (method.responseStream) { - method_type = 'bidi'; - } else { - method_type = 'client_stream'; - } - } else { - if (method.responseStream) { - method_type = 'server_stream'; - } else { - method_type = 'unary'; - } - } - if (service_handlers[service_name][decapitalize(method.name)] === - undefined) { - throw new Error('Method handler for ' + - common.fullyQualifiedName(method) + ' not provided.'); - } - var binary_handler = handler_makers[method_type]( - service_handlers[service_name][decapitalize(method.name)]); - var serialize = common.serializeCls( - method.resolvedResponseType.build()); - var deserialize = common.deserializeCls( - method.resolvedRequestType.build()); - server.register(prefix + capitalize(method.name), binary_handler, - serialize, deserialize); - }); - }, this); - } - - /** - * Binds the server to the given port, with SSL enabled if secure is specified - * @param {string} port The port that the server should bind on, in the format - * "address:port" - * @param {boolean=} secure Whether the server should open a secure port - * @return {SurfaceServer} this - */ - SurfaceServer.prototype.bind = function(port, secure) { - return this.inner_server.bind(port, secure); - }; - - /** - * Starts the server listening on any bound ports - * @return {SurfaceServer} this - */ - SurfaceServer.prototype.listen = function() { - this.inner_server.start(); - return this; - }; - - /** - * Shuts the server down; tells it to stop listening for new requests and to - * kill old requests. - */ - SurfaceServer.prototype.shutdown = function() { - this.inner_server.shutdown(); - }; - - return SurfaceServer; -} - -/** - * See documentation for makeServerConstructor - */ -exports.makeServerConstructor = makeServerConstructor; diff --git a/src/node/test/client_server_test.js b/src/node/test/client_server_test.js deleted file mode 100644 index 1db9f69467..0000000000 --- a/src/node/test/client_server_test.js +++ /dev/null @@ -1,255 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -var assert = require('assert'); -var fs = require('fs'); -var path = require('path'); -var grpc = require('bindings')('grpc.node'); -var Server = require('../src/server'); -var client = require('../src/client'); -var common = require('../src/common'); - -var ca_path = path.join(__dirname, 'data/ca.pem'); - -var key_path = path.join(__dirname, 'data/server1.key'); - -var pem_path = path.join(__dirname, 'data/server1.pem'); - -/** - * Helper function to return an absolute deadline given a relative timeout in - * seconds. - * @param {number} timeout_secs The number of seconds to wait before timing out - * @return {Date} A date timeout_secs in the future - */ -function getDeadline(timeout_secs) { - var deadline = new Date(); - deadline.setSeconds(deadline.getSeconds() + timeout_secs); - return deadline; -} - -/** - * Responds to every request with the same data as a response - * @param {Stream} stream - */ -function echoHandler(stream) { - stream.pipe(stream); -} - -/** - * Responds to every request with an error status - * @param {Stream} stream - */ -function errorHandler(stream) { - throw { - 'code' : grpc.status.UNIMPLEMENTED, - 'details' : 'error details' - }; -} - -/** - * Wait for a cancellation instead of responding - * @param {Stream} stream - */ -function cancelHandler(stream) { - // do nothing -} - -function metadataHandler(stream, metadata) { - stream.end(); -} - -/** - * Serialize a string to a Buffer - * @param {string} value The string to serialize - * @return {Buffer} The serialized value - */ -function stringSerialize(value) { - return new Buffer(value); -} - -/** - * Deserialize a Buffer to a string - * @param {Buffer} buffer The buffer to deserialize - * @return {string} The string value of the buffer - */ -function stringDeserialize(buffer) { - return buffer.toString(); -} - -describe('echo client', function() { - var server; - var channel; - before(function() { - server = new Server(function getMetadata(method, metadata) { - return {method: [method]}; - }); - var port_num = server.bind('0.0.0.0:0'); - server.register('echo', echoHandler); - server.register('error', errorHandler); - server.register('cancellation', cancelHandler); - server.register('metadata', metadataHandler); - server.start(); - - channel = new grpc.Channel('localhost:' + port_num); - }); - after(function() { - server.shutdown(); - }); - it('should receive echo responses', function(done) { - var messages = ['echo1', 'echo2', 'echo3', 'echo4']; - var stream = client.makeRequest( - channel, - 'echo', - stringSerialize, - stringDeserialize); - for (var i = 0; i < messages.length; i++) { - stream.write(messages[i]); - } - stream.end(); - var index = 0; - stream.on('data', function(chunk) { - assert.equal(messages[index], chunk); - index += 1; - }); - stream.on('status', function(status) { - assert.equal(status.code, client.status.OK); - }); - stream.on('end', function() { - assert.equal(index, messages.length); - done(); - }); - }); - it('should recieve metadata set by the server', function(done) { - var stream = client.makeRequest(channel, 'metadata'); - stream.on('metadata', function(metadata) { - assert.strictEqual(metadata.method[0].toString(), 'metadata'); - }); - stream.on('status', function(status) { - assert.equal(status.code, client.status.OK); - done(); - }); - stream.end(); - }); - it('should get an error status that the server throws', function(done) { - var stream = client.makeRequest(channel, 'error'); - - stream.on('data', function() {}); - stream.write(new Buffer('test')); - stream.end(); - stream.on('status', function(status) { - assert.equal(status.code, grpc.status.UNIMPLEMENTED); - assert.equal(status.details, 'error details'); - done(); - }); - }); - it('should be able to cancel a call', function(done) { - var stream = client.makeRequest( - channel, - 'cancellation', - null, - getDeadline(1)); - - stream.cancel(); - stream.on('status', function(status) { - assert.equal(status.code, grpc.status.CANCELLED); - done(); - }); - }); - it('should get correct status for unimplemented method', function(done) { - var stream = client.makeRequest(channel, 'unimplemented_method'); - stream.end(); - stream.on('status', function(status) { - assert.equal(status.code, grpc.status.UNIMPLEMENTED); - done(); - }); - }); -}); -/* TODO(mlumish): explore options for reducing duplication between this test - * and the insecure echo client test */ -describe('secure echo client', function() { - var server; - var channel; - before(function(done) { - fs.readFile(ca_path, function(err, ca_data) { - assert.ifError(err); - fs.readFile(key_path, function(err, key_data) { - assert.ifError(err); - fs.readFile(pem_path, function(err, pem_data) { - assert.ifError(err); - var creds = grpc.Credentials.createSsl(ca_data); - var server_creds = grpc.ServerCredentials.createSsl(null, - key_data, - pem_data); - - server = new Server(null, {'credentials' : server_creds}); - var port_num = server.bind('0.0.0.0:0', true); - server.register('echo', echoHandler); - server.start(); - - channel = new grpc.Channel('localhost:' + port_num, { - 'grpc.ssl_target_name_override' : 'foo.test.google.com', - 'credentials' : creds - }); - done(); - }); - }); - }); - }); - after(function() { - server.shutdown(); - }); - it('should recieve echo responses', function(done) { - var messages = ['echo1', 'echo2', 'echo3', 'echo4']; - var stream = client.makeRequest( - channel, - 'echo', - stringSerialize, - stringDeserialize); - for (var i = 0; i < messages.length; i++) { - stream.write(messages[i]); - } - stream.end(); - var index = 0; - stream.on('data', function(chunk) { - assert.equal(messages[index], chunk); - index += 1; - }); - stream.on('status', function(status) { - assert.equal(status.code, client.status.OK); - }); - stream.on('end', function() { - assert.equal(index, messages.length); - done(); - }); - }); -}); diff --git a/src/node/test/interop_sanity_test.js b/src/node/test/interop_sanity_test.js index 7ecaad833d..81cd9fa5b9 100644 --- a/src/node/test/interop_sanity_test.js +++ b/src/node/test/interop_sanity_test.js @@ -56,7 +56,7 @@ describe('Interop tests', function() { interop_client.runTest(port, name_override, 'empty_unary', true, done); }); // This fails due to an unknown bug - it.skip('should pass large_unary', function(done) { + it('should pass large_unary', function(done) { interop_client.runTest(port, name_override, 'large_unary', true, done); }); it('should pass client_streaming', function(done) { diff --git a/src/node/test/math_client_test.js b/src/node/test/math_client_test.js index f347b18ea0..61b4a2fa2f 100644 --- a/src/node/test/math_client_test.js +++ b/src/node/test/math_client_test.js @@ -66,7 +66,7 @@ describe('Math client', function() { done(); }); }); - it.only('should handle a server streaming request', function(done) { + it('should handle a server streaming request', function(done) { var call = math_client.fib({limit: 7}); var expected_results = [1, 1, 2, 3, 5, 8, 13]; var next_expected = 0; diff --git a/src/node/test/server_test.js b/src/node/test/server_test.js deleted file mode 100644 index a3e1edf50f..0000000000 --- a/src/node/test/server_test.js +++ /dev/null @@ -1,122 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -var assert = require('assert'); -var grpc = require('bindings')('grpc.node'); -var Server = require('../src/server'); - -/** - * This is used for testing functions with multiple asynchronous calls that - * can happen in different orders. This should be passed the number of async - * function invocations that can occur last, and each of those should call this - * function's return value - * @param {function()} done The function that should be called when a test is - * complete. - * @param {number} count The number of calls to the resulting function if the - * test passes. - * @return {function()} The function that should be called at the end of each - * sequence of asynchronous functions. - */ -function multiDone(done, count) { - return function() { - count -= 1; - if (count <= 0) { - done(); - } - }; -} - -/** - * Responds to every request with the same data as a response - * @param {Stream} stream - */ -function echoHandler(stream) { - stream.pipe(stream); -} - -describe('echo server', function() { - var server; - var channel; - before(function() { - server = new Server(); - var port_num = server.bind('[::]:0'); - server.register('echo', echoHandler); - server.start(); - - channel = new grpc.Channel('localhost:' + port_num); - }); - after(function() { - server.shutdown(); - }); - it('should echo inputs as responses', function(done) { - done = multiDone(done, 4); - - var req_text = 'echo test string'; - var status_text = 'OK'; - - var deadline = new Date(); - deadline.setSeconds(deadline.getSeconds() + 3); - var call = new grpc.Call(channel, - 'echo', - deadline); - call.invoke(function(event) { - assert.strictEqual(event.type, - grpc.completionType.CLIENT_METADATA_READ); - done(); - },function(event) { - assert.strictEqual(event.type, grpc.completionType.FINISHED); - var status = event.data; - assert.strictEqual(status.code, grpc.status.OK); - assert.strictEqual(status.details, status_text); - done(); - }, 0); - call.startWrite( - new Buffer(req_text), - function(event) { - assert.strictEqual(event.type, - grpc.completionType.WRITE_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - call.writesDone(function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - done(); - }); - }, 0); - call.startRead(function(event) { - assert.strictEqual(event.type, grpc.completionType.READ); - assert.strictEqual(event.data.toString(), req_text); - done(); - }); - }); -}); diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index 1038f9ab33..34e4ab4013 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -33,9 +33,9 @@ var assert = require('assert'); -var surface_server = require('../src/surface_server.js'); +var surface_server = require('../src/server.js'); -var surface_client = require('../src/surface_client.js'); +var surface_client = require('../src/client.js'); var ProtoBuf = require('protobufjs'); -- cgit v1.2.3 From d17d57aa39b6e20fac45b8b961f41eb979dc2d5d Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 12 Feb 2015 13:32:18 -0800 Subject: Removed extra log lines --- src/node/examples/math_server.js | 1 - src/node/interop/interop_server.js | 1 - src/node/src/client.js | 6 ------ src/node/src/server.js | 9 --------- src/node/test/end_to_end_test.js | 1 - 5 files changed, 18 deletions(-) (limited to 'src/node') diff --git a/src/node/examples/math_server.js b/src/node/examples/math_server.js index e010445389..e1bd11b5a6 100644 --- a/src/node/examples/math_server.js +++ b/src/node/examples/math_server.js @@ -69,7 +69,6 @@ function mathDiv(call, cb) { * @param {stream} stream The stream for sending responses. */ function mathFib(stream) { - console.log(stream); // Here, call is a standard writable Node object Stream var previous = 0, current = 1; for (var i = 0; i < stream.request.limit; i++) { diff --git a/src/node/interop/interop_server.js b/src/node/interop/interop_server.js index 271fe5962e..54e9715d1e 100644 --- a/src/node/interop/interop_server.js +++ b/src/node/interop/interop_server.js @@ -106,7 +106,6 @@ function handleStreamingOutput(call) { testProto.PayloadType.COMPRESSABLE, testProto.PayloadType.UNCOMPRESSABLE][Math.random() < 0.5 ? 0 : 1]; } - console.log('req:', req); _.each(req.response_parameters, function(resp_param) { call.write({ payload: { diff --git a/src/node/src/client.js b/src/node/src/client.js index 36a6f41d7b..4b7eda324e 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -56,7 +56,6 @@ function ClientWritableStream(call, serialize) { this.call = call; this.serialize = common.wrapIgnoreNull(serialize); this.on('finish', function() { - console.log('Send close from client'); var batch = {}; batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; call.startBatch(batch, function() {}); @@ -73,7 +72,6 @@ function ClientWritableStream(call, serialize) { function _write(chunk, encoding, callback) { var batch = {}; batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk); - console.log(batch); this.call.startBatch(batch, function(err, event) { if (err) { throw err; @@ -153,7 +151,6 @@ function ClientDuplexStream(call, serialize, deserialize) { var reading = false; this.call = call; this.on('finish', function() { - console.log('Send close from client'); var batch = {}; batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; call.startBatch(batch, function() {}); @@ -279,7 +276,6 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { callback(err); return; } - console.log(response); if (response.status.code != grpc.status.OK) { callback(response.status); return; @@ -323,7 +319,6 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { } var stream = new ClientReadableStream(call, deserialize); var start_batch = {}; - console.log('Starting server streaming request on', method); start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; start_batch[grpc.opType.SEND_MESSAGE] = serialize(argument); @@ -332,7 +327,6 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { if (err) { throw err; } - console.log(response); stream.emit('metadata', response.metadata); }); var status_batch = {}; diff --git a/src/node/src/server.js b/src/node/src/server.js index 3c21491319..82d521ddf1 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -95,7 +95,6 @@ function setUpWritable(stream, serialize) { }; stream.serialize = common.wrapIgnoreNull(serialize); function sendStatus() { - console.log('Server sending status'); var batch = {}; batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status; stream.call.startBatch(batch, function(){}); @@ -169,7 +168,6 @@ function ServerWritableStream(call, serialize) { function _write(chunk, encoding, callback) { var batch = {}; batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk); - console.log('Server writing', batch); this.call.startBatch(batch, function(err, value) { if (err) { this.emit('error', err); @@ -207,14 +205,11 @@ function _read(size) { return; } if (self.finished) { - console.log('Pushing null'); self.push(null); return; } var data = event.read; - console.log(data); if (self.push(self.deserialize(data)) && data != null) { - console.log('Reading again'); var read_batch = {}; read_batch[grpc.opType.RECV_MESSAGE] = true; self.call.startBatch(read_batch, readCallback); @@ -276,7 +271,6 @@ function handleUnary(call, handler, metadata) { } function handleServerStreaming(call, handler, metadata) { - console.log('Handling server streaming call'); var stream = new ServerWritableStream(call, handler.serialize); waitForCancel(call, stream); var batch = {}; @@ -360,7 +354,6 @@ function Server(getMetadata, options) { * @param {grpc.Event} event The event to handle with tag SERVER_RPC_NEW */ function handleNewCall(err, event) { - console.log('Handling new call'); if (err) { return; } @@ -376,9 +369,7 @@ function Server(getMetadata, options) { var deadline = details.deadline; if (handlers.hasOwnProperty(method)) { handler = handlers[method]; - console.log(handler); } else { - console.log(handlers); var batch = {}; batch[grpc.opType.SEND_INITIAL_METADATA] = {}; batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js index 34ce2500f6..f8899beae8 100644 --- a/src/node/test/end_to_end_test.js +++ b/src/node/test/end_to_end_test.js @@ -148,7 +148,6 @@ describe('end-to-end', function() { }); server.requestCall(function(err, call_details) { - console.log("Server received new call"); var new_call = call_details['new call']; assert.notEqual(new_call, null); assert.strictEqual(new_call.metadata.client_key[0].toString(), -- cgit v1.2.3 From c55ee616b58ce06916661ddaf585711a987a2da2 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 12 Feb 2015 13:58:24 -0800 Subject: Last test now passes --- src/node/test/call_test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/node') diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js index 1cbfc2280c..c1a7e95fa0 100644 --- a/src/node/test/call_test.js +++ b/src/node/test/call_test.js @@ -111,7 +111,7 @@ describe('call', function() { call.startBatch(null, function(){}); }); }); - it.skip('should succeed with an empty object', function(done) { + it('should succeed with an empty object', function(done) { var call = new grpc.Call(channel, 'method', getDeadline(1)); assert.doesNotThrow(function() { call.startBatch({}, function(err) { -- cgit v1.2.3 From 44dd2f33679e1b8a35d7c3f37a719279988419a7 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 12 Feb 2015 15:48:51 -0800 Subject: Removed extraneous log messages --- src/node/ext/call.cc | 6 ------ src/node/ext/completion_queue_async_worker.cc | 2 -- src/node/ext/credentials.cc | 1 - src/node/ext/server.cc | 1 - src/node/ext/server_credentials.cc | 1 - 5 files changed, 11 deletions(-) (limited to 'src/node') diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index cdc34b52a7..4401698b1e 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -98,9 +98,6 @@ bool CreateMetadataArray( string_handles->push_back(unique_ptr(utf8_key)); Handle values = Local::Cast(metadata->Get(current_key)); for (unsigned int j = 0; j < values->Length(); j++) { - if (array->count >= array->capacity) { - gpr_log(GPR_ERROR, "Metadata array grew past capacity"); - } Handle value = values->Get(j); grpc_metadata *current = &array->metadata[array->count]; current->key = **utf8_key; @@ -447,11 +444,9 @@ void DestroyTag(void *tag) { } Call::Call(grpc_call *call) : wrapped_call(call) { - gpr_log(GPR_DEBUG, "Constructing call, this: %p, pointer: %p", this, call); } Call::~Call() { - gpr_log(GPR_DEBUG, "Destructing call, this: %p, pointer: %p", this, wrapped_call); grpc_call_destroy(wrapped_call); } @@ -483,7 +478,6 @@ Handle Call::WrapStruct(grpc_call *call) { if (call == NULL) { return NanEscapeScope(NanNull()); } - gpr_log(GPR_DEBUG, "Wrapping call: %p", call); const int argc = 1; Handle argv[argc] = {External::New(reinterpret_cast(call))}; return NanEscapeScope(constructor->NewInstance(argc, argv)); diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc index 3c32b07ca3..a1f390f64b 100644 --- a/src/node/ext/completion_queue_async_worker.cc +++ b/src/node/ext/completion_queue_async_worker.cc @@ -58,7 +58,6 @@ CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {} void CompletionQueueAsyncWorker::Execute() { result = grpc_completion_queue_next(queue, gpr_inf_future); - gpr_log(GPR_DEBUG, "Handling response on call %p", result->call); if (result->data.op_complete != GRPC_OP_OK) { SetErrorMessage("The batch encountered an error"); } @@ -79,7 +78,6 @@ void CompletionQueueAsyncWorker::Init(Handle exports) { void CompletionQueueAsyncWorker::HandleOKCallback() { NanScope(); - gpr_log(GPR_DEBUG, "Handling response on call %p", result->call); NanCallback *callback = GetTagCallback(result->tag); Handle argv[] = {NanNull(), GetTagNodeValue(result->tag)}; diff --git a/src/node/ext/credentials.cc b/src/node/ext/credentials.cc index c8859ed941..b79c3e3019 100644 --- a/src/node/ext/credentials.cc +++ b/src/node/ext/credentials.cc @@ -63,7 +63,6 @@ Credentials::Credentials(grpc_credentials *credentials) : wrapped_credentials(credentials) {} Credentials::~Credentials() { - gpr_log(GPR_DEBUG, "Destroying credentials object"); grpc_credentials_release(wrapped_credentials); } diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc index 93aa9ec44d..51904479a9 100644 --- a/src/node/ext/server.cc +++ b/src/node/ext/server.cc @@ -91,7 +91,6 @@ class NewCallOp : public Op { return NanEscapeScope(NanNull()); } Handle obj = NanNew(); - gpr_log(GPR_DEBUG, "Wrapping server call: %p", call); obj->Set(NanNew("call"), Call::WrapStruct(call)); obj->Set(NanNew("method"), NanNew(details.method)); obj->Set(NanNew("host"), NanNew(details.host)); diff --git a/src/node/ext/server_credentials.cc b/src/node/ext/server_credentials.cc index 393f3a6305..3add43c48c 100644 --- a/src/node/ext/server_credentials.cc +++ b/src/node/ext/server_credentials.cc @@ -63,7 +63,6 @@ ServerCredentials::ServerCredentials(grpc_server_credentials *credentials) : wrapped_credentials(credentials) {} ServerCredentials::~ServerCredentials() { - gpr_log(GPR_DEBUG, "Destroying server credentials object"); grpc_server_credentials_release(wrapped_credentials); } -- cgit v1.2.3 From 17be589de0c41807b14fe47165e165c7d64deb9c Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Fri, 13 Feb 2015 10:19:10 -0800 Subject: Made changes based on comments --- src/node/ext/call.cc | 86 ++++++++++++++++++++------------------------------ src/node/ext/call.h | 15 +++++---- src/node/ext/server.cc | 6 ++-- 3 files changed, 46 insertions(+), 61 deletions(-) (limited to 'src/node') diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 4401698b1e..18f40f2488 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -48,6 +48,7 @@ #include "timeval.h" using std::unique_ptr; +using std::shared_ptr; namespace grpc { namespace node { @@ -76,10 +77,8 @@ Persistent Call::constructor; Persistent Call::fun_tpl; -bool CreateMetadataArray( - Handle metadata, grpc_metadata_array *array, - std::vector > *string_handles, - std::vector > *handles) { +bool CreateMetadataArray(Handle metadata, grpc_metadata_array *array, + shared_ptr resources) { NanScope(); grpc_metadata_array_init(array); Handle keys(metadata->GetOwnPropertyNames()); @@ -95,7 +94,7 @@ bool CreateMetadataArray( for (unsigned int i = 0; i < keys->Length(); i++) { Handle current_key(keys->Get(i)->ToString()); NanUtf8String *utf8_key = new NanUtf8String(current_key); - string_handles->push_back(unique_ptr(utf8_key)); + resources->strings.push_back(unique_ptr(utf8_key)); Handle values = Local::Cast(metadata->Get(current_key)); for (unsigned int j = 0; j < values->Length(); j++) { Handle value = values->Get(j); @@ -106,12 +105,12 @@ bool CreateMetadataArray( current->value_length = Buffer::Length(value); Persistent handle; NanAssignPersistent(handle, value); - handles->push_back(unique_ptr( + resources->handles.push_back(unique_ptr( new PersistentHolder(handle))); } else if (value->IsString()) { Handle string_value = value->ToString(); NanUtf8String *utf8_value = new NanUtf8String(string_value); - string_handles->push_back(unique_ptr(utf8_value)); + resources->strings.push_back(unique_ptr(utf8_value)); current->value = **utf8_value; current->value_length = string_value->Length(); } else { @@ -168,13 +167,12 @@ class SendMetadataOp : public Op { return NanEscapeScope(NanTrue()); } bool ParseOp(Handle value, grpc_op *out, - std::vector > *strings, - std::vector > *handles) { + shared_ptr resources) { if (!value->IsObject()) { return false; } grpc_metadata_array array; - if (!CreateMetadataArray(value->ToObject(), &array, strings, handles)) { + if (!CreateMetadataArray(value->ToObject(), &array, resources)) { return false; } out->data.send_initial_metadata.count = array.count; @@ -194,8 +192,7 @@ class SendMessageOp : public Op { return NanEscapeScope(NanTrue()); } bool ParseOp(Handle value, grpc_op *out, - std::vector > *strings, - std::vector > *handles) { + shared_ptr resources) { if (!Buffer::HasInstance(value)) { return false; } @@ -204,7 +201,7 @@ class SendMessageOp : public Op { Handle temp = NanNew(); NanAssignPersistent(handle, temp); NanAssignPersistent(handle, value); - handles->push_back(unique_ptr( + resources->handles.push_back(unique_ptr( new PersistentHolder(handle))); return true; } @@ -221,8 +218,7 @@ class SendClientCloseOp : public Op { return NanEscapeScope(NanTrue()); } bool ParseOp(Handle value, grpc_op *out, - std::vector > *strings, - std::vector > *handles) { + shared_ptr resources) { return true; } protected: @@ -238,8 +234,7 @@ class SendServerStatusOp : public Op { return NanEscapeScope(NanTrue()); } bool ParseOp(Handle value, grpc_op *out, - std::vector > *strings, - std::vector > *handles) { + shared_ptr resources) { if (!value->IsObject()) { return false; } @@ -256,7 +251,7 @@ class SendServerStatusOp : public Op { grpc_metadata_array array; if (!CreateMetadataArray(server_status->Get(NanNew("metadata"))-> ToObject(), - &array, strings, handles)) { + &array, resources)) { return false; } out->data.send_status_from_server.trailing_metadata_count = array.count; @@ -266,7 +261,7 @@ class SendServerStatusOp : public Op { server_status->Get(NanNew("code"))->Uint32Value()); NanUtf8String *str = new NanUtf8String( server_status->Get(NanNew("details"))); - strings->push_back(unique_ptr(str)); + resources->strings.push_back(unique_ptr(str)); out->data.send_status_from_server.status_details = **str; return true; } @@ -292,8 +287,7 @@ class GetMetadataOp : public Op { } bool ParseOp(Handle value, grpc_op *out, - std::vector > *strings, - std::vector > *handles) { + shared_ptr resources) { out->data.recv_initial_metadata = &recv_metadata; return true; } @@ -323,8 +317,7 @@ class ReadMessageOp : public Op { } bool ParseOp(Handle value, grpc_op *out, - std::vector > *strings, - std::vector > *handles) { + shared_ptr resources) { out->data.recv_message = &recv_message; return true; } @@ -352,8 +345,7 @@ class ClientStatusOp : public Op { } bool ParseOp(Handle value, grpc_op *out, - std::vector > *strings, - std::vector > *handles) { + shared_ptr resources) { out->data.recv_status_on_client.trailing_metadata = &metadata_array; out->data.recv_status_on_client.status = &status; out->data.recv_status_on_client.status_details = &status_details; @@ -390,8 +382,7 @@ class ServerCloseResponseOp : public Op { } bool ParseOp(Handle value, grpc_op *out, - std::vector > *strings, - std::vector > *handles) { + shared_ptr resources) { out->data.recv_close_on_server.cancelled = &cancelled; return true; } @@ -406,19 +397,13 @@ class ServerCloseResponseOp : public Op { }; tag::tag(NanCallback *callback, std::vector > *ops, - std::vector > *handles, - std::vector > *strings) : - callback(callback), ops(ops), handles(handles), strings(strings){ + shared_ptr resources) : + callback(callback), ops(ops), resources(resources){ } + tag::~tag() { delete callback; delete ops; - if (handles != NULL) { - delete handles; - } - if (strings != NULL) { - delete strings; - } } Handle GetTagNodeValue(void *tag) { @@ -542,17 +527,14 @@ NAN_METHOD(Call::StartBatch) { Handle callback_func = args[1].As(); NanCallback *callback = new NanCallback(callback_func); Call *call = ObjectWrap::Unwrap(args.This()); - std::vector > *handles = - new std::vector >(); - std::vector > *strings = - new std::vector >(); + shared_ptr resources(new Resources); Handle obj = args[0]->ToObject(); Handle keys = obj->GetOwnPropertyNames(); size_t nops = keys->Length(); grpc_op *ops = new grpc_op[nops]; std::vector > *op_vector = new std::vector >(); for (unsigned int i = 0; i < nops; i++) { - Op *op; + unique_ptr op; if (!keys->Get(i)->IsUint32()) { return NanThrowError( "startBatch's first argument's keys must be integers"); @@ -561,40 +543,40 @@ NAN_METHOD(Call::StartBatch) { ops[i].op = static_cast(type); switch (type) { case GRPC_OP_SEND_INITIAL_METADATA: - op = new SendMetadataOp(); + op.reset(new SendMetadataOp()); break; case GRPC_OP_SEND_MESSAGE: - op = new SendMessageOp(); + op.reset(new SendMessageOp()); break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: - op = new SendClientCloseOp(); + op.reset(new SendClientCloseOp()); break; case GRPC_OP_SEND_STATUS_FROM_SERVER: - op = new SendServerStatusOp(); + op.reset(new SendServerStatusOp()); break; case GRPC_OP_RECV_INITIAL_METADATA: - op = new GetMetadataOp(); + op.reset(new GetMetadataOp()); break; case GRPC_OP_RECV_MESSAGE: - op = new ReadMessageOp(); + op.reset(new ReadMessageOp()); break; case GRPC_OP_RECV_STATUS_ON_CLIENT: - op = new ClientStatusOp(); + op.reset(new ClientStatusOp()); break; case GRPC_OP_RECV_CLOSE_ON_SERVER: - op = new ServerCloseResponseOp(); + op.reset(new ServerCloseResponseOp()); break; default: return NanThrowError("Argument object had an unrecognized key"); } - if (!op->ParseOp(obj->Get(type), &ops[i], strings, handles)) { + if (!op->ParseOp(obj->Get(type), &ops[i], resources)) { return NanThrowTypeError("Incorrectly typed arguments to startBatch"); } - op_vector->push_back(unique_ptr(op)); + op_vector->push_back(std::move(op)); } grpc_call_error error = grpc_call_start_batch( call->wrapped_call, ops, nops, new struct tag( - callback, op_vector, handles, strings)); + callback, op_vector, resources)); delete ops; if (error != GRPC_CALL_OK) { return NanThrowError("startBatch failed", error); diff --git a/src/node/ext/call.h b/src/node/ext/call.h index f443a04637..4074f1509b 100644 --- a/src/node/ext/call.h +++ b/src/node/ext/call.h @@ -47,6 +47,7 @@ namespace grpc { namespace node { using std::unique_ptr; +using std::shared_ptr; v8::Handle ParseMetadata(const grpc_metadata_array *metadata_array); @@ -64,12 +65,16 @@ class PersistentHolder { v8::Persistent persist; }; +struct Resources { + std::vector > strings; + std::vector > handles; +}; + class Op { public: virtual v8::Handle GetNodeValue() const = 0; virtual bool ParseOp(v8::Handle value, grpc_op *out, - std::vector > *strings, - std::vector > *handles) = 0; + shared_ptr resources) = 0; v8::Handle GetOpType() const; protected: @@ -78,13 +83,11 @@ class Op { struct tag { tag(NanCallback *callback, std::vector > *ops, - std::vector > *handles, - std::vector > *strings); + shared_ptr resources); ~tag(); NanCallback *callback; std::vector > *ops; - std::vector > *handles; - std::vector > *strings; + shared_ptr resources; }; v8::Handle GetTagNodeValue(void *tag); diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc index 51904479a9..6a4a951121 100644 --- a/src/node/ext/server.cc +++ b/src/node/ext/server.cc @@ -101,8 +101,7 @@ class NewCallOp : public Op { } bool ParseOp(Handle value, grpc_op *out, - std::vector > *strings, - std::vector > *handles) { + shared_ptr resources) { return true; } @@ -230,7 +229,8 @@ NAN_METHOD(Server::RequestCall) { grpc_call_error error = grpc_server_request_call( server->wrapped_server, &op->call, &op->details, &op->request_metadata, CompletionQueueAsyncWorker::GetQueue(), - new struct tag(new NanCallback(args[0].As()), ops, NULL, NULL)); + new struct tag(new NanCallback(args[0].As()), ops, + shared_ptr(nullptr))); if (error != GRPC_CALL_OK) { return NanThrowError("requestCall failed", error); } -- cgit v1.2.3 From d66408ba5a23bdaac9a9764cc28ffe13edbfafea Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Fri, 13 Feb 2015 10:40:07 -0800 Subject: Improved memory management --- src/node/ext/call.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'src/node') diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 18f40f2488..4d719802fc 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -531,7 +531,7 @@ NAN_METHOD(Call::StartBatch) { Handle obj = args[0]->ToObject(); Handle keys = obj->GetOwnPropertyNames(); size_t nops = keys->Length(); - grpc_op *ops = new grpc_op[nops]; + std::vector ops(nops); std::vector > *op_vector = new std::vector >(); for (unsigned int i = 0; i < nops; i++) { unique_ptr op; @@ -575,9 +575,8 @@ NAN_METHOD(Call::StartBatch) { op_vector->push_back(std::move(op)); } grpc_call_error error = grpc_call_start_batch( - call->wrapped_call, ops, nops, new struct tag( + call->wrapped_call, &ops[0], nops, new struct tag( callback, op_vector, resources)); - delete ops; if (error != GRPC_CALL_OK) { return NanThrowError("startBatch failed", error); } -- cgit v1.2.3 From 57dfd058510d1a3be0ce315ff51fb9f793befd64 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Fri, 13 Feb 2015 10:41:25 -0800 Subject: Further improved memory management --- src/node/ext/call.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/node') diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 4d719802fc..e6701efbd4 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -525,7 +525,6 @@ NAN_METHOD(Call::StartBatch) { return NanThrowError("startBatch's second argument must be a callback"); } Handle callback_func = args[1].As(); - NanCallback *callback = new NanCallback(callback_func); Call *call = ObjectWrap::Unwrap(args.This()); shared_ptr resources(new Resources); Handle obj = args[0]->ToObject(); @@ -574,6 +573,7 @@ NAN_METHOD(Call::StartBatch) { } op_vector->push_back(std::move(op)); } + NanCallback *callback = new NanCallback(callback_func); grpc_call_error error = grpc_call_start_batch( call->wrapped_call, &ops[0], nops, new struct tag( callback, op_vector, resources)); -- cgit v1.2.3 From e012366e25b0e73fdd9e071798e6e0036eadad5d Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Fri, 13 Feb 2015 11:14:03 -0800 Subject: Improved op_vector memory management --- src/node/ext/call.cc | 11 ++++++----- src/node/ext/call.h | 7 +++++-- 2 files changed, 11 insertions(+), 7 deletions(-) (limited to 'src/node') diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index e6701efbd4..a2333fa426 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -49,6 +49,7 @@ using std::unique_ptr; using std::shared_ptr; +using std::vector; namespace grpc { namespace node { @@ -396,7 +397,7 @@ class ServerCloseResponseOp : public Op { int cancelled; }; -tag::tag(NanCallback *callback, std::vector > *ops, +tag::tag(NanCallback *callback, OpVec *ops, shared_ptr resources) : callback(callback), ops(ops), resources(resources){ } @@ -410,7 +411,7 @@ Handle GetTagNodeValue(void *tag) { NanEscapableScope(); struct tag *tag_struct = reinterpret_cast(tag); Handle tag_obj = NanNew(); - for (std::vector >::iterator it = tag_struct->ops->begin(); + for (vector >::iterator it = tag_struct->ops->begin(); it != tag_struct->ops->end(); ++it) { Op *op_ptr = it->get(); tag_obj->Set(op_ptr->GetOpType(), op_ptr->GetNodeValue()); @@ -530,8 +531,8 @@ NAN_METHOD(Call::StartBatch) { Handle obj = args[0]->ToObject(); Handle keys = obj->GetOwnPropertyNames(); size_t nops = keys->Length(); - std::vector ops(nops); - std::vector > *op_vector = new std::vector >(); + vector ops(nops); + unique_ptr op_vector(new OpVec()); for (unsigned int i = 0; i < nops; i++) { unique_ptr op; if (!keys->Get(i)->IsUint32()) { @@ -576,7 +577,7 @@ NAN_METHOD(Call::StartBatch) { NanCallback *callback = new NanCallback(callback_func); grpc_call_error error = grpc_call_start_batch( call->wrapped_call, &ops[0], nops, new struct tag( - callback, op_vector, resources)); + callback, op_vector.release(), resources)); if (error != GRPC_CALL_OK) { return NanThrowError("startBatch failed", error); } diff --git a/src/node/ext/call.h b/src/node/ext/call.h index 4074f1509b..dbdb8e2ff6 100644 --- a/src/node/ext/call.h +++ b/src/node/ext/call.h @@ -43,6 +43,7 @@ #include "channel.h" + namespace grpc { namespace node { @@ -81,12 +82,14 @@ class Op { virtual std::string GetTypeString() const = 0; }; +typedef std::vector> OpVec; + struct tag { - tag(NanCallback *callback, std::vector > *ops, + tag(NanCallback *callback, OpVec *ops, shared_ptr resources); ~tag(); NanCallback *callback; - std::vector > *ops; + OpVec *ops; shared_ptr resources; }; -- cgit v1.2.3 From d3f9f9f5493710515d0a82b590261294bb966859 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Fri, 13 Feb 2015 12:21:59 -0800 Subject: Updated server.cc to match call.cc changes --- src/node/ext/server.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/node') diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc index 6a4a951121..ee3e1087ce 100644 --- a/src/node/ext/server.cc +++ b/src/node/ext/server.cc @@ -224,12 +224,12 @@ NAN_METHOD(Server::RequestCall) { } Server *server = ObjectWrap::Unwrap(args.This()); NewCallOp *op = new NewCallOp(); - std::vector > *ops = new std::vector >(); + unique_ptr ops(new OpVec()); ops->push_back(unique_ptr(op)); grpc_call_error error = grpc_server_request_call( server->wrapped_server, &op->call, &op->details, &op->request_metadata, CompletionQueueAsyncWorker::GetQueue(), - new struct tag(new NanCallback(args[0].As()), ops, + new struct tag(new NanCallback(args[0].As()), ops.release(), shared_ptr(nullptr))); if (error != GRPC_CALL_OK) { return NanThrowError("requestCall failed", error); -- cgit v1.2.3 From 42683db2d89f30f2458b7689cd6b2be60b0a560b Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Fri, 13 Feb 2015 13:02:47 -0800 Subject: Removed debugging code --- src/node/ext/call.cc | 2 -- 1 file changed, 2 deletions(-) (limited to 'src/node') diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index a2333fa426..9ed389f3bc 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -199,8 +199,6 @@ class SendMessageOp : public Op { } out->data.send_message = BufferToByteBuffer(value); Persistent handle; - Handle temp = NanNew(); - NanAssignPersistent(handle, temp); NanAssignPersistent(handle, value); resources->handles.push_back(unique_ptr( new PersistentHolder(handle))); -- cgit v1.2.3 From b3a4e03b2df42d8480a215f0ea3165514f0ed4f4 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Fri, 13 Feb 2015 14:10:13 -0800 Subject: Version bump --- src/node/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/node') diff --git a/src/node/package.json b/src/node/package.json index 028dc20555..8f81014c1e 100644 --- a/src/node/package.json +++ b/src/node/package.json @@ -1,6 +1,6 @@ { "name": "grpc", - "version": "0.1.0", + "version": "0.2.0", "description": "gRPC Library for Node", "scripts": { "test": "./node_modules/mocha/bin/mocha" -- cgit v1.2.3 From 1d1f952731887521117b863625d60da05a85aec8 Mon Sep 17 00:00:00 2001 From: Nathaniel Manista Date: Sun, 15 Feb 2015 01:21:18 +0000 Subject: The Python interoperability testing server. --- src/csharp/GrpcApi/proto/test.proto | 2 +- src/node/interop/test.proto | 2 +- src/python/interop/interop/__init__.py | 0 src/python/interop/interop/credentials/README | 1 + src/python/interop/interop/credentials/server1.key | 16 + src/python/interop/interop/credentials/server1.pem | 16 + src/python/interop/interop/empty_pb2.py | 60 +++ src/python/interop/interop/messages_pb2.py | 444 +++++++++++++++++++++ src/python/interop/interop/methods.py | 109 +++++ src/python/interop/interop/server.py | 91 +++++ src/python/interop/interop/test_pb2.py | 32 ++ src/python/interop/setup.py | 51 +++ src/python/src/setup.py | 2 +- test/cpp/interop/test.proto | 2 +- 14 files changed, 824 insertions(+), 4 deletions(-) create mode 100644 src/python/interop/interop/__init__.py create mode 100755 src/python/interop/interop/credentials/README create mode 100755 src/python/interop/interop/credentials/server1.key create mode 100755 src/python/interop/interop/credentials/server1.pem create mode 100644 src/python/interop/interop/empty_pb2.py create mode 100644 src/python/interop/interop/messages_pb2.py create mode 100644 src/python/interop/interop/methods.py create mode 100644 src/python/interop/interop/server.py create mode 100644 src/python/interop/interop/test_pb2.py create mode 100644 src/python/interop/setup.py (limited to 'src/node') diff --git a/src/csharp/GrpcApi/proto/test.proto b/src/csharp/GrpcApi/proto/test.proto index 8380ebb31d..996f11aa6d 100644 --- a/src/csharp/GrpcApi/proto/test.proto +++ b/src/csharp/GrpcApi/proto/test.proto @@ -14,7 +14,7 @@ service TestService { rpc EmptyCall(grpc.testing.Empty) returns (grpc.testing.Empty); // One request followed by one response. - // The server returns the client payload as-is. + // TODO(Issue 527): Describe required server behavior. rpc UnaryCall(SimpleRequest) returns (SimpleResponse); // One request followed by a sequence of responses (streamed download). diff --git a/src/node/interop/test.proto b/src/node/interop/test.proto index 8380ebb31d..996f11aa6d 100644 --- a/src/node/interop/test.proto +++ b/src/node/interop/test.proto @@ -14,7 +14,7 @@ service TestService { rpc EmptyCall(grpc.testing.Empty) returns (grpc.testing.Empty); // One request followed by one response. - // The server returns the client payload as-is. + // TODO(Issue 527): Describe required server behavior. rpc UnaryCall(SimpleRequest) returns (SimpleResponse); // One request followed by a sequence of responses (streamed download). diff --git a/src/python/interop/interop/__init__.py b/src/python/interop/interop/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/python/interop/interop/credentials/README b/src/python/interop/interop/credentials/README new file mode 100755 index 0000000000..cb20dcb49f --- /dev/null +++ b/src/python/interop/interop/credentials/README @@ -0,0 +1 @@ +These are test keys *NOT* to be used in production. diff --git a/src/python/interop/interop/credentials/server1.key b/src/python/interop/interop/credentials/server1.key new file mode 100755 index 0000000000..143a5b8765 --- /dev/null +++ b/src/python/interop/interop/credentials/server1.key @@ -0,0 +1,16 @@ +-----BEGIN PRIVATE KEY----- +MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAOHDFScoLCVJpYDD +M4HYtIdV6Ake/sMNaaKdODjDMsux/4tDydlumN+fm+AjPEK5GHhGn1BgzkWF+slf +3BxhrA/8dNsnunstVA7ZBgA/5qQxMfGAq4wHNVX77fBZOgp9VlSMVfyd9N8YwbBY +AckOeUQadTi2X1S6OgJXgQ0m3MWhAgMBAAECgYAn7qGnM2vbjJNBm0VZCkOkTIWm +V10okw7EPJrdL2mkre9NasghNXbE1y5zDshx5Nt3KsazKOxTT8d0Jwh/3KbaN+YY +tTCbKGW0pXDRBhwUHRcuRzScjli8Rih5UOCiZkhefUTcRb6xIhZJuQy71tjaSy0p +dHZRmYyBYO2YEQ8xoQJBAPrJPhMBkzmEYFtyIEqAxQ/o/A6E+E4w8i+KM7nQCK7q +K4JXzyXVAjLfyBZWHGM2uro/fjqPggGD6QH1qXCkI4MCQQDmdKeb2TrKRh5BY1LR +81aJGKcJ2XbcDu6wMZK4oqWbTX2KiYn9GB0woM6nSr/Y6iy1u145YzYxEV/iMwff +DJULAkB8B2MnyzOg0pNFJqBJuH29bKCcHa8gHJzqXhNO5lAlEbMK95p/P2Wi+4Hd +aiEIAF1BF326QJcvYKmwSmrORp85AkAlSNxRJ50OWrfMZnBgzVjDx3xG6KsFQVk2 +ol6VhqL6dFgKUORFUWBvnKSyhjJxurlPEahV6oo6+A+mPhFY8eUvAkAZQyTdupP3 +XEFQKctGz+9+gKkemDp7LBBMEMBXrGTLPhpEfcjv/7KPdnFHYmhYeBTBnuVmTVWe +F98XJ7tIFfJq +-----END PRIVATE KEY----- diff --git a/src/python/interop/interop/credentials/server1.pem b/src/python/interop/interop/credentials/server1.pem new file mode 100755 index 0000000000..8e582e571f --- /dev/null +++ b/src/python/interop/interop/credentials/server1.pem @@ -0,0 +1,16 @@ +-----BEGIN CERTIFICATE----- +MIICmzCCAgSgAwIBAgIBAzANBgkqhkiG9w0BAQUFADBWMQswCQYDVQQGEwJBVTET +MBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0cyBQ +dHkgTHRkMQ8wDQYDVQQDDAZ0ZXN0Y2EwHhcNMTQwNzIyMDYwMDU3WhcNMjQwNzE5 +MDYwMDU3WjBkMQswCQYDVQQGEwJVUzERMA8GA1UECBMISWxsaW5vaXMxEDAOBgNV +BAcTB0NoaWNhZ28xFDASBgNVBAoTC0dvb2dsZSBJbmMuMRowGAYDVQQDFBEqLnRl +c3QuZ29vZ2xlLmNvbTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA4cMVJygs +JUmlgMMzgdi0h1XoCR7+ww1pop04OMMyy7H/i0PJ2W6Y35+b4CM8QrkYeEafUGDO +RYX6yV/cHGGsD/x02ye6ey1UDtkGAD/mpDEx8YCrjAc1Vfvt8Fk6Cn1WVIxV/J30 +3xjBsFgByQ55RBp1OLZfVLo6AleBDSbcxaECAwEAAaNrMGkwCQYDVR0TBAIwADAL +BgNVHQ8EBAMCBeAwTwYDVR0RBEgwRoIQKi50ZXN0Lmdvb2dsZS5mcoIYd2F0ZXJ6 +b29pLnRlc3QuZ29vZ2xlLmJlghIqLnRlc3QueW91dHViZS5jb22HBMCoAQMwDQYJ +KoZIhvcNAQEFBQADgYEAM2Ii0LgTGbJ1j4oqX9bxVcxm+/R5Yf8oi0aZqTJlnLYS +wXcBykxTx181s7WyfJ49WwrYXo78zTDAnf1ma0fPq3e4mpspvyndLh1a+OarHa1e +aT0DIIYk7qeEa1YcVljx2KyLd0r1BBAfrwyGaEPVeJQVYWaOJRU2we/KD4ojf9s= +-----END CERTIFICATE----- diff --git a/src/python/interop/interop/empty_pb2.py b/src/python/interop/interop/empty_pb2.py new file mode 100644 index 0000000000..753341c7da --- /dev/null +++ b/src/python/interop/interop/empty_pb2.py @@ -0,0 +1,60 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: test/cpp/interop/empty.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='test/cpp/interop/empty.proto', + package='grpc.testing', + serialized_pb=_b('\n\x1ctest/cpp/interop/empty.proto\x12\x0cgrpc.testing\"\x07\n\x05\x45mpty') +) +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + + + + +_EMPTY = _descriptor.Descriptor( + name='Empty', + full_name='grpc.testing.Empty', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=46, + serialized_end=53, +) + +DESCRIPTOR.message_types_by_name['Empty'] = _EMPTY + +Empty = _reflection.GeneratedProtocolMessageType('Empty', (_message.Message,), dict( + DESCRIPTOR = _EMPTY, + __module__ = 'test.cpp.interop.empty_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.Empty) + )) +_sym_db.RegisterMessage(Empty) + + +# @@protoc_insertion_point(module_scope) diff --git a/src/python/interop/interop/messages_pb2.py b/src/python/interop/interop/messages_pb2.py new file mode 100644 index 0000000000..79270cdf12 --- /dev/null +++ b/src/python/interop/interop/messages_pb2.py @@ -0,0 +1,444 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: test/cpp/interop/messages.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf.internal import enum_type_wrapper +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='test/cpp/interop/messages.proto', + package='grpc.testing', + serialized_pb=_b('\n\x1ftest/cpp/interop/messages.proto\x12\x0cgrpc.testing\"@\n\x07Payload\x12\'\n\x04type\x18\x01 \x01(\x0e\x32\x19.grpc.testing.PayloadType\x12\x0c\n\x04\x62ody\x18\x02 \x01(\x0c\"\xb1\x01\n\rSimpleRequest\x12\x30\n\rresponse_type\x18\x01 \x01(\x0e\x32\x19.grpc.testing.PayloadType\x12\x15\n\rresponse_size\x18\x02 \x01(\x05\x12&\n\x07payload\x18\x03 \x01(\x0b\x32\x15.grpc.testing.Payload\x12\x15\n\rfill_username\x18\x04 \x01(\x08\x12\x18\n\x10\x66ill_oauth_scope\x18\x05 \x01(\x08\"_\n\x0eSimpleResponse\x12&\n\x07payload\x18\x01 \x01(\x0b\x32\x15.grpc.testing.Payload\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x13\n\x0boauth_scope\x18\x03 \x01(\t\"C\n\x19StreamingInputCallRequest\x12&\n\x07payload\x18\x01 \x01(\x0b\x32\x15.grpc.testing.Payload\"=\n\x1aStreamingInputCallResponse\x12\x1f\n\x17\x61ggregated_payload_size\x18\x01 \x01(\x05\"7\n\x12ResponseParameters\x12\x0c\n\x04size\x18\x01 \x01(\x05\x12\x13\n\x0binterval_us\x18\x02 \x01(\x05\"\xb5\x01\n\x1aStreamingOutputCallRequest\x12\x30\n\rresponse_type\x18\x01 \x01(\x0e\x32\x19.grpc.testing.PayloadType\x12=\n\x13response_parameters\x18\x02 \x03(\x0b\x32 .grpc.testing.ResponseParameters\x12&\n\x07payload\x18\x03 \x01(\x0b\x32\x15.grpc.testing.Payload\"E\n\x1bStreamingOutputCallResponse\x12&\n\x07payload\x18\x01 \x01(\x0b\x32\x15.grpc.testing.Payload*?\n\x0bPayloadType\x12\x10\n\x0c\x43OMPRESSABLE\x10\x00\x12\x12\n\x0eUNCOMPRESSABLE\x10\x01\x12\n\n\x06RANDOM\x10\x02') +) +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +_PAYLOADTYPE = _descriptor.EnumDescriptor( + name='PayloadType', + full_name='grpc.testing.PayloadType', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='COMPRESSABLE', index=0, number=0, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='UNCOMPRESSABLE', index=1, number=1, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='RANDOM', index=2, number=2, + options=None, + type=None), + ], + containing_type=None, + options=None, + serialized_start=836, + serialized_end=899, +) +_sym_db.RegisterEnumDescriptor(_PAYLOADTYPE) + +PayloadType = enum_type_wrapper.EnumTypeWrapper(_PAYLOADTYPE) +COMPRESSABLE = 0 +UNCOMPRESSABLE = 1 +RANDOM = 2 + + + +_PAYLOAD = _descriptor.Descriptor( + name='Payload', + full_name='grpc.testing.Payload', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='type', full_name='grpc.testing.Payload.type', index=0, + number=1, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='body', full_name='grpc.testing.Payload.body', index=1, + number=2, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=_b(""), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=49, + serialized_end=113, +) + + +_SIMPLEREQUEST = _descriptor.Descriptor( + name='SimpleRequest', + full_name='grpc.testing.SimpleRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='response_type', full_name='grpc.testing.SimpleRequest.response_type', index=0, + number=1, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='response_size', full_name='grpc.testing.SimpleRequest.response_size', index=1, + number=2, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='payload', full_name='grpc.testing.SimpleRequest.payload', index=2, + number=3, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='fill_username', full_name='grpc.testing.SimpleRequest.fill_username', index=3, + number=4, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='fill_oauth_scope', full_name='grpc.testing.SimpleRequest.fill_oauth_scope', index=4, + number=5, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=116, + serialized_end=293, +) + + +_SIMPLERESPONSE = _descriptor.Descriptor( + name='SimpleResponse', + full_name='grpc.testing.SimpleResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='payload', full_name='grpc.testing.SimpleResponse.payload', index=0, + number=1, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='username', full_name='grpc.testing.SimpleResponse.username', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='oauth_scope', full_name='grpc.testing.SimpleResponse.oauth_scope', index=2, + number=3, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=295, + serialized_end=390, +) + + +_STREAMINGINPUTCALLREQUEST = _descriptor.Descriptor( + name='StreamingInputCallRequest', + full_name='grpc.testing.StreamingInputCallRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='payload', full_name='grpc.testing.StreamingInputCallRequest.payload', index=0, + number=1, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=392, + serialized_end=459, +) + + +_STREAMINGINPUTCALLRESPONSE = _descriptor.Descriptor( + name='StreamingInputCallResponse', + full_name='grpc.testing.StreamingInputCallResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='aggregated_payload_size', full_name='grpc.testing.StreamingInputCallResponse.aggregated_payload_size', index=0, + number=1, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=461, + serialized_end=522, +) + + +_RESPONSEPARAMETERS = _descriptor.Descriptor( + name='ResponseParameters', + full_name='grpc.testing.ResponseParameters', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='size', full_name='grpc.testing.ResponseParameters.size', index=0, + number=1, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='interval_us', full_name='grpc.testing.ResponseParameters.interval_us', index=1, + number=2, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=524, + serialized_end=579, +) + + +_STREAMINGOUTPUTCALLREQUEST = _descriptor.Descriptor( + name='StreamingOutputCallRequest', + full_name='grpc.testing.StreamingOutputCallRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='response_type', full_name='grpc.testing.StreamingOutputCallRequest.response_type', index=0, + number=1, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='response_parameters', full_name='grpc.testing.StreamingOutputCallRequest.response_parameters', index=1, + number=2, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='payload', full_name='grpc.testing.StreamingOutputCallRequest.payload', index=2, + number=3, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=582, + serialized_end=763, +) + + +_STREAMINGOUTPUTCALLRESPONSE = _descriptor.Descriptor( + name='StreamingOutputCallResponse', + full_name='grpc.testing.StreamingOutputCallResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='payload', full_name='grpc.testing.StreamingOutputCallResponse.payload', index=0, + number=1, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=765, + serialized_end=834, +) + +_PAYLOAD.fields_by_name['type'].enum_type = _PAYLOADTYPE +_SIMPLEREQUEST.fields_by_name['response_type'].enum_type = _PAYLOADTYPE +_SIMPLEREQUEST.fields_by_name['payload'].message_type = _PAYLOAD +_SIMPLERESPONSE.fields_by_name['payload'].message_type = _PAYLOAD +_STREAMINGINPUTCALLREQUEST.fields_by_name['payload'].message_type = _PAYLOAD +_STREAMINGOUTPUTCALLREQUEST.fields_by_name['response_type'].enum_type = _PAYLOADTYPE +_STREAMINGOUTPUTCALLREQUEST.fields_by_name['response_parameters'].message_type = _RESPONSEPARAMETERS +_STREAMINGOUTPUTCALLREQUEST.fields_by_name['payload'].message_type = _PAYLOAD +_STREAMINGOUTPUTCALLRESPONSE.fields_by_name['payload'].message_type = _PAYLOAD +DESCRIPTOR.message_types_by_name['Payload'] = _PAYLOAD +DESCRIPTOR.message_types_by_name['SimpleRequest'] = _SIMPLEREQUEST +DESCRIPTOR.message_types_by_name['SimpleResponse'] = _SIMPLERESPONSE +DESCRIPTOR.message_types_by_name['StreamingInputCallRequest'] = _STREAMINGINPUTCALLREQUEST +DESCRIPTOR.message_types_by_name['StreamingInputCallResponse'] = _STREAMINGINPUTCALLRESPONSE +DESCRIPTOR.message_types_by_name['ResponseParameters'] = _RESPONSEPARAMETERS +DESCRIPTOR.message_types_by_name['StreamingOutputCallRequest'] = _STREAMINGOUTPUTCALLREQUEST +DESCRIPTOR.message_types_by_name['StreamingOutputCallResponse'] = _STREAMINGOUTPUTCALLRESPONSE +DESCRIPTOR.enum_types_by_name['PayloadType'] = _PAYLOADTYPE + +Payload = _reflection.GeneratedProtocolMessageType('Payload', (_message.Message,), dict( + DESCRIPTOR = _PAYLOAD, + __module__ = 'test.cpp.interop.messages_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.Payload) + )) +_sym_db.RegisterMessage(Payload) + +SimpleRequest = _reflection.GeneratedProtocolMessageType('SimpleRequest', (_message.Message,), dict( + DESCRIPTOR = _SIMPLEREQUEST, + __module__ = 'test.cpp.interop.messages_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.SimpleRequest) + )) +_sym_db.RegisterMessage(SimpleRequest) + +SimpleResponse = _reflection.GeneratedProtocolMessageType('SimpleResponse', (_message.Message,), dict( + DESCRIPTOR = _SIMPLERESPONSE, + __module__ = 'test.cpp.interop.messages_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.SimpleResponse) + )) +_sym_db.RegisterMessage(SimpleResponse) + +StreamingInputCallRequest = _reflection.GeneratedProtocolMessageType('StreamingInputCallRequest', (_message.Message,), dict( + DESCRIPTOR = _STREAMINGINPUTCALLREQUEST, + __module__ = 'test.cpp.interop.messages_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.StreamingInputCallRequest) + )) +_sym_db.RegisterMessage(StreamingInputCallRequest) + +StreamingInputCallResponse = _reflection.GeneratedProtocolMessageType('StreamingInputCallResponse', (_message.Message,), dict( + DESCRIPTOR = _STREAMINGINPUTCALLRESPONSE, + __module__ = 'test.cpp.interop.messages_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.StreamingInputCallResponse) + )) +_sym_db.RegisterMessage(StreamingInputCallResponse) + +ResponseParameters = _reflection.GeneratedProtocolMessageType('ResponseParameters', (_message.Message,), dict( + DESCRIPTOR = _RESPONSEPARAMETERS, + __module__ = 'test.cpp.interop.messages_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.ResponseParameters) + )) +_sym_db.RegisterMessage(ResponseParameters) + +StreamingOutputCallRequest = _reflection.GeneratedProtocolMessageType('StreamingOutputCallRequest', (_message.Message,), dict( + DESCRIPTOR = _STREAMINGOUTPUTCALLREQUEST, + __module__ = 'test.cpp.interop.messages_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.StreamingOutputCallRequest) + )) +_sym_db.RegisterMessage(StreamingOutputCallRequest) + +StreamingOutputCallResponse = _reflection.GeneratedProtocolMessageType('StreamingOutputCallResponse', (_message.Message,), dict( + DESCRIPTOR = _STREAMINGOUTPUTCALLRESPONSE, + __module__ = 'test.cpp.interop.messages_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.StreamingOutputCallResponse) + )) +_sym_db.RegisterMessage(StreamingOutputCallResponse) + + +# @@protoc_insertion_point(module_scope) diff --git a/src/python/interop/interop/methods.py b/src/python/interop/interop/methods.py new file mode 100644 index 0000000000..e5ce5902ca --- /dev/null +++ b/src/python/interop/interop/methods.py @@ -0,0 +1,109 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Implementations of interoperability test methods.""" + +from grpc_early_adopter import utilities + +from interop import empty_pb2 +from interop import messages_pb2 + +def _empty_call(request): + return empty_pb2.Empty() + +EMPTY_CALL = utilities.unary_unary_rpc_method( + _empty_call, empty_pb2.Empty.SerializeToString, empty_pb2.Empty.FromString, + empty_pb2.Empty.SerializeToString, empty_pb2.Empty.FromString) + + +def _unary_call(request): + return messages_pb2.SimpleResponse( + payload=messages_pb2.Payload( + type=messages_pb2.COMPRESSABLE, + body=b'\x00' * request.response_size)) + +UNARY_CALL = utilities.unary_unary_rpc_method( + _unary_call, messages_pb2.SimpleRequest.SerializeToString, + messages_pb2.SimpleRequest.FromString, + messages_pb2.SimpleResponse.SerializeToString, + messages_pb2.SimpleResponse.FromString) + + +def _streaming_output_call(request): + for response_parameters in request.response_parameters: + yield messages_pb2.StreamingOutputCallResponse( + payload=messages_pb2.Payload( + type=request.response_type, + body=b'\x00' * response_parameters.size)) + +STREAMING_OUTPUT_CALL = utilities.unary_stream_rpc_method( + _streaming_output_call, + messages_pb2.StreamingOutputCallRequest.SerializeToString, + messages_pb2.StreamingOutputCallRequest.FromString, + messages_pb2.StreamingOutputCallResponse.SerializeToString, + messages_pb2.StreamingOutputCallResponse.FromString) + + +def _streaming_input_call(request_iterator): + aggregate_size = 0 + for request in request_iterator: + if request.payload and request.payload.body: + aggregate_size += len(request.payload.body) + return messages_pb2.StreamingInputCallResponse( + aggregated_payload_size=aggregate_size) + +STREAMING_INPUT_CALL = utilities.stream_unary_rpc_method( + _streaming_input_call, + messages_pb2.StreamingInputCallRequest.SerializeToString, + messages_pb2.StreamingInputCallRequest.FromString, + messages_pb2.StreamingInputCallResponse.SerializeToString, + messages_pb2.StreamingInputCallResponse.FromString) + + +def _full_duplex_call(request_iterator): + for request in request_iterator: + yield messages_pb2.StreamingOutputCallResponse( + payload=messages_pb2.Payload( + type=request.payload.type, + body=b'\x00' * request.response_parameters[0].size)) + +FULL_DUPLEX_CALL = utilities.stream_stream_rpc_method( + _full_duplex_call, + messages_pb2.StreamingOutputCallRequest.SerializeToString, + messages_pb2.StreamingOutputCallRequest.FromString, + messages_pb2.StreamingOutputCallResponse.SerializeToString, + messages_pb2.StreamingOutputCallResponse.FromString) + +# NOTE(nathaniel): Apparently this is the same as the full-duplex call? +HALF_DUPLEX_CALL = utilities.stream_stream_rpc_method( + _full_duplex_call, + messages_pb2.StreamingOutputCallRequest.SerializeToString, + messages_pb2.StreamingOutputCallRequest.FromString, + messages_pb2.StreamingOutputCallResponse.SerializeToString, + messages_pb2.StreamingOutputCallResponse.FromString) diff --git a/src/python/interop/interop/server.py b/src/python/interop/interop/server.py new file mode 100644 index 0000000000..404c87dd0a --- /dev/null +++ b/src/python/interop/interop/server.py @@ -0,0 +1,91 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""The Python implementation of the GRPC interoperability test server.""" + +import argparse +import logging +import pkg_resources +import time + +from grpc_early_adopter import implementations + +from interop import methods + +_ONE_DAY_IN_SECONDS = 60 * 60 * 24 + +_PRIVATE_KEY_RESOURCE_PATH = 'credentials/server1.key' +_CERTIFICATE_CHAIN_RESOURCE_PATH = 'credentials/server1.pem' + +_METHODS = { + '/grpc.testing.TestService/EmptyCall': methods.EMPTY_CALL, + '/grpc.testing.TestService/UnaryCall': methods.UNARY_CALL, + '/grpc.testing.TestService/StreamingOutputCall': + methods.STREAMING_OUTPUT_CALL, + '/grpc.testing.TestService/StreamingInputCall': + methods.STREAMING_INPUT_CALL, + '/grpc.testing.TestService/FullDuplexCall': + methods.FULL_DUPLEX_CALL, + '/grpc.testing.TestService/HalfDuplexCall': + methods.HALF_DUPLEX_CALL, +} + + +def serve(): + parser = argparse.ArgumentParser() + parser.add_argument( + '--port', help='the port on which to serve', type=int) + parser.add_argument( + '--use_tls', help='require a secure connection', dest='use_tls', + action='store_true') + args = parser.parse_args() + + if args.use_tls: + private_key = pkg_resources.resource_string( + __name__, _PRIVATE_KEY_RESOURCE_PATH) + certificate_chain = pkg_resources.resource_string( + __name__, _CERTIFICATE_CHAIN_RESOURCE_PATH) + server = implementations.secure_server( + _METHODS, args.port, private_key, certificate_chain) + else: + server = implementations.insecure_server( + _METHODS, args.port) + + server.start() + logging.info('Server serving.') + try: + while True: + time.sleep(_ONE_DAY_IN_SECONDS) + except BaseException as e: + logging.info('Caught exception "%s"; stopping server...', e) + server.stop() + logging.info('Server stopped; exiting.') + +if __name__ == '__main__': + serve() diff --git a/src/python/interop/interop/test_pb2.py b/src/python/interop/interop/test_pb2.py new file mode 100644 index 0000000000..1241453159 --- /dev/null +++ b/src/python/interop/interop/test_pb2.py @@ -0,0 +1,32 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: test/cpp/interop/test.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from test.cpp.interop import empty_pb2 as test_dot_cpp_dot_interop_dot_empty__pb2 +from test.cpp.interop import messages_pb2 as test_dot_cpp_dot_interop_dot_messages__pb2 + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='test/cpp/interop/test.proto', + package='grpc.testing', + serialized_pb=_b('\n\x1btest/cpp/interop/test.proto\x12\x0cgrpc.testing\x1a\x1ctest/cpp/interop/empty.proto\x1a\x1ftest/cpp/interop/messages.proto2\xbb\x04\n\x0bTestService\x12\x35\n\tEmptyCall\x12\x13.grpc.testing.Empty\x1a\x13.grpc.testing.Empty\x12\x46\n\tUnaryCall\x12\x1b.grpc.testing.SimpleRequest\x1a\x1c.grpc.testing.SimpleResponse\x12l\n\x13StreamingOutputCall\x12(.grpc.testing.StreamingOutputCallRequest\x1a).grpc.testing.StreamingOutputCallResponse0\x01\x12i\n\x12StreamingInputCall\x12\'.grpc.testing.StreamingInputCallRequest\x1a(.grpc.testing.StreamingInputCallResponse(\x01\x12i\n\x0e\x46ullDuplexCall\x12(.grpc.testing.StreamingOutputCallRequest\x1a).grpc.testing.StreamingOutputCallResponse(\x01\x30\x01\x12i\n\x0eHalfDuplexCall\x12(.grpc.testing.StreamingOutputCallRequest\x1a).grpc.testing.StreamingOutputCallResponse(\x01\x30\x01') + , + dependencies=[test_dot_cpp_dot_interop_dot_empty__pb2.DESCRIPTOR,test_dot_cpp_dot_interop_dot_messages__pb2.DESCRIPTOR,]) +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + + + + + +# @@protoc_insertion_point(module_scope) diff --git a/src/python/interop/setup.py b/src/python/interop/setup.py new file mode 100644 index 0000000000..4b7709f234 --- /dev/null +++ b/src/python/interop/setup.py @@ -0,0 +1,51 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""A setup module for the GRPC Python interop testing package.""" + +from distutils import core as _core + +_PACKAGES = ( + 'interop', +) + +_PACKAGE_DIRECTORIES = { + 'interop': 'interop', +} + +_PACKAGE_DATA = { + 'interop': ['credentials/server1.key', 'credentials/server1.pem',] +} + +_INSTALL_REQUIRES = ['grpc-2015>=0.0.1'] + +_core.setup( + name='interop', version='0.0.1', packages=_PACKAGES, + package_dir=_PACKAGE_DIRECTORIES, package_data=_PACKAGE_DATA, + install_requires=_INSTALL_REQUIRES) diff --git a/src/python/src/setup.py b/src/python/src/setup.py index be3724026d..93af4d68ca 100644 --- a/src/python/src/setup.py +++ b/src/python/src/setup.py @@ -77,6 +77,6 @@ _PACKAGE_DIRECTORIES = { } _core.setup( - name='grpc', version='0.0.1', + name='grpc-2015', version='0.0.1', ext_modules=[_EXTENSION_MODULE], packages=_PACKAGES, package_dir=_PACKAGE_DIRECTORIES) diff --git a/test/cpp/interop/test.proto b/test/cpp/interop/test.proto index e358f3bea5..1162ad6124 100644 --- a/test/cpp/interop/test.proto +++ b/test/cpp/interop/test.proto @@ -14,7 +14,7 @@ service TestService { rpc EmptyCall(grpc.testing.Empty) returns (grpc.testing.Empty); // One request followed by one response. - // The server returns the client payload as-is. + // TODO(Issue 527): Describe required server behavior. rpc UnaryCall(SimpleRequest) returns (SimpleResponse); // One request followed by a sequence of responses (streamed download). -- cgit v1.2.3 From ffbcaaf18ecc4aabdc3938c8ccaccc554c35da60 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Tue, 17 Feb 2015 16:23:06 -0800 Subject: Added missing documentation --- src/node/src/client.js | 25 +++++++++++-- src/node/src/server.js | 97 ++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 117 insertions(+), 5 deletions(-) (limited to 'src/node') diff --git a/src/node/src/client.js b/src/node/src/client.js index 4b7eda324e..81fa65eb26 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -51,6 +51,13 @@ var util = require('util'); util.inherits(ClientWritableStream, Writable); +/** + * A stream that the client can write to. Used for calls that are streaming from + * the client side. + * @constructor + * @param {grpc.Call} call The call object to send data with + * @param {function(*):Buffer=} serialize Serialization function for writes. + */ function ClientWritableStream(call, serialize) { Writable.call(this, {objectMode: true}); this.call = call; @@ -84,6 +91,13 @@ ClientWritableStream.prototype._write = _write; util.inherits(ClientReadableStream, Readable); +/** + * A stream that the client can read from. Used for calls that are streaming + * from the server side. + * @constructor + * @param {grpc.Call} call The call object to read data with + * @param {function(Buffer):*=} deserialize Deserialization function for reads + */ function ClientReadableStream(call, deserialize) { Readable.call(this, {objectMode: true}); this.call = call; @@ -92,6 +106,10 @@ function ClientReadableStream(call, deserialize) { this.deserialize = common.wrapIgnoreNull(deserialize); } +/** + * Read the next object from the stream. + * @param {*} size Ignored because we use objectMode=true + */ function _read(size) { var self = this; /** @@ -133,8 +151,8 @@ ClientReadableStream.prototype._read = _read; util.inherits(ClientDuplexStream, Duplex); /** - * Class for representing a gRPC client side stream as a Node stream. Extends - * from stream.Duplex. + * A stream that the client can read from or write to. Used for calls with + * duplex streaming. * @constructor * @param {grpc.Call} call Call object to proxy * @param {function(*):Buffer=} serialize Serialization function for requests @@ -160,6 +178,9 @@ function ClientDuplexStream(call, serialize, deserialize) { ClientDuplexStream.prototype._read = _read; ClientDuplexStream.prototype._write = _write; +/** + * Cancel the ongoing call + */ function cancel() { this.call.cancel(); } diff --git a/src/node/src/server.js b/src/node/src/server.js index 82d521ddf1..48c349ef99 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -51,16 +51,38 @@ var EventEmitter = require('events').EventEmitter; var common = require('./common.js'); +/** + * Handle an error on a call by sending it as a status + * @param {grpc.Call} call The call to send the error on + * @param {Object} error The error object + */ function handleError(call, error) { - var error_batch = {}; - error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + var status = { code: grpc.status.INTERNAL, details: 'Unknown Error', metadata: {} }; + if (error.hasOwnProperty('message')) { + status.details = error.message; + } + if (error.hasOwnProperty('code')) { + status.code = error.code; + if (error.hasOwnProperty('details')) { + status.details = error.details; + } + } + var error_batch = {}; + error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status; call.startBatch(error_batch, function(){}); } +/** + * Wait for the client to close, then emit a cancelled event if the client + * cancelled. + * @param {grpc.Call} call The call object to wait on + * @param {EventEmitter} emitter The event emitter to emit the cancelled event + * on + */ function waitForCancel(call, emitter) { var cancel_batch = {}; cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; @@ -75,6 +97,13 @@ function waitForCancel(call, emitter) { }); } +/** + * Send a response to a unary or client streaming call. + * @param {grpc.Call} call The call to respond on + * @param {*} value The value to respond with + * @param {function(*):Buffer=} serialize Serialization function for the + * response + */ function sendUnaryResponse(call, value, serialize) { var end_batch = {}; end_batch[grpc.opType.SEND_MESSAGE] = serialize(value); @@ -86,6 +115,12 @@ function sendUnaryResponse(call, value, serialize) { call.startBatch(end_batch, function (){}); } +/** + * Initialize a writable stream. This is used for both the writable and duplex + * stream constructors. + * @param {Writable} stream The stream to set up + * @param {function(*):Buffer=} Serialization function for responses + */ function setUpWritable(stream, serialize) { stream.finished = false; stream.status = { @@ -109,7 +144,9 @@ function setUpWritable(stream, serialize) { function setStatus(err) { var code = grpc.status.INTERNAL; var details = 'Unknown Error'; - + if (err.hasOwnProperty('message')) { + details = err.message; + } if (err.hasOwnProperty('code')) { code = err.code; if (err.hasOwnProperty('details')) { @@ -132,6 +169,13 @@ function setUpWritable(stream, serialize) { stream.on('error', terminateCall); } +/** + * Initialize a readable stream. This is used for both the readable and duplex + * stream constructors. + * @param {Readable} stream The stream to initialize + * @param {function(Buffer):*=} deserialize Deserialization function for + * incoming data. + */ function setUpReadable(stream, deserialize) { stream.deserialize = common.wrapIgnoreNull(deserialize); stream.finished = false; @@ -149,6 +193,13 @@ function setUpReadable(stream, deserialize) { util.inherits(ServerWritableStream, Writable); +/** + * A stream that the server can write to. Used for calls that are streaming from + * the server side. + * @constructor + * @param {grpc.Call} call The call object to send data with + * @param {function(*):Buffer=} serialize Serialization function for writes + */ function ServerWritableStream(call, serialize) { Writable.call(this, {objectMode: true}); this.call = call; @@ -181,6 +232,13 @@ ServerWritableStream.prototype._write = _write; util.inherits(ServerReadableStream, Readable); +/** + * A stream that the server can read from. Used for calls that are streaming + * from the client side. + * @constructor + * @param {grpc.Call} call The call object to read data with + * @param {function(Buffer):*=} deserialize Deserialization function for reads + */ function ServerReadableStream(call, deserialize) { Readable.call(this, {objectMode: true}); this.call = call; @@ -233,6 +291,15 @@ ServerReadableStream.prototype._read = _read; util.inherits(ServerDuplexStream, Duplex); +/** + * A stream that the server can read from or write to. Used for calls with + * duplex streaming. + * @constructor + * @param {grpc.Call} call Call object to proxy + * @param {function(*):Buffer=} serialize Serialization function for requests + * @param {function(Buffer):*=} deserialize Deserialization function for + * responses + */ function ServerDuplexStream(call, serialize, deserialize) { Duplex.call(this, {objectMode: true}); this.call = call; @@ -243,6 +310,12 @@ function ServerDuplexStream(call, serialize, deserialize) { ServerDuplexStream.prototype._read = _read; ServerDuplexStream.prototype._write = _write; +/** + * Fully handle a unary call + * @param {grpc.Call} call The call to handle + * @param {Object} handler Request handler object for the method that was called + * @param {Object} metadata Metadata from the client + */ function handleUnary(call, handler, metadata) { var emitter = new EventEmitter(); emitter.on('error', function(error) { @@ -270,6 +343,12 @@ function handleUnary(call, handler, metadata) { }); } +/** + * Fully handle a server streaming call + * @param {grpc.Call} call The call to handle + * @param {Object} handler Request handler object for the method that was called + * @param {Object} metadata Metadata from the client + */ function handleServerStreaming(call, handler, metadata) { var stream = new ServerWritableStream(call, handler.serialize); waitForCancel(call, stream); @@ -286,6 +365,12 @@ function handleServerStreaming(call, handler, metadata) { }); } +/** + * Fully handle a client streaming call + * @param {grpc.Call} call The call to handle + * @param {Object} handler Request handler object for the method that was called + * @param {Object} metadata Metadata from the client + */ function handleClientStreaming(call, handler, metadata) { var stream = new ServerReadableStream(call, handler.deserialize); waitForCancel(call, stream); @@ -301,6 +386,12 @@ function handleClientStreaming(call, handler, metadata) { }); } +/** + * Fully handle a bidirectional streaming call + * @param {grpc.Call} call The call to handle + * @param {Object} handler Request handler object for the method that was called + * @param {Object} metadata Metadata from the client + */ function handleBidiStreaming(call, handler, metadata) { var stream = new ServerDuplexStream(call, handler.serialize, handler.deserialize); -- cgit v1.2.3