diff options
author | 2017-04-12 15:16:35 -0700 | |
---|---|---|
committer | 2017-04-12 15:16:35 -0700 | |
commit | b3d308b9a11496722754f4dc23ebb82a9f0e63d2 (patch) | |
tree | d5c4d27375d08a91fdb71b6e84a2bb4558e28b52 /src/node/ext/call.cc | |
parent | c20fa90c6094599540e3a7ef16a5aa381580be33 (diff) | |
parent | 1b76bda4a61a0ed65d5a5de7a6f3363a47871e50 (diff) |
Merge github.com:grpc/grpc into cpparena
Diffstat (limited to 'src/node/ext/call.cc')
-rw-r--r-- | src/node/ext/call.cc | 280 |
1 files changed, 132 insertions, 148 deletions
diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 88bcd6db0b..7f3cbb8ed1 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -31,23 +31,23 @@ * */ +#include <map> #include <memory> #include <vector> -#include <map> #include <node.h> -#include "grpc/support/log.h" -#include "grpc/grpc.h" -#include "grpc/grpc_security.h" -#include "grpc/support/alloc.h" -#include "grpc/support/time.h" #include "byte_buffer.h" #include "call.h" +#include "call_credentials.h" #include "channel.h" #include "completion_queue.h" #include "completion_queue_async_worker.h" -#include "call_credentials.h" +#include "grpc/grpc.h" +#include "grpc/grpc_security.h" +#include "grpc/support/alloc.h" +#include "grpc/support/log.h" +#include "grpc/support/time.h" #include "slice.h" #include "timeval.h" @@ -99,30 +99,31 @@ Local<Value> nanErrorWithCode(const char *msg, grpc_call_error code) { bool CreateMetadataArray(Local<Object> metadata, grpc_metadata_array *array) { HandleScope scope; - grpc_metadata_array_init(array); Local<Array> keys = Nan::GetOwnPropertyNames(metadata).ToLocalChecked(); for (unsigned int i = 0; i < keys->Length(); i++) { - Local<String> current_key = Nan::To<String>( - Nan::Get(keys, i).ToLocalChecked()).ToLocalChecked(); + Local<String> current_key = + Nan::To<String>(Nan::Get(keys, i).ToLocalChecked()).ToLocalChecked(); Local<Value> value_array = Nan::Get(metadata, current_key).ToLocalChecked(); if (!value_array->IsArray()) { return false; } array->capacity += Local<Array>::Cast(value_array)->Length(); } - array->metadata = reinterpret_cast<grpc_metadata*>( - gpr_malloc(array->capacity * sizeof(grpc_metadata))); + array->metadata = reinterpret_cast<grpc_metadata *>( + gpr_zalloc(array->capacity * sizeof(grpc_metadata))); for (unsigned int i = 0; i < keys->Length(); i++) { Local<String> current_key(Nan::To<String>(keys->Get(i)).ToLocalChecked()); - Local<Array> values = Local<Array>::Cast( - Nan::Get(metadata, current_key).ToLocalChecked()); - grpc_slice key_slice = grpc_slice_intern(CreateSliceFromString(current_key)); + Local<Array> values = + Local<Array>::Cast(Nan::Get(metadata, current_key).ToLocalChecked()); + grpc_slice key_slice = CreateSliceFromString(current_key); + grpc_slice key_intern_slice = grpc_slice_intern(key_slice); + grpc_slice_unref(key_slice); for (unsigned int j = 0; j < values->Length(); j++) { Local<Value> value = Nan::Get(values, j).ToLocalChecked(); grpc_metadata *current = &array->metadata[array->count]; - current->key = key_slice; + current->key = key_intern_slice; // Only allow binary headers for "-bin" keys - if (grpc_is_binary_header(key_slice)) { + if (grpc_is_binary_header(key_intern_slice)) { if (::node::Buffer::HasInstance(value)) { current->value = CreateSliceFromBuffer(value); } else { @@ -142,13 +143,21 @@ bool CreateMetadataArray(Local<Object> metadata, grpc_metadata_array *array) { return true; } +void DestroyMetadataArray(grpc_metadata_array *array) { + for (size_t i = 0; i < array->count; i++) { + // Don't unref keys because they are interned + grpc_slice_unref(array->metadata[i].value); + } + grpc_metadata_array_destroy(array); +} + Local<Value> ParseMetadata(const grpc_metadata_array *metadata_array) { EscapableHandleScope scope; grpc_metadata *metadata_elements = metadata_array->metadata; size_t length = metadata_array->count; Local<Object> metadata_object = Nan::New<Object>(); for (unsigned int i = 0; i < length; i++) { - grpc_metadata* elem = &metadata_elements[i]; + grpc_metadata *elem = &metadata_elements[i]; // TODO(murgatroid99): Use zero-copy string construction instead Local<String> key_string = CopyStringFromSlice(elem->key); Local<Array> array; @@ -174,11 +183,12 @@ Local<Value> Op::GetOpType() const { return scope.Escape(Nan::New(GetTypeString()).ToLocalChecked()); } -Op::~Op() { -} +Op::~Op() {} class SendMetadataOp : public Op { public: + SendMetadataOp() { grpc_metadata_array_init(&send_metadata); } + ~SendMetadataOp() { DestroyMetadataArray(&send_metadata); } Local<Value> GetNodeValue() const { EscapableHandleScope scope; return scope.Escape(Nan::True()); @@ -187,33 +197,29 @@ class SendMetadataOp : public Op { if (!value->IsObject()) { return false; } - grpc_metadata_array array; MaybeLocal<Object> maybe_metadata = Nan::To<Object>(value); if (maybe_metadata.IsEmpty()) { return false; } - if (!CreateMetadataArray(maybe_metadata.ToLocalChecked(), - &array)) { + if (!CreateMetadataArray(maybe_metadata.ToLocalChecked(), &send_metadata)) { return false; } - out->data.send_initial_metadata.count = array.count; - out->data.send_initial_metadata.metadata = array.metadata; + out->data.send_initial_metadata.count = send_metadata.count; + out->data.send_initial_metadata.metadata = send_metadata.metadata; return true; } - bool IsFinalOp() { - return false; - } + bool IsFinalOp() { return false; } + protected: - std::string GetTypeString() const { - return "send_metadata"; - } + std::string GetTypeString() const { return "send_metadata"; } + + private: + grpc_metadata_array send_metadata; }; class SendMessageOp : public Op { public: - SendMessageOp() { - send_message = NULL; - } + SendMessageOp() { send_message = NULL; } ~SendMessageOp() { if (send_message != NULL) { grpc_byte_buffer_destroy(send_message); @@ -228,8 +234,8 @@ class SendMessageOp : public Op { return false; } Local<Object> object_value = Nan::To<Object>(value).ToLocalChecked(); - MaybeLocal<Value> maybe_flag_value = Nan::Get( - object_value, Nan::New("grpcWriteFlags").ToLocalChecked()); + MaybeLocal<Value> maybe_flag_value = + Nan::Get(object_value, Nan::New("grpcWriteFlags").ToLocalChecked()); if (!maybe_flag_value.IsEmpty()) { Local<Value> flag_value = maybe_flag_value.ToLocalChecked(); if (flag_value->IsUint32()) { @@ -241,13 +247,11 @@ class SendMessageOp : public Op { out->data.send_message.send_message = send_message; return true; } - bool IsFinalOp() { - return false; - } + bool IsFinalOp() { return false; } + protected: - std::string GetTypeString() const { - return "send_message"; - } + std::string GetTypeString() const { return "send_message"; } + private: grpc_byte_buffer *send_message; }; @@ -258,22 +262,19 @@ class SendClientCloseOp : public Op { EscapableHandleScope scope; return scope.Escape(Nan::True()); } - bool ParseOp(Local<Value> value, grpc_op *out) { - return true; - } - bool IsFinalOp() { - return false; - } + bool ParseOp(Local<Value> value, grpc_op *out) { return true; } + bool IsFinalOp() { return false; } + protected: - std::string GetTypeString() const { - return "client_close"; - } + std::string GetTypeString() const { return "client_close"; } }; class SendServerStatusOp : public Op { public: + SendServerStatusOp() { grpc_metadata_array_init(&status_metadata); } ~SendServerStatusOp() { grpc_slice_unref(details); + DestroyMetadataArray(&status_metadata); } Local<Value> GetNodeValue() const { EscapableHandleScope scope; @@ -284,18 +285,18 @@ class SendServerStatusOp : public Op { return false; } Local<Object> server_status = Nan::To<Object>(value).ToLocalChecked(); - MaybeLocal<Value> maybe_metadata = Nan::Get( - server_status, Nan::New("metadata").ToLocalChecked()); + MaybeLocal<Value> maybe_metadata = + Nan::Get(server_status, Nan::New("metadata").ToLocalChecked()); if (maybe_metadata.IsEmpty()) { return false; } if (!maybe_metadata.ToLocalChecked()->IsObject()) { return false; } - Local<Object> metadata = Nan::To<Object>( - maybe_metadata.ToLocalChecked()).ToLocalChecked(); - MaybeLocal<Value> maybe_code = Nan::Get(server_status, - Nan::New("code").ToLocalChecked()); + Local<Object> metadata = + Nan::To<Object>(maybe_metadata.ToLocalChecked()).ToLocalChecked(); + MaybeLocal<Value> maybe_code = + Nan::Get(server_status, Nan::New("code").ToLocalChecked()); if (maybe_code.IsEmpty()) { return false; } @@ -303,49 +304,44 @@ class SendServerStatusOp : public Op { return false; } uint32_t code = Nan::To<uint32_t>(maybe_code.ToLocalChecked()).FromJust(); - MaybeLocal<Value> maybe_details = Nan::Get( - server_status, Nan::New("details").ToLocalChecked()); + MaybeLocal<Value> maybe_details = + Nan::Get(server_status, Nan::New("details").ToLocalChecked()); if (maybe_details.IsEmpty()) { return false; } if (!maybe_details.ToLocalChecked()->IsString()) { return false; } - Local<String> details = Nan::To<String>( - maybe_details.ToLocalChecked()).ToLocalChecked(); - grpc_metadata_array array; - if (!CreateMetadataArray(metadata, &array)) { + Local<String> details = + Nan::To<String>(maybe_details.ToLocalChecked()).ToLocalChecked(); + if (!CreateMetadataArray(metadata, &status_metadata)) { return false; } - out->data.send_status_from_server.trailing_metadata_count = array.count; - out->data.send_status_from_server.trailing_metadata = array.metadata; + out->data.send_status_from_server.trailing_metadata_count = + status_metadata.count; + out->data.send_status_from_server.trailing_metadata = + status_metadata.metadata; out->data.send_status_from_server.status = static_cast<grpc_status_code>(code); this->details = CreateSliceFromString(details); out->data.send_status_from_server.status_details = &this->details; return true; } - bool IsFinalOp() { - return true; - } + bool IsFinalOp() { return true; } + protected: - std::string GetTypeString() const { - return "send_status"; - } + std::string GetTypeString() const { return "send_status"; } private: grpc_slice details; + grpc_metadata_array status_metadata; }; class GetMetadataOp : public Op { public: - GetMetadataOp() { - grpc_metadata_array_init(&recv_metadata); - } + GetMetadataOp() { grpc_metadata_array_init(&recv_metadata); } - ~GetMetadataOp() { - grpc_metadata_array_destroy(&recv_metadata); - } + ~GetMetadataOp() { grpc_metadata_array_destroy(&recv_metadata); } Local<Value> GetNodeValue() const { EscapableHandleScope scope; @@ -356,14 +352,10 @@ class GetMetadataOp : public Op { out->data.recv_initial_metadata.recv_initial_metadata = &recv_metadata; return true; } - bool IsFinalOp() { - return false; - } + bool IsFinalOp() { return false; } protected: - std::string GetTypeString() const { - return "metadata"; - } + std::string GetTypeString() const { return "metadata"; } private: grpc_metadata_array recv_metadata; @@ -371,9 +363,7 @@ class GetMetadataOp : public Op { class ReadMessageOp : public Op { public: - ReadMessageOp() { - recv_message = NULL; - } + ReadMessageOp() { recv_message = NULL; } ~ReadMessageOp() { if (recv_message != NULL) { grpc_byte_buffer_destroy(recv_message); @@ -388,14 +378,10 @@ class ReadMessageOp : public Op { out->data.recv_message.recv_message = &recv_message; return true; } - bool IsFinalOp() { - return false; - } + bool IsFinalOp() { return false; } protected: - std::string GetTypeString() const { - return "read"; - } + std::string GetTypeString() const { return "read"; } private: grpc_byte_buffer *recv_message; @@ -403,13 +389,9 @@ class ReadMessageOp : public Op { class ClientStatusOp : public Op { public: - ClientStatusOp() { - grpc_metadata_array_init(&metadata_array); - } + ClientStatusOp() { grpc_metadata_array_init(&metadata_array); } - ~ClientStatusOp() { - grpc_metadata_array_destroy(&metadata_array); - } + ~ClientStatusOp() { grpc_metadata_array_destroy(&metadata_array); } bool ParseOp(Local<Value> value, grpc_op *out) { out->data.recv_status_on_client.trailing_metadata = &metadata_array; @@ -422,20 +404,18 @@ class ClientStatusOp : public Op { EscapableHandleScope scope; Local<Object> status_obj = Nan::New<Object>(); Nan::Set(status_obj, Nan::New("code").ToLocalChecked(), - Nan::New<Number>(status)); + Nan::New<Number>(status)); Nan::Set(status_obj, Nan::New("details").ToLocalChecked(), - CopyStringFromSlice(status_details)); + CopyStringFromSlice(status_details)); Nan::Set(status_obj, Nan::New("metadata").ToLocalChecked(), ParseMetadata(&metadata_array)); return scope.Escape(status_obj); } - bool IsFinalOp() { - return true; - } + bool IsFinalOp() { return true; } + protected: - std::string GetTypeString() const { - return "status"; - } + std::string GetTypeString() const { return "status"; } + private: grpc_metadata_array metadata_array; grpc_status_code status; @@ -453,21 +433,19 @@ class ServerCloseResponseOp : public Op { out->data.recv_close_on_server.cancelled = &cancelled; return true; } - bool IsFinalOp() { - return false; - } + bool IsFinalOp() { return false; } protected: - std::string GetTypeString() const { - return "cancelled"; - } + std::string GetTypeString() const { return "cancelled"; } private: int cancelled; }; -tag::tag(Callback *callback, OpVec *ops, Call *call) : - callback(callback), ops(ops), call(call){ +tag::tag(Callback *callback, OpVec *ops, Call *call, Local<Value> call_value) + : callback(callback), ops(ops), call(call) { + HandleScope scope; + call_persist.Reset(call_value); } tag::~tag() { @@ -513,17 +491,18 @@ void DestroyTag(void *tag) { delete tag_struct; } -Call::Call(grpc_call *call) : wrapped_call(call), - pending_batches(0), - has_final_op_completed(false) { -} - -Call::~Call() { - if (wrapped_call != NULL) { - grpc_call_unref(wrapped_call); +void Call::DestroyCall() { + if (this->wrapped_call != NULL) { + grpc_call_unref(this->wrapped_call); + this->wrapped_call = NULL; } } +Call::Call(grpc_call *call) + : wrapped_call(call), pending_batches(0), has_final_op_completed(false) {} + +Call::~Call() { DestroyCall(); } + void Call::Init(Local<Object> exports) { HandleScope scope; Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New); @@ -551,10 +530,10 @@ Local<Value> Call::WrapStruct(grpc_call *call) { return scope.Escape(Nan::Null()); } const int argc = 1; - Local<Value> argv[argc] = {Nan::New<External>( - reinterpret_cast<void *>(call))}; - MaybeLocal<Object> maybe_instance = Nan::NewInstance( - constructor->GetFunction(), argc, argv); + Local<Value> argv[argc] = { + Nan::New<External>(reinterpret_cast<void *>(call))}; + MaybeLocal<Object> maybe_instance = + Nan::NewInstance(constructor->GetFunction(), argc, argv); if (maybe_instance.IsEmpty()) { return scope.Escape(Nan::Null()); } else { @@ -568,19 +547,25 @@ void Call::CompleteBatch(bool is_final_op) { } this->pending_batches--; if (this->has_final_op_completed && this->pending_batches == 0) { - grpc_call_unref(this->wrapped_call); - this->wrapped_call = NULL; + this->DestroyCall(); } } NAN_METHOD(Call::New) { + /* Arguments: + * 0: Channel to make the call on + * 1: Method + * 2: Deadline + * 3: host + * 4: parent Call + * 5: propagation flags + */ if (info.IsConstructCall()) { Call *call; if (info[0]->IsExternal()) { Local<External> ext = info[0].As<External>(); // This option is used for wrapping an existing call - grpc_call *call_value = - reinterpret_cast<grpc_call *>(ext->Value()); + grpc_call *call_value = reinterpret_cast<grpc_call *>(ext->Value()); call = new Call(call_value); } else { if (!Channel::HasInstance(info[0])) { @@ -596,8 +581,8 @@ NAN_METHOD(Call::New) { // These arguments are at the end because they are optional grpc_call *parent_call = NULL; if (Call::HasInstance(info[4])) { - Call *parent_obj = ObjectWrap::Unwrap<Call>( - Nan::To<Object>(info[4]).ToLocalChecked()); + Call *parent_obj = + ObjectWrap::Unwrap<Call>(Nan::To<Object>(info[4]).ToLocalChecked()); parent_call = parent_obj->wrapped_call; } else if (!(info[4]->IsUndefined() || info[4]->IsNull())) { return Nan::ThrowTypeError( @@ -618,25 +603,24 @@ NAN_METHOD(Call::New) { double deadline = Nan::To<double>(info[2]).FromJust(); grpc_channel *wrapped_channel = channel->GetWrappedChannel(); grpc_call *wrapped_call; + grpc_slice method = + CreateSliceFromString(Nan::To<String>(info[1]).ToLocalChecked()); if (info[3]->IsString()) { grpc_slice *host = new grpc_slice; - *host = CreateSliceFromString( - Nan::To<String>(info[3]).ToLocalChecked()); + *host = + CreateSliceFromString(Nan::To<String>(info[3]).ToLocalChecked()); wrapped_call = grpc_channel_create_call( - wrapped_channel, parent_call, propagate_flags, - GetCompletionQueue(), CreateSliceFromString( - Nan::To<String>(info[1]).ToLocalChecked()), - host, MillisecondsToTimespec(deadline), NULL); + wrapped_channel, parent_call, propagate_flags, GetCompletionQueue(), + method, host, MillisecondsToTimespec(deadline), NULL); delete host; } else if (info[3]->IsUndefined() || info[3]->IsNull()) { wrapped_call = grpc_channel_create_call( - wrapped_channel, parent_call, propagate_flags, - GetCompletionQueue(), CreateSliceFromString( - Nan::To<String>(info[1]).ToLocalChecked()), - NULL, MillisecondsToTimespec(deadline), NULL); + wrapped_channel, parent_call, propagate_flags, GetCompletionQueue(), + method, NULL, MillisecondsToTimespec(deadline), NULL); } else { return Nan::ThrowTypeError("Call's fourth argument must be a string"); } + grpc_slice_unref(method); call = new Call(wrapped_call); Nan::Set(info.This(), Nan::New("channel_").ToLocalChecked(), channel_object); @@ -646,8 +630,8 @@ NAN_METHOD(Call::New) { } else { const int argc = 4; Local<Value> argv[argc] = {info[0], info[1], info[2], info[3]}; - MaybeLocal<Object> maybe_instance = Nan::NewInstance( - constructor->GetFunction(), argc, argv); + MaybeLocal<Object> maybe_instance = + Nan::NewInstance(constructor->GetFunction(), argc, argv); if (maybe_instance.IsEmpty()) { // There's probably a pending exception return; @@ -720,8 +704,8 @@ NAN_METHOD(Call::StartBatch) { } Callback *callback = new Callback(callback_func); grpc_call_error error = grpc_call_start_batch( - call->wrapped_call, &ops[0], nops, new struct tag( - callback, op_vector.release(), call), NULL); + call->wrapped_call, &ops[0], nops, + new struct tag(callback, op_vector.release(), call, info.This()), NULL); if (error != GRPC_CALL_OK) { return Nan::ThrowError(nanErrorWithCode("startBatch failed", error)); } @@ -754,8 +738,8 @@ NAN_METHOD(Call::CancelWithStatus) { "cancelWithStatus's second argument must be a string"); } Call *call = ObjectWrap::Unwrap<Call>(info.This()); - grpc_status_code code = static_cast<grpc_status_code>( - Nan::To<uint32_t>(info[0]).FromJust()); + grpc_status_code code = + static_cast<grpc_status_code>(Nan::To<uint32_t>(info[0]).FromJust()); if (code == GRPC_STATUS_OK) { return Nan::ThrowRangeError( "cancelWithStatus cannot be called with OK status"); |