diff options
author | 2015-08-14 09:19:38 -0700 | |
---|---|---|
committer | 2015-08-14 09:19:38 -0700 | |
commit | 0b15b44fe12679742733022664d5709274a74178 (patch) | |
tree | 9309a97ab6f6148b9acad522eea5de3ee9f120c6 /src/node | |
parent | 76714346086e05975492327f05a9f3a546f02782 (diff) | |
parent | 296a369915d126dc53f7fbdd9b4e9d83229e11fa (diff) |
Merge pull request #2696 from murgatroid99/node_client_connectivity
Wrap connectivity API, expose it to client as waitForReady
Diffstat (limited to 'src/node')
-rw-r--r-- | src/node/ext/channel.cc | 59 | ||||
-rw-r--r-- | src/node/ext/channel.h | 2 | ||||
-rw-r--r-- | src/node/ext/completion_queue_async_worker.cc | 2 | ||||
-rw-r--r-- | src/node/ext/node_grpc.cc | 19 | ||||
-rw-r--r-- | src/node/src/client.js | 29 | ||||
-rw-r--r-- | src/node/test/channel_test.js | 87 | ||||
-rw-r--r-- | src/node/test/constant_test.js | 19 | ||||
-rw-r--r-- | src/node/test/surface_test.js | 72 |
8 files changed, 285 insertions, 4 deletions
diff --git a/src/node/ext/channel.cc b/src/node/ext/channel.cc index 45d0d09e22..a61c830099 100644 --- a/src/node/ext/channel.cc +++ b/src/node/ext/channel.cc @@ -33,12 +33,17 @@ #include <vector> +#include "grpc/support/log.h" + #include <node.h> #include <nan.h> #include "grpc/grpc.h" #include "grpc/grpc_security.h" +#include "call.h" #include "channel.h" +#include "completion_queue_async_worker.h" #include "credentials.h" +#include "timeval.h" namespace grpc { namespace node { @@ -51,6 +56,7 @@ using v8::Handle; using v8::HandleScope; using v8::Integer; using v8::Local; +using v8::Number; using v8::Object; using v8::Persistent; using v8::String; @@ -76,6 +82,12 @@ void Channel::Init(Handle<Object> exports) { NanNew<FunctionTemplate>(Close)->GetFunction()); NanSetPrototypeTemplate(tpl, "getTarget", NanNew<FunctionTemplate>(GetTarget)->GetFunction()); + NanSetPrototypeTemplate( + tpl, "getConnectivityState", + NanNew<FunctionTemplate>(GetConnectivityState)->GetFunction()); + NanSetPrototypeTemplate( + tpl, "watchConnectivityState", + NanNew<FunctionTemplate>(WatchConnectivityState)->GetFunction()); NanAssignPersistent(fun_tpl, tpl); Handle<Function> ctr = tpl->GetFunction(); constructor = new NanCallback(ctr); @@ -186,5 +198,52 @@ NAN_METHOD(Channel::GetTarget) { NanReturnValue(NanNew(grpc_channel_get_target(channel->wrapped_channel))); } +NAN_METHOD(Channel::GetConnectivityState) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError( + "getConnectivityState can only be called on Channel objects"); + } + Channel *channel = ObjectWrap::Unwrap<Channel>(args.This()); + int try_to_connect = (int)args[0]->Equals(NanTrue()); + NanReturnValue(grpc_channel_check_connectivity_state(channel->wrapped_channel, + try_to_connect)); +} + +NAN_METHOD(Channel::WatchConnectivityState) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError( + "watchConnectivityState can only be called on Channel objects"); + } + if (!args[0]->IsUint32()) { + return NanThrowTypeError( + "watchConnectivityState's first argument must be a channel state"); + } + if (!(args[1]->IsNumber() || args[1]->IsDate())) { + return NanThrowTypeError( + "watchConnectivityState's second argument must be a date or a number"); + } + if (!args[2]->IsFunction()) { + return NanThrowTypeError( + "watchConnectivityState's third argument must be a callback"); + } + grpc_connectivity_state last_state = + static_cast<grpc_connectivity_state>(args[0]->Uint32Value()); + double deadline = args[1]->NumberValue(); + Handle<Function> callback_func = args[2].As<Function>(); + NanCallback *callback = new NanCallback(callback_func); + Channel *channel = ObjectWrap::Unwrap<Channel>(args.This()); + unique_ptr<OpVec> ops(new OpVec()); + grpc_channel_watch_connectivity_state( + channel->wrapped_channel, last_state, MillisecondsToTimespec(deadline), + CompletionQueueAsyncWorker::GetQueue(), + new struct tag(callback, + ops.release(), + shared_ptr<Resources>(nullptr))); + CompletionQueueAsyncWorker::Next(); + NanReturnUndefined(); +} + } // namespace node } // namespace grpc diff --git a/src/node/ext/channel.h b/src/node/ext/channel.h index e2182cb45c..458f71d093 100644 --- a/src/node/ext/channel.h +++ b/src/node/ext/channel.h @@ -64,6 +64,8 @@ class Channel : public ::node::ObjectWrap { static NAN_METHOD(New); static NAN_METHOD(Close); static NAN_METHOD(GetTarget); + static NAN_METHOD(GetConnectivityState); + static NAN_METHOD(WatchConnectivityState); static NanCallback *constructor; static v8::Persistent<v8::FunctionTemplate> fun_tpl; diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc index 4501e848ae..bf2cd946a5 100644 --- a/src/node/ext/completion_queue_async_worker.cc +++ b/src/node/ext/completion_queue_async_worker.cc @@ -65,7 +65,7 @@ void CompletionQueueAsyncWorker::Execute() { result = grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); if (!result.success) { - SetErrorMessage("The batch encountered an error"); + SetErrorMessage("The async function encountered an error"); } } diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc index 4e31cbaa27..331ccb60d6 100644 --- a/src/node/ext/node_grpc.cc +++ b/src/node/ext/node_grpc.cc @@ -159,12 +159,31 @@ void InitOpTypeConstants(Handle<Object> exports) { op_type->Set(NanNew("RECV_CLOSE_ON_SERVER"), RECV_CLOSE_ON_SERVER); } +void InitConnectivityStateConstants(Handle<Object> exports) { + NanScope(); + Handle<Object> channel_state = NanNew<Object>(); + exports->Set(NanNew("connectivityState"), channel_state); + Handle<Value> IDLE(NanNew<Uint32, uint32_t>(GRPC_CHANNEL_IDLE)); + channel_state->Set(NanNew("IDLE"), IDLE); + Handle<Value> CONNECTING(NanNew<Uint32, uint32_t>(GRPC_CHANNEL_CONNECTING)); + channel_state->Set(NanNew("CONNECTING"), CONNECTING); + Handle<Value> READY(NanNew<Uint32, uint32_t>(GRPC_CHANNEL_READY)); + channel_state->Set(NanNew("READY"), READY); + Handle<Value> TRANSIENT_FAILURE( + NanNew<Uint32, uint32_t>(GRPC_CHANNEL_TRANSIENT_FAILURE)); + channel_state->Set(NanNew("TRANSIENT_FAILURE"), TRANSIENT_FAILURE); + Handle<Value> FATAL_FAILURE( + NanNew<Uint32, uint32_t>(GRPC_CHANNEL_FATAL_FAILURE)); + channel_state->Set(NanNew("FATAL_FAILURE"), FATAL_FAILURE); +} + void init(Handle<Object> exports) { NanScope(); grpc_init(); InitStatusConstants(exports); InitCallErrorConstants(exports); InitOpTypeConstants(exports); + InitConnectivityStateConstants(exports); grpc::node::Call::Init(exports); grpc::node::Channel::Init(exports); diff --git a/src/node/src/client.js b/src/node/src/client.js index 5cde438572..d14713f393 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -558,6 +558,35 @@ exports.makeClientConstructor = function(methods, serviceName) { this.updateMetadata = updateMetadata; } + /** + * Wait for the client to be ready. The callback will be called when the + * client has successfully connected to the server, and it will be called + * with an error if the attempt to connect to the server has unrecoverablly + * failed or if the deadline expires. This function will make the channel + * start connecting if it has not already done so. + * @param {(Date|Number)} deadline When to stop waiting for a connection. Pass + * Infinity to wait forever. + * @param {function(Error)} callback The callback to call when done attempting + * to connect. + */ + Client.prototype.$waitForReady = function(deadline, callback) { + var self = this; + var checkState = function(err) { + if (err) { + callback(new Error('Failed to connect before the deadline')); + } + var new_state = self.channel.getConnectivityState(true); + if (new_state === grpc.connectivityState.READY) { + callback(); + } else if (new_state === grpc.connectivityState.FATAL_FAILURE) { + callback(new Error('Failed to connect to server')); + } else { + self.channel.watchConnectivityState(new_state, deadline, checkState); + } + }; + checkState(); + }; + _.each(methods, function(attrs, name) { var method_type; if (attrs.requestStream) { diff --git a/src/node/test/channel_test.js b/src/node/test/channel_test.js index c991d7b25b..d81df2a36d 100644 --- a/src/node/test/channel_test.js +++ b/src/node/test/channel_test.js @@ -36,6 +36,26 @@ var assert = require('assert'); var grpc = require('bindings')('grpc.node'); +/** + * This is used for testing functions with multiple asynchronous calls that + * can happen in different orders. This should be passed the number of async + * function invocations that can occur last, and each of those should call this + * function's return value + * @param {function()} done The function that should be called when a test is + * complete. + * @param {number} count The number of calls to the resulting function if the + * test passes. + * @return {function()} The function that should be called at the end of each + * sequence of asynchronous functions. + */ +function multiDone(done, count) { + return function() { + count -= 1; + if (count <= 0) { + done(); + } + }; +} var insecureCreds = grpc.Credentials.createInsecure(); describe('channel', function() { @@ -86,14 +106,16 @@ describe('channel', function() { }); }); describe('close', function() { + var channel; + beforeEach(function() { + channel = new grpc.Channel('hostname', insecureCreds, {}); + }); it('should succeed silently', function() { - var channel = new grpc.Channel('hostname', insecureCreds, {}); assert.doesNotThrow(function() { channel.close(); }); }); it('should be idempotent', function() { - var channel = new grpc.Channel('hostname', insecureCreds, {}); assert.doesNotThrow(function() { channel.close(); channel.close(); @@ -101,9 +123,68 @@ describe('channel', function() { }); }); describe('getTarget', function() { + var channel; + beforeEach(function() { + channel = new grpc.Channel('hostname', insecureCreds, {}); + }); it('should return a string', function() { - var channel = new grpc.Channel('localhost', insecureCreds, {}); assert.strictEqual(typeof channel.getTarget(), 'string'); }); }); + describe('getConnectivityState', function() { + var channel; + beforeEach(function() { + channel = new grpc.Channel('hostname', insecureCreds, {}); + }); + it('should return IDLE for a new channel', function() { + assert.strictEqual(channel.getConnectivityState(), + grpc.connectivityState.IDLE); + }); + }); + describe('watchConnectivityState', function() { + var channel; + beforeEach(function() { + channel = new grpc.Channel('localhost', insecureCreds, {}); + }); + afterEach(function() { + channel.close(); + }); + it('should time out if called alone', function(done) { + var old_state = channel.getConnectivityState(); + var deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 1); + channel.watchConnectivityState(old_state, deadline, function(err, value) { + assert(err); + done(); + }); + }); + it('should complete if a connection attempt is forced', function(done) { + var old_state = channel.getConnectivityState(); + var deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 1); + channel.watchConnectivityState(old_state, deadline, function(err, value) { + assert.ifError(err); + assert.notEqual(value.new_state, old_state); + done(); + }); + channel.getConnectivityState(true); + }); + it('should complete twice if called twice', function(done) { + done = multiDone(done, 2); + var old_state = channel.getConnectivityState(); + var deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 1); + channel.watchConnectivityState(old_state, deadline, function(err, value) { + assert.ifError(err); + assert.notEqual(value.new_state, old_state); + done(); + }); + channel.watchConnectivityState(old_state, deadline, function(err, value) { + assert.ifError(err); + assert.notEqual(value.new_state, old_state); + done(); + }); + channel.getConnectivityState(true); + }); + }); }); diff --git a/src/node/test/constant_test.js b/src/node/test/constant_test.js index ecc98ec443..93bf0c8ada 100644 --- a/src/node/test/constant_test.js +++ b/src/node/test/constant_test.js @@ -78,6 +78,19 @@ var callErrorNames = [ 'INVALID_FLAGS' ]; +/** + * List of all connectivity state names + * @const + * @type {Array.<string>} + */ +var connectivityStateNames = [ + 'IDLE', + 'CONNECTING', + 'READY', + 'TRANSIENT_FAILURE', + 'FATAL_FAILURE' +]; + describe('constants', function() { it('should have all of the status constants', function() { for (var i = 0; i < statusNames.length; i++) { @@ -91,4 +104,10 @@ describe('constants', function() { 'call error missing: ' + callErrorNames[i]); } }); + it('should have all of the connectivity states', function() { + for (var i = 0; i < connectivityStateNames.length; i++) { + assert(grpc.connectivityState.hasOwnProperty(connectivityStateNames[i]), + 'connectivity status missing: ' + connectivityStateNames[i]); + } + }); }); diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index dda2f8d127..098905e74b 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -47,6 +47,26 @@ var mathService = math_proto.lookup('math.Math'); var _ = require('lodash'); +/** + * This is used for testing functions with multiple asynchronous calls that + * can happen in different orders. This should be passed the number of async + * function invocations that can occur last, and each of those should call this + * function's return value + * @param {function()} done The function that should be called when a test is + * complete. + * @param {number} count The number of calls to the resulting function if the + * test passes. + * @return {function()} The function that should be called at the end of each + * sequence of asynchronous functions. + */ +function multiDone(done, count) { + return function() { + count -= 1; + if (count <= 0) { + done(); + } + }; +} var server_insecure_creds = grpc.ServerCredentials.createInsecure(); describe('File loader', function() { @@ -112,6 +132,58 @@ describe('Server.prototype.addProtoService', function() { }); }); }); +describe('Client#$waitForReady', function() { + var server; + var port; + var Client; + var client; + before(function() { + server = new grpc.Server(); + port = server.bind('localhost:0', grpc.ServerCredentials.createInsecure()); + server.start(); + Client = surface_client.makeProtobufClientConstructor(mathService); + }); + beforeEach(function() { + client = new Client('localhost:' + port, grpc.Credentials.createInsecure()); + }); + after(function() { + server.shutdown(); + }); + it('should complete when called alone', function(done) { + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + done(); + }); + }); + it('should complete when a call is initiated', function(done) { + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + done(); + }); + var call = client.div({}, function(err, response) {}); + call.cancel(); + }); + it('should complete if called more than once', function(done) { + done = multiDone(done, 2); + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + done(); + }); + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + done(); + }); + }); + it('should complete if called when already ready', function(done) { + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + done(); + }); + }); + }); +}); describe('Echo service', function() { var server; var client; |