aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/node/ext/call.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/node/ext/call.cc')
-rw-r--r--src/node/ext/call.cc81
1 files changed, 69 insertions, 12 deletions
diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc
index 9f023b5883..191e763e0e 100644
--- a/src/node/ext/call.cc
+++ b/src/node/ext/call.cc
@@ -45,6 +45,7 @@
#include "byte_buffer.h"
#include "call.h"
#include "channel.h"
+#include "completion_queue.h"
#include "completion_queue_async_worker.h"
#include "call_credentials.h"
#include "timeval.h"
@@ -222,6 +223,9 @@ class SendMetadataOp : public Op {
out->data.send_initial_metadata.metadata = array.metadata;
return true;
}
+ bool IsFinalOp() {
+ return false;
+ }
protected:
std::string GetTypeString() const {
return "send_metadata";
@@ -263,6 +267,9 @@ class SendMessageOp : public Op {
resources->handles.push_back(unique_ptr<PersistentValue>(handle));
return true;
}
+ bool IsFinalOp() {
+ return false;
+ }
protected:
std::string GetTypeString() const {
return "send_message";
@@ -281,6 +288,9 @@ class SendClientCloseOp : public Op {
shared_ptr<Resources> resources) {
return true;
}
+ bool IsFinalOp() {
+ return false;
+ }
protected:
std::string GetTypeString() const {
return "client_close";
@@ -341,6 +351,9 @@ class SendServerStatusOp : public Op {
out->data.send_status_from_server.status_details = **str;
return true;
}
+ bool IsFinalOp() {
+ return true;
+ }
protected:
std::string GetTypeString() const {
return "send_status";
@@ -367,6 +380,9 @@ class GetMetadataOp : public Op {
out->data.recv_initial_metadata = &recv_metadata;
return true;
}
+ bool IsFinalOp() {
+ return false;
+ }
protected:
std::string GetTypeString() const {
@@ -397,6 +413,9 @@ class ReadMessageOp : public Op {
out->data.recv_message = &recv_message;
return true;
}
+ bool IsFinalOp() {
+ return false;
+ }
protected:
std::string GetTypeString() const {
@@ -442,6 +461,9 @@ class ClientStatusOp : public Op {
ParseMetadata(&metadata_array));
return scope.Escape(status_obj);
}
+ bool IsFinalOp() {
+ return true;
+ }
protected:
std::string GetTypeString() const {
return "status";
@@ -465,6 +487,9 @@ class ServerCloseResponseOp : public Op {
out->data.recv_close_on_server.cancelled = &cancelled;
return true;
}
+ bool IsFinalOp() {
+ return false;
+ }
protected:
std::string GetTypeString() const {
@@ -476,8 +501,8 @@ class ServerCloseResponseOp : public Op {
};
tag::tag(Callback *callback, OpVec *ops,
- shared_ptr<Resources> resources) :
- callback(callback), ops(ops), resources(resources){
+ shared_ptr<Resources> resources, Call *call) :
+ callback(callback), ops(ops), resources(resources), call(call){
}
tag::~tag() {
@@ -502,16 +527,36 @@ Callback *GetTagCallback(void *tag) {
return tag_struct->callback;
}
+void CompleteTag(void *tag) {
+ struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
+ bool is_final_op = false;
+ if (tag_struct->call == NULL) {
+ return;
+ }
+ for (vector<unique_ptr<Op> >::iterator it = tag_struct->ops->begin();
+ it != tag_struct->ops->end(); ++it) {
+ Op *op_ptr = it->get();
+ if (op_ptr->IsFinalOp()) {
+ is_final_op = true;
+ }
+ }
+ tag_struct->call->CompleteBatch(is_final_op);
+}
+
void DestroyTag(void *tag) {
struct tag *tag_struct = reinterpret_cast<struct tag *>(tag);
delete tag_struct;
}
-Call::Call(grpc_call *call) : wrapped_call(call) {
+Call::Call(grpc_call *call) : wrapped_call(call),
+ pending_batches(0),
+ has_final_op_completed(false) {
}
Call::~Call() {
- grpc_call_destroy(wrapped_call);
+ if (wrapped_call != NULL) {
+ grpc_call_destroy(wrapped_call);
+ }
}
void Call::Init(Local<Object> exports) {
@@ -552,6 +597,17 @@ Local<Value> Call::WrapStruct(grpc_call *call) {
}
}
+void Call::CompleteBatch(bool is_final_op) {
+ if (is_final_op) {
+ this->has_final_op_completed = true;
+ }
+ this->pending_batches--;
+ if (this->has_final_op_completed && this->pending_batches == 0) {
+ grpc_call_destroy(this->wrapped_call);
+ this->wrapped_call = NULL;
+ }
+}
+
NAN_METHOD(Call::New) {
if (info.IsConstructCall()) {
Call *call;
@@ -602,27 +658,27 @@ NAN_METHOD(Call::New) {
Utf8String host_override(info[3]);
wrapped_call = grpc_channel_create_call(
wrapped_channel, parent_call, propagate_flags,
- CompletionQueueAsyncWorker::GetQueue(), *method,
+ GetCompletionQueue(), *method,
*host_override, MillisecondsToTimespec(deadline), NULL);
} else if (info[3]->IsUndefined() || info[3]->IsNull()) {
wrapped_call = grpc_channel_create_call(
wrapped_channel, parent_call, propagate_flags,
- CompletionQueueAsyncWorker::GetQueue(), *method,
+ GetCompletionQueue(), *method,
NULL, MillisecondsToTimespec(deadline), NULL);
} else {
return Nan::ThrowTypeError("Call's fourth argument must be a string");
}
call = new Call(wrapped_call);
- info.This()->SetHiddenValue(Nan::New("channel_").ToLocalChecked(),
- channel_object);
+ Nan::Set(info.This(), Nan::New("channel_").ToLocalChecked(),
+ channel_object);
}
call->Wrap(info.This());
info.GetReturnValue().Set(info.This());
} else {
const int argc = 4;
Local<Value> argv[argc] = {info[0], info[1], info[2], info[3]};
- MaybeLocal<Object> maybe_instance = constructor->GetFunction()->NewInstance(
- argc, argv);
+ MaybeLocal<Object> maybe_instance = Nan::NewInstance(
+ constructor->GetFunction(), argc, argv);
if (maybe_instance.IsEmpty()) {
// There's probably a pending exception
return;
@@ -697,11 +753,12 @@ NAN_METHOD(Call::StartBatch) {
Callback *callback = new Callback(callback_func);
grpc_call_error error = grpc_call_start_batch(
call->wrapped_call, &ops[0], nops, new struct tag(
- callback, op_vector.release(), resources), NULL);
+ callback, op_vector.release(), resources, call), NULL);
if (error != GRPC_CALL_OK) {
return Nan::ThrowError(nanErrorWithCode("startBatch failed", error));
}
- CompletionQueueAsyncWorker::Next();
+ call->pending_batches++;
+ CompletionQueueNext();
}
NAN_METHOD(Call::Cancel) {