diff options
Diffstat (limited to 'src/node/ext/completion_queue_async_worker.cc')
-rw-r--r-- | src/node/ext/completion_queue_async_worker.cc | 49 |
1 files changed, 35 insertions, 14 deletions
diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc index bf2cd946a5..619ea41515 100644 --- a/src/node/ext/completion_queue_async_worker.cc +++ b/src/node/ext/completion_queue_async_worker.cc @@ -46,18 +46,20 @@ namespace node { const int max_queue_threads = 2; using v8::Function; -using v8::Handle; +using v8::Local; using v8::Object; -using v8::Persistent; using v8::Value; grpc_completion_queue *CompletionQueueAsyncWorker::queue; +// Invariants: current_threads <= max_queue_threads +// (current_threads == max_queue_threads) || (waiting_next_calls == 0) + int CompletionQueueAsyncWorker::current_threads; int CompletionQueueAsyncWorker::waiting_next_calls; CompletionQueueAsyncWorker::CompletionQueueAsyncWorker() - : NanAsyncWorker(NULL) {} + : Nan::AsyncWorker(NULL) {} CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {} @@ -72,42 +74,61 @@ void CompletionQueueAsyncWorker::Execute() { grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; } void CompletionQueueAsyncWorker::Next() { - NanScope(); + Nan::HandleScope scope; if (current_threads < max_queue_threads) { + current_threads += 1; CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); - NanAsyncQueueWorker(worker); + Nan::AsyncQueueWorker(worker); } else { waiting_next_calls += 1; } + GPR_ASSERT(current_threads <= max_queue_threads); + GPR_ASSERT((current_threads == max_queue_threads) || + (waiting_next_calls == 0)); } -void CompletionQueueAsyncWorker::Init(Handle<Object> exports) { - NanScope(); +void CompletionQueueAsyncWorker::Init(Local<Object> exports) { + Nan::HandleScope scope; current_threads = 0; waiting_next_calls = 0; queue = grpc_completion_queue_create(NULL); } void CompletionQueueAsyncWorker::HandleOKCallback() { - NanScope(); + Nan::HandleScope scope; if (waiting_next_calls > 0) { waiting_next_calls -= 1; + // Old worker removed, new worker added. current_threads += 0 CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); - NanAsyncQueueWorker(worker); + Nan::AsyncQueueWorker(worker); } else { current_threads -= 1; } - NanCallback *callback = GetTagCallback(result.tag); - Handle<Value> argv[] = {NanNull(), GetTagNodeValue(result.tag)}; + GPR_ASSERT(current_threads <= max_queue_threads); + GPR_ASSERT((current_threads == max_queue_threads) || + (waiting_next_calls == 0)); + Nan::Callback *callback = GetTagCallback(result.tag); + Local<Value> argv[] = {Nan::Null(), GetTagNodeValue(result.tag)}; callback->Call(2, argv); DestroyTag(result.tag); } void CompletionQueueAsyncWorker::HandleErrorCallback() { - NanScope(); - NanCallback *callback = GetTagCallback(result.tag); - Handle<Value> argv[] = {NanError(ErrorMessage())}; + if (waiting_next_calls > 0) { + waiting_next_calls -= 1; + // Old worker removed, new worker added. current_threads += 0 + CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); + Nan::AsyncQueueWorker(worker); + } else { + current_threads -= 1; + } + GPR_ASSERT(current_threads <= max_queue_threads); + GPR_ASSERT((current_threads == max_queue_threads) || + (waiting_next_calls == 0)); + Nan::HandleScope scope; + Nan::Callback *callback = GetTagCallback(result.tag); + Local<Value> argv[] = {Nan::Error(ErrorMessage())}; callback->Call(1, argv); |