diff options
author | Yuchen Zeng <zyc@google.com> | 2017-02-27 13:35:41 -0800 |
---|---|---|
committer | Yuchen Zeng <zyc@google.com> | 2017-03-17 15:57:23 -0700 |
commit | 2c977084c8fec4bc395976338683d1d480bab91e (patch) | |
tree | cf696a8403371b308a45d02445864159ba527e08 /src/node/ext/completion_queue_threadpool.cc | |
parent | dc6b569d97aa54e987681f869f13acdd9c89834f (diff) | |
parent | dc720ca6bf27181c040cefcdb298d9dee8bf3058 (diff) |
Merge remote-tracking branch 'upstream/master' into cares_bazel_rule
Diffstat (limited to 'src/node/ext/completion_queue_threadpool.cc')
-rw-r--r-- | src/node/ext/completion_queue_threadpool.cc | 43 |
1 files changed, 14 insertions, 29 deletions
diff --git a/src/node/ext/completion_queue_threadpool.cc b/src/node/ext/completion_queue_threadpool.cc index 6302e7a103..1917074dc2 100644 --- a/src/node/ext/completion_queue_threadpool.cc +++ b/src/node/ext/completion_queue_threadpool.cc @@ -78,6 +78,8 @@ class CompletionQueueAsyncWorker : public Nan::AsyncWorker { void HandleErrorCallback(); private: + static void TryAddWorker(); + grpc_event result; static grpc_completion_queue *queue; @@ -118,20 +120,21 @@ void CompletionQueueAsyncWorker::Execute() { grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; } -void CompletionQueueAsyncWorker::Next() { -#ifndef GRPC_UV - Nan::HandleScope scope; - if (current_threads < max_queue_threads) { +void CompletionQueueAsyncWorker::TryAddWorker() { + if (current_threads < max_queue_threads && waiting_next_calls > 0) { current_threads += 1; + waiting_next_calls -= 1; CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); Nan::AsyncQueueWorker(worker); - } else { - waiting_next_calls += 1; } GPR_ASSERT(current_threads <= max_queue_threads); GPR_ASSERT((current_threads == max_queue_threads) || (waiting_next_calls == 0)); -#endif +} + +void CompletionQueueAsyncWorker::Next() { + waiting_next_calls += 1; + TryAddWorker(); } void CompletionQueueAsyncWorker::Init(Local<Object> exports) { @@ -143,17 +146,8 @@ void CompletionQueueAsyncWorker::Init(Local<Object> exports) { void CompletionQueueAsyncWorker::HandleOKCallback() { Nan::HandleScope scope; - if (waiting_next_calls > 0) { - waiting_next_calls -= 1; - // Old worker removed, new worker added. current_threads += 0 - CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); - Nan::AsyncQueueWorker(worker); - } else { - current_threads -= 1; - } - GPR_ASSERT(current_threads <= max_queue_threads); - GPR_ASSERT((current_threads == max_queue_threads) || - (waiting_next_calls == 0)); + current_threads -= 1; + TryAddWorker(); Nan::Callback *callback = GetTagCallback(result.tag); Local<Value> argv[] = {Nan::Null(), GetTagNodeValue(result.tag)}; callback->Call(2, argv); @@ -162,18 +156,9 @@ void CompletionQueueAsyncWorker::HandleOKCallback() { } void CompletionQueueAsyncWorker::HandleErrorCallback() { - if (waiting_next_calls > 0) { - waiting_next_calls -= 1; - // Old worker removed, new worker added. current_threads += 0 - CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); - Nan::AsyncQueueWorker(worker); - } else { - current_threads -= 1; - } - GPR_ASSERT(current_threads <= max_queue_threads); - GPR_ASSERT((current_threads == max_queue_threads) || - (waiting_next_calls == 0)); Nan::HandleScope scope; + current_threads -= 1; + TryAddWorker(); Nan::Callback *callback = GetTagCallback(result.tag); Local<Value> argv[] = {Nan::Error(ErrorMessage())}; |