diff options
Diffstat (limited to 'src/node')
-rw-r--r-- | src/node/ext/completion_queue_async_worker.cc | 22 | ||||
-rw-r--r-- | src/node/ext/completion_queue_async_worker.h | 5 |
2 files changed, 25 insertions, 2 deletions
diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc index cd7acd1d1b..4e57121a85 100644 --- a/src/node/ext/completion_queue_async_worker.cc +++ b/src/node/ext/completion_queue_async_worker.cc @@ -43,6 +43,8 @@ namespace grpc { namespace node { +const int max_queue_threads = 2; + using v8::Function; using v8::Handle; using v8::Object; @@ -51,6 +53,9 @@ using v8::Value; grpc_completion_queue *CompletionQueueAsyncWorker::queue; +int CompletionQueueAsyncWorker::current_threads; +int CompletionQueueAsyncWorker::waiting_next_calls; + CompletionQueueAsyncWorker::CompletionQueueAsyncWorker() : NanAsyncWorker(NULL) {} @@ -67,17 +72,30 @@ grpc_completion_queue *CompletionQueueAsyncWorker::GetQueue() { return queue; } void CompletionQueueAsyncWorker::Next() { NanScope(); - CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); - NanAsyncQueueWorker(worker); + if (current_threads < max_queue_threads) { + CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); + NanAsyncQueueWorker(worker); + } else { + waiting_next_calls += 1; + } } void CompletionQueueAsyncWorker::Init(Handle<Object> exports) { NanScope(); + current_threads = 0; + waiting_next_calls = 0; queue = grpc_completion_queue_create(); } void CompletionQueueAsyncWorker::HandleOKCallback() { NanScope(); + if (waiting_next_calls > 0) { + waiting_next_calls -= 1; + CompletionQueueAsyncWorker *worker = new CompletionQueueAsyncWorker(); + NanAsyncQueueWorker(worker); + } else { + current_threads -= 1; + } NanCallback *callback = GetTagCallback(result->tag); Handle<Value> argv[] = {NanNull(), GetTagNodeValue(result->tag)}; callback->Call(2, argv); diff --git a/src/node/ext/completion_queue_async_worker.h b/src/node/ext/completion_queue_async_worker.h index 0ddb5b4cfd..5d52bbb1fb 100644 --- a/src/node/ext/completion_queue_async_worker.h +++ b/src/node/ext/completion_queue_async_worker.h @@ -73,6 +73,11 @@ class CompletionQueueAsyncWorker : public NanAsyncWorker { grpc_event *result; static grpc_completion_queue *queue; + + // Number of grpc_completion_queue_next calls in the thread pool + static int current_threads; + // Number of grpc_completion_queue_next calls waiting to enter the thread pool + static int waiting_next_calls; }; } // namespace node |