aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/node/ext
diff options
context:
space:
mode:
authorGravatar murgatroid99 <mlumish@google.com>2015-10-06 11:16:49 -0700
committerGravatar murgatroid99 <mlumish@google.com>2015-10-06 11:16:49 -0700
commitcc545461c0bcc819a40370d0259146b9e0a35b6c (patch)
tree508c691c09fa41df12f260735501c44eb681ee27 /src/node/ext
parent86ef17ada9a11966ea307c720bfa06e63ce09184 (diff)
parenta4aba6e66876d9f3babda8644949fd3cb4bb9745 (diff)
Resolved merge conflicts with master
Diffstat (limited to 'src/node/ext')
-rw-r--r--src/node/ext/completion_queue_async_worker.cc22
1 files changed, 22 insertions, 0 deletions
diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc
index 3a79f7c45d..619ea41515 100644
--- a/src/node/ext/completion_queue_async_worker.cc
+++ b/src/node/ext/completion_queue_async_worker.cc
@@ -52,6 +52,9 @@ using v8::Value;
grpc_completion_queue *CompletionQueueAsyncWorker::queue;
+// Invariants: current_threads <= max_queue_threads
+// (current_threads == max_queue_threads) || (waiting_next_calls == 0)
+
int CompletionQueueAsyncWorker::current_threads;
int CompletionQueueAsyncWorker::waiting_next_calls;
@@ -73,11 +76,15 @@ grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; }
void CompletionQueueAsyncWorker::Next() {
Nan::HandleScope scope;
if (current_threads < max_queue_threads) {
+ current_threads += 1;
CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
Nan::AsyncQueueWorker(worker);
} else {
waiting_next_calls += 1;
}
+ GPR_ASSERT(current_threads <= max_queue_threads);
+ GPR_ASSERT((current_threads == max_queue_threads) ||
+ (waiting_next_calls == 0));
}
void CompletionQueueAsyncWorker::Init(Local<Object> exports) {
@@ -91,11 +98,15 @@ void CompletionQueueAsyncWorker::HandleOKCallback() {
Nan::HandleScope scope;
if (waiting_next_calls > 0) {
waiting_next_calls -= 1;
+ // Old worker removed, new worker added. current_threads += 0
CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
Nan::AsyncQueueWorker(worker);
} else {
current_threads -= 1;
}
+ GPR_ASSERT(current_threads <= max_queue_threads);
+ GPR_ASSERT((current_threads == max_queue_threads) ||
+ (waiting_next_calls == 0));
Nan::Callback *callback = GetTagCallback(result.tag);
Local<Value> argv[] = {Nan::Null(), GetTagNodeValue(result.tag)};
callback->Call(2, argv);
@@ -104,6 +115,17 @@ void CompletionQueueAsyncWorker::HandleOKCallback() {
}
void CompletionQueueAsyncWorker::HandleErrorCallback() {
+ if (waiting_next_calls > 0) {
+ waiting_next_calls -= 1;
+ // Old worker removed, new worker added. current_threads += 0
+ CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
+ Nan::AsyncQueueWorker(worker);
+ } else {
+ current_threads -= 1;
+ }
+ GPR_ASSERT(current_threads <= max_queue_threads);
+ GPR_ASSERT((current_threads == max_queue_threads) ||
+ (waiting_next_calls == 0));
Nan::HandleScope scope;
Nan::Callback *callback = GetTagCallback(result.tag);
Local<Value> argv[] = {Nan::Error(ErrorMessage())};