aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/node/ext/completion_queue.h1
-rw-r--r--src/node/ext/completion_queue_threadpool.cc (renamed from src/node/ext/completion_queue_async_worker.cc)63
-rw-r--r--src/node/ext/completion_queue_uv.cc (renamed from src/node/ext/completion_queue.cc)17
-rw-r--r--src/node/test/surface_test.js2
4 files changed, 69 insertions, 14 deletions
diff --git a/src/node/ext/completion_queue.h b/src/node/ext/completion_queue.h
index bf280f768b..9b01028ef1 100644
--- a/src/node/ext/completion_queue.h
+++ b/src/node/ext/completion_queue.h
@@ -32,6 +32,7 @@
*/
#include <v8.h>
+#include <grpc/grpc.h>
namespace grpc {
namespace node {
diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_threadpool.cc
index f5e03b277b..6302e7a103 100644
--- a/src/node/ext/completion_queue_async_worker.cc
+++ b/src/node/ext/completion_queue_threadpool.cc
@@ -31,18 +31,63 @@
*
*/
+/* I don't like using #ifndef, but I don't see a better way to do this */
+#ifndef GRPC_UV
+
#include <node.h>
#include <nan.h>
#include "grpc/grpc.h"
#include "grpc/support/log.h"
#include "grpc/support/time.h"
-#include "completion_queue_async_worker.h"
+#include "completion_queue.h"
#include "call.h"
namespace grpc {
namespace node {
+namespace {
+
+/* A worker that asynchronously calls completion_queue_next, and queues onto the
+ node event loop a call to the function stored in the event's tag. */
+class CompletionQueueAsyncWorker : public Nan::AsyncWorker {
+ public:
+ CompletionQueueAsyncWorker();
+
+ ~CompletionQueueAsyncWorker();
+ /* Calls completion_queue_next with the provided deadline, and stores the
+ event if there was one or sets an error message if there was not */
+ void Execute();
+
+ /* Returns the completion queue attached to this class */
+ static grpc_completion_queue *GetQueue();
+
+ /* Convenience function to create a worker with the given arguments and queue
+ it to run asynchronously */
+ static void Next();
+
+ /* Initialize the CompletionQueueAsyncWorker class */
+ static void Init(v8::Local<v8::Object> exports);
+
+ protected:
+ /* Called when Execute has succeeded (completed without setting an error
+ message). Calls the saved callback with the event that came from
+ completion_queue_next */
+ void HandleOKCallback();
+
+ void HandleErrorCallback();
+
+ private:
+ grpc_event result;
+
+ static grpc_completion_queue *queue;
+
+ // Number of grpc_completion_queue_next calls in the thread pool
+ static int current_threads;
+ // Number of grpc_completion_queue_next calls waiting to enter the thread pool
+ static int waiting_next_calls;
+};
+
const int max_queue_threads = 2;
using v8::Function;
@@ -137,5 +182,21 @@ void CompletionQueueAsyncWorker::HandleErrorCallback() {
DestroyTag(result.tag);
}
+} // namespace
+
+grpc_completion_queue *GetCompletionQueue() {
+ return CompletionQueueAsyncWorker::GetQueue();
+}
+
+void CompletionQueueNext() {
+ CompletionQueueAsyncWorker::Next();
+}
+
+void CompletionQueueInit(Local<Object> exports) {
+ CompletionQueueAsyncWorker::Init(exports);
+}
+
} // namespace node
} // namespace grpc
+
+#endif /* GRPC_UV */
diff --git a/src/node/ext/completion_queue.cc b/src/node/ext/completion_queue_uv.cc
index fcfa77b39c..615973a6c9 100644
--- a/src/node/ext/completion_queue.cc
+++ b/src/node/ext/completion_queue_uv.cc
@@ -31,6 +31,8 @@
*
*/
+#ifdef GRPC_UV
+
#include <uv.h>
#include <node.h>
#include <v8.h>
@@ -38,7 +40,6 @@
#include "call.h"
#include "completion_queue.h"
-#include "completion_queue_async_worker.h"
namespace grpc {
namespace node {
@@ -81,34 +82,24 @@ void drain_completion_queue(uv_prepare_t *handle) {
}
grpc_completion_queue *GetCompletionQueue() {
-#ifdef GRPC_UV
return queue;
-#else
- return CompletionQueueAsyncWorker::GetQueue();
-#endif
}
void CompletionQueueNext() {
-#ifdef GRPC_UV
if (pending_batches == 0) {
GPR_ASSERT(!uv_is_active((uv_handle_t *)&prepare));
uv_prepare_start(&prepare, drain_completion_queue);
}
pending_batches++;
-#else
- CompletionQueueAsyncWorker::Next();
-#endif
}
void CompletionQueueInit(Local<Object> exports) {
-#ifdef GRPC_UV
queue = grpc_completion_queue_create(NULL);
uv_prepare_init(uv_default_loop(), &prepare);
pending_batches = 0;
-#else
- CompletionQueueAsyncWorker::Init(exports);
-#endif
}
} // namespace node
} // namespace grpc
+
+#endif /* GRPC_UV */
diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js
index d8b36dc55c..17c62d5635 100644
--- a/src/node/test/surface_test.js
+++ b/src/node/test/surface_test.js
@@ -183,6 +183,7 @@ describe('Server.prototype.addProtoService', function() {
assert.strictEqual(status.code, grpc.status.UNIMPLEMENTED);
done();
});
+ call.on('error', function(status) { /* Do nothing */ });
});
it('should respond to a bidi call with UNIMPLEMENTED', function(done) {
var call = client.divMany();
@@ -193,6 +194,7 @@ describe('Server.prototype.addProtoService', function() {
assert.strictEqual(status.code, grpc.status.UNIMPLEMENTED);
done();
});
+ call.on('error', function(status) { /* Do nothing */ });
call.end();
});
});