diff options
author | murgatroid99 <mlumish@google.com> | 2015-07-28 15:18:57 -0700 |
---|---|---|
committer | murgatroid99 <mlumish@google.com> | 2015-07-28 15:18:57 -0700 |
commit | c7f4d4fb84566992bc89d2ffceefdec1a2e83040 (patch) | |
tree | 64e813e8127ec9efb3cce41a1d8ccda7bdfd1037 /src | |
parent | 8e06c2e62e0be2606598dd1f2a6582b585364520 (diff) |
Wrap connectivity API, expose it to client as waitForReady
Diffstat (limited to 'src')
-rw-r--r-- | src/node/ext/channel.cc | 80 | ||||
-rw-r--r-- | src/node/ext/channel.h | 2 | ||||
-rw-r--r-- | src/node/ext/completion_queue_async_worker.cc | 2 | ||||
-rw-r--r-- | src/node/ext/node_grpc.cc | 19 | ||||
-rw-r--r-- | src/node/src/client.js | 41 | ||||
-rw-r--r-- | src/node/test/channel_test.js | 88 | ||||
-rw-r--r-- | src/node/test/constant_test.js | 19 | ||||
-rw-r--r-- | src/node/test/surface_test.js | 76 |
8 files changed, 323 insertions, 4 deletions
diff --git a/src/node/ext/channel.cc b/src/node/ext/channel.cc index c43b55f115..fa99c10d22 100644 --- a/src/node/ext/channel.cc +++ b/src/node/ext/channel.cc @@ -33,12 +33,17 @@ #include <vector> +#include "grpc/support/log.h" + #include <node.h> #include <nan.h> #include "grpc/grpc.h" #include "grpc/grpc_security.h" +#include "call.h" #include "channel.h" +#include "completion_queue_async_worker.h" #include "credentials.h" +#include "timeval.h" namespace grpc { namespace node { @@ -51,11 +56,31 @@ using v8::Handle; using v8::HandleScope; using v8::Integer; using v8::Local; +using v8::Number; using v8::Object; using v8::Persistent; using v8::String; using v8::Value; +class ConnectivityStateOp : public Op { + public: + Handle<Value> GetNodeValue() const { + return NanNew<Number>(new_state); + } + + bool ParseOp(Handle<Value> value, grpc_op *out, + shared_ptr<Resources> resources) { + return true; + } + + grpc_connectivity_state new_state; + + protected: + std::string GetTypeString() const { + return "new_state"; + } +}; + NanCallback *Channel::constructor; Persistent<FunctionTemplate> Channel::fun_tpl; @@ -78,6 +103,12 @@ void Channel::Init(Handle<Object> exports) { NanNew<FunctionTemplate>(Close)->GetFunction()); NanSetPrototypeTemplate(tpl, "getTarget", NanNew<FunctionTemplate>(GetTarget)->GetFunction()); + NanSetPrototypeTemplate( + tpl, "getConnectivityState", + NanNew<FunctionTemplate>(GetConnectivityState)->GetFunction()); + NanSetPrototypeTemplate( + tpl, "watchConnectivityState", + NanNew<FunctionTemplate>(WatchConnectivityState)->GetFunction()); NanAssignPersistent(fun_tpl, tpl); Handle<Function> ctr = tpl->GetFunction(); constructor = new NanCallback(ctr); @@ -196,5 +227,54 @@ NAN_METHOD(Channel::GetTarget) { NanReturnValue(NanNew(grpc_channel_get_target(channel->wrapped_channel))); } +NAN_METHOD(Channel::GetConnectivityState) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError( + "getConnectivityState can only be called on Channel objects"); + } + Channel *channel = ObjectWrap::Unwrap<Channel>(args.This()); + int try_to_connect = (int)args[0]->Equals(NanTrue()); + NanReturnValue(grpc_channel_check_connectivity_state(channel->wrapped_channel, + try_to_connect)); +} + +NAN_METHOD(Channel::WatchConnectivityState) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError( + "watchConnectivityState can only be called on Channel objects"); + } + if (!args[0]->IsUint32()) { + return NanThrowTypeError( + "watchConnectivityState's first argument must be a channel state"); + } + if (!(args[1]->IsNumber() || args[1]->IsDate())) { + return NanThrowTypeError( + "watchConnectivityState's second argument must be a date or a number"); + } + if (!args[2]->IsFunction()) { + return NanThrowTypeError( + "watchConnectivityState's third argument must be a callback"); + } + grpc_connectivity_state last_state = + static_cast<grpc_connectivity_state>(args[0]->Uint32Value()); + double deadline = args[1]->NumberValue(); + Handle<Function> callback_func = args[2].As<Function>(); + NanCallback *callback = new NanCallback(callback_func); + Channel *channel = ObjectWrap::Unwrap<Channel>(args.This()); + ConnectivityStateOp *op = new ConnectivityStateOp(); + unique_ptr<OpVec> ops(new OpVec()); + ops->push_back(unique_ptr<Op>(op)); + grpc_channel_watch_connectivity_state( + channel->wrapped_channel, last_state, &op->new_state, + MillisecondsToTimespec(deadline), CompletionQueueAsyncWorker::GetQueue(), + new struct tag(callback, + ops.release(), + shared_ptr<Resources>(nullptr))); + CompletionQueueAsyncWorker::Next(); + NanReturnUndefined(); +} + } // namespace node } // namespace grpc diff --git a/src/node/ext/channel.h b/src/node/ext/channel.h index 6725ebb03f..c4e83a32e2 100644 --- a/src/node/ext/channel.h +++ b/src/node/ext/channel.h @@ -67,6 +67,8 @@ class Channel : public ::node::ObjectWrap { static NAN_METHOD(New); static NAN_METHOD(Close); static NAN_METHOD(GetTarget); + static NAN_METHOD(GetConnectivityState); + static NAN_METHOD(WatchConnectivityState); static NanCallback *constructor; static v8::Persistent<v8::FunctionTemplate> fun_tpl; diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc index 1215c97e19..c45e303f60 100644 --- a/src/node/ext/completion_queue_async_worker.cc +++ b/src/node/ext/completion_queue_async_worker.cc @@ -65,7 +65,7 @@ void CompletionQueueAsyncWorker::Execute() { result = grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME)); if (!result.success) { - SetErrorMessage("The batch encountered an error"); + SetErrorMessage("The asnyc function encountered an error"); } } diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc index 4e31cbaa27..331ccb60d6 100644 --- a/src/node/ext/node_grpc.cc +++ b/src/node/ext/node_grpc.cc @@ -159,12 +159,31 @@ void InitOpTypeConstants(Handle<Object> exports) { op_type->Set(NanNew("RECV_CLOSE_ON_SERVER"), RECV_CLOSE_ON_SERVER); } +void InitConnectivityStateConstants(Handle<Object> exports) { + NanScope(); + Handle<Object> channel_state = NanNew<Object>(); + exports->Set(NanNew("connectivityState"), channel_state); + Handle<Value> IDLE(NanNew<Uint32, uint32_t>(GRPC_CHANNEL_IDLE)); + channel_state->Set(NanNew("IDLE"), IDLE); + Handle<Value> CONNECTING(NanNew<Uint32, uint32_t>(GRPC_CHANNEL_CONNECTING)); + channel_state->Set(NanNew("CONNECTING"), CONNECTING); + Handle<Value> READY(NanNew<Uint32, uint32_t>(GRPC_CHANNEL_READY)); + channel_state->Set(NanNew("READY"), READY); + Handle<Value> TRANSIENT_FAILURE( + NanNew<Uint32, uint32_t>(GRPC_CHANNEL_TRANSIENT_FAILURE)); + channel_state->Set(NanNew("TRANSIENT_FAILURE"), TRANSIENT_FAILURE); + Handle<Value> FATAL_FAILURE( + NanNew<Uint32, uint32_t>(GRPC_CHANNEL_FATAL_FAILURE)); + channel_state->Set(NanNew("FATAL_FAILURE"), FATAL_FAILURE); +} + void init(Handle<Object> exports) { NanScope(); grpc_init(); InitStatusConstants(exports); InitCallErrorConstants(exports); InitOpTypeConstants(exports); + InitConnectivityStateConstants(exports); grpc::node::Call::Init(exports); grpc::node::Channel::Init(exports); diff --git a/src/node/src/client.js b/src/node/src/client.js index f843669bd0..39392dffe2 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -551,6 +551,47 @@ exports.makeClientConstructor = function(methods, serviceName) { this.updateMetadata = updateMetadata; } + /** + * Wait for the client to be ready. The callback will be called when the + * client has successfully connected to the server, and it will be called + * with an error if the attempt to connect to the server has unrecoverablly + * failed or if the deadline expires. This function does not automatically + * attempt to initiate the connection, so the callback will not be called + * unless you also start a method call or call $tryConnect. + * @param {(Date|Number)} deadline When to stop waiting for a connection. Pass + * Infinity to wait forever. + * @param {function(Error)} callback The callback to call when done attempting + * to connect. + */ + Client.prototype.$waitForReady = function(deadline, callback) { + var self = this; + var checkState = function(err, result) { + if (err) { + callback(new Error('Failed to connect before the deadline')); + } + var new_state = result.new_state; + console.log(result); + if (new_state === grpc.connectivityState.READY) { + callback(); + } else if (new_state === grpc.connectivityState.FATAL_FAILURE) { + callback(new Error('Failed to connect to server')); + } else { + self.channel.watchConnectivityState(new_state, deadline, checkState); + } + }; + checkState(null, {new_state: this.channel.getConnectivityState()}); + }; + + /** + * Attempt to connect to the server. That will happen automatically if + * you try to make a method call with this client, so this function should + * only be used if you want to know that you have a connection before making + * any calls. + */ + Client.prototype.$tryConnect = function() { + this.channel.getConnectivityState(true); + }; + _.each(methods, function(attrs, name) { var method_type; if (attrs.requestStream) { diff --git a/src/node/test/channel_test.js b/src/node/test/channel_test.js index 3e61d3bbc6..feb3e67227 100644 --- a/src/node/test/channel_test.js +++ b/src/node/test/channel_test.js @@ -36,6 +36,27 @@ var assert = require('assert'); var grpc = require('bindings')('grpc.node'); +/** + * This is used for testing functions with multiple asynchronous calls that + * can happen in different orders. This should be passed the number of async + * function invocations that can occur last, and each of those should call this + * function's return value + * @param {function()} done The function that should be called when a test is + * complete. + * @param {number} count The number of calls to the resulting function if the + * test passes. + * @return {function()} The function that should be called at the end of each + * sequence of asynchronous functions. + */ +function multiDone(done, count) { + return function() { + count -= 1; + if (count <= 0) { + done(); + } + }; +} + describe('channel', function() { describe('constructor', function() { it('should require a string for the first argument', function() { @@ -73,14 +94,16 @@ describe('channel', function() { }); }); describe('close', function() { + var channel; + beforeEach(function() { + channel = new grpc.Channel('hostname', {}); + }); it('should succeed silently', function() { - var channel = new grpc.Channel('hostname', {}); assert.doesNotThrow(function() { channel.close(); }); }); it('should be idempotent', function() { - var channel = new grpc.Channel('hostname', {}); assert.doesNotThrow(function() { channel.close(); channel.close(); @@ -88,9 +111,68 @@ describe('channel', function() { }); }); describe('getTarget', function() { + var channel; + beforeEach(function() { + channel = new grpc.Channel('hostname', {}); + }); it('should return a string', function() { - var channel = new grpc.Channel('localhost', {}); assert.strictEqual(typeof channel.getTarget(), 'string'); }); }); + describe('getConnectivityState', function() { + var channel; + beforeEach(function() { + channel = new grpc.Channel('hostname', {}); + }); + it('should return IDLE for a new channel', function() { + assert.strictEqual(channel.getConnectivityState(), + grpc.connectivityState.IDLE); + }); + }); + describe('watchConnectivityState', function() { + var channel; + beforeEach(function() { + channel = new grpc.Channel('localhost', {}); + }); + afterEach(function() { + channel.close(); + }); + it('should time out if called alone', function(done) { + var old_state = channel.getConnectivityState(); + var deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 1); + channel.watchConnectivityState(old_state, deadline, function(err, value) { + assert(err); + done(); + }); + }); + it('should complete if a connection attempt is forced', function(done) { + var old_state = channel.getConnectivityState(); + var deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 1); + channel.watchConnectivityState(old_state, deadline, function(err, value) { + assert.ifError(err); + assert.notEqual(value.new_state, old_state); + done(); + }); + channel.getConnectivityState(true); + }); + it('should complete twice if called twice', function(done) { + done = multiDone(done, 2); + var old_state = channel.getConnectivityState(); + var deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 1); + channel.watchConnectivityState(old_state, deadline, function(err, value) { + assert.ifError(err); + assert.notEqual(value.new_state, old_state); + done(); + }); + channel.watchConnectivityState(old_state, deadline, function(err, value) { + assert.ifError(err); + assert.notEqual(value.new_state, old_state); + done(); + }); + channel.getConnectivityState(true); + }); + }); }); diff --git a/src/node/test/constant_test.js b/src/node/test/constant_test.js index ecc98ec443..93bf0c8ada 100644 --- a/src/node/test/constant_test.js +++ b/src/node/test/constant_test.js @@ -78,6 +78,19 @@ var callErrorNames = [ 'INVALID_FLAGS' ]; +/** + * List of all connectivity state names + * @const + * @type {Array.<string>} + */ +var connectivityStateNames = [ + 'IDLE', + 'CONNECTING', + 'READY', + 'TRANSIENT_FAILURE', + 'FATAL_FAILURE' +]; + describe('constants', function() { it('should have all of the status constants', function() { for (var i = 0; i < statusNames.length; i++) { @@ -91,4 +104,10 @@ describe('constants', function() { 'call error missing: ' + callErrorNames[i]); } }); + it('should have all of the connectivity states', function() { + for (var i = 0; i < connectivityStateNames.length; i++) { + assert(grpc.connectivityState.hasOwnProperty(connectivityStateNames[i]), + 'connectivity status missing: ' + connectivityStateNames[i]); + } + }); }); diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index 98f9b15bfc..4711658e2f 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -47,6 +47,27 @@ var mathService = math_proto.lookup('math.Math'); var _ = require('lodash'); +/** + * This is used for testing functions with multiple asynchronous calls that + * can happen in different orders. This should be passed the number of async + * function invocations that can occur last, and each of those should call this + * function's return value + * @param {function()} done The function that should be called when a test is + * complete. + * @param {number} count The number of calls to the resulting function if the + * test passes. + * @return {function()} The function that should be called at the end of each + * sequence of asynchronous functions. + */ +function multiDone(done, count) { + return function() { + count -= 1; + if (count <= 0) { + done(); + } + }; +} + describe('File loader', function() { it('Should load a proto file by default', function() { assert.doesNotThrow(function() { @@ -110,6 +131,61 @@ describe('Server.prototype.addProtoService', function() { }); }); }); +describe('Client#$waitForReady', function() { + var server; + var port; + var Client; + var client; + before(function() { + server = new grpc.Server(); + port = server.bind('localhost:0'); + server.start(); + Client = surface_client.makeProtobufClientConstructor(mathService); + }); + beforeEach(function() { + client = new Client('localhost:' + port); + }); + after(function() { + server.shutdown(); + }); + it('should complete when a call is initiated', function(done) { + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + done(); + }); + var call = client.div({}, function(err, response) {}); + call.cancel(); + }); + it('should complete if $tryConnect is called', function(done) { + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + done(); + }); + client.$tryConnect(); + }); + it('should complete if called more than once', function(done) { + done = multiDone(done, 2); + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + done(); + }); + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + done(); + }); + client.$tryConnect(); + }); + it('should complete if called when already ready', function(done) { + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + done(); + }); + }); + client.$tryConnect(); + }); +}); describe('Echo service', function() { var server; var client; |