diff options
Diffstat (limited to 'src/node/ext/completion_queue_threadpool.cc')
-rw-r--r-- | src/node/ext/completion_queue_threadpool.cc | 44 |
1 files changed, 15 insertions, 29 deletions
diff --git a/src/node/ext/completion_queue_threadpool.cc b/src/node/ext/completion_queue_threadpool.cc index 6302e7a103..4881542f2d 100644 --- a/src/node/ext/completion_queue_threadpool.cc +++ b/src/node/ext/completion_queue_threadpool.cc @@ -78,6 +78,8 @@ class CompletionQueueAsyncWorker : public Nan::AsyncWorker { void HandleErrorCallback(); private: + static void TryAddWorker(); + grpc_event result; static grpc_completion_queue *queue; @@ -118,20 +120,21 @@ void CompletionQueueAsyncWorker::Execute() { grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; } -void CompletionQueueAsyncWorker::Next() { -#ifndef GRPC_UV - Nan::HandleScope scope; - if (current_threads < max_queue_threads) { +void CompletionQueueAsyncWorker::TryAddWorker() { + if (current_threads < max_queue_threads && waiting_next_calls > 0) { current_threads += 1; + waiting_next_calls -= 1; CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); Nan::AsyncQueueWorker(worker); - } else { - waiting_next_calls += 1; } GPR_ASSERT(current_threads <= max_queue_threads); GPR_ASSERT((current_threads == max_queue_threads) || (waiting_next_calls == 0)); -#endif +} + +void CompletionQueueAsyncWorker::Next() { + waiting_next_calls += 1; + TryAddWorker(); } void CompletionQueueAsyncWorker::Init(Local<Object> exports) { @@ -143,17 +146,8 @@ void CompletionQueueAsyncWorker::Init(Local<Object> exports) { void CompletionQueueAsyncWorker::HandleOKCallback() { Nan::HandleScope scope; - if (waiting_next_calls > 0) { - waiting_next_calls -= 1; - // Old worker removed, new worker added. current_threads += 0 - CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); - Nan::AsyncQueueWorker(worker); - } else { - current_threads -= 1; - } - GPR_ASSERT(current_threads <= max_queue_threads); - GPR_ASSERT((current_threads == max_queue_threads) || - (waiting_next_calls == 0)); + current_threads -= 1; + TryAddWorker(); Nan::Callback *callback = GetTagCallback(result.tag); Local<Value> argv[] = {Nan::Null(), GetTagNodeValue(result.tag)}; callback->Call(2, argv); @@ -162,18 +156,9 @@ void CompletionQueueAsyncWorker::HandleOKCallback() { } void CompletionQueueAsyncWorker::HandleErrorCallback() { - if (waiting_next_calls > 0) { - waiting_next_calls -= 1; - // Old worker removed, new worker added. current_threads += 0 - CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); - Nan::AsyncQueueWorker(worker); - } else { - current_threads -= 1; - } - GPR_ASSERT(current_threads <= max_queue_threads); - GPR_ASSERT((current_threads == max_queue_threads) || - (waiting_next_calls == 0)); Nan::HandleScope scope; + current_threads -= 1; + TryAddWorker(); Nan::Callback *callback = GetTagCallback(result.tag); Local<Value> argv[] = {Nan::Error(ErrorMessage())}; @@ -189,6 +174,7 @@ grpc_completion_queue *GetCompletionQueue() { } void CompletionQueueNext() { + gpr_log(GPR_DEBUG, "Called CompletionQueueNext"); CompletionQueueAsyncWorker::Next(); } |