diff options
Diffstat (limited to 'src/node/ext/call.cc')
-rw-r--r-- | src/node/ext/call.cc | 130 |
1 files changed, 81 insertions, 49 deletions
diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 9213d5e87d..191e763e0e 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -48,7 +48,6 @@ #include "completion_queue.h" #include "completion_queue_async_worker.h" #include "call_credentials.h" -#include "slice.h" #include "timeval.h" using std::unique_ptr; @@ -97,7 +96,8 @@ Local<Value> nanErrorWithCode(const char *msg, grpc_call_error code) { return scope.Escape(err); } -bool CreateMetadataArray(Local<Object> metadata, grpc_metadata_array *array) { +bool CreateMetadataArray(Local<Object> metadata, grpc_metadata_array *array, + shared_ptr<Resources> resources) { HandleScope scope; grpc_metadata_array_init(array); Local<Array> keys = Nan::GetOwnPropertyNames(metadata).ToLocalChecked(); @@ -113,25 +113,32 @@ bool CreateMetadataArray(Local<Object> metadata, grpc_metadata_array *array) { array->metadata = reinterpret_cast<grpc_metadata*>( gpr_malloc(array->capacity * sizeof(grpc_metadata))); for (unsigned int i = 0; i < keys->Length(); i++) { - Local<String> current_key(Nan::To<String>(keys->Get(i)).ToLocalChecked()); + Local<String> current_key(keys->Get(i)->ToString()); + Utf8String *utf8_key = new Utf8String(current_key); + resources->strings.push_back(unique_ptr<Utf8String>(utf8_key)); Local<Array> values = Local<Array>::Cast( Nan::Get(metadata, current_key).ToLocalChecked()); - grpc_slice key_slice = grpc_slice_intern(CreateSliceFromString(current_key)); for (unsigned int j = 0; j < values->Length(); j++) { Local<Value> value = Nan::Get(values, j).ToLocalChecked(); grpc_metadata *current = &array->metadata[array->count]; - current->key = key_slice; + current->key = **utf8_key; // Only allow binary headers for "-bin" keys - if (grpc_is_binary_header(key_slice)) { + if (grpc_is_binary_header(current->key, strlen(current->key))) { if (::node::Buffer::HasInstance(value)) { - current->value = CreateSliceFromBuffer(value); + current->value = ::node::Buffer::Data(value); + current->value_length = ::node::Buffer::Length(value); + PersistentValue *handle = new PersistentValue(value); + resources->handles.push_back(unique_ptr<PersistentValue>(handle)); } else { return false; } } else { if (value->IsString()) { Local<String> string_value = Nan::To<String>(value).ToLocalChecked(); - current->value = CreateSliceFromString(string_value); + Utf8String *utf8_value = new Utf8String(string_value); + resources->strings.push_back(unique_ptr<Utf8String>(utf8_value)); + current->value = **utf8_value; + current->value_length = string_value->Length(); } else { return false; } @@ -146,25 +153,40 @@ Local<Value> ParseMetadata(const grpc_metadata_array *metadata_array) { EscapableHandleScope scope; grpc_metadata *metadata_elements = metadata_array->metadata; size_t length = metadata_array->count; + std::map<const char*, size_t> size_map; + std::map<const char*, size_t> index_map; + + for (unsigned int i = 0; i < length; i++) { + const char *key = metadata_elements[i].key; + if (size_map.count(key)) { + size_map[key] += 1; + } else { + size_map[key] = 1; + } + index_map[key] = 0; + } Local<Object> metadata_object = Nan::New<Object>(); for (unsigned int i = 0; i < length; i++) { grpc_metadata* elem = &metadata_elements[i]; - // TODO(murgatroid99): Use zero-copy string construction instead - Local<String> key_string = CopyStringFromSlice(elem->key); + Local<String> key_string = Nan::New(elem->key).ToLocalChecked(); Local<Array> array; MaybeLocal<Value> maybe_array = Nan::Get(metadata_object, key_string); if (maybe_array.IsEmpty() || !maybe_array.ToLocalChecked()->IsArray()) { - array = Nan::New<Array>(0); + array = Nan::New<Array>(size_map[elem->key]); Nan::Set(metadata_object, key_string, array); } else { array = Local<Array>::Cast(maybe_array.ToLocalChecked()); } - if (grpc_is_binary_header(elem->key)) { - Nan::Set(array, array->Length(), CreateBufferFromSlice(elem->value)); + if (grpc_is_binary_header(elem->key, strlen(elem->key))) { + Nan::Set(array, index_map[elem->key], + MakeFastBuffer( + Nan::CopyBuffer(elem->value, + elem->value_length).ToLocalChecked())); } else { - // TODO(murgatroid99): Use zero-copy string construction instead - Nan::Set(array, array->Length(), CopyStringFromSlice(elem->value)); + Nan::Set(array, index_map[elem->key], + Nan::New(elem->value).ToLocalChecked()); } + index_map[elem->key] += 1; } return scope.Escape(metadata_object); } @@ -183,7 +205,8 @@ class SendMetadataOp : public Op { EscapableHandleScope scope; return scope.Escape(Nan::True()); } - bool ParseOp(Local<Value> value, grpc_op *out) { + bool ParseOp(Local<Value> value, grpc_op *out, + shared_ptr<Resources> resources) { if (!value->IsObject()) { return false; } @@ -193,7 +216,7 @@ class SendMetadataOp : public Op { return false; } if (!CreateMetadataArray(maybe_metadata.ToLocalChecked(), - &array)) { + &array, resources)) { return false; } out->data.send_initial_metadata.count = array.count; @@ -223,7 +246,8 @@ class SendMessageOp : public Op { EscapableHandleScope scope; return scope.Escape(Nan::True()); } - bool ParseOp(Local<Value> value, grpc_op *out) { + bool ParseOp(Local<Value> value, grpc_op *out, + shared_ptr<Resources> resources) { if (!::node::Buffer::HasInstance(value)) { return false; } @@ -239,6 +263,8 @@ class SendMessageOp : public Op { } send_message = BufferToByteBuffer(value); out->data.send_message = send_message; + PersistentValue *handle = new PersistentValue(value); + resources->handles.push_back(unique_ptr<PersistentValue>(handle)); return true; } bool IsFinalOp() { @@ -258,7 +284,8 @@ class SendClientCloseOp : public Op { EscapableHandleScope scope; return scope.Escape(Nan::True()); } - bool ParseOp(Local<Value> value, grpc_op *out) { + bool ParseOp(Local<Value> value, grpc_op *out, + shared_ptr<Resources> resources) { return true; } bool IsFinalOp() { @@ -272,14 +299,12 @@ class SendClientCloseOp : public Op { class SendServerStatusOp : public Op { public: - ~SendServerStatusOp() { - grpc_slice_unref(details); - } Local<Value> GetNodeValue() const { EscapableHandleScope scope; return scope.Escape(Nan::True()); } - bool ParseOp(Local<Value> value, grpc_op *out) { + bool ParseOp(Local<Value> value, grpc_op *out, + shared_ptr<Resources> resources) { if (!value->IsObject()) { return false; } @@ -314,15 +339,16 @@ class SendServerStatusOp : public Op { Local<String> details = Nan::To<String>( maybe_details.ToLocalChecked()).ToLocalChecked(); grpc_metadata_array array; - if (!CreateMetadataArray(metadata, &array)) { + if (!CreateMetadataArray(metadata, &array, resources)) { return false; } out->data.send_status_from_server.trailing_metadata_count = array.count; out->data.send_status_from_server.trailing_metadata = array.metadata; out->data.send_status_from_server.status = static_cast<grpc_status_code>(code); - this->details = CreateSliceFromString(details); - out->data.send_status_from_server.status_details = &this->details; + Utf8String *str = new Utf8String(details); + resources->strings.push_back(unique_ptr<Utf8String>(str)); + out->data.send_status_from_server.status_details = **str; return true; } bool IsFinalOp() { @@ -332,9 +358,6 @@ class SendServerStatusOp : public Op { std::string GetTypeString() const { return "send_status"; } - - private: - grpc_slice details; }; class GetMetadataOp : public Op { @@ -352,7 +375,8 @@ class GetMetadataOp : public Op { return scope.Escape(ParseMetadata(&recv_metadata)); } - bool ParseOp(Local<Value> value, grpc_op *out) { + bool ParseOp(Local<Value> value, grpc_op *out, + shared_ptr<Resources> resources) { out->data.recv_initial_metadata = &recv_metadata; return true; } @@ -384,7 +408,8 @@ class ReadMessageOp : public Op { return scope.Escape(ByteBufferToBuffer(recv_message)); } - bool ParseOp(Local<Value> value, grpc_op *out) { + bool ParseOp(Local<Value> value, grpc_op *out, + shared_ptr<Resources> resources) { out->data.recv_message = &recv_message; return true; } @@ -405,16 +430,21 @@ class ClientStatusOp : public Op { public: ClientStatusOp() { grpc_metadata_array_init(&metadata_array); + status_details = NULL; + details_capacity = 0; } ~ClientStatusOp() { grpc_metadata_array_destroy(&metadata_array); + gpr_free(status_details); } - bool ParseOp(Local<Value> value, grpc_op *out) { + bool ParseOp(Local<Value> value, grpc_op *out, + shared_ptr<Resources> resources) { out->data.recv_status_on_client.trailing_metadata = &metadata_array; out->data.recv_status_on_client.status = &status; out->data.recv_status_on_client.status_details = &status_details; + out->data.recv_status_on_client.status_details_capacity = &details_capacity; return true; } @@ -423,8 +453,10 @@ class ClientStatusOp : public Op { Local<Object> status_obj = Nan::New<Object>(); Nan::Set(status_obj, Nan::New("code").ToLocalChecked(), Nan::New<Number>(status)); - Nan::Set(status_obj, Nan::New("details").ToLocalChecked(), - CopyStringFromSlice(status_details)); + if (status_details != NULL) { + Nan::Set(status_obj, Nan::New("details").ToLocalChecked(), + Nan::New(status_details).ToLocalChecked()); + } Nan::Set(status_obj, Nan::New("metadata").ToLocalChecked(), ParseMetadata(&metadata_array)); return scope.Escape(status_obj); @@ -439,7 +471,8 @@ class ClientStatusOp : public Op { private: grpc_metadata_array metadata_array; grpc_status_code status; - grpc_slice status_details; + char *status_details; + size_t details_capacity; }; class ServerCloseResponseOp : public Op { @@ -449,7 +482,8 @@ class ServerCloseResponseOp : public Op { return scope.Escape(Nan::New<Boolean>(cancelled)); } - bool ParseOp(Local<Value> value, grpc_op *out) { + bool ParseOp(Local<Value> value, grpc_op *out, + shared_ptr<Resources> resources) { out->data.recv_close_on_server.cancelled = &cancelled; return true; } @@ -466,8 +500,9 @@ class ServerCloseResponseOp : public Op { int cancelled; }; -tag::tag(Callback *callback, OpVec *ops, Call *call) : - callback(callback), ops(ops), call(call){ +tag::tag(Callback *callback, OpVec *ops, + shared_ptr<Resources> resources, Call *call) : + callback(callback), ops(ops), resources(resources), call(call){ } tag::~tag() { @@ -615,24 +650,20 @@ NAN_METHOD(Call::New) { if (channel->GetWrappedChannel() == NULL) { return Nan::ThrowError("Call cannot be created from a closed channel"); } + Utf8String method(info[1]); double deadline = Nan::To<double>(info[2]).FromJust(); grpc_channel *wrapped_channel = channel->GetWrappedChannel(); grpc_call *wrapped_call; if (info[3]->IsString()) { - grpc_slice *host = new grpc_slice; - *host = CreateSliceFromString( - Nan::To<String>(info[3]).ToLocalChecked()); + Utf8String host_override(info[3]); wrapped_call = grpc_channel_create_call( wrapped_channel, parent_call, propagate_flags, - GetCompletionQueue(), CreateSliceFromString( - Nan::To<String>(info[1]).ToLocalChecked()), - host, MillisecondsToTimespec(deadline), NULL); - delete host; + GetCompletionQueue(), *method, + *host_override, MillisecondsToTimespec(deadline), NULL); } else if (info[3]->IsUndefined() || info[3]->IsNull()) { wrapped_call = grpc_channel_create_call( wrapped_channel, parent_call, propagate_flags, - GetCompletionQueue(), CreateSliceFromString( - Nan::To<String>(info[1]).ToLocalChecked()), + GetCompletionQueue(), *method, NULL, MillisecondsToTimespec(deadline), NULL); } else { return Nan::ThrowTypeError("Call's fourth argument must be a string"); @@ -669,6 +700,7 @@ NAN_METHOD(Call::StartBatch) { } Local<Function> callback_func = info[1].As<Function>(); Call *call = ObjectWrap::Unwrap<Call>(info.This()); + shared_ptr<Resources> resources(new Resources); Local<Object> obj = Nan::To<Object>(info[0]).ToLocalChecked(); Local<Array> keys = Nan::GetOwnPropertyNames(obj).ToLocalChecked(); size_t nops = keys->Length(); @@ -713,7 +745,7 @@ NAN_METHOD(Call::StartBatch) { default: return Nan::ThrowError("Argument object had an unrecognized key"); } - if (!op->ParseOp(obj->Get(type), &ops[i])) { + if (!op->ParseOp(obj->Get(type), &ops[i], resources)) { return Nan::ThrowTypeError("Incorrectly typed arguments to startBatch"); } op_vector->push_back(std::move(op)); @@ -721,7 +753,7 @@ NAN_METHOD(Call::StartBatch) { Callback *callback = new Callback(callback_func); grpc_call_error error = grpc_call_start_batch( call->wrapped_call, &ops[0], nops, new struct tag( - callback, op_vector.release(), call), NULL); + callback, op_vector.release(), resources, call), NULL); if (error != GRPC_CALL_OK) { return Nan::ThrowError(nanErrorWithCode("startBatch failed", error)); } |