diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/node/ext/completion_queue.h | 1 | ||||
-rw-r--r-- | src/node/ext/completion_queue_threadpool.cc (renamed from src/node/ext/completion_queue_async_worker.cc) | 63 | ||||
-rw-r--r-- | src/node/ext/completion_queue_uv.cc (renamed from src/node/ext/completion_queue.cc) | 17 | ||||
-rw-r--r-- | src/node/test/surface_test.js | 2 |
4 files changed, 69 insertions, 14 deletions
diff --git a/src/node/ext/completion_queue.h b/src/node/ext/completion_queue.h index bf280f768b..9b01028ef1 100644 --- a/src/node/ext/completion_queue.h +++ b/src/node/ext/completion_queue.h @@ -32,6 +32,7 @@ */ #include <v8.h> +#include <grpc/grpc.h> namespace grpc { namespace node { diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_threadpool.cc index f5e03b277b..6302e7a103 100644 --- a/src/node/ext/completion_queue_async_worker.cc +++ b/src/node/ext/completion_queue_threadpool.cc @@ -31,18 +31,63 @@ * */ +/* I don't like using #ifndef, but I don't see a better way to do this */ +#ifndef GRPC_UV + #include <node.h> #include <nan.h> #include "grpc/grpc.h" #include "grpc/support/log.h" #include "grpc/support/time.h" -#include "completion_queue_async_worker.h" +#include "completion_queue.h" #include "call.h" namespace grpc { namespace node { +namespace { + +/* A worker that asynchronously calls completion_queue_next, and queues onto the + node event loop a call to the function stored in the event's tag. */ +class CompletionQueueAsyncWorker : public Nan::AsyncWorker { + public: + CompletionQueueAsyncWorker(); + + ~CompletionQueueAsyncWorker(); + /* Calls completion_queue_next with the provided deadline, and stores the + event if there was one or sets an error message if there was not */ + void Execute(); + + /* Returns the completion queue attached to this class */ + static grpc_completion_queue *GetQueue(); + + /* Convenience function to create a worker with the given arguments and queue + it to run asynchronously */ + static void Next(); + + /* Initialize the CompletionQueueAsyncWorker class */ + static void Init(v8::Local<v8::Object> exports); + + protected: + /* Called when Execute has succeeded (completed without setting an error + message). Calls the saved callback with the event that came from + completion_queue_next */ + void HandleOKCallback(); + + void HandleErrorCallback(); + + private: + grpc_event result; + + static grpc_completion_queue *queue; + + // Number of grpc_completion_queue_next calls in the thread pool + static int current_threads; + // Number of grpc_completion_queue_next calls waiting to enter the thread pool + static int waiting_next_calls; +}; + const int max_queue_threads = 2; using v8::Function; @@ -137,5 +182,21 @@ void CompletionQueueAsyncWorker::HandleErrorCallback() { DestroyTag(result.tag); } +} // namespace + +grpc_completion_queue *GetCompletionQueue() { + return CompletionQueueAsyncWorker::GetQueue(); +} + +void CompletionQueueNext() { + CompletionQueueAsyncWorker::Next(); +} + +void CompletionQueueInit(Local<Object> exports) { + CompletionQueueAsyncWorker::Init(exports); +} + } // namespace node } // namespace grpc + +#endif /* GRPC_UV */ diff --git a/src/node/ext/completion_queue.cc b/src/node/ext/completion_queue_uv.cc index fcfa77b39c..615973a6c9 100644 --- a/src/node/ext/completion_queue.cc +++ b/src/node/ext/completion_queue_uv.cc @@ -31,6 +31,8 @@ * */ +#ifdef GRPC_UV + #include <uv.h> #include <node.h> #include <v8.h> @@ -38,7 +40,6 @@ #include "call.h" #include "completion_queue.h" -#include "completion_queue_async_worker.h" namespace grpc { namespace node { @@ -81,34 +82,24 @@ void drain_completion_queue(uv_prepare_t *handle) { } grpc_completion_queue *GetCompletionQueue() { -#ifdef GRPC_UV return queue; -#else - return CompletionQueueAsyncWorker::GetQueue(); -#endif } void CompletionQueueNext() { -#ifdef GRPC_UV if (pending_batches == 0) { GPR_ASSERT(!uv_is_active((uv_handle_t *)&prepare)); uv_prepare_start(&prepare, drain_completion_queue); } pending_batches++; -#else - CompletionQueueAsyncWorker::Next(); -#endif } void CompletionQueueInit(Local<Object> exports) { -#ifdef GRPC_UV queue = grpc_completion_queue_create(NULL); uv_prepare_init(uv_default_loop(), &prepare); pending_batches = 0; -#else - CompletionQueueAsyncWorker::Init(exports); -#endif } } // namespace node } // namespace grpc + +#endif /* GRPC_UV */ diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index d8b36dc55c..17c62d5635 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -183,6 +183,7 @@ describe('Server.prototype.addProtoService', function() { assert.strictEqual(status.code, grpc.status.UNIMPLEMENTED); done(); }); + call.on('error', function(status) { /* Do nothing */ }); }); it('should respond to a bidi call with UNIMPLEMENTED', function(done) { var call = client.divMany(); @@ -193,6 +194,7 @@ describe('Server.prototype.addProtoService', function() { assert.strictEqual(status.code, grpc.status.UNIMPLEMENTED); done(); }); + call.on('error', function(status) { /* Do nothing */ }); call.end(); }); }); |