aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Michael Lumish <mlumish@google.com>2017-02-07 11:20:16 -0800
committerGravatar Michael Lumish <mlumish@google.com>2017-02-07 11:20:16 -0800
commitfe6b328a1b93df552a4f2b42746c25092cc7bcc2 (patch)
treea49e19652ffe2c72a0682259ae89b51add6a81a1
parent2b3e12ceeecd2ff1c0b7364764e583212cd02901 (diff)
Node: refactor non-uv completion queue wrapping code
-rw-r--r--src/node/ext/completion_queue_threadpool.cc44
1 files changed, 15 insertions, 29 deletions
diff --git a/src/node/ext/completion_queue_threadpool.cc b/src/node/ext/completion_queue_threadpool.cc
index 6302e7a103..4881542f2d 100644
--- a/src/node/ext/completion_queue_threadpool.cc
+++ b/src/node/ext/completion_queue_threadpool.cc
@@ -78,6 +78,8 @@ class CompletionQueueAsyncWorker : public Nan::AsyncWorker {
void HandleErrorCallback();
private:
+ static void TryAddWorker();
+
grpc_event result;
static grpc_completion_queue *queue;
@@ -118,20 +120,21 @@ void CompletionQueueAsyncWorker::Execute() {
grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; }
-void CompletionQueueAsyncWorker::Next() {
-#ifndef GRPC_UV
- Nan::HandleScope scope;
- if (current_threads < max_queue_threads) {
+void CompletionQueueAsyncWorker::TryAddWorker() {
+ if (current_threads < max_queue_threads && waiting_next_calls > 0) {
current_threads += 1;
+ waiting_next_calls -= 1;
CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
Nan::AsyncQueueWorker(worker);
- } else {
- waiting_next_calls += 1;
}
GPR_ASSERT(current_threads <= max_queue_threads);
GPR_ASSERT((current_threads == max_queue_threads) ||
(waiting_next_calls == 0));
-#endif
+}
+
+void CompletionQueueAsyncWorker::Next() {
+ waiting_next_calls += 1;
+ TryAddWorker();
}
void CompletionQueueAsyncWorker::Init(Local<Object> exports) {
@@ -143,17 +146,8 @@ void CompletionQueueAsyncWorker::Init(Local<Object> exports) {
void CompletionQueueAsyncWorker::HandleOKCallback() {
Nan::HandleScope scope;
- if (waiting_next_calls > 0) {
- waiting_next_calls -= 1;
- // Old worker removed, new worker added. current_threads += 0
- CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
- Nan::AsyncQueueWorker(worker);
- } else {
- current_threads -= 1;
- }
- GPR_ASSERT(current_threads <= max_queue_threads);
- GPR_ASSERT((current_threads == max_queue_threads) ||
- (waiting_next_calls == 0));
+ current_threads -= 1;
+ TryAddWorker();
Nan::Callback *callback = GetTagCallback(result.tag);
Local<Value> argv[] = {Nan::Null(), GetTagNodeValue(result.tag)};
callback->Call(2, argv);
@@ -162,18 +156,9 @@ void CompletionQueueAsyncWorker::HandleOKCallback() {
}
void CompletionQueueAsyncWorker::HandleErrorCallback() {
- if (waiting_next_calls > 0) {
- waiting_next_calls -= 1;
- // Old worker removed, new worker added. current_threads += 0
- CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker();
- Nan::AsyncQueueWorker(worker);
- } else {
- current_threads -= 1;
- }
- GPR_ASSERT(current_threads <= max_queue_threads);
- GPR_ASSERT((current_threads == max_queue_threads) ||
- (waiting_next_calls == 0));
Nan::HandleScope scope;
+ current_threads -= 1;
+ TryAddWorker();
Nan::Callback *callback = GetTagCallback(result.tag);
Local<Value> argv[] = {Nan::Error(ErrorMessage())};
@@ -189,6 +174,7 @@ grpc_completion_queue *GetCompletionQueue() {
}
void CompletionQueueNext() {
+ gpr_log(GPR_DEBUG, "Called CompletionQueueNext");
CompletionQueueAsyncWorker::Next();
}