diff options
author | murgatroid99 <mlumish@google.com> | 2015-10-06 11:16:49 -0700 |
---|---|---|
committer | murgatroid99 <mlumish@google.com> | 2015-10-06 11:16:49 -0700 |
commit | cc545461c0bcc819a40370d0259146b9e0a35b6c (patch) | |
tree | 508c691c09fa41df12f260735501c44eb681ee27 /src/node/ext | |
parent | 86ef17ada9a11966ea307c720bfa06e63ce09184 (diff) | |
parent | a4aba6e66876d9f3babda8644949fd3cb4bb9745 (diff) |
Resolved merge conflicts with master
Diffstat (limited to 'src/node/ext')
-rw-r--r-- | src/node/ext/completion_queue_async_worker.cc | 22 |
1 files changed, 22 insertions, 0 deletions
diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc index 3a79f7c45d..619ea41515 100644 --- a/src/node/ext/completion_queue_async_worker.cc +++ b/src/node/ext/completion_queue_async_worker.cc @@ -52,6 +52,9 @@ using v8::Value; grpc_completion_queue *CompletionQueueAsyncWorker::queue; +// Invariants: current_threads <= max_queue_threads +// (current_threads == max_queue_threads) || (waiting_next_calls == 0) + int CompletionQueueAsyncWorker::current_threads; int CompletionQueueAsyncWorker::waiting_next_calls; @@ -73,11 +76,15 @@ grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; } void CompletionQueueAsyncWorker::Next() { Nan::HandleScope scope; if (current_threads < max_queue_threads) { + current_threads += 1; CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); Nan::AsyncQueueWorker(worker); } else { waiting_next_calls += 1; } + GPR_ASSERT(current_threads <= max_queue_threads); + GPR_ASSERT((current_threads == max_queue_threads) || + (waiting_next_calls == 0)); } void CompletionQueueAsyncWorker::Init(Local<Object> exports) { @@ -91,11 +98,15 @@ void CompletionQueueAsyncWorker::HandleOKCallback() { Nan::HandleScope scope; if (waiting_next_calls > 0) { waiting_next_calls -= 1; + // Old worker removed, new worker added. current_threads += 0 CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); Nan::AsyncQueueWorker(worker); } else { current_threads -= 1; } + GPR_ASSERT(current_threads <= max_queue_threads); + GPR_ASSERT((current_threads == max_queue_threads) || + (waiting_next_calls == 0)); Nan::Callback *callback = GetTagCallback(result.tag); Local<Value> argv[] = {Nan::Null(), GetTagNodeValue(result.tag)}; callback->Call(2, argv); @@ -104,6 +115,17 @@ void CompletionQueueAsyncWorker::HandleOKCallback() { } void CompletionQueueAsyncWorker::HandleErrorCallback() { + if (waiting_next_calls > 0) { + waiting_next_calls -= 1; + // Old worker removed, new worker added. current_threads += 0 + CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); + Nan::AsyncQueueWorker(worker); + } else { + current_threads -= 1; + } + GPR_ASSERT(current_threads <= max_queue_threads); + GPR_ASSERT((current_threads == max_queue_threads) || + (waiting_next_calls == 0)); Nan::HandleScope scope; Nan::Callback *callback = GetTagCallback(result.tag); Local<Value> argv[] = {Nan::Error(ErrorMessage())}; |