aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar murgatroid99 <mlumish@google.com>2015-07-28 15:18:57 -0700
committerGravatar murgatroid99 <mlumish@google.com>2015-07-28 15:18:57 -0700
commitc7f4d4fb84566992bc89d2ffceefdec1a2e83040 (patch)
tree64e813e8127ec9efb3cce41a1d8ccda7bdfd1037 /src
parent8e06c2e62e0be2606598dd1f2a6582b585364520 (diff)
Wrap connectivity API, expose it to client as waitForReady
Diffstat (limited to 'src')
-rw-r--r--src/node/ext/channel.cc80
-rw-r--r--src/node/ext/channel.h2
-rw-r--r--src/node/ext/completion_queue_async_worker.cc2
-rw-r--r--src/node/ext/node_grpc.cc19
-rw-r--r--src/node/src/client.js41
-rw-r--r--src/node/test/channel_test.js88
-rw-r--r--src/node/test/constant_test.js19
-rw-r--r--src/node/test/surface_test.js76
8 files changed, 323 insertions, 4 deletions
diff --git a/src/node/ext/channel.cc b/src/node/ext/channel.cc
index c43b55f115..fa99c10d22 100644
--- a/src/node/ext/channel.cc
+++ b/src/node/ext/channel.cc
@@ -33,12 +33,17 @@
#include <vector>
+#include "grpc/support/log.h"
+
#include <node.h>
#include <nan.h>
#include "grpc/grpc.h"
#include "grpc/grpc_security.h"
+#include "call.h"
#include "channel.h"
+#include "completion_queue_async_worker.h"
#include "credentials.h"
+#include "timeval.h"
namespace grpc {
namespace node {
@@ -51,11 +56,31 @@ using v8::Handle;
using v8::HandleScope;
using v8::Integer;
using v8::Local;
+using v8::Number;
using v8::Object;
using v8::Persistent;
using v8::String;
using v8::Value;
+class ConnectivityStateOp : public Op {
+ public:
+ Handle<Value> GetNodeValue() const {
+ return NanNew<Number>(new_state);
+ }
+
+ bool ParseOp(Handle<Value> value, grpc_op *out,
+ shared_ptr<Resources> resources) {
+ return true;
+ }
+
+ grpc_connectivity_state new_state;
+
+ protected:
+ std::string GetTypeString() const {
+ return "new_state";
+ }
+};
+
NanCallback *Channel::constructor;
Persistent<FunctionTemplate> Channel::fun_tpl;
@@ -78,6 +103,12 @@ void Channel::Init(Handle<Object> exports) {
NanNew<FunctionTemplate>(Close)->GetFunction());
NanSetPrototypeTemplate(tpl, "getTarget",
NanNew<FunctionTemplate>(GetTarget)->GetFunction());
+ NanSetPrototypeTemplate(
+ tpl, "getConnectivityState",
+ NanNew<FunctionTemplate>(GetConnectivityState)->GetFunction());
+ NanSetPrototypeTemplate(
+ tpl, "watchConnectivityState",
+ NanNew<FunctionTemplate>(WatchConnectivityState)->GetFunction());
NanAssignPersistent(fun_tpl, tpl);
Handle<Function> ctr = tpl->GetFunction();
constructor = new NanCallback(ctr);
@@ -196,5 +227,54 @@ NAN_METHOD(Channel::GetTarget) {
NanReturnValue(NanNew(grpc_channel_get_target(channel->wrapped_channel)));
}
+NAN_METHOD(Channel::GetConnectivityState) {
+ NanScope();
+ if (!HasInstance(args.This())) {
+ return NanThrowTypeError(
+ "getConnectivityState can only be called on Channel objects");
+ }
+ Channel *channel = ObjectWrap::Unwrap<Channel>(args.This());
+ int try_to_connect = (int)args[0]->Equals(NanTrue());
+ NanReturnValue(grpc_channel_check_connectivity_state(channel->wrapped_channel,
+ try_to_connect));
+}
+
+NAN_METHOD(Channel::WatchConnectivityState) {
+ NanScope();
+ if (!HasInstance(args.This())) {
+ return NanThrowTypeError(
+ "watchConnectivityState can only be called on Channel objects");
+ }
+ if (!args[0]->IsUint32()) {
+ return NanThrowTypeError(
+ "watchConnectivityState's first argument must be a channel state");
+ }
+ if (!(args[1]->IsNumber() || args[1]->IsDate())) {
+ return NanThrowTypeError(
+ "watchConnectivityState's second argument must be a date or a number");
+ }
+ if (!args[2]->IsFunction()) {
+ return NanThrowTypeError(
+ "watchConnectivityState's third argument must be a callback");
+ }
+ grpc_connectivity_state last_state =
+ static_cast<grpc_connectivity_state>(args[0]->Uint32Value());
+ double deadline = args[1]->NumberValue();
+ Handle<Function> callback_func = args[2].As<Function>();
+ NanCallback *callback = new NanCallback(callback_func);
+ Channel *channel = ObjectWrap::Unwrap<Channel>(args.This());
+ ConnectivityStateOp *op = new ConnectivityStateOp();
+ unique_ptr<OpVec> ops(new OpVec());
+ ops->push_back(unique_ptr<Op>(op));
+ grpc_channel_watch_connectivity_state(
+ channel->wrapped_channel, last_state, &op->new_state,
+ MillisecondsToTimespec(deadline), CompletionQueueAsyncWorker::GetQueue(),
+ new struct tag(callback,
+ ops.release(),
+ shared_ptr<Resources>(nullptr)));
+ CompletionQueueAsyncWorker::Next();
+ NanReturnUndefined();
+}
+
} // namespace node
} // namespace grpc
diff --git a/src/node/ext/channel.h b/src/node/ext/channel.h
index 6725ebb03f..c4e83a32e2 100644
--- a/src/node/ext/channel.h
+++ b/src/node/ext/channel.h
@@ -67,6 +67,8 @@ class Channel : public ::node::ObjectWrap {
static NAN_METHOD(New);
static NAN_METHOD(Close);
static NAN_METHOD(GetTarget);
+ static NAN_METHOD(GetConnectivityState);
+ static NAN_METHOD(WatchConnectivityState);
static NanCallback *constructor;
static v8::Persistent<v8::FunctionTemplate> fun_tpl;
diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc
index 1215c97e19..c45e303f60 100644
--- a/src/node/ext/completion_queue_async_worker.cc
+++ b/src/node/ext/completion_queue_async_worker.cc
@@ -65,7 +65,7 @@ void CompletionQueueAsyncWorker::Execute() {
result =
grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME));
if (!result.success) {
- SetErrorMessage("The batch encountered an error");
+ SetErrorMessage("The asnyc function encountered an error");
}
}
diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc
index 4e31cbaa27..331ccb60d6 100644
--- a/src/node/ext/node_grpc.cc
+++ b/src/node/ext/node_grpc.cc
@@ -159,12 +159,31 @@ void InitOpTypeConstants(Handle<Object> exports) {
op_type->Set(NanNew("RECV_CLOSE_ON_SERVER"), RECV_CLOSE_ON_SERVER);
}
+void InitConnectivityStateConstants(Handle<Object> exports) {
+ NanScope();
+ Handle<Object> channel_state = NanNew<Object>();
+ exports->Set(NanNew("connectivityState"), channel_state);
+ Handle<Value> IDLE(NanNew<Uint32, uint32_t>(GRPC_CHANNEL_IDLE));
+ channel_state->Set(NanNew("IDLE"), IDLE);
+ Handle<Value> CONNECTING(NanNew<Uint32, uint32_t>(GRPC_CHANNEL_CONNECTING));
+ channel_state->Set(NanNew("CONNECTING"), CONNECTING);
+ Handle<Value> READY(NanNew<Uint32, uint32_t>(GRPC_CHANNEL_READY));
+ channel_state->Set(NanNew("READY"), READY);
+ Handle<Value> TRANSIENT_FAILURE(
+ NanNew<Uint32, uint32_t>(GRPC_CHANNEL_TRANSIENT_FAILURE));
+ channel_state->Set(NanNew("TRANSIENT_FAILURE"), TRANSIENT_FAILURE);
+ Handle<Value> FATAL_FAILURE(
+ NanNew<Uint32, uint32_t>(GRPC_CHANNEL_FATAL_FAILURE));
+ channel_state->Set(NanNew("FATAL_FAILURE"), FATAL_FAILURE);
+}
+
void init(Handle<Object> exports) {
NanScope();
grpc_init();
InitStatusConstants(exports);
InitCallErrorConstants(exports);
InitOpTypeConstants(exports);
+ InitConnectivityStateConstants(exports);
grpc::node::Call::Init(exports);
grpc::node::Channel::Init(exports);
diff --git a/src/node/src/client.js b/src/node/src/client.js
index f843669bd0..39392dffe2 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -551,6 +551,47 @@ exports.makeClientConstructor = function(methods, serviceName) {
this.updateMetadata = updateMetadata;
}
+ /**
+ * Wait for the client to be ready. The callback will be called when the
+ * client has successfully connected to the server, and it will be called
+ * with an error if the attempt to connect to the server has unrecoverablly
+ * failed or if the deadline expires. This function does not automatically
+ * attempt to initiate the connection, so the callback will not be called
+ * unless you also start a method call or call $tryConnect.
+ * @param {(Date|Number)} deadline When to stop waiting for a connection. Pass
+ * Infinity to wait forever.
+ * @param {function(Error)} callback The callback to call when done attempting
+ * to connect.
+ */
+ Client.prototype.$waitForReady = function(deadline, callback) {
+ var self = this;
+ var checkState = function(err, result) {
+ if (err) {
+ callback(new Error('Failed to connect before the deadline'));
+ }
+ var new_state = result.new_state;
+ console.log(result);
+ if (new_state === grpc.connectivityState.READY) {
+ callback();
+ } else if (new_state === grpc.connectivityState.FATAL_FAILURE) {
+ callback(new Error('Failed to connect to server'));
+ } else {
+ self.channel.watchConnectivityState(new_state, deadline, checkState);
+ }
+ };
+ checkState(null, {new_state: this.channel.getConnectivityState()});
+ };
+
+ /**
+ * Attempt to connect to the server. That will happen automatically if
+ * you try to make a method call with this client, so this function should
+ * only be used if you want to know that you have a connection before making
+ * any calls.
+ */
+ Client.prototype.$tryConnect = function() {
+ this.channel.getConnectivityState(true);
+ };
+
_.each(methods, function(attrs, name) {
var method_type;
if (attrs.requestStream) {
diff --git a/src/node/test/channel_test.js b/src/node/test/channel_test.js
index 3e61d3bbc6..feb3e67227 100644
--- a/src/node/test/channel_test.js
+++ b/src/node/test/channel_test.js
@@ -36,6 +36,27 @@
var assert = require('assert');
var grpc = require('bindings')('grpc.node');
+/**
+ * This is used for testing functions with multiple asynchronous calls that
+ * can happen in different orders. This should be passed the number of async
+ * function invocations that can occur last, and each of those should call this
+ * function's return value
+ * @param {function()} done The function that should be called when a test is
+ * complete.
+ * @param {number} count The number of calls to the resulting function if the
+ * test passes.
+ * @return {function()} The function that should be called at the end of each
+ * sequence of asynchronous functions.
+ */
+function multiDone(done, count) {
+ return function() {
+ count -= 1;
+ if (count <= 0) {
+ done();
+ }
+ };
+}
+
describe('channel', function() {
describe('constructor', function() {
it('should require a string for the first argument', function() {
@@ -73,14 +94,16 @@ describe('channel', function() {
});
});
describe('close', function() {
+ var channel;
+ beforeEach(function() {
+ channel = new grpc.Channel('hostname', {});
+ });
it('should succeed silently', function() {
- var channel = new grpc.Channel('hostname', {});
assert.doesNotThrow(function() {
channel.close();
});
});
it('should be idempotent', function() {
- var channel = new grpc.Channel('hostname', {});
assert.doesNotThrow(function() {
channel.close();
channel.close();
@@ -88,9 +111,68 @@ describe('channel', function() {
});
});
describe('getTarget', function() {
+ var channel;
+ beforeEach(function() {
+ channel = new grpc.Channel('hostname', {});
+ });
it('should return a string', function() {
- var channel = new grpc.Channel('localhost', {});
assert.strictEqual(typeof channel.getTarget(), 'string');
});
});
+ describe('getConnectivityState', function() {
+ var channel;
+ beforeEach(function() {
+ channel = new grpc.Channel('hostname', {});
+ });
+ it('should return IDLE for a new channel', function() {
+ assert.strictEqual(channel.getConnectivityState(),
+ grpc.connectivityState.IDLE);
+ });
+ });
+ describe('watchConnectivityState', function() {
+ var channel;
+ beforeEach(function() {
+ channel = new grpc.Channel('localhost', {});
+ });
+ afterEach(function() {
+ channel.close();
+ });
+ it('should time out if called alone', function(done) {
+ var old_state = channel.getConnectivityState();
+ var deadline = new Date();
+ deadline.setSeconds(deadline.getSeconds() + 1);
+ channel.watchConnectivityState(old_state, deadline, function(err, value) {
+ assert(err);
+ done();
+ });
+ });
+ it('should complete if a connection attempt is forced', function(done) {
+ var old_state = channel.getConnectivityState();
+ var deadline = new Date();
+ deadline.setSeconds(deadline.getSeconds() + 1);
+ channel.watchConnectivityState(old_state, deadline, function(err, value) {
+ assert.ifError(err);
+ assert.notEqual(value.new_state, old_state);
+ done();
+ });
+ channel.getConnectivityState(true);
+ });
+ it('should complete twice if called twice', function(done) {
+ done = multiDone(done, 2);
+ var old_state = channel.getConnectivityState();
+ var deadline = new Date();
+ deadline.setSeconds(deadline.getSeconds() + 1);
+ channel.watchConnectivityState(old_state, deadline, function(err, value) {
+ assert.ifError(err);
+ assert.notEqual(value.new_state, old_state);
+ done();
+ });
+ channel.watchConnectivityState(old_state, deadline, function(err, value) {
+ assert.ifError(err);
+ assert.notEqual(value.new_state, old_state);
+ done();
+ });
+ channel.getConnectivityState(true);
+ });
+ });
});
diff --git a/src/node/test/constant_test.js b/src/node/test/constant_test.js
index ecc98ec443..93bf0c8ada 100644
--- a/src/node/test/constant_test.js
+++ b/src/node/test/constant_test.js
@@ -78,6 +78,19 @@ var callErrorNames = [
'INVALID_FLAGS'
];
+/**
+ * List of all connectivity state names
+ * @const
+ * @type {Array.<string>}
+ */
+var connectivityStateNames = [
+ 'IDLE',
+ 'CONNECTING',
+ 'READY',
+ 'TRANSIENT_FAILURE',
+ 'FATAL_FAILURE'
+];
+
describe('constants', function() {
it('should have all of the status constants', function() {
for (var i = 0; i < statusNames.length; i++) {
@@ -91,4 +104,10 @@ describe('constants', function() {
'call error missing: ' + callErrorNames[i]);
}
});
+ it('should have all of the connectivity states', function() {
+ for (var i = 0; i < connectivityStateNames.length; i++) {
+ assert(grpc.connectivityState.hasOwnProperty(connectivityStateNames[i]),
+ 'connectivity status missing: ' + connectivityStateNames[i]);
+ }
+ });
});
diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js
index 98f9b15bfc..4711658e2f 100644
--- a/src/node/test/surface_test.js
+++ b/src/node/test/surface_test.js
@@ -47,6 +47,27 @@ var mathService = math_proto.lookup('math.Math');
var _ = require('lodash');
+/**
+ * This is used for testing functions with multiple asynchronous calls that
+ * can happen in different orders. This should be passed the number of async
+ * function invocations that can occur last, and each of those should call this
+ * function's return value
+ * @param {function()} done The function that should be called when a test is
+ * complete.
+ * @param {number} count The number of calls to the resulting function if the
+ * test passes.
+ * @return {function()} The function that should be called at the end of each
+ * sequence of asynchronous functions.
+ */
+function multiDone(done, count) {
+ return function() {
+ count -= 1;
+ if (count <= 0) {
+ done();
+ }
+ };
+}
+
describe('File loader', function() {
it('Should load a proto file by default', function() {
assert.doesNotThrow(function() {
@@ -110,6 +131,61 @@ describe('Server.prototype.addProtoService', function() {
});
});
});
+describe('Client#$waitForReady', function() {
+ var server;
+ var port;
+ var Client;
+ var client;
+ before(function() {
+ server = new grpc.Server();
+ port = server.bind('localhost:0');
+ server.start();
+ Client = surface_client.makeProtobufClientConstructor(mathService);
+ });
+ beforeEach(function() {
+ client = new Client('localhost:' + port);
+ });
+ after(function() {
+ server.shutdown();
+ });
+ it('should complete when a call is initiated', function(done) {
+ client.$waitForReady(Infinity, function(error) {
+ assert.ifError(error);
+ done();
+ });
+ var call = client.div({}, function(err, response) {});
+ call.cancel();
+ });
+ it('should complete if $tryConnect is called', function(done) {
+ client.$waitForReady(Infinity, function(error) {
+ assert.ifError(error);
+ done();
+ });
+ client.$tryConnect();
+ });
+ it('should complete if called more than once', function(done) {
+ done = multiDone(done, 2);
+ client.$waitForReady(Infinity, function(error) {
+ assert.ifError(error);
+ done();
+ });
+ client.$waitForReady(Infinity, function(error) {
+ assert.ifError(error);
+ done();
+ });
+ client.$tryConnect();
+ });
+ it('should complete if called when already ready', function(done) {
+ client.$waitForReady(Infinity, function(error) {
+ assert.ifError(error);
+ client.$waitForReady(Infinity, function(error) {
+ assert.ifError(error);
+ done();
+ });
+ });
+ client.$tryConnect();
+ });
+});
describe('Echo service', function() {
var server;
var client;